Unnamed Skill
Executor package architecture for ChainGraph flow execution engine. Use when working on packages/chaingraph-executor, execution services, DBOS workflows, event bus, task queues, tRPC routes, or execution-related database operations. Triggers: executor, execution, service, worker, queue, event bus, dbos, workflow, tRPC execution, execution-api, execution-worker.
$ Installieren
git clone https://github.com/chaingraphlabs/chaingraph /tmp/chaingraph && cp -r /tmp/chaingraph/.claude/skills/executor-architecture ~/.claude/skills/chaingraph// tip: Run this command in your terminal to install the skill
name: executor-architecture description: Executor package architecture for ChainGraph flow execution engine. Use when working on packages/chaingraph-executor, execution services, DBOS workflows, event bus, task queues, tRPC routes, or execution-related database operations. Triggers: executor, execution, service, worker, queue, event bus, dbos, workflow, tRPC execution, execution-api, execution-worker.
ChainGraph Executor Architecture
This skill provides architectural guidance for the @badaitech/chaingraph-executor package - the execution engine that runs ChainGraph flows with durable execution via DBOS.
Package Overview
Location: packages/chaingraph-executor/
Purpose: Flow execution engine with DBOS durable execution
Key Feature: Exactly-once execution semantics with automatic recovery
Directory Structure
packages/chaingraph-executor/
โโโ server/
โ โโโ index.ts # Main exports
โ โ
โ โโโ dbos/ # DBOS durable execution โญ
โ โ โโโ config.ts # DBOS initialization
โ โ โโโ DBOSExecutionWorker.ts # Worker lifecycle
โ โ โโโ queue.ts # Queue management
โ โ โโโ workflows/
โ โ โ โโโ ExecutionWorkflows.ts # Main orchestration
โ โ โโโ steps/
โ โ โโโ ExecuteFlowAtomicStep.ts # Core execution
โ โ โโโ UpdateStatusStep.ts # Status updates
โ โ
โ โโโ services/ # Business logic layer
โ โ โโโ ExecutionService.ts # Execution instance management
โ โ โโโ RecoveryService.ts # Failure recovery
โ โ โโโ ServiceFactory.ts # Service initialization
โ โ
โ โโโ implementations/ # Interface implementations
โ โ โโโ dbos/
โ โ โ โโโ DBOSEventBus.ts # DBOS event streaming
โ โ โ โโโ DBOSTaskQueue.ts # DBOS task queue
โ โ โโโ local/
โ โ โโโ InMemoryEventBus.ts # Dev/test event bus
โ โ โโโ InMemoryTaskQueue.ts
โ โ
โ โโโ interfaces/ # Abstract interfaces
โ โ โโโ IEventBus.ts # Event streaming contract
โ โ โโโ ITaskQueue.ts # Task queue contract
โ โ
โ โโโ stores/ # Data access layer
โ โ โโโ execution-store.ts # Execution CRUD
โ โ โโโ flow-store.ts # Flow loading
โ โ โโโ postgres/
โ โ โโโ schema.ts # Drizzle schema
โ โ โโโ postgres-execution-store.ts
โ โ
โ โโโ trpc/ # API layer
โ โ โโโ router.ts # tRPC procedures
โ โ โโโ context.ts # Request context
โ โ
โ โโโ utils/ # Utilities
โ โโโ config.ts # Environment config
โ โโโ db.ts # Database connection
โ โโโ logger.ts # Logging
โ
โโโ client/ # tRPC client exports
โโโ types/ # TypeScript types
Architecture Layers
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Layer 1: API (tRPC) โ
โ โโ create() โ Start execution workflow โ
โ โโ start() โ Send START_SIGNAL โ
โ โโ stop() โ Cancel workflow โ
โ โโ pause() โ Send PAUSE command โ
โ โโ subscribeToExecutionEvents() โ Stream events โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Layer 2: Services โ
โ โโ ExecutionService โ Instance management โ
โ โโ RecoveryService โ Failure recovery โ
โ โโ ServiceFactory โ Dependency injection โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Layer 3: DBOS (Durable Execution) โ
โ โโ ExecutionWorkflow โ Orchestration + child spawning โ
โ โโ ExecuteFlowAtomicStep โ Core flow execution โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Layer 4: Implementations โ
โ โโ DBOSEventBus โ PostgreSQL event streaming โ
โ โโ DBOSTaskQueue โ PostgreSQL task queue โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Layer 5: Stores โ
โ โโ ExecutionStore โ Execution row CRUD โ
โ โโ FlowStore โ Flow definition loading โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Two Execution Modes
The executor supports two modes controlled by ENABLE_DBOS_EXECUTION:
DBOS Mode (Production)
ENABLE_DBOS_EXECUTION=true
Features:
โโ Exactly-once execution via workflow IDs
โโ Automatic recovery from failures
โโ Real-time event streaming via PostgreSQL
โโ Durable task queue (no Kafka needed)
โโ DBOS Admin UI at localhost:3022
Legacy/Local Mode (Development)
ENABLE_DBOS_EXECUTION=false
Features:
โโ In-memory event bus
โโ In-memory task queue
โโ Simpler debugging
โโ No durability guarantees
Key Files
| File | Purpose | Critical? |
|---|---|---|
server/dbos/workflows/ExecutionWorkflows.ts | Main orchestration | โญโญโญ |
server/dbos/steps/ExecuteFlowAtomicStep.ts | Core execution step | โญโญโญ |
server/services/ExecutionService.ts | Instance management | โญโญ |
server/services/ServiceFactory.ts | Service initialization | โญโญ |
server/implementations/dbos/DBOSEventBus.ts | Event streaming | โญโญ |
server/trpc/router.ts | API procedures | โญโญ |
server/stores/postgres/schema.ts | Database schema | โญ |
server/utils/config.ts | Environment config | โญ |
Execution Lifecycle
1. CREATE (tRPC)
โโ ExecutionRow inserted โ Workflow started
โโ Workflow writes EXECUTION_CREATED event
โโ Workflow waits for START_SIGNAL
2. SUBSCRIBE (tRPC)
โโ Client subscribes to DBOS stream
โโ Immediately receives EXECUTION_CREATED
3. START (tRPC)
โโ Sends START_SIGNAL via DBOS.send()
โโ Workflow continues
4. EXECUTE (Workflow)
โโ Step 1: updateToRunning()
โโ Step 2: executeFlowAtomic()
โ โโ Load flow from DB
โ โโ Create execution instance
โ โโ Execute flow (up to 30min)
โ โโ Stream events in real-time
โ โโ Collect child tasks
โโ Step 3: Spawn children
โโ Step 4: updateToCompleted()
5. COMPLETE
โโ DBOS auto-closes event stream
โโ Client receives all events
Service Layer
ExecutionService
Manages execution instances with event streaming setup:
// server/services/ExecutionService.ts
class ExecutionService {
// Create execution instance with event handling
async createExecutionInstance(params: {
task: ExecutionTask
flow: Flow
executionRow: ExecutionRow
abortController: AbortController
}): Promise<ExecutionInstance>
// Get event bus (DBOS or InMemory based on config)
getEventBus(): IEventBus
// Setup event handling (connects engine events โ event bus)
setupEventHandling(instance: ExecutionInstance): () => Promise<void>
}
ServiceFactory
Initializes all services with proper dependency injection:
// server/services/ServiceFactory.ts
async function initializeServices(): Promise<Services> {
// 1. Create event bus (DBOS or InMemory)
const eventBus = config.dbos.enabled
? new DBOSEventBus()
: new InMemoryEventBus()
// 2. Create task queue
const taskQueue = config.dbos.enabled
? new DBOSTaskQueue()
: new InMemoryTaskQueue()
// 3. Create execution service
const executionService = new ExecutionService(eventBus, taskQueue)
// 4. Initialize DBOS steps (dependency injection)
initializeExecuteFlowStep(executionService, executionStore)
return { eventBus, taskQueue, executionService }
}
tRPC Router
File: server/trpc/router.ts
export const executionRouter = router({
// Create execution (starts workflow immediately)
create: procedure
.input(CreateExecutionInput)
.mutation(async ({ input }) => {
// 1. Create execution row in DB
// 2. Start DBOS workflow (writes EXECUTION_CREATED)
// 3. Return executionId
}),
// Start execution (sends START_SIGNAL)
start: procedure
.input(z.object({ executionId: z.string() }))
.mutation(async ({ input }) => {
await DBOS.send(input.executionId, 'API', 'START_SIGNAL')
}),
// Subscribe to execution events (real-time streaming)
subscribeToExecutionEvents: procedure
.input(z.object({ executionId: z.string(), fromIndex: z.number() }))
.subscription(async function* ({ input }) {
// Yields events from DBOS stream
for await (const event of DBOS.readStream(input.executionId, 'events')) {
yield event
}
}),
// Control commands
pause: procedure.mutation(...),
resume: procedure.mutation(...),
stop: procedure.mutation(...),
})
Database Schema
File: server/stores/postgres/schema.ts
export const executions = pgTable('executions', {
id: text('id').primaryKey(), // EX123...
flowId: text('flow_id').notNull(),
ownerId: text('owner_id').notNull(),
status: executionStatusEnum('status').notNull(),
// Hierarchy
rootExecutionId: text('root_execution_id'),
parentExecutionId: text('parent_execution_id'),
executionDepth: integer('execution_depth').default(0),
// Timestamps
createdAt: timestamp('created_at').notNull(),
startedAt: timestamp('started_at'),
completedAt: timestamp('completed_at'),
// Error tracking
errorMessage: text('error_message'),
errorNodeId: text('error_node_id'),
// Recovery
failureCount: integer('failure_count').default(0),
lastFailureAt: timestamp('last_failure_at'),
// Context
options: jsonb('options'),
integration: jsonb('integration'), // archai context
externalEvents: jsonb('external_events'), // events for children
})
Environment Variables
# DBOS Mode
ENABLE_DBOS_EXECUTION=true
# Database
DATABASE_URL_EXECUTIONS=postgres://...
# DBOS Configuration
DBOS_ADMIN_ENABLED=true
DBOS_ADMIN_PORT=3022
DBOS_QUEUE_CONCURRENCY=100
DBOS_WORKER_CONCURRENCY=5
# Execution Limits
EXECUTION_MAX_DEPTH=100
EXECUTION_DEFAULT_TIMEOUT_MS=3600000 # 1 hour
Quick Reference
| Need | Where | File |
|---|---|---|
| Add new tRPC procedure | API layer | server/trpc/router.ts |
| Modify execution logic | DBOS step | server/dbos/steps/ExecuteFlowAtomicStep.ts |
| Add orchestration logic | DBOS workflow | server/dbos/workflows/ExecutionWorkflows.ts |
| Change event streaming | Implementation | server/implementations/dbos/DBOSEventBus.ts |
| Modify schema | Store | server/stores/postgres/schema.ts |
| Add service | Service layer | server/services/ |
Related Skills
dbos-patterns- CRITICAL DBOS constraints and patternschaingraph-concepts- Core domain concepts (Flow, Node, Port)subscription-sync- Event streaming architecturetypes-architecture- Execution types and eventstrpc-execution- Execution tRPC procedures (API layer)trpc-patterns- General tRPC framework patterns
Repository
