Welcome to the new Golem Cloud Docs! 👋
Parallel Workers — Fan-Out / Fan-In (MoonBit)

Parallel Workers — Fan-Out / Fan-In (MoonBit)

Overview

Golem agents process invocations sequentially — a single agent cannot run work in parallel. To execute work concurrently, distribute it across multiple agent instances. This skill covers two approaches:

  1. Child agents via AgentClient::scoped — spawn separate agent instances, dispatch work, and collect results
  2. @api.fork() — clone the current agent at the current execution point for lightweight parallel execution

Approach 1: Child Agent Fan-Out

Spawn child agents, dispatch work via fire-and-forget triggers, and collect results via Golem promises.

Basic Pattern with Promises

#derive.agent
struct Coordinator {
  // no state needed
}

fn Coordinator::new() -> Coordinator { {  } }

/// Fan out work to child agents and collect results
pub fn Coordinator::fan_out(self : Self, items : Array[String]) -> Array[String] {
  // Create one promise per child
  let promise_ids : Array[@types.PromiseId] = []
  for _ in items {
    promise_ids.push(@api.create_promise())
  }

  // Fire-and-forget: trigger each child with its promise ID
  for i, item in items {
    WorkerClient::scoped(i.to_uint64(), fn(child) raise @common.AgentError {
      child.trigger_process(item, promise_ids[i])
    })
  }

  // Collect all results (agent suspends on each until completed)
  let results : Array[String] = []
  for pid in promise_ids {
    let bytes = @api.await_promise(pid)
    results.push(String::from_array(bytes.to_array().map(fn(b) { Char::from_int(b.to_int()) })))
  }
  results
}
#derive.agent
struct Worker {
  id : UInt64
}

fn Worker::new(id : UInt64) -> Worker { { id, } }

/// Process data and complete the promise with the result
pub fn Worker::process(
  self : Self,
  data : String,
  promise_id : @types.PromiseId,
) -> Unit {
  let result = "processed-\{data}"
  let payload = Bytes::from_array(result.to_array().map(fn(c) { c.to_int().to_byte() }))
  let _ = @api.complete_promise(promise_id, payload)
}

Chunked Fan-Out

Batch children to limit concurrency:

pub fn Coordinator::fan_out_chunked(
  self : Self,
  items : Array[String],
) -> Array[String] {
  let results : Array[String] = []
  let chunk_size = 5
  let mut offset = 0

  while offset < items.length() {
    let end = @math.minimum(offset + chunk_size, items.length())
    let promise_ids : Array[@types.PromiseId] = []
    for i = offset; i < end; i = i + 1 {
      let pid = @api.create_promise()
      promise_ids.push(pid)
      WorkerClient::scoped(i.to_uint64(), fn(child) raise @common.AgentError {
        child.trigger_process(items[i], pid)
      })
    }
    for pid in promise_ids {
      let bytes = @api.await_promise(pid)
      results.push(String::from_array(bytes.to_array().map(fn(c) { Char::from_int(c.to_int()) })))
    }
    offset = end
  }
  results
}

Approach 2: @api.fork()

@api.fork() clones the current agent at the current execution point, creating a new agent instance with the same state but a unique phantom ID. Use Golem promises to synchronize between the original and forked agents.

Basic Fork Pattern

#derive.agent
struct ForkAgent {
  mut result : String
}

fn ForkAgent::new() -> ForkAgent { { result: "" } }

/// Fork the agent and collect result via a promise
pub fn ForkAgent::parallel_compute(self : Self) -> String {
  let promise_id = @api.create_promise()

  match @api.fork() {
    Original(_details) => {
      // Wait for the forked agent to complete the promise
      let bytes = @api.await_promise(promise_id)
      let forked_result = String::from_array(
        bytes.to_array().map(fn(b) { Char::from_int(b.to_int()) }),
      )
      "Combined: original + \{forked_result}"
    }
    Forked(_details) => {
      // Do work in the forked copy
      let computed = "forked-result"
      let payload = Bytes::from_array(
        computed.to_array().map(fn(c) { c.to_int().to_byte() }),
      )
      let _ = @api.complete_promise(promise_id, payload)
      "forked done" // Only seen by the forked agent
    }
  }
}

Multi-Fork Fan-Out

Fork multiple times for N-way parallelism:

pub fn ForkAgent::multi_fork(self : Self, n : UInt64) -> Array[String] {
  let promise_ids : Array[@types.PromiseId] = []
  for _ = 0L; _ < n.to_int64(); _ = _ + 1L {
    promise_ids.push(@api.create_promise())
  }

  for i = 0; i < promise_ids.length(); i = i + 1 {
    match @api.fork() {
      Original(_) => {
        // Continue to next fork
      }
      Forked(_) => {
        // Each forked agent does its slice of work
        let output = "result-from-fork-\{i}"
        let payload = Bytes::from_array(
          output.to_array().map(fn(c) { c.to_int().to_byte() }),
        )
        let _ = @api.complete_promise(promise_ids[i], payload)
        return [] // Forked agent exits here
      }
    }
  }

  // Original agent collects all results
  let results : Array[String] = []
  for pid in promise_ids {
    let bytes = @api.await_promise(pid)
    results.push(String::from_array(
      bytes.to_array().map(fn(b) { Char::from_int(b.to_int()) }),
    ))
  }
  results
}

When to Use Which Approach

CriteriaChild Agents@api.fork()
Work is independent and stateless✅ Best fitWorks but overkill
Need to share current state with workers❌ Must pass via args✅ Forked copy inherits state
Workers need persistent identity✅ Each has own ID❌ Forked agents are ephemeral phantoms
Number of parallel tasks is dynamic✅ Spawn as many as needed✅ Fork in a loop
Need simple error isolation✅ Child failure doesn't crash parent⚠️ Forked agent shares oplog lineage

Key Points

  • No threads: Golem is single-threaded per agent — parallelism is achieved by distributing across agent instances
  • Durability: All RPC calls, promises, and fork operations are durably recorded — work survives crashes
  • Deadlock avoidance: Never have two agents awaiting each other synchronously — use trigger_ to break cycles
  • Cleanup: Child agents persist after the coordinator finishes; delete them explicitly if they hold unwanted state
  • Scoped clients: Always prefer AgentClient::scoped over manual get/drop for client lifecycle management
  • Aggregation: Collect results by iterating over promise IDs and calling @api.await_promise for each