Unnamed Skill
DBOS durable execution patterns and CRITICAL constraints for ChainGraph executor. Use when working on workflows, steps, execution, or any DBOS-related code. Contains MUST-FOLLOW constraints about what can be called from workflows vs steps. Triggers: dbos, workflow, step, durable, execution, startWorkflow, writeStream, recv, send, runStep, atomic, checkpoint, WorkflowQueue, queue, cancelWorkflow, Promise.allSettled. (project)
$ 설치
git clone https://github.com/chaingraphlabs/chaingraph /tmp/chaingraph && cp -r /tmp/chaingraph/.claude/skills/dbos-patterns ~/.claude/skills/chaingraph// tip: Run this command in your terminal to install the skill
name: dbos-patterns description: DBOS durable execution patterns and CRITICAL constraints for ChainGraph executor. Use when working on workflows, steps, execution, or any DBOS-related code. Contains MUST-FOLLOW constraints about what can be called from workflows vs steps. Triggers: dbos, workflow, step, durable, execution, startWorkflow, writeStream, recv, send, runStep, atomic, checkpoint, WorkflowQueue, queue, cancelWorkflow, Promise.allSettled. (project)
DBOS Patterns for ChainGraph
This skill covers DBOS (Database-Oriented Operating System) patterns used in the ChainGraph executor. CRITICAL: Contains constraints that agents MUST follow to avoid runtime errors.
CRITICAL Constraints
The Most Important Rule
DBOS context methods have strict calling restrictions based on WHERE you are:
// ============================================================
// WORKFLOW FUNCTIONS: All DBOS methods allowed
// ============================================================
async function myWorkflow(task: Task): Promise<Result> {
await DBOS.send(...) // ✅ Allowed
await DBOS.recv(...) // ✅ Allowed
await DBOS.startWorkflow(...) // ✅ Allowed
await DBOS.writeStream(...) // ✅ Allowed
await DBOS.setEvent(...) // ✅ Allowed
await DBOS.sleep(...) // ✅ Allowed
const result = await DBOS.runStep(() => myStep(task)) // ✅ Allowed
return result
}
// ============================================================
// STEP FUNCTIONS: ONLY writeStream() allowed!
// ============================================================
async function myStep(task: Task): Promise<StepResult> {
await DBOS.writeStream(...) // ✅ ONLY THIS ONE!
// ❌ NOT ALLOWED - Will throw runtime error:
// await DBOS.send(...) // ❌ Error!
// await DBOS.recv(...) // ❌ Error!
// await DBOS.startWorkflow(...) // ❌ Error!
// await DBOS.setEvent(...) // ❌ Error!
// await DBOS.sleep(...) // ❌ Error!
return { data: ... }
}
Constraint Reference Table
| DBOS Method | From Workflow | From Step |
|---|---|---|
DBOS.send() | ✅ | ❌ |
DBOS.recv() | ✅ | ❌ |
DBOS.startWorkflow() | ✅ | ❌ |
DBOS.setEvent() / getEvent() | ✅ | ❌ |
DBOS.sleep() | ✅ | ❌ |
DBOS.cancelWorkflow() | ✅ | ❌ |
DBOS.runStep() | ✅ | ❌ |
DBOS.writeStream() | ✅ | ✅ |
DBOS.readStream() | ✅ | ❌ |
Promise Handling
NEVER use Promise.all() - it fails fast and leaves promises unresolved, risking unhandled rejections.
// ❌ BAD: Promise.all() fails fast, other promises left dangling
const results = await Promise.all([step1(), step2(), step3()])
// ✅ GOOD: Promise.allSettled() waits for all, reports outcomes
const results = await Promise.allSettled([step1(), step2(), step3()])
Memory Isolation
Workflows and steps should NOT have side effects outside their own scope:
- ✅ Can READ global variables
- ❌ Must NOT create or update global variables
- ❌ Must NOT modify shared state outside return values
Queue Initialization Order
CRITICAL: WorkflowQueue MUST be created before DBOS.launch() is called!
// File: server/dbos/queue.ts:17-35
// Queue is created at module level BEFORE DBOS.launch()
export const executionQueue = new WorkflowQueue(QUEUE_NAME, {
workerConcurrency: config.dbos.workerConcurrency ?? 5,
concurrency: config.dbos.queueConcurrency ?? 100,
})
// If created AFTER DBOS.launch(), queue will NOT dequeue tasks!
Design Patterns
Pattern 1: Signal Pattern (Race Condition Fix)
Problem: Client subscribes to events before the stream exists.
Solution: Workflow writes initialization event BEFORE waiting for start signal.
File: packages/chaingraph-executor/server/dbos/workflows/ExecutionWorkflows.ts
Timeline:
1. create execution (tRPC)
└─ Workflow starts → writes EXECUTION_CREATED → stream exists! ✅
└─ Workflow waits for START_SIGNAL... ⏸️
2. subscribe events (tRPC)
└─ Stream already exists → immediately receives EXECUTION_CREATED ✅
3. start execution (tRPC)
└─ Sends START_SIGNAL → workflow continues ▶️
Implementation Pattern:
async function executionWorkflow(task: ExecutionTask): Promise<ExecutionResult> {
// Write event BEFORE waiting - stream now exists!
await DBOS.writeStream('events', {
executionId: task.executionId,
event: 'EXECUTION_CREATED',
timestamp: Date.now(),
})
// Now safe to wait - clients can subscribe
const signal = await DBOS.recv<string>('START_SIGNAL', 300)
if (!signal) {
throw new Error('Execution start timeout')
}
// Continue with execution...
}
Pattern 2: Shared State Pattern (Command System)
Problem: Cannot call DBOS.recv() from steps, but need to check for commands.
Solution: Workflow polls messages, updates shared state object that step reads.
Files:
- Workflow:
server/dbos/workflows/ExecutionWorkflows.ts - Step:
server/dbos/steps/ExecuteFlowAtomicStep.ts
// Shared state object (passed from workflow to step)
interface CommandController {
currentCommand: 'PAUSE' | 'RESUME' | 'STEP' | null
}
// WORKFLOW LEVEL: Poll DBOS.recv() every 500ms
async function executionWorkflow(task: ExecutionTask) {
const commandController: CommandController = { currentCommand: null }
const abortController = new AbortController()
// Start polling loop (runs concurrently with step)
const pollCommands = async () => {
while (!abortController.signal.aborted) {
const cmd = await DBOS.recv<{ command: string }>('COMMAND', 0.5)
if (cmd) {
if (cmd.command === 'STOP') {
abortController.abort()
} else {
commandController.currentCommand = cmd.command
}
}
}
}
// Run step with shared state
const result = await DBOS.runStep(() =>
executeFlowAtomic(task, abortController, commandController)
)
return result
}
// STEP LEVEL: Check shared state every 100ms (no DBOS calls!)
async function executeFlowAtomic(
task: ExecutionTask,
abortController: AbortController,
commandController: CommandController
) {
const checkCommands = setInterval(() => {
if (commandController.currentCommand === 'PAUSE') {
debugger.pause()
} else if (commandController.currentCommand === 'RESUME') {
debugger.continue()
}
commandController.currentCommand = null
}, 100)
// Execute flow...
// Step reads shared state, never calls DBOS.recv()
}
Pattern 3: Collect & Spawn Pattern (Child Executions)
Problem: Cannot call DBOS.startWorkflow() from steps, but Event Emitter nodes need to spawn children.
Solution: Step collects child tasks and returns them, workflow spawns them.
Files:
- Step:
server/dbos/steps/ExecuteFlowAtomicStep.ts:346-401 - Workflow:
server/dbos/workflows/ExecutionWorkflows.ts
// STEP: Collect child tasks (don't spawn!)
async function executeFlowAtomic(task: ExecutionTask): Promise<ExecutionResult> {
const collectedChildTasks: ExecutionTask[] = []
// Execute flow, capture emitted events
await engine.execute()
// After execution, collect child tasks from emitted events
for (const event of context.emittedEvents.filter(e => !e.processed)) {
event.processed = true
// Create child execution row in DB (allowed in step)
const childTask = await createChildTask(instance, event, store)
collectedChildTasks.push(childTask)
}
// Return child tasks for workflow-level spawning
return {
status: 'completed',
childTasks: collectedChildTasks, // ← Workflow will spawn these
}
}
// WORKFLOW: Spawn collected children (DBOS.startWorkflow allowed here!)
async function executionWorkflow(task: ExecutionTask) {
const result = await DBOS.runStep(() => executeFlowAtomic(task))
// Spawn children at workflow level
if (result.childTasks?.length > 0) {
for (const childTask of result.childTasks) {
await DBOS.startWorkflow(executionWorkflow, {
workflowID: childTask.executionId
})(childTask)
}
}
return result
}
Pattern 4: Auto-Start Pattern (Child Execution Lifecycle)
Problem: Children need manual start call, slowing down execution tree.
Solution: Children skip the signal wait entirely and start immediately.
File: server/dbos/workflows/ExecutionWorkflows.ts:192-214
async function executionWorkflow(task: ExecutionTask) {
const executionRow = await store.get(task.executionId)
const isChildExecution = !!executionRow.parentExecutionId
// Write EXECUTION_CREATED first (Signal Pattern)
await DBOS.writeStream('events', { event: 'EXECUTION_CREATED', ... })
// Auto-start for children!
if (!isChildExecution) {
// Parents: wait for signal from tRPC (timeout: 5 minutes)
const startSignal = await DBOS.recv<string>('START_SIGNAL', 300)
if (!startSignal) {
throw new Error('Execution start timeout')
}
} else {
// Children: skip waiting, start immediately
DBOS.logger.info(`Child execution auto-start, beginning execution`)
}
// Continue execution...
}
Child Execution Lifecycle:
Parent spawns child via DBOS.startWorkflow()
└─ Child workflow starts
├─ Writes EXECUTION_CREATED event
├─ Detects parentExecutionId
├─ Skips signal wait (auto-start)
└─ Executes flow immediately
Pattern 5: WorkflowQueue Pattern (Managed Concurrency)
Problem: Need to manage concurrency and ensure idempotent workflow spawning.
Solution: Use WorkflowQueue with concurrency limits and deduplication.
File: server/dbos/queue.ts
import { WorkflowQueue } from '@dbos-inc/dbos-sdk'
// Create at module level BEFORE DBOS.launch()
export const executionQueue = new WorkflowQueue('chaingraph-executions', {
workerConcurrency: 5, // Max concurrent per worker process
concurrency: 100, // Max concurrent globally
})
// Use with deduplication to prevent duplicate workflows
await DBOS.startWorkflow(ExecutionWorkflows, {
queueName: executionQueue.name,
workflowID: childTask.executionId, // Unique ID
enqueueOptions: {
deduplicationID: childTask.executionId, // Idempotency key
},
}).executeChainGraph(childTask)
Pattern 6: Parent Monitoring Pattern (Child Stops if Parent Dies)
Problem: Child executions should stop if their parent completes or fails.
Solution: Background checker monitors parent workflow status.
File: server/dbos/workflows/ExecutionWorkflows.ts
async function monitorParentWorkflow(
parentExecutionId: string,
abortController: AbortController
) {
while (!abortController.signal.aborted) {
const parentStatus = await DBOS.getWorkflowStatus(parentExecutionId)
if (parentStatus?.status === 'COMPLETED' ||
parentStatus?.status === 'ERROR' ||
parentStatus?.status === 'CANCELLED') {
abortController.abort('Parent workflow has ended')
break
}
await DBOS.sleep(5) // Check every 5 seconds
}
}
Three-Phase Workflow Structure
ChainGraph executions follow a three-phase structure:
┌──────────────────────────────────────────────────────────────┐
│ PHASE 1: Stream Initialization (Lines 148-214) │
│ ├─ Create CommandController │
│ ├─ Write EXECUTION_CREATED event (stream exists!) │
│ ├─ Auto-start children (send START_SIGNAL to self) │
│ └─ Wait for START_SIGNAL │
├──────────────────────────────────────────────────────────────┤
│ PHASE 2: Execution (Lines 216-374) │
│ ├─ Step 1: updateToRunning() │
│ ├─ Step 2: executeFlowAtomic() ← Core execution │
│ └─ Spawn children via DBOS.startWorkflow() │
├──────────────────────────────────────────────────────────────┤
│ PHASE 3: Cleanup (Lines 376-423) │
│ ├─ Step 3: updateToCompleted() │
│ ├─ Stop command polling │
│ └─ DBOS auto-closes event stream │
└──────────────────────────────────────────────────────────────┘
Key Files
| File | Purpose | Critical? |
|---|---|---|
server/dbos/workflows/ExecutionWorkflows.ts | Main orchestration workflow | ⭐⭐⭐ |
server/dbos/steps/ExecuteFlowAtomicStep.ts | Core execution step | ⭐⭐⭐ |
server/dbos/queue.ts:17-35 | Queue initialization (MUST be before DBOS.launch) | ⭐⭐⭐ |
server/dbos/config.ts | DBOS initialization | ⭐⭐ |
server/dbos/DBOSExecutionWorker.ts | Worker lifecycle | ⭐⭐ |
server/dbos/steps/UpdateStatusStep.ts | Status updates | ⭐ |
server/implementations/dbos/DBOSEventBus.ts | Event streaming via DBOS.writeStream() | ⭐⭐ |
server/utils/config.ts:70-139 | Environment config | ⭐⭐ |
Environment Variables
# Enable DBOS mode (default: false)
ENABLE_DBOS_EXECUTION=true
# DBOS Admin UI
DBOS_ADMIN_ENABLED=true
DBOS_ADMIN_PORT=3022 # Access at http://localhost:3022
# Concurrency Limits
DBOS_QUEUE_CONCURRENCY=100 # Global across all workers
DBOS_WORKER_CONCURRENCY=5 # Per worker process
# DBOS Conductor (optional, for production monitoring)
DBOS_CONDUCTOR_URL=https://conductor.dbos.dev
DBOS_APPLICATION_NAME=chaingraph-executor
DBOS_CONDUCTOR_KEY=your-api-key-here
Anti-Patterns
Anti-Pattern #1: Calling DBOS methods from steps
// ❌ BAD: Will throw runtime error
async function myStep(data: string) {
await DBOS.send('other-workflow', 'message', 'TOPIC') // ❌ Error!
}
// ✅ GOOD: Return data, let workflow send
async function myStep(data: string): Promise<{ toSend: Message }> {
return { toSend: { target: 'other-workflow', message: 'hello' } }
}
async function myWorkflow() {
const result = await DBOS.runStep(() => myStep(data))
await DBOS.send(result.toSend.target, result.toSend.message, 'TOPIC') // ✅
}
Anti-Pattern #2: Splitting atomic execution
// ❌ BAD: State lost between steps
await DBOS.runStep(() => loadFlow())
await DBOS.runStep(() => executeFlow()) // ❌ Flow state lost!
// ✅ GOOD: Single atomic step
await DBOS.runStep(() => executeFlowAtomic(task)) // ✅ All in one step
Anti-Pattern #3: Making children wait for START_SIGNAL
// ❌ BAD: Children timeout waiting for signal that never comes
async function executionWorkflow(task: ExecutionTask) {
const isChild = !!executionRow.parentExecutionId
// Always waiting - children have no one to send them the signal!
await DBOS.recv('START_SIGNAL', 300) // ❌ Times out for children
}
// ✅ GOOD: Children skip signal wait
async function executionWorkflow(task: ExecutionTask) {
const isChild = !!executionRow.parentExecutionId
if (!isChild) {
// Only parents wait for signal (from tRPC start() call)
await DBOS.recv('START_SIGNAL', 300)
}
// Children start immediately - no signal wait!
}
Anti-Pattern #4: Using Promise.all() for parallel steps
// ❌ BAD: Promise.all() fails fast, leaving other promises dangling
const results = await Promise.all([
DBOS.runStep(() => step1()),
DBOS.runStep(() => step2()),
DBOS.runStep(() => step3()),
])
// ✅ GOOD: Promise.allSettled() waits for all, handles all outcomes
const results = await Promise.allSettled([
DBOS.runStep(() => step1()),
DBOS.runStep(() => step2()),
DBOS.runStep(() => step3()),
])
Anti-Pattern #5: Memory side effects in workflows/steps
// ❌ BAD: Modifying global state
let globalCounter = 0
async function myWorkflow() {
globalCounter++ // ❌ Side effect outside scope!
}
// ✅ GOOD: Return values instead of mutating globals
async function myWorkflow(): Promise<{ count: number }> {
const count = calculateCount()
return { count } // ✅ Pure function, no side effects
}
Anti-Pattern #6: Creating queue after DBOS.launch()
// ❌ BAD: Queue created after DBOS is initialized
await DBOS.launch()
const queue = new WorkflowQueue('my-queue') // ❌ Won't dequeue!
// ✅ GOOD: Queue created at module level BEFORE DBOS.launch()
const queue = new WorkflowQueue('my-queue') // ✅ Module level
// ... later in main()
await DBOS.launch()
Quick Reference
| Need | Pattern | Where |
|---|---|---|
| Stream exists before subscribe | Signal Pattern | Write event before recv() |
| Commands during step execution | Shared State | Workflow polls, step reads object |
| Spawn child workflows | Collect & Spawn | Step collects, workflow spawns |
| Children start immediately | Auto-Start | Skip signal wait |
| Real-time events from step | DBOS.writeStream() | Only stream method allowed in steps |
| Managed concurrency | WorkflowQueue | Queue with workerConcurrency/concurrency |
| Child stops if parent dies | Parent Monitoring | Background status checker |
| Parallel steps safely | Promise.allSettled() | Never use Promise.all() |
DBOS Workflow Architecture
┌─────────────────────────────────────────────────────────────┐
│ WORKFLOW (can call ALL DBOS methods) │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ DBOS.send() │ │ DBOS.recv() │ │startWorkflow│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ DBOS.runStep(() => ...) │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ STEP (ONLY writeStream allowed) │ │ │
│ │ │ │ │ │
│ │ │ ✅ DBOS.writeStream() │ │ │
│ │ │ ❌ DBOS.send/recv/startWorkflow/sleep/... │ │ │
│ │ │ │ │ │
│ │ │ return { childTasks: [...] } │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ // After step completes: │
│ for (child of result.childTasks) { │
│ await DBOS.startWorkflow(...)(child) // ✅ Allowed here│
│ } │
└─────────────────────────────────────────────────────────────┘
Advanced DBOS Features
For advanced DBOS features not currently used in ChainGraph (Debouncer, forkWorkflow, versioning, rate limiting, partitioned queues), see dbos-advanced.md in this skill directory.
Related Skills
executor-architecture- Package overviewchaingraph-concepts- Core domain conceptssubscription-sync- Event streaming patternstrpc-execution- Execution tRPC procedurestrpc-patterns- General tRPC framework patterns
Repository
