Workload Arbiter

In this section, you will learn about the following components in Spatial:

  • Stream controllers

  • Forever Counters

  • Controller Breaks

Note that a large collection of Spatial applications can be found here.


Here we give an example application where a scheduler dynamically determines which worker should receive which packet at runtime. We create an architecture shown in the animation on the right.

Architectural Description: There is a scheduler is responsible for reading data from a work list in DRAM. It should distribute this data by issuing each packet to one of three workers, based on a schedule that attempts to issue the newest command to the “least busy” worker. Each worker processes to the end of its inbound work queue, and remains idle if the queue is empty. After all packets have been processed, the scheduler broadcasts a “done” packet to all of its workers. When each worker sees this packet, it issues a signal to the final stage to wrap up the computation. This final stage has three inbound streams and will begin only when all three have valid data.

Function Descriptions (not shown in animation): The scheduling function we use keeps a score for each worker, where the “length” of a packet is added to the score when the new packet is issued, and decrements one from the score each cycle. Each worker parses the integer “payload” of the packet, and increments an external counter for that many counts. It also tracks the “index” of the payload in a local history so that we can reconstruct which worker processed which packet afterwards. The finisher reports the status of the architecture, and interleaves the histories of the three workers into a single list.

Note, there are cleaner ways to express this particular application, but we choose to sprawl much of the code to demonstrate the concepts. The functions in this particular architecture are contrived, but learning the concepts from this example will help you generalize to more complicated apps without making common mistakes.


Basic implementation

import spatial.dsl._

@spatial object DummyWorkload extends SpatialApp {
  // Set up properties
  val maxQueue = 256
  val HEAVY = 16
  val MEDIUM = 8
  val LIGHT = 1
  val DONE = -1

  // Define packet struct
  @struct case class full_packet(payload: Int, idx: Int) 

  def main(args: Array[String]): Unit = {
    // Set up size of queue
    val queueSize = args(0).to[Int]
    val QS = ArgIn[Int]
    assert(queueSize <= maxQueue)
    setArg(QS, queueSize)

    // Create "packets", where each "packet" is just an Int 
    //   that roughly represents how long a worker will spend
    //   working on it
    val allPackets = DRAM[full_packet](QS)
    val initWeights = Array.tabulate(QS){i => 
      if (i % 7 == 0) full_packet(HEAVY, i) 
      else if (i % 7 == 1) full_packet(MEDIUM, i) 
      else full_packet(LIGHT, i) 
    setMem(allPackets, initWeights)
    printArray(initWeights, "Work profile:")

    // Create counters, who will make sure the workers shared their work properly
    val counter1 = HostIO[Int]
    val counter2 = HostIO[Int]
    val counter3 = HostIO[Int]
    setArg(counter1, 0)
    setArg(counter2, 0)
    setArg(counter3, 0)

    // Create memory to track which worker processed which packet
    val history = DRAM[Int](QS + 1)

    // Create status reg
    val status = ArgOut[Int]

    Accel {         
      // Fetch work
      val localQueue = SRAM[full_packet](maxQueue)
      localQueue load allPackets(0::QS)

      // Create worker worklist queues
      val worker1Queue = FIFO[full_packet](3)
      val worker2Queue = FIFO[full_packet](3)
      val worker3Queue = FIFO[full_packet](3)

      // Create worker history queues
      val worker1History = FIFO[Int](maxQueue + 1)
      val worker2History = FIFO[Int](maxQueue + 1)
      val worker3History = FIFO[Int](maxQueue + 1)
      // Put one element in each history to guarantee it won't stall the FINISH stage

      // Create break regs
      val break1 = Reg[Bit](false)
      val break2 = Reg[Bit](false)
      val break3 = Reg[Bit](false)

      // Create done reporters, these are FIFOs of depth 1, used for controlling streams
      val done1 = FIFOReg[Bit]
      val done2 = FIFOReg[Bit]
      val done3 = FIFOReg[Bit]

      def schedule(p: full_packet): Unit = {
        // Worker estimates
        val workLoads = List.fill(3)(Reg[Int](0))
        if (workLoads(0).value <= workLoads(1).value && workLoads(0).value <= workLoads(2).value) {
          workLoads(0) :+= p.payload; worker1Queue.enq(p)
        else if (workLoads(1).value <= workLoads(0).value && workLoads(1).value <= workLoads(2).value) {
          workLoads(1) :+= p.payload; worker2Queue.enq(p)
        else {
          workLoads(2) :+= p.payload; worker3Queue.enq(p)

        // Decrement workLoads each "cycle" for cheap estimate
        workLoads.foreach{ w => if (w.value > 5) w :-= 5 }

      def processPacket(p: full_packet, workerHistory: FIFO[Int], counter: Reg[Int]): Unit = {
        // Log packet
        // Process payload
        Foreach(p.payload by 1){i => 
          counter :+= 1


      Stream {

        'SCHEDULER.Sequential.Foreach(QS by 1){packet => 
          val p: full_packet = localQueue(packet)

          // Issue packet to correct worker, based on schedule rule

          // If all packets sent, give everyone "done" signal
          // NOTE: Pipe here is to guarantee the enqs above finish
          //       before these enqs start.  Otherwise, retiming
          //       doesn't recognize the dependency
            if (packet == QS.value-1) {

        // Indicate that controller should execute sequentially,
        //    and provide register who will break the loop
        'WORKER1.Sequential(breakWhen = break1).Foreach(*){_ => 
          val p = worker1Queue.deq()
          // Do "work" on valid packet
          if (p.payload > 0) {
            processPacket(p, worker1History, counter1)
          // Handle done packet
          else if (p.payload == DONE) {
            break1 := true

        // Indicate that controller should execute sequentially,
        //    and provide register who will break the loop
        'WORKER2.Sequential(breakWhen = break2).Foreach(*){_ => 
          val p = worker2Queue.deq()
          // Do "work" on valid packet
          if (p.payload > 0) {
            processPacket(p, worker2History, counter2)
          // Handle done packet
          else if (p.payload == DONE) {
            break2 := true

        // Indicate that controller should execute sequentially,
        //    and provide register who will break the loop
        'WORKER3.Sequential(breakWhen = break3).Foreach(*){_ => 
          val p = worker3Queue.deq()
          // Do "work" on valid packet
          if (p.payload > 0) {
            processPacket(p, worker3History, counter3)
          // Handle done packet
          else if (p.payload == DONE) {
            break3 := true

        // When all 3 done FIFORegs are valid, we can finish up
          val a = done1.deq()
          val b = done2.deq()
          val c = done3.deq()
          if (a && b && c) status := 1 // Be sure to use a,b,c to avoid DCE

          // Interleave history
          val aggregatedHistory = FIFO[Int](maxQueue + 4)
          val w1 = Reg[Int]; w1 := worker1History.deq
          val w2 = Reg[Int]; w2 := worker2History.deq
          val w3 = Reg[Int]; w3 := worker3History.deq
          FSM(1)(_ != 0) { _ =>
            if (w1.value <= w2.value && w1.value <= w3.value) {
              if (worker1History.isEmpty) w1 := maxQueue + 999 else w1 := worker1History.deq
            else if ((worker1History.isEmpty && w2.value <= w3.value) || (w2.value <= w1.value && w1.value <= w3.value)) {
              if (worker2History.isEmpty) w2 := maxQueue + 999 else w2 := worker2History.deq
            else {
              if (worker3History.isEmpty) w3 := maxQueue + 999 else w3 := worker3History.deq
          }{whil => mux(worker1History.isEmpty && worker2History.isEmpty && worker3History.isEmpty, 0, 1)}
          // Export history
          history store aggregatedHistory

    println(r"Status = $status")
    println("\"Good\" behavior is when each worker processes roughly the same amount.")
    println(r"Worker1 processed: ${getArg(counter1)}")
    println(r"Worker2 processed: ${getArg(counter2)}")
    println(r"Worker3 processed: ${getArg(counter3)}")
    println(r"Total work sent: ${{_+_}}")
    println(r"Total work done: ${getArg(counter1) + getArg(counter2) + getArg(counter3)}")
    println(r"Pass: ${initWeights.reduce{_+_} == getArg(counter1) + getArg(counter2) + getArg(counter3)}")
    printArray(getMem(history), "Worker responsible for each packet")
    assert(getMem(history).map(_ != 0).reduce{_&&_}, "At least one packet apparently not processed!")
    assert(getArg(counter1) > 0)
    assert(getArg(counter2) > 0)
    assert(getArg(counter3) > 0)
    assert({_+_} == getArg(counter1) + getArg(counter2) + getArg(counter3))
    assert(status == 1)


We start by defining a full_packet type. which describes the data fields we will have in our “packet.”

We then define some properties that we will use later in the application. maxQueue is a sizing parameter for on-chip memories. HEAVY, MEDIUM, LIGHT, and DONE are values that the payload field of a packet may take.

We allow the total number of packets to be set at runtime, as long as it is less than maxQueue. We set up the DRAM which will hold all of the packets. For no particular reason, we choose to have a heavy packet, followed by a medium packet, followed by light packets, and repeat this pattern for every 7 packets.

We create counters as HostIOs, which will be responsible for logging the amount of “work” each worker has done. We initialize them all to 0 (don’t want any workers to cheat!)

We also create a status reg, we use to guarantee that the final stage has a side effect and none of the primitives inside of it will be eliminated as dead code.

While this app can be written as a tiled application, we choose to load the entire work list on chip in the localQueue.

We create a separate, tiny queue for each worker. The reason we chose to set the queue so small is to stress-test the case where the scheduler wants to issue a new packet to a worker who is already full. Because of implementation details of the backend, if the scheduler chooses a filled queue, it will wait until it drains one of its packets rather than issuing to a different worker.

We also allocate a history FIFO for each worker and size it so that we are guaranteed it will not fill up for any one worker and stall the entire app. We also inject a dummy value into each, because it will be read down below in the FINISHER stage of the processor. If, for some reason, one of the workers never activated, this pre-enqueue of a dummy value guarantees that the final stage will not be stalled forever while it waits for all three history FIFOs to have at least one element. We can choose to ignore the dummy value later, if we want.

We create boolean registers that will be used later in the application by each worker to forcibly break out of an infinite loop. These will only be triggered when the DONE packet is processed by the worker.

We create FIFORegs as the interface between each worker and the final stage. We know that each worker will need to send the done signal once, so we want a FIFO of size one to mediate the interaction between the workers and the final stage. We can create a FIFO of size one to handle this, or we can use the FIFOReg, which is a simplified FIFO with only one element.

We define what we want the scheduling function to be, given a new packet. In this example, we create a workLoad register for each worker. Each time we enqueue to that worker, we add the payload for the packet to this register. We decrement these registers each time this scheduler runs.

We also define the rules for processing a packet. Specifically, we just enqueue the idx part of the packet payload to the history for this worker, and increment the worker’s counter as many times as the payload dictates.

The key concept in this app is to bind all of the controllers outlined in the animation above in a Stream controller. We want the controller to run once, so we can dump the code for all of the children controllers directly into the body of the Stream. Each of the five controller blocks shown in the animation is tagged with its name: SCHEDULER, WORKER1, WORKER2, WORKER3, and FINISHER.

In the SCHEDULER controller, we iterate over each of the packets in the work list. We send the packet to our scheduler function, who will populate the correct queue with the new packet. The last thing this controller does is check if this is the last packet, and issue the DONE packet if so. The tricky part about this is that we need to guarantee that this special packet will not jump ahead of a true work packet. Without this Pipe explicitly gating the final packet generation from the rest of the controller, the retimer inside of Spatial will see a short chain of primitives being used to generate the DONE enqueue, and potentially a long but independent chain of primitives being used to generate the work packet enqueue (inside of schedule()). Retiming preserves program order in some cases, but this is not one of those cases in the current version of Spatial.

Next, we create three WORKERs. The only difference between each worker is that they operate on a different history queue and counter register. Each counter is set to run forever, using the Foreach(*) syntax. We choose to use an infinite counter in these controllers because we want each worker to run any time it receives a packet, regardless of how many packets it has processed already. The direct parent of these workers is the Stream controller, so as long as the inbound work queue has a valid element, the controller will get to run for another iteration. The only way these workers can send a done signal to their parent is when their respective breakWhen register raises. We allocated these registers previously, but we set them inside the body of the worker. Spatial only supports breakWhen for unparallelized Sequential and Stream controllers. Breaking a Sequential controller is the safest way to map the idea of a loop break to hardware, because the scheduling ensures there is no pipelining and no way for previous iterations to still be draining through the retime chains when the break condition is raised. Because of this, using this syntax for Stream controllers is strongly discouraged, because it can easily lead to unexpected behavior. There are only a few limited and specific cases where a Stream should be used with a breakWhen condition.

We finish by using the three done FIFORegs to trigger the FINISHER pipe. Only when all three of these registers have valid data will the controller enable and process its body. We start by setting the status ArgOut, and then we interleave the history data from each of the workers. We choose to use an FSM here, which dequeues from one FIFO at a time and determines which one has the lowest index. We then tag the aggregatedHistory with an id for the worker that worked on a particular packet until all three history packets have been drained. We then send this information out to DRAM.

On the host, we check if the app did what we expected it to do. We ensure that all three workers combined have processed the entire work list by checking if the sum of the three counters is equal to the sum of the work list. We expect that each worker processed roughly the same amount, especially if a large number of packets was used. We also check to make sure each of the packets was truly claimed by one of the workers by looking at the aggregated history. We expect to see that the workload was distributed evenly among the workers by a somewhat random pattern in the history.