claude-dispatch
A signed task bus for agent-to-agent delegation over a shared filesystem: one session sends, the other verifies, executes, and files the result.
A signed task bus for agent-to-agent delegation over a shared filesystem: one session sends, the other verifies, executes, and files the result.
If you run agent sessions on more than one machine, you eventually want them to delegate work to each other. A Claude Code session on a dev box should be able to hand a "run the test suite on the GPU box and report numbers" task to a peer session running there, then block until the result comes back as a structured file, without either session needing the other's terminal open or attached.
The transport is whatever shared filesystem both hosts can mount (NFS, SMB, anything). The integrity story is HMAC-SHA256 signing of every task envelope. There are no open ports, no broker process, and no network code in the critical path.
The sender runs dispatch-send, which builds a task envelope (a JSON object with id, from, to, priority, request, timestamps, and execution controls), signs it by HMAC-ing the six fields that matter (id | from | to | created | priority | request), and atomically writes it into the recipient's inbox/ via a .tmp-then-rename. The recipient's watcher polls its inbox, verifies the HMAC against the shared key, and routes the task to one of four states: pending-ack/ (if require_ack=true and no human has approved yet), processing/ (spawning the executor), done/ (result + log written), or rejected/ (bad signature).
A KILLSWITCH file at the dispatch root immediately stops both watchers from spawning new executors without interrupting anything already running. It is a hard stop that takes effect in under one poll cycle.
def _digest(task: dict) -> str:
payload = "|".join([
task["id"], task["from"], task["to"], task["created"],
task["priority"], task["request"],
]).encode()
return hmac.new(_key(), payload, hashlib.sha256).hexdigest()
def sign(task: dict) -> dict:
task["hmac"] = _digest(task)
return task
def verify(task: dict) -> bool:
provided = task.get("hmac", "")
if not provided:
return False
try:
return hmac.compare_digest(provided, _digest(task))
except KeyError:
return False
def enqueue(task: dict) -> pathlib.Path:
"""Atomically write a signed task to the recipient's inbox."""
lane_dir = inbox_for(task["to"])
lane_dir.mkdir(parents=True, exist_ok=True)
path = lane_dir / f"{task['id']}.json"
tmp = path.with_suffix(".json.tmp")
tmp.write_text(json.dumps(task, indent=2))
os.replace(tmp, path) # atomic on POSIX
return path
Three independent layers keep the system from running away:
rejected/ immediately.require_ack=true parks the task in pending-ack/ until a human (or an automation) drops an <id>.ack file next to it. Dangerous tasks default to this mode.DISPATCH_MAX_PARALLEL (default 2) concurrent executor processes per side. A sudden flood of tasks queues behind running ones rather than forking unbounded workers.The append-only session-log.jsonl at the dispatch root records every state transition, so there's a full audit trail even if the watcher process is restarted.