Home / Part 2 / 2.5 Concurrency

Section 2.5

Concurrency

Until now there's been one walk of the tree at a time — flatMap is strictly sequential. This section introduces the Fiber: one independent walk, and the ability to run several at once, wait on them, and cancel the ones you stop caring about.

Methods: fork · raceFirst · forEachConcurrent · sleep · timeout

Follow along with the code

This section's runtime is self-contained. Read it, run example.ts with bun, and step through it with a debugger as you read.

sequential → 150ms task A 50ms task B 50ms task C 50ms concurrent → 50ms task A 50ms task B 50ms task C 50ms All three are forked before any finishes,so they overlap — same total work, a third of the wall-clock.

A Fiber is the bundle of state

Up to now the runtime kept its state — current, stack, value, failure, context — in local variables. To run more than one walk at a time, that state has to be a thing you can have many of. So it moves into a class:

class FiberRuntime {
  current; stack; value; failure; inFailure; context  // the same registers as before
  done; exitValue; observers                            // how it reports its result
  suspended; canceler                                   // for pausing and interruption
  step() { /* the exact same loop from 2.4, plus one case: Fork */ }
}
What a fiber actually is

A fiber is not a thread — it's the registers and stack we've had all along, wrapped so you can have more than one. observers is the only new piece: a list of "who wants this fiber's result." When it finishes, it calls each observer with the exit. That's how one fiber waits on another.

fork: start a second walk

case "Fork": {
  const child = new FiberRuntime(node.self, this.context)
  queueMicrotask(() => child.step())  // run it independently, soon
  this.value = child                  // hand back the handle, right now
}

fork does three things: it makes a new fiber for self, starts it running on its own, and right away hands the parent a handle to that fiber as the value — without waiting for it to finish. So the parent keeps going while the child runs.

On its own, fork only starts work. To use the result you fork, then wait for it. Waiting is just an Async node that adds an observer to the child; when the child finishes, it notifies its observers, which resumes the parent. "The child finished" is an event that arrives later — so waiting on a fiber uses the exact same pause/resume machinery from 2.2.

raceFirst: fork both, take the first, interrupt the loser

raceFirst(a, b) =
  flatMap(fork(a), (fa) =>
    flatMap(fork(b), (fb) =>
      async((resume) => {
        let settled = false
        const onSettle = (loser) => (exit) => {
          if (settled) return
          settled = true
          loser.interrupt()           // cancel the one we don't need
          resume(exitToEffect(exit))  // continue with the winner's result
        }
        fa.addObserver(onSettle(fb))
        fb.addObserver(onSettle(fa))
      })))
fast · 20ms wins → resume slow · 100ms interrupted — canceler fires A settled flag means only the first counts; the loser is cancelled at once.
timeout is just raceFirst against a sleep that fails — if the sleep wins, you get a Timeout failure and self is interrupted.

forEachConcurrent is the same idea with more fibers: fork an effect for every item, then an Async that waits for every observer to fire and collects the results in order. If they all succeed you get the array; if one fails, the rest are interrupted and the whole thing fails.

Interruption: the canceler

Back in 2.2, async's register could return a value we ignored. Now it returns a canceler — how to abort the work it started:

const sleep = (ms) => async((resume) => {
  const timer = setTimeout(() => resume(succeed(undefined)), ms)
  return () => clearTimeout(timer)   // the canceler
})

interrupt() {
  if (this.canceler) this.canceler()  // clearTimeout — stop the real work
  this.failure = { _tag: "Interrupted" }
  this.inFailure = true
  this.step()                         // unwind from here
}

This is why the race doesn't leave a dangling timer keeping the process alive. Without the canceler, interrupting would only ignore the loser's result while its underlying work — the pending timer — kept running.

"Does interrupt just call a cleanup function? Is this where AbortSignal comes in?"

Yes — exactly. interrupt just calls whatever canceler the async handed back, and the canceler is whatever that work needs to stop. sleep returns () => clearTimeout(timer). A real fetch would return () => controller.abort(). The runtime doesn't know or care which — it only knows "to cancel, call this function."

AbortSignal is the generalized version. Real Effect hands the register an AbortSignal along with resume. So instead of inventing your own canceler, you pass the signal straight into the work — fetch(url, { signal }) — and the work aborts itself when the signal fires. Same mechanism, one standard channel: our return-a-canceler is the toy version of real Effect's give-you-a-signal.

Where this maps in real Effect

Real Effect's fiber is the same idea — a FiberRuntime with registers, a stack, observers, and a step loop — just richer (scheduling, interruption flags, supervision). fork isn't a standalone op-code; forking happens through the runtime. raceFirst, forEach, and timeout are all library functions built on fork-and-wait, exactly as we built them. The cancelable async is the real OP_ASYNC — its register really does receive an AbortSignal and can return a canceler.