Orchestration

Orchestrator

class Orchestrator : public Application

Orchestrator for edge computing workloads with admission control and scheduling.

Orchestrator manages the execution of computational tasks on a cluster of backend workers. It provides:

The orchestrator supports mixed task types through a task type registry. Each task type is registered via RegisterTaskType() with its deserializer callbacks, enabling DAGs containing different task types (e.g., ImageTask and LlmTask in the same workflow).

Example usage:

Ptr<Orchestrator> orchestrator = CreateObject<Orchestrator>();
orchestrator->SetCluster(cluster);
orchestrator->SetAttribute("Scheduler", PointerValue(scheduler));
orchestrator->SetAttribute("AdmissionPolicy", PointerValue(policy));

Public Types

typedef Callback<Ptr<Task>, Ptr<Packet>, uint64_t&> DeserializerCallback

Callback type for deserializing tasks from packet buffers.

The deserializer handles both boundary detection and task creation. It knows its own header format, so it can determine message boundaries.

Param packet:

The packet buffer (may contain multiple messages or partial data).

Param consumedBytes:

Output: bytes consumed from packet (0 if not enough data).

Return:

The deserialized task, or nullptr if not enough data for complete message.

typedef void (*WorkloadAdmittedTracedCallback)(uint64_t workloadId, uint32_t taskCount)

TracedCallback signature for workload admitted events.

Param workloadId:

The admitted workload ID.

Param taskCount:

Number of tasks in the workload.

typedef void (*WorkloadRejectedTracedCallback)(uint32_t taskCount, const std::string &reason)

TracedCallback signature for workload rejected events.

Param taskCount:

Number of tasks in the rejected workload.

Param reason:

Human-readable rejection reason.

typedef void (*TaskDispatchedTracedCallback)(uint64_t workloadId, uint64_t taskId, uint32_t backendIdx)

TracedCallback signature for task dispatched events.

Param workloadId:

The workload this task belongs to.

Param taskId:

The dispatched task ID.

Param backendIdx:

The backend index the task was dispatched to.

typedef void (*TaskCompletedTracedCallback)(uint64_t workloadId, uint64_t taskId, uint32_t backendIdx)

TracedCallback signature for task completed events.

Param workloadId:

The workload this task belongs to.

Param taskId:

The completed task ID.

Param backendIdx:

The backend index that processed the task.

typedef void (*WorkloadCancelledTracedCallback)(uint64_t workloadId)

TracedCallback signature for workload cancelled events.

Param workloadId:

The cancelled workload ID.

typedef void (*WorkloadCompletedTracedCallback)(uint64_t workloadId)

TracedCallback signature for workload completed events.

Param workloadId:

The completed workload ID.

Public Functions

void SetCluster(const Cluster &cluster)

Set the cluster of backend workers.

Parameters:

cluster – The cluster configuration.

const Cluster &GetCluster() const

Get the cluster.

Returns:

The cluster configuration.

void RegisterTaskType(uint8_t taskType, DeserializerCallback fullDeserializer, DeserializerCallback metadataDeserializer)

Register deserializers for a task type.

Associates a 1-byte task type identifier with full and metadata-only deserializer callbacks. Must be called before StartApplication() for custom task types. If no types are registered, SimpleTask is registered automatically at startup.

Parameters:
  • taskType – The task type identifier (from Task::GetTaskType()).

  • fullDeserializer – Callback to deserialize a complete task (header + payload).

  • metadataDeserializer – Callback to deserialize task metadata only (header).

uint64_t GetWorkloadsAdmitted() const

Get number of workloads admitted.

Returns:

Count of admitted workloads.

uint64_t GetWorkloadsRejected() const

Get number of workloads rejected.

Returns:

Count of rejected workloads.

uint64_t GetWorkloadsCompleted() const

Get number of workloads completed.

Returns:

Count of completed workloads.

uint32_t GetActiveWorkloadCount() const

Get number of active workloads.

Returns:

Count of currently executing workloads.

uint64_t GetWorkloadsCancelled() const

Get number of workloads cancelled.

Returns:

Count of cancelled workloads (e.g., due to client disconnect).

Ptr<ClusterScheduler> GetScheduler() const

Get the configured scheduler.

Returns:

The scheduler, or nullptr if not set.

Ptr<AdmissionPolicy> GetAdmissionPolicy() const

Get the configured admission policy.

Returns:

The admission policy, or nullptr if not set (always admit).

Public Static Functions

static TypeId GetTypeId()

Get the type ID.

Returns:

The object TypeId.

struct TaskTypeEntry

Entry in the task type registry.

Public Members

DeserializerCallback fullDeserializer

Full task deserializer (header + payload)

DeserializerCallback metadataDeserializer

Header-only deserializer.

Cluster

class Cluster

Represents a cluster of backend server nodes for distributed computing.

A Cluster holds references to server nodes and their addresses. It provides iteration and access patterns similar to NodeContainer. The Cluster is used by ClusterScheduler implementations to select which backend should handle incoming tasks.

Each backend in the cluster is represented by a Backend struct containing:

  • A pointer to the server Node (which may have a GpuAccelerator aggregated)

  • The network Address (InetSocketAddress with IP and port) for TCP connections

Example usage:

Cluster cluster;
cluster.AddBackend(serverNode1, InetSocketAddress(addr1, 9000));
cluster.AddBackend(serverNode2, InetSocketAddress(addr2, 9000));
cluster.AddBackend(serverNode3, InetSocketAddress(addr3, 9000));

for (auto it = cluster.Begin(); it != cluster.End(); ++it)
{
    Ptr<Node> node = it->node;
    Address addr = it->address;
}

Public Types

typedef std::vector<Backend>::const_iterator Iterator

Iterator type for traversing backends.

Public Functions

Cluster()

Create an empty cluster.

void AddBackend(Ptr<Node> node, const Address &address, const std::string &acceleratorType = "")

Add a backend to the cluster.

Parameters:
  • node – The backend node. This node should have an application installed and a GpuAccelerator aggregated.

  • address – The address clients should connect to (typically InetSocketAddress with the server’s IP and port).

  • acceleratorType – The type of accelerator on this backend (e.g., “GPU”, “TPU”). Empty string means any/unspecified.

uint32_t GetN() const

Get the number of backends in the cluster.

Returns:

The number of backend servers.

const Backend &Get(uint32_t i) const

Get backend at the specified index.

Parameters:

i – The index of the backend (0 to GetN()-1).

Returns:

Reference to the Backend struct at the given index.

Iterator Begin() const

Get an iterator to the first backend.

Returns:

Iterator pointing to the first backend, or End() if empty.

Iterator End() const

Get an iterator past the last backend.

Returns:

Iterator pointing past the last backend.

Iterator begin() const

Get an iterator to the first backend (lowercase for range-based for).

Returns:

Iterator pointing to the first backend, or end() if empty.

Iterator end() const

Get an iterator past the last backend (lowercase for range-based for).

Returns:

Iterator pointing past the last backend.

bool IsEmpty() const

Check if the cluster is empty.

Returns:

true if the cluster has no backends, false otherwise.

void Clear()

Remove all backends from the cluster.

const std::vector<uint32_t> &GetBackendsByType(const std::string &acceleratorType) const

Get backend indices for a specific accelerator type.

Returns a vector of indices into the cluster for backends matching the specified accelerator type. This enables efficient scheduling decisions without iterating through all backends.

Parameters:

acceleratorType – The accelerator type to filter by (e.g., “GPU”, “TPU”).

Returns:

Vector of backend indices matching the type. Empty if none match.

bool HasAcceleratorType(const std::string &acceleratorType) const

Check if the cluster has backends of a specific accelerator type.

Parameters:

acceleratorType – The accelerator type to check for.

Returns:

true if at least one backend has this accelerator type.

int32_t GetBackendIndex(const Address &address) const

Look up a backend index by its network address.

Parameters:

address – The backend address to search for.

Returns:

Backend index (0 to GetN()-1), or -1 if not found.

struct Backend

Information about a backend server in the cluster.

Public Members

Ptr<Node> node

The backend server node (may have GpuAccelerator aggregated)

Address address

Server address (InetSocketAddress with IP and port)

std::string acceleratorType

Type of accelerator (e.g., “GPU”, “TPU”). Empty = any.

ClusterState

class ClusterState

Centralized view of per-backend load and device metrics for decision-makers.

ClusterState is a plain data container owned by Orchestrator. It aggregates orchestrator-tracked dispatch/completion counts, derived backend capability, and device-reported metrics into a single object that is passed to ScalingPolicy, ClusterScheduler, and AdmissionPolicy on each call.

Public Functions

void Resize(uint32_t n)

Resize the backend state vector.

Parameters:

n – Number of backends.

uint32_t GetN() const

Get the number of backends.

Returns:

Number of backends.

const BackendState &Get(uint32_t idx) const

Get the state of a specific backend.

Parameters:

idx – Backend index.

Returns:

Const reference to the backend state.

void NotifyTaskDispatched(uint32_t backendIdx, double computeDemand = 0.0)

Record that a task was dispatched to a backend.

Parameters:
  • backendIdx – The backend index.

  • computeDemand – Estimated compute work added to the backend in FLOPs.

void UndoTaskDispatched(uint32_t backendIdx, double computeDemand = 0.0)

Roll back a dispatch notification (e.g., on send failure).

Parameters:
  • backendIdx – The backend index.

  • computeDemand – Estimated compute work removed from the backend in FLOPs.

void NotifyTaskCompleted(uint32_t backendIdx, double computeDemand = 0.0)

Record that a task completed on a backend.

Parameters:
  • backendIdx – The backend index.

  • computeDemand – Estimated compute work removed from the backend in FLOPs.

void SetBackendCapability(uint32_t backendIdx, double nominalComputeRate)

Initialize backend capability information used by schedulers.

Parameters:
  • backendIdx – The backend index.

  • nominalComputeRate – Compute rate at base state (index 0) in FLOP/s.

void SetDeviceMetrics(uint32_t backendIdx, Ptr<DeviceMetrics> metrics)

Store device metrics for a backend.

Parameters:
  • backendIdx – The backend index.

  • metrics – The device metrics.

void SetCurrentState(uint32_t backendIdx, uint32_t stateIdx, double effectiveComputeRate)

Set the current performance state for a backend.

Parameters:
  • backendIdx – The backend index.

  • stateIdx – Performance state index.

  • effectiveComputeRate – Compute rate at this state in FLOP/s.

void SetPerformanceStates(uint32_t backendIdx, uint32_t numStates, double maxComputeRate)

Set the performance state range for a backend.

Parameters:
  • backendIdx – The backend index.

  • numStates – Number of available performance states.

  • maxComputeRate – Compute rate at the highest state in FLOP/s.

void SetActiveWorkloadCount(uint32_t count)

Set the active workload count.

Parameters:

count – Number of active workloads.

uint32_t GetActiveWorkloadCount() const

Get the active workload count.

Returns:

Number of active workloads.

void ReserveTasks(uint32_t count, const std::string &acceleratorType = "")

Reserve capacity for tasks pending admission (not yet dispatched).

Parameters:
  • count – Number of tasks to reserve.

  • acceleratorType – Required accelerator type for the reservation. Empty = any backend.

void ReleaseTasks(uint32_t count, const std::string &acceleratorType = "")

Release previously reserved task capacity.

Parameters:
  • count – Number of tasks to release.

  • acceleratorType – Required accelerator type for the reservation. Empty = any backend.

uint32_t GetPendingAdmissionTasks() const

Get the total number of tasks reserved by pending admissions.

Returns:

Total pending admission task count across all accelerator types.

uint32_t GetPendingAdmissionTasksForType(const std::string &acceleratorType) const

Get the number of tasks reserved for a specific accelerator type.

Parameters:

acceleratorType – Required accelerator type. Empty = any backend.

Returns:

Pending admission task count for the given type.

void Clear()

Clear all state.

struct BackendState

Per-backend state split by write authority.

Device fields are written exclusively by DeviceManager. Orchestration fields are written exclusively by Orchestrator.

struct Device

Public Members

double nominalComputeRate = {0.0}

Compute rate at base state (index 0) in FLOP/s.

uint32_t numPerformanceStates = {0}

Available performance states (0 = no scaling)

double maxComputeRate = {0.0}

Compute rate at highest state in FLOP/s.

uint32_t currentStateIdx = {0}

Current performance state index.

double effectiveComputeRate = {0.0}

Current compute rate in FLOP/s.

Ptr<DeviceMetrics> deviceMetrics

Latest device-reported metrics (nullable)

struct Orchestration

Public Members

uint32_t activeTasks = {0}

Dispatched but not yet completed.

double estimatedRemainingComputeWork = {0}

Estimated queued compute work in FLOPs.

ClusterScheduler

class ClusterScheduler : public Object

Abstract base class for task scheduling policies.

ClusterScheduler determines which backend in a cluster should execute a given task.

ClusterScheduler is used by Orchestrator for task placement decisions during DAG execution.

Example usage:

Ptr<ClusterScheduler> scheduler = CreateObject<RoundRobinScheduler>();

Ptr<Task> task = CreateObject<SimpleTask>();
task->SetRequiredAcceleratorType("GPU");

int32_t backendIdx = scheduler->ScheduleTask(task, cluster);
if (backendIdx >= 0)
{
    // Dispatch task to cluster.Get(backendIdx)
}

Subclassed by ns3::LeastLoadedScheduler, ns3::RoundRobinScheduler

Public Functions

virtual int32_t ScheduleTask(Ptr<Task> task, const Cluster &cluster, const ClusterState &state) = 0

Select a backend to execute the given task.

Examines the task’s requirements (accelerator type, compute demand, etc.) and cluster state to select an appropriate backend.

Parameters:
  • task – The task to schedule.

  • cluster – The cluster of available backends.

  • state – Per-backend load and device metrics.

Returns:

Index into cluster (0 to GetN()-1), or -1 if no suitable backend found.

virtual bool CanScheduleTask(Ptr<Task> task, const Cluster &cluster, const ClusterState &state) const

Check if a task can be scheduled without side effects.

Returns true if ScheduleTask() would find a suitable backend. Default implementation checks if any backend matches the task’s required accelerator type. Other scheduler’s can override with custom behaviour.

Parameters:
  • task – The task to check.

  • cluster – The cluster of available backends.

  • state – Per-backend load and device metrics.

Returns:

true if the task can be scheduled, false otherwise.

virtual void NotifyTaskCompleted(uint32_t backendIdx, Ptr<Task> task)

Notify scheduler that a task completed on a backend.

Called when a task finishes execution. Stateful schedulers can use this to update internal state (e.g., decrement pending count, track latency). Default implementation does nothing.

Parameters:
  • backendIdx – The backend index where the task completed.

  • task – The task that completed.

virtual std::string GetName() const = 0

Get the scheduler name for logging and debugging.

Returns:

A string identifying this scheduler type.

Public Static Functions

static TypeId GetTypeId()

Get the type ID.

Returns:

The object TypeId.

RoundRobinScheduler

class RoundRobinScheduler : public ns3::ClusterScheduler

Scheduler that dispatches tasks across backends in round-robin order.

RoundRobinScheduler keeps a per-accelerator-type index that advances with every scheduled task, distributing workloads evenly across all matching backends in the cluster.

Public Functions

virtual int32_t ScheduleTask(Ptr<Task> task, const Cluster &cluster, const ClusterState &state) override

Select a backend for the task in round-robin order.

Advances the per-accelerator-type index and returns the next matching backend. Returns -1 if the cluster has no backend matching the task’s accelerator requirement.

Parameters:
  • task – The task to schedule.

  • cluster – The cluster of backends.

  • state – The current state of the cluster.

Returns:

Backend index, or -1 if no suitable backend.

virtual std::string GetName() const override

Get the scheduler name.

Returns:

“RoundRobin”

Public Static Functions

static TypeId GetTypeId()

Get the type ID.

Returns:

The object TypeId.

LeastLoadedScheduler

class LeastLoadedScheduler : public ns3::ClusterScheduler

Scheduler that selects the backend with the lowest predicted compute burden.

LeastLoadedScheduler picks the backend with the minimum predicted compute burden, as tracked in ClusterState. The score combines the backend’s estimated remaining compute work and current effective compute rate. If the task specifies a required accelerator type, only matching backends are considered. Ties are broken deterministically by backend index.

Public Functions

virtual int32_t ScheduleTask(Ptr<Task> task, const Cluster &cluster, const ClusterState &state) override

Select the backend with the lowest predicted compute burden.

Parameters:
  • task – The task to schedule.

  • cluster – The cluster of backends.

  • state – Per-backend load state.

Returns:

Backend index, or -1 if no suitable backend.

virtual std::string GetName() const override

Get the scheduler name.

Returns:

“LeastLoaded”

Public Static Functions

static TypeId GetTypeId()

Get the type ID.

Returns:

The object TypeId.

AdmissionPolicy

class AdmissionPolicy : public Object

Abstract base class for admission control policies.

AdmissionPolicy determines whether a workload should be accepted for execution.

Implementations are stateless - the orchestrator tracks active workloads and passes per-backend state to ShouldAdmit(). This follows real-world patterns from Kubernetes and Spark where admission controllers are stateless.

Example usage:

Ptr<AdmissionPolicy> policy = CreateObject<MaxActiveTasksPolicy>();

if (policy->ShouldAdmit(dag, cluster, state))
{
    // Accept workload...
}

Subclassed by ns3::MaxActiveTasksPolicy

Public Functions

virtual bool ShouldAdmit(Ptr<DagTask> dag, const Cluster &cluster, const ClusterState &state) = 0

Check if a workload should be admitted.

This method evaluates the workload against the current cluster state and policy rules to decide admission.

Parameters:
  • dag – The workload DAG (single tasks are wrapped as 1-node DAGs)

  • cluster – Current cluster state with available backends

  • state – Per-backend load and device metrics

Returns:

true if the workload should be admitted, false if rejected

virtual std::map<std::string, uint32_t> GetReservationTaskCounts(Ptr<DagTask> dag) const

Get pending-admission reservation counts for this workload.

The orchestrator uses this to reserve admission capacity between Phase 1 admission and Phase 2 data upload without needing policy-specific logic. The default implementation reserves the whole workload against the generic/unspecified accelerator type.

Parameters:

dag – The workload DAG.

Returns:

Required reservation counts grouped by accelerator type.

virtual std::string GetName() const = 0

Get the policy name for logging and debugging.

Returns:

A string identifying this policy type.

Public Static Functions

static TypeId GetTypeId()

Get the type ID.

Returns:

The object TypeId.

MaxActiveTasksPolicy

class MaxActiveTasksPolicy : public ns3::AdmissionPolicy

Type-aware admission policy that rejects workloads when compatible backends lack capacity.

MaxActiveTasksPolicy counts the incoming workload’s tasks per required accelerator type and checks whether compatible backends have enough remaining capacity after accounting for active tasks and pending admission reservations. Tasks with no required type are matched against any backend.

Public Functions

virtual bool ShouldAdmit(Ptr<DagTask> dag, const Cluster &cluster, const ClusterState &state) override

Admit if compatible backends have enough capacity for the incoming workload.

Parameters:
  • dag – The workload DAG (inspected for per-type task counts).

  • cluster – The cluster (used for type-based backend lookup).

  • state – Per-backend load, metrics, and pending admission reservations.

Returns:

true if capacity exists for all required types, false otherwise.

virtual std::map<std::string, uint32_t> GetReservationTaskCounts(Ptr<DagTask> dag) const override

Get type-scoped reservation counts for the incoming workload.

Parameters:

dag – The workload DAG.

Returns:

Required task counts grouped by accelerator type.

virtual std::string GetName() const override

Get the policy name.

Returns:

“MaxActiveTasks”

Public Static Functions

static TypeId GetTypeId()

Get the type ID.

Returns:

The object TypeId.