Unnamed Skill

ChainGraph execution tRPC layer for flow execution management. Use when working on packages/chaingraph-executor or apps/chaingraph-execution-api. Covers execution lifecycle (create/start/pause/resume/stop), event streaming, DBOS workflow integration, signal pattern, API vs Worker modes. Triggers: execution procedure, subscribeToExecutionEvents, ExecutionService, chaingraph-executor, execution-api, execution workflow, DBOS signal, taskQueue, eventBus.

$ Instalar

git clone https://github.com/chaingraphlabs/chaingraph /tmp/chaingraph && cp -r /tmp/chaingraph/.claude/skills/trpc-execution ~/.claude/skills/chaingraph

// tip: Run this command in your terminal to install the skill


name: trpc-execution description: ChainGraph execution tRPC layer for flow execution management. Use when working on packages/chaingraph-executor or apps/chaingraph-execution-api. Covers execution lifecycle (create/start/pause/resume/stop), event streaming, DBOS workflow integration, signal pattern, API vs Worker modes. Triggers: execution procedure, subscribeToExecutionEvents, ExecutionService, chaingraph-executor, execution-api, execution workflow, DBOS signal, taskQueue, eventBus.

tRPC Execution Layer

This skill covers the tRPC procedures for execution management in ChainGraph - the API for controlling flow execution with real-time event streaming.

Architecture Overview

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                 Frontend (React + XYFlow)                       โ”‚
โ”‚                        โ”‚                                        โ”‚
โ”‚              tRPC Client (WebSocket)                            โ”‚
โ”‚                   (Port 4021)                                   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                         โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚         chaingraph-execution-api OR execution-worker            โ”‚
โ”‚                        โ”‚                                        โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚   โ”‚               executionRouter                            โ”‚  โ”‚
โ”‚   โ”‚  โ”œโ”€ create         (starts workflow, waits for signal)  โ”‚  โ”‚
โ”‚   โ”‚  โ”œโ”€ start          (sends START_SIGNAL)                 โ”‚  โ”‚
โ”‚   โ”‚  โ”œโ”€ stop           (cancels workflow)                   โ”‚  โ”‚
โ”‚   โ”‚  โ”œโ”€ pause          (sends PAUSE command)                โ”‚  โ”‚
โ”‚   โ”‚  โ”œโ”€ resume         (sends RESUME command)               โ”‚  โ”‚
โ”‚   โ”‚  โ””โ”€ subscribeToExecutionEvents (event streaming)        โ”‚  โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                        โ”‚                                        โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚   โ”‚              ServiceFactory                              โ”‚  โ”‚
โ”‚   โ”‚  โ”œโ”€ API Mode: DBOSClient (enqueue only)                 โ”‚  โ”‚
โ”‚   โ”‚  โ””โ”€ Worker Mode: Full DBOS runtime                      โ”‚  โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                        โ”‚                                        โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚   โ”‚              DBOS Workflows                              โ”‚  โ”‚
โ”‚   โ”‚  โ”œโ”€ ExecutionWorkflow (orchestration)                   โ”‚  โ”‚
โ”‚   โ”‚  โ”œโ”€ executeFlowAtomic (step - runs ExecutionEngine)     โ”‚  โ”‚
โ”‚   โ”‚  โ””โ”€ PostgreSQL (state + event streams)                  โ”‚  โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Directory Structure

packages/chaingraph-executor/server/
โ”œโ”€โ”€ trpc/
โ”‚   โ”œโ”€โ”€ router.ts          # Main execution router
โ”‚   โ””โ”€โ”€ context.ts         # tRPC context with services
โ”‚
โ”œโ”€โ”€ services/
โ”‚   โ”œโ”€โ”€ ExecutionService.ts    # Core execution logic
โ”‚   โ”œโ”€โ”€ IExecutionService.ts   # Service interface
โ”‚   โ””โ”€โ”€ ServiceFactory.ts      # API vs Worker mode setup
โ”‚
โ”œโ”€โ”€ implementations/
โ”‚   โ”œโ”€โ”€ dbos/
โ”‚   โ”‚   โ”œโ”€โ”€ DBOSEventBus.ts    # DBOS event streaming
โ”‚   โ”‚   โ”œโ”€โ”€ DBOSTaskQueue.ts   # Worker queue (DBOS.startWorkflow)
โ”‚   โ”‚   โ”œโ”€โ”€ APITaskQueue.ts    # API queue (DBOSClient.enqueue)
โ”‚   โ”‚   โ””โ”€โ”€ streaming/
โ”‚   โ”‚       โ””โ”€โ”€ StreamBridge.ts # PostgreSQL LISTEN/NOTIFY
โ”‚   โ””โ”€โ”€ local/
โ”‚       โ”œโ”€โ”€ InMemoryEventBus.ts
โ”‚       โ””โ”€โ”€ InMemoryTaskQueue.ts
โ”‚
โ”œโ”€โ”€ dbos/
โ”‚   โ”œโ”€โ”€ workflows/
โ”‚   โ”‚   โ””โ”€โ”€ ExecutionWorkflows.ts  # Main DBOS workflow
โ”‚   โ”œโ”€โ”€ steps/
โ”‚   โ”‚   โ”œโ”€โ”€ ExecuteFlowAtomicStep.ts  # Core execution step
โ”‚   โ”‚   โ””โ”€โ”€ UpdateStatusStep.ts
โ”‚   โ””โ”€โ”€ queue.ts                      # DBOS queue config
โ”‚
โ”œโ”€โ”€ interfaces/
โ”‚   โ”œโ”€โ”€ IEventBus.ts
โ”‚   โ””โ”€โ”€ ITaskQueue.ts
โ”‚
โ””โ”€โ”€ ws-server.ts           # WebSocket server

apps/chaingraph-execution-api/
โ””โ”€โ”€ src/
    โ”œโ”€โ”€ index.ts           # Entry point
    โ””โ”€โ”€ server/index.ts    # Uses createServicesForAPI()

Execution Procedures

File: packages/chaingraph-executor/server/trpc/router.ts

Create Execution

Lines: 148-271

create: authedProcedure
  .input(z.object({
    flowId: z.string(),
    options: ExecutionOptionsSchema.optional(),
    integration: IntegrationContextSchema.optional(),
    events: z.array(ExecutionExternalEventSchema).optional(),
  }))
  .mutation(async ({ input, ctx }) => {
    const { executionStore, taskQueue, flowStore } = ctx
    const userId = ctx.session?.user?.id

    // 1. Validate user owns the flow
    const flow = await flowStore.getFlow(input.flowId)
    if (!flow) {
      throw new TRPCError({
        code: 'NOT_FOUND',
        message: `Flow ${input.flowId} not found`,
      })
    }

    if (flow.metadata.ownerID !== userId) {
      throw new TRPCError({
        code: 'FORBIDDEN',
        message: 'User does not own this flow',
      })
    }

    // 2. Create execution row in database
    const executionId = `EX${generateShortId(16)}`
    const execution = await executionStore.create({
      id: executionId,
      flowId: input.flowId,
      userId,
      status: ExecutionStatus.Created,
      createdAt: new Date(),
      options: input.options,
      integration: input.integration,
    })

    if (!execution) {
      throw new TRPCError({
        code: 'INTERNAL_SERVER_ERROR',
        message: 'Failed to create execution record',
      })
    }

    // 3. Start DBOS workflow (SIGNAL PATTERN)
    // Workflow writes EXECUTION_CREATED and waits for START_SIGNAL
    await taskQueue.publishTask({
      executionId,
      flowId: input.flowId,
      userId,
      options: input.options,
      integration: input.integration,
      externalEvents: input.events,
    })

    return { executionId }
  })

Start Execution

Lines: 273-318

start: executionContextProcedure
  .input(z.object({
    executionId: z.string(),
  }))
  .mutation(async ({ input, ctx }) => {
    const { executionStore, dbosClient } = ctx

    // 1. Validate status
    const execution = await executionStore.get(input.executionId)
    if (!execution) {
      throw new TRPCError({
        code: 'NOT_FOUND',
        message: `Execution ${input.executionId} not found`,
      })
    }

    if (execution.status !== ExecutionStatus.Created) {
      throw new TRPCError({
        code: 'BAD_REQUEST',
        message: `Cannot start execution in status ${execution.status}`,
      })
    }

    // 2. Send START_SIGNAL to waiting workflow
    // Uses DBOSClient in API mode, DBOS.send() in Worker mode
    if (dbosClient) {
      // API Mode: Use DBOSClient
      await dbosClient.send(input.executionId, START_SIGNAL, 'START_SIGNAL')
    } else {
      // Worker Mode: Use DBOS directly
      await DBOS.send(input.executionId, START_SIGNAL, 'START_SIGNAL')
    }

    return { success: true }
  })

Stop Execution

Lines: 320-378

stop: executionContextProcedure
  .input(z.object({
    executionId: z.string(),
    reason: z.string().optional(),
  }))
  .mutation(async ({ input, ctx }) => {
    const { executionStore, dbosClient } = ctx

    const execution = await executionStore.get(input.executionId)
    if (!execution) {
      throw new TRPCError({
        code: 'NOT_FOUND',
        message: `Execution ${input.executionId} not found`,
      })
    }

    // Can't stop if already terminal
    const terminalStatuses = [
      ExecutionStatus.Completed,
      ExecutionStatus.Failed,
      ExecutionStatus.Stopped,
    ]
    if (terminalStatuses.includes(execution.status)) {
      throw new TRPCError({
        code: 'BAD_REQUEST',
        message: `Cannot stop execution in status ${execution.status}`,
      })
    }

    // Cancel DBOS workflow (built-in feature)
    if (dbosClient) {
      await dbosClient.cancelWorkflow(input.executionId)
    } else {
      await DBOS.cancelWorkflow(input.executionId)
    }

    // Update database status
    await executionStore.updateStatus(input.executionId, ExecutionStatus.Stopped)

    return { success: true }
  })

Pause Execution

Lines: 380-437

pause: executionContextProcedure
  .input(z.object({
    executionId: z.string(),
    reason: z.string().optional(),
  }))
  .mutation(async ({ input, ctx }) => {
    const { executionStore, dbosClient } = ctx

    const execution = await executionStore.get(input.executionId)
    if (!execution || execution.status !== ExecutionStatus.Running) {
      throw new TRPCError({
        code: 'BAD_REQUEST',
        message: 'Can only pause running executions',
      })
    }

    // Send PAUSE command via DBOS messaging
    const command = { command: 'PAUSE', reason: input.reason }

    if (dbosClient) {
      await dbosClient.send(input.executionId, command, 'COMMAND')
    } else {
      await DBOS.send(input.executionId, command, 'COMMAND')
    }

    return { success: true }
  })

Resume Execution

Lines: 439-494

resume: executionContextProcedure
  .input(z.object({
    executionId: z.string(),
  }))
  .mutation(async ({ input, ctx }) => {
    const { executionStore, dbosClient } = ctx

    const execution = await executionStore.get(input.executionId)
    if (!execution || execution.status !== ExecutionStatus.Paused) {
      throw new TRPCError({
        code: 'BAD_REQUEST',
        message: 'Can only resume paused executions',
      })
    }

    // Send RESUME command via DBOS messaging
    const command = { command: 'RESUME' }

    if (dbosClient) {
      await dbosClient.send(input.executionId, command, 'COMMAND')
    } else {
      await DBOS.send(input.executionId, command, 'COMMAND')
    }

    return { success: true }
  })

Subscribe to Execution Events

Lines: 574-644

subscribeToExecutionEvents: executionContextProcedure
  .input(z.object({
    executionId: z.string(),
    fromIndex: z.number().optional().default(0),
    eventTypes: z.array(z.string()).optional().default([]),
    batchSize: z.number().min(1).max(1000).optional().default(100),
    batchTimeoutMs: z.number().min(0).max(1000).optional().default(25),
  }))
  .subscription(async function* ({ input, ctx, signal }) {
    const { executionStore, eventBus } = ctx

    // 1. Verify execution exists
    const instance = await executionStore.get(input.executionId)
    if (!instance) {
      throw new TRPCError({
        code: 'NOT_FOUND',
        message: `Execution with id ${input.executionId} not found`,
      })
    }

    // 2. Subscribe to event stream with batching
    const iterator = eventBus.subscribeToEvents(
      input.executionId,
      input.fromIndex,
      {
        maxSize: input.batchSize,
        timeoutMs: input.batchTimeoutMs,
      },
    )

    let eventCount = 0

    try {
      for await (const events of iterator) {
        // Check for client disconnect
        if (signal?.aborted) {
          logger.info({ executionId: input.executionId, eventsSent: eventCount },
            'Client disconnected')
          break
        }

        // Filter by event types
        const filtered = events.filter((event) => {
          if (input.eventTypes.length === 0) return true
          return input.eventTypes.includes(event.type)
        })

        // Only yield non-empty batches
        if (filtered.length > 0) {
          eventCount += filtered.length
          yield filtered
        }
      }
    } catch (error) {
      logger.error({ error, executionId: input.executionId },
        'Error in event subscription')
      throw error
    } finally {
      logger.info({ executionId: input.executionId, eventsSent: eventCount },
        'Subscription ended, cleaning up')
      await eventBus.unsubscribe(input.executionId)
    }
  })

Signal Pattern (CRITICAL)

The signal pattern solves a race condition where clients might subscribe before the event stream exists.

WITHOUT SIGNAL PATTERN (BROKEN):
1. create() โ†’ Returns immediately
2. subscribe() โ†’ Stream doesn't exist yet! โŒ
3. Workflow starts โ†’ EXECUTION_CREATED lost!

WITH SIGNAL PATTERN (CORRECT):
1. create() โ†’ Workflow starts โ†’ EXECUTION_CREATED โ†’ Stream exists! โœ…
2. subscribe() โ†’ Stream exists โ†’ Receives EXECUTION_CREATED โœ…
3. start() โ†’ Sends START_SIGNAL โ†’ Workflow continues

Implementation

In create procedure:

// Workflow starts but PAUSES after writing EXECUTION_CREATED
await taskQueue.publishTask({ executionId, ... })

In ExecutionWorkflow:

// Phase 1: Write event BEFORE waiting
await DBOS.writeStream('events', {
  type: ExecutionEventEnum.EXECUTION_CREATED,
  executionId,
  index: -1,  // Special index
})

// Wait for START_SIGNAL
const signal = await DBOS.recv<string>('START_SIGNAL', 60)
if (signal !== START_SIGNAL) {
  throw new Error('Expected START_SIGNAL')
}
// Phase 2: Continue execution...

In start procedure:

// Send signal to waiting workflow
await DBOS.send(executionId, START_SIGNAL, 'START_SIGNAL')

Dual-Mode Architecture

ChainGraph execution supports two deployment modes:

API Mode

File: packages/chaingraph-executor/server/services/ServiceFactory.ts:202-298

export async function createServicesForAPI(): Promise<ServiceInstances> {
  // NO DBOS runtime initialization!
  // Uses external DBOSClient instead

  const dbosClient = new DBOSClient(config.dbos)

  return {
    executionService: new ExecutionService(flowStore, nodeRegistry),
    executionStore: new PostgresExecutionStore(db),
    eventBus: new DBOSEventBus(streamBridge),
    taskQueue: new APITaskQueue(dbosClient),  // Uses DBOSClient.enqueue()
    flowStore,
    ownershipResolver,
    dbosClient,  // Available in context
  }
}

Characteristics:

  • NO local workflow execution
  • Uses DBOSClient for remote enqueue
  • Can send signals via DBOSClient.send()
  • Can cancel workflows via DBOSClient.cancelWorkflow()
  • Subscribes to events via DBOSEventBus

Worker Mode

File: packages/chaingraph-executor/server/services/ServiceFactory.ts:306-429

export async function createServicesForWorker(): Promise<ServiceInstances> {
  // Import workflow class (REQUIRED for DBOS registration)
  const { ExecutionWorkflows } = await import('../dbos/workflows/ExecutionWorkflows')

  // Create queue BEFORE DBOS.launch() for dequeue capability
  const executionQueue = new WorkflowQueue('executionQueue', {
    workerConcurrency: config.dbos.workerConcurrency,
  })

  // Initialize DBOS runtime
  await initializeDBOS()

  return {
    executionService: new ExecutionService(flowStore, nodeRegistry),
    executionStore: new PostgresExecutionStore(db),
    eventBus: new DBOSEventBus(streamBridge),
    taskQueue: new DBOSTaskQueue(executionQueue),  // Uses DBOS.startWorkflow()
    flowStore,
    ownershipResolver,
    // NO dbosClient - use DBOS directly
  }
}

Characteristics:

  • Full DBOS runtime initialization
  • Executes workflows locally
  • Queue created BEFORE DBOS.launch() (critical!)
  • Uses DBOS.send(), DBOS.startWorkflow() directly

Event Streaming

Event Bus Interface

File: packages/chaingraph-executor/server/interfaces/IEventBus.ts

interface IEventBus {
  publishEvent(executionId: string, event: ExecutionEventImpl): Promise<void>

  subscribeToEvents(
    executionId: string,
    fromIndex: number,
    batchConfig?: EventBatchConfig,
  ): AsyncIterable<ExecutionEventImpl[]>

  unsubscribe(executionId: string): Promise<void>

  close(): Promise<void>
}

interface EventBatchConfig {
  maxSize: number      // Max events per batch (default: 100)
  timeoutMs: number    // Max wait before flush (default: 25ms)
}

DBOS Event Bus

File: packages/chaingraph-executor/server/implementations/dbos/DBOSEventBus.ts

export class DBOSEventBus implements IEventBus {
  private readonly streamBridge: StreamBridge

  async publishEvent(executionId: string, event: ExecutionEventImpl): Promise<void> {
    // Uses DBOS.writeStream() - allowed from STEP context!
    await DBOS.writeStream('events', {
      ...event,
      executionId,
    })
  }

  subscribeToEvents(
    executionId: string,
    fromIndex: number,
    batchConfig?: EventBatchConfig,
  ): AsyncIterable<ExecutionEventImpl[]> {
    return this.streamBridge.subscribe(executionId, fromIndex, batchConfig)
  }

  async unsubscribe(executionId: string): Promise<void> {
    await this.streamBridge.unsubscribe(executionId)
  }
}

Stream Bridge (PostgreSQL LISTEN/NOTIFY)

File: packages/chaingraph-executor/server/implementations/dbos/streaming/StreamBridge.ts

Event Published (DBOS.writeStream)
        โ”‚
        โ–ผ
PostgreSQL (dbos_workflow_event_queue table)
        โ”‚
        โ–ผ
NOTIFY dbos_workflow_events (with executionId)
        โ”‚
        โ–ผ
PGListenerPool (10 listeners, sharded)
        โ”‚
        โ–ผ
DBOSStreamSubscriber
        โ”‚
        โ–ผ
StreamBridge (batching accumulator)
        โ”‚
        โ–ผ
DBOSEventBus
        โ”‚
        โ–ผ
tRPC Subscription (WebSocket)

Key Features:

  • Real-time via PostgreSQL LISTEN/NOTIFY
  • Fallback polling if LISTEN unavailable
  • Automatic listener sharding (10 listeners)
  • Configurable batching for network efficiency

Execution Context

File: packages/chaingraph-executor/server/trpc/context.ts

export interface ExecutorContext {
  session: Session
  executionService: IExecutionService
  executionStore: IExecutionStore
  eventBus: IEventBus
  taskQueue: ITaskQueue
  flowStore: IFlowStore
  ownershipResolver: IOwnershipResolver
  dbosClient?: DBOSClient  // Only in API mode
}

export async function createContext(opts: CreateHTTPContextOptions): Promise<ExecutorContext> {
  // Get services from ServiceFactory (singleton)
  const services = await getServices()

  // Extract auth token and validate session
  const token = getAuthToken(opts)
  const session = await authService.validateSession(token)

  return {
    session: {
      user: session?.user,
      isAuthenticated: !!session?.user,
    },
    ...services,
  }
}

Query Procedures

Get Execution Details

Lines: 496-513

getExecutionDetails: executionContextProcedure
  .input(z.object({
    executionId: z.string(),
  }))
  .query(async ({ input, ctx }) => {
    const execution = await ctx.executionStore.get(input.executionId)
    if (!execution) {
      throw new TRPCError({
        code: 'NOT_FOUND',
        message: `Execution ${input.executionId} not found`,
      })
    }
    return execution
  })

Get Executions Tree

Lines: 515-532

getExecutionsTree: executionContextProcedure
  .input(z.object({
    executionId: z.string(),
  }))
  .query(async ({ input, ctx }) => {
    return ctx.executionStore.getTree(input.executionId)
  })

Get Root Executions

Lines: 534-572

getRootExecutions: authedProcedure
  .input(z.object({
    flowId: z.string(),
    limit: z.number().min(1).max(100).default(50),
    after: z.date().optional(),
  }))
  .query(async ({ input, ctx }) => {
    // Returns paginated list of root executions for a flow
    return ctx.executionStore.listRootExecutions(
      input.flowId,
      ctx.session.user.id,
      input.limit,
      input.after,
    )
  })

Comparison: Flow Editing vs Execution

AspectFlow EditingExecution
Port30014021
Packagechaingraph-trpcchaingraph-executor
Appchaingraph-backendchaingraph-execution-api
PurposeCRUD flows, nodes, edgesRun flows, stream events
StorageFlow definitions (JSONB)Execution state + events
Real-timeflow.onEvent() โ†’ WebSocketDBOS streams โ†’ WebSocket
OrchestrationNoneDBOS workflows
ModesSingle modeAPI + Worker modes

Key Files

FilePurpose
packages/chaingraph-executor/server/trpc/router.ts:148-644All execution procedures
packages/chaingraph-executor/server/trpc/context.tsExecution context
packages/chaingraph-executor/server/services/ServiceFactory.tsAPI vs Worker setup
packages/chaingraph-executor/server/implementations/dbos/DBOSEventBus.tsEvent streaming
packages/chaingraph-executor/server/implementations/dbos/streaming/StreamBridge.tsLISTEN/NOTIFY
packages/chaingraph-executor/server/dbos/workflows/ExecutionWorkflows.tsDBOS workflow
packages/chaingraph-executor/server/ws-server.tsWebSocket server
apps/chaingraph-execution-api/src/index.tsAPI entry point

Quick Reference

OperationProcedureKey Pattern
CreatecreateStart workflow, wait for signal
StartstartSend START_SIGNAL
StopstopDBOS.cancelWorkflow()
PausepauseSend PAUSE command
ResumeresumeSend RESUME command
SubscribesubscribeToExecutionEventsStream with batching
ModeTaskQueueSignal Method
APIAPITaskQueue โ†’ DBOSClient.enqueue()dbosClient.send()
WorkerDBOSTaskQueue โ†’ DBOS.startWorkflow()DBOS.send()

Related Skills

  • trpc-patterns - General tRPC framework patterns
  • trpc-flow-editing - Flow editing procedures
  • dbos-patterns - DBOS constraints and patterns
  • executor-architecture - Package overview