Unnamed Skill

Executor package architecture for ChainGraph flow execution engine. Use when working on packages/chaingraph-executor, execution services, DBOS workflows, event bus, task queues, tRPC routes, or execution-related database operations. Triggers: executor, execution, service, worker, queue, event bus, dbos, workflow, tRPC execution, execution-api, execution-worker.

$ Installieren

git clone https://github.com/chaingraphlabs/chaingraph /tmp/chaingraph && cp -r /tmp/chaingraph/.claude/skills/executor-architecture ~/.claude/skills/chaingraph

// tip: Run this command in your terminal to install the skill


name: executor-architecture description: Executor package architecture for ChainGraph flow execution engine. Use when working on packages/chaingraph-executor, execution services, DBOS workflows, event bus, task queues, tRPC routes, or execution-related database operations. Triggers: executor, execution, service, worker, queue, event bus, dbos, workflow, tRPC execution, execution-api, execution-worker.

ChainGraph Executor Architecture

This skill provides architectural guidance for the @badaitech/chaingraph-executor package - the execution engine that runs ChainGraph flows with durable execution via DBOS.

Package Overview

Location: packages/chaingraph-executor/ Purpose: Flow execution engine with DBOS durable execution Key Feature: Exactly-once execution semantics with automatic recovery

Directory Structure

packages/chaingraph-executor/
โ”œโ”€โ”€ server/
โ”‚   โ”œโ”€โ”€ index.ts                    # Main exports
โ”‚   โ”‚
โ”‚   โ”œโ”€โ”€ dbos/                       # DBOS durable execution โญ
โ”‚   โ”‚   โ”œโ”€โ”€ config.ts               # DBOS initialization
โ”‚   โ”‚   โ”œโ”€โ”€ DBOSExecutionWorker.ts  # Worker lifecycle
โ”‚   โ”‚   โ”œโ”€โ”€ queue.ts                # Queue management
โ”‚   โ”‚   โ”œโ”€โ”€ workflows/
โ”‚   โ”‚   โ”‚   โ””โ”€โ”€ ExecutionWorkflows.ts  # Main orchestration
โ”‚   โ”‚   โ””โ”€โ”€ steps/
โ”‚   โ”‚       โ”œโ”€โ”€ ExecuteFlowAtomicStep.ts  # Core execution
โ”‚   โ”‚       โ””โ”€โ”€ UpdateStatusStep.ts  # Status updates
โ”‚   โ”‚
โ”‚   โ”œโ”€โ”€ services/                   # Business logic layer
โ”‚   โ”‚   โ”œโ”€โ”€ ExecutionService.ts     # Execution instance management
โ”‚   โ”‚   โ”œโ”€โ”€ RecoveryService.ts      # Failure recovery
โ”‚   โ”‚   โ””โ”€โ”€ ServiceFactory.ts       # Service initialization
โ”‚   โ”‚
โ”‚   โ”œโ”€โ”€ implementations/            # Interface implementations
โ”‚   โ”‚   โ”œโ”€โ”€ dbos/
โ”‚   โ”‚   โ”‚   โ”œโ”€โ”€ DBOSEventBus.ts     # DBOS event streaming
โ”‚   โ”‚   โ”‚   โ””โ”€โ”€ DBOSTaskQueue.ts    # DBOS task queue
โ”‚   โ”‚   โ””โ”€โ”€ local/
โ”‚   โ”‚       โ”œโ”€โ”€ InMemoryEventBus.ts # Dev/test event bus
โ”‚   โ”‚       โ””โ”€โ”€ InMemoryTaskQueue.ts
โ”‚   โ”‚
โ”‚   โ”œโ”€โ”€ interfaces/                 # Abstract interfaces
โ”‚   โ”‚   โ”œโ”€โ”€ IEventBus.ts            # Event streaming contract
โ”‚   โ”‚   โ””โ”€โ”€ ITaskQueue.ts           # Task queue contract
โ”‚   โ”‚
โ”‚   โ”œโ”€โ”€ stores/                     # Data access layer
โ”‚   โ”‚   โ”œโ”€โ”€ execution-store.ts      # Execution CRUD
โ”‚   โ”‚   โ”œโ”€โ”€ flow-store.ts           # Flow loading
โ”‚   โ”‚   โ””โ”€โ”€ postgres/
โ”‚   โ”‚       โ”œโ”€โ”€ schema.ts           # Drizzle schema
โ”‚   โ”‚       โ””โ”€โ”€ postgres-execution-store.ts
โ”‚   โ”‚
โ”‚   โ”œโ”€โ”€ trpc/                       # API layer
โ”‚   โ”‚   โ”œโ”€โ”€ router.ts               # tRPC procedures
โ”‚   โ”‚   โ””โ”€โ”€ context.ts              # Request context
โ”‚   โ”‚
โ”‚   โ””โ”€โ”€ utils/                      # Utilities
โ”‚       โ”œโ”€โ”€ config.ts               # Environment config
โ”‚       โ”œโ”€โ”€ db.ts                   # Database connection
โ”‚       โ””โ”€โ”€ logger.ts               # Logging
โ”‚
โ”œโ”€โ”€ client/                         # tRPC client exports
โ””โ”€โ”€ types/                          # TypeScript types

Architecture Layers

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Layer 1: API (tRPC)                                         โ”‚
โ”‚ โ”œโ”€ create()    โ†’ Start execution workflow                   โ”‚
โ”‚ โ”œโ”€ start()     โ†’ Send START_SIGNAL                          โ”‚
โ”‚ โ”œโ”€ stop()      โ†’ Cancel workflow                            โ”‚
โ”‚ โ”œโ”€ pause()     โ†’ Send PAUSE command                         โ”‚
โ”‚ โ””โ”€ subscribeToExecutionEvents() โ†’ Stream events             โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Layer 2: Services                                           โ”‚
โ”‚ โ”œโ”€ ExecutionService   โ†’ Instance management                 โ”‚
โ”‚ โ”œโ”€ RecoveryService    โ†’ Failure recovery                    โ”‚
โ”‚ โ””โ”€ ServiceFactory     โ†’ Dependency injection                โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Layer 3: DBOS (Durable Execution)                           โ”‚
โ”‚ โ”œโ”€ ExecutionWorkflow  โ†’ Orchestration + child spawning      โ”‚
โ”‚ โ””โ”€ ExecuteFlowAtomicStep โ†’ Core flow execution              โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Layer 4: Implementations                                    โ”‚
โ”‚ โ”œโ”€ DBOSEventBus       โ†’ PostgreSQL event streaming          โ”‚
โ”‚ โ””โ”€ DBOSTaskQueue      โ†’ PostgreSQL task queue               โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Layer 5: Stores                                             โ”‚
โ”‚ โ”œโ”€ ExecutionStore     โ†’ Execution row CRUD                  โ”‚
โ”‚ โ””โ”€ FlowStore          โ†’ Flow definition loading             โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Two Execution Modes

The executor supports two modes controlled by ENABLE_DBOS_EXECUTION:

DBOS Mode (Production)

ENABLE_DBOS_EXECUTION=true

Features:
โ”œโ”€ Exactly-once execution via workflow IDs
โ”œโ”€ Automatic recovery from failures
โ”œโ”€ Real-time event streaming via PostgreSQL
โ”œโ”€ Durable task queue (no Kafka needed)
โ””โ”€ DBOS Admin UI at localhost:3022

Legacy/Local Mode (Development)

ENABLE_DBOS_EXECUTION=false

Features:
โ”œโ”€ In-memory event bus
โ”œโ”€ In-memory task queue
โ”œโ”€ Simpler debugging
โ””โ”€ No durability guarantees

Key Files

FilePurposeCritical?
server/dbos/workflows/ExecutionWorkflows.tsMain orchestrationโญโญโญ
server/dbos/steps/ExecuteFlowAtomicStep.tsCore execution stepโญโญโญ
server/services/ExecutionService.tsInstance managementโญโญ
server/services/ServiceFactory.tsService initializationโญโญ
server/implementations/dbos/DBOSEventBus.tsEvent streamingโญโญ
server/trpc/router.tsAPI proceduresโญโญ
server/stores/postgres/schema.tsDatabase schemaโญ
server/utils/config.tsEnvironment configโญ

Execution Lifecycle

1. CREATE (tRPC)
   โ””โ”€ ExecutionRow inserted โ†’ Workflow started
   โ””โ”€ Workflow writes EXECUTION_CREATED event
   โ””โ”€ Workflow waits for START_SIGNAL

2. SUBSCRIBE (tRPC)
   โ””โ”€ Client subscribes to DBOS stream
   โ””โ”€ Immediately receives EXECUTION_CREATED

3. START (tRPC)
   โ””โ”€ Sends START_SIGNAL via DBOS.send()
   โ””โ”€ Workflow continues

4. EXECUTE (Workflow)
   โ””โ”€ Step 1: updateToRunning()
   โ””โ”€ Step 2: executeFlowAtomic()
   โ”‚   โ””โ”€ Load flow from DB
   โ”‚   โ””โ”€ Create execution instance
   โ”‚   โ””โ”€ Execute flow (up to 30min)
   โ”‚   โ””โ”€ Stream events in real-time
   โ”‚   โ””โ”€ Collect child tasks
   โ””โ”€ Step 3: Spawn children
   โ””โ”€ Step 4: updateToCompleted()

5. COMPLETE
   โ””โ”€ DBOS auto-closes event stream
   โ””โ”€ Client receives all events

Service Layer

ExecutionService

Manages execution instances with event streaming setup:

// server/services/ExecutionService.ts
class ExecutionService {
  // Create execution instance with event handling
  async createExecutionInstance(params: {
    task: ExecutionTask
    flow: Flow
    executionRow: ExecutionRow
    abortController: AbortController
  }): Promise<ExecutionInstance>

  // Get event bus (DBOS or InMemory based on config)
  getEventBus(): IEventBus

  // Setup event handling (connects engine events โ†’ event bus)
  setupEventHandling(instance: ExecutionInstance): () => Promise<void>
}

ServiceFactory

Initializes all services with proper dependency injection:

// server/services/ServiceFactory.ts
async function initializeServices(): Promise<Services> {
  // 1. Create event bus (DBOS or InMemory)
  const eventBus = config.dbos.enabled
    ? new DBOSEventBus()
    : new InMemoryEventBus()

  // 2. Create task queue
  const taskQueue = config.dbos.enabled
    ? new DBOSTaskQueue()
    : new InMemoryTaskQueue()

  // 3. Create execution service
  const executionService = new ExecutionService(eventBus, taskQueue)

  // 4. Initialize DBOS steps (dependency injection)
  initializeExecuteFlowStep(executionService, executionStore)

  return { eventBus, taskQueue, executionService }
}

tRPC Router

File: server/trpc/router.ts

export const executionRouter = router({
  // Create execution (starts workflow immediately)
  create: procedure
    .input(CreateExecutionInput)
    .mutation(async ({ input }) => {
      // 1. Create execution row in DB
      // 2. Start DBOS workflow (writes EXECUTION_CREATED)
      // 3. Return executionId
    }),

  // Start execution (sends START_SIGNAL)
  start: procedure
    .input(z.object({ executionId: z.string() }))
    .mutation(async ({ input }) => {
      await DBOS.send(input.executionId, 'API', 'START_SIGNAL')
    }),

  // Subscribe to execution events (real-time streaming)
  subscribeToExecutionEvents: procedure
    .input(z.object({ executionId: z.string(), fromIndex: z.number() }))
    .subscription(async function* ({ input }) {
      // Yields events from DBOS stream
      for await (const event of DBOS.readStream(input.executionId, 'events')) {
        yield event
      }
    }),

  // Control commands
  pause: procedure.mutation(...),
  resume: procedure.mutation(...),
  stop: procedure.mutation(...),
})

Database Schema

File: server/stores/postgres/schema.ts

export const executions = pgTable('executions', {
  id: text('id').primaryKey(),              // EX123...
  flowId: text('flow_id').notNull(),
  ownerId: text('owner_id').notNull(),
  status: executionStatusEnum('status').notNull(),

  // Hierarchy
  rootExecutionId: text('root_execution_id'),
  parentExecutionId: text('parent_execution_id'),
  executionDepth: integer('execution_depth').default(0),

  // Timestamps
  createdAt: timestamp('created_at').notNull(),
  startedAt: timestamp('started_at'),
  completedAt: timestamp('completed_at'),

  // Error tracking
  errorMessage: text('error_message'),
  errorNodeId: text('error_node_id'),

  // Recovery
  failureCount: integer('failure_count').default(0),
  lastFailureAt: timestamp('last_failure_at'),

  // Context
  options: jsonb('options'),
  integration: jsonb('integration'),        // archai context
  externalEvents: jsonb('external_events'), // events for children
})

Environment Variables

# DBOS Mode
ENABLE_DBOS_EXECUTION=true

# Database
DATABASE_URL_EXECUTIONS=postgres://...

# DBOS Configuration
DBOS_ADMIN_ENABLED=true
DBOS_ADMIN_PORT=3022
DBOS_QUEUE_CONCURRENCY=100
DBOS_WORKER_CONCURRENCY=5

# Execution Limits
EXECUTION_MAX_DEPTH=100
EXECUTION_DEFAULT_TIMEOUT_MS=3600000  # 1 hour

Quick Reference

NeedWhereFile
Add new tRPC procedureAPI layerserver/trpc/router.ts
Modify execution logicDBOS stepserver/dbos/steps/ExecuteFlowAtomicStep.ts
Add orchestration logicDBOS workflowserver/dbos/workflows/ExecutionWorkflows.ts
Change event streamingImplementationserver/implementations/dbos/DBOSEventBus.ts
Modify schemaStoreserver/stores/postgres/schema.ts
Add serviceService layerserver/services/

Related Skills

  • dbos-patterns - CRITICAL DBOS constraints and patterns
  • chaingraph-concepts - Core domain concepts (Flow, Node, Port)
  • subscription-sync - Event streaming architecture
  • types-architecture - Execution types and events
  • trpc-execution - Execution tRPC procedures (API layer)
  • trpc-patterns - General tRPC framework patterns