Unnamed Skill

Real-time data synchronization patterns for ChainGraph frontend. Use when working on WebSocket subscriptions, event buffers, tRPC subscriptions, flow synchronization, or execution event streaming. Covers subscription lifecycle, event buffering, race condition solutions. Triggers: subscription, sync, real-time, websocket, event buffer, tRPC subscription, flow events, onData, patronum interval.

$ Installer

git clone https://github.com/chaingraphlabs/chaingraph /tmp/chaingraph && cp -r /tmp/chaingraph/.claude/skills/subscription-sync ~/.claude/skills/chaingraph

// tip: Run this command in your terminal to install the skill


name: subscription-sync description: Real-time data synchronization patterns for ChainGraph frontend. Use when working on WebSocket subscriptions, event buffers, tRPC subscriptions, flow synchronization, or execution event streaming. Covers subscription lifecycle, event buffering, race condition solutions. Triggers: subscription, sync, real-time, websocket, event buffer, tRPC subscription, flow events, onData, patronum interval.

Subscription Sync Patterns

This skill covers the real-time data synchronization system between ChainGraph backend and frontend via WebSocket subscriptions.

Architecture Overview

┌──────────────────────────────────────────────────────────────┐
│                      BACKEND (tRPC)                           │
│                                                               │
│  Flow Subscription           Execution Subscription           │
│  ├─ FlowInitStart            ├─ EXECUTION_CREATED            │
│  ├─ NodesAdded               ├─ FLOW_STARTED                 │
│  ├─ EdgesAdded               ├─ NODE_STARTED                 │
│  ├─ FlowInitEnd              ├─ NODE_COMPLETED               │
│  ├─ NodeUpdated              ├─ EDGE_TRANSFER                │
│  ├─ PortUpdated              └─ FLOW_COMPLETED               │
│  └─ ...                                                       │
└──────────────────┬───────────────────────┬───────────────────┘
                   │ WebSocket             │ WebSocket
                   ▼                       ▼
┌──────────────────────────────────────────────────────────────┐
│                      FRONTEND                                  │
│                                                               │
│  ┌─────────────────────┐    ┌─────────────────────┐          │
│  │ $trpcClient         │    │ $trpcClientExecutor │          │
│  │ ws://localhost:3001 │    │ ws://localhost:4021 │          │
│  └──────────┬──────────┘    └──────────┬──────────┘          │
│             │                          │                      │
│             ▼                          ▼                      │
│  ┌─────────────────────┐    ┌─────────────────────┐          │
│  │ Flow Event Buffer   │    │ Execution Events    │          │
│  │ (50ms batching)     │    │ (direct processing) │          │
│  └──────────┬──────────┘    └──────────┬──────────┘          │
│             │                          │                      │
│             ▼                          ▼                      │
│  ┌────────────────────────────────────────────────┐          │
│  │              Effector Stores                    │          │
│  │  $nodes, $edges, $portValues, $execution        │          │
│  └────────────────────────────────────────────────┘          │
└──────────────────────────────────────────────────────────────┘

Two tRPC Clients

ChainGraph frontend maintains TWO separate WebSocket connections:

Files:

  • Main Server Client: apps/chaingraph-frontend/src/store/trpc/store.ts
  • Executor Server Client: apps/chaingraph-frontend/src/store/trpc/execution-client.ts
// Main Server - Flow editing operations (store.ts)
export const $trpcClient = trpcDomain.createStore<TRPCClient | null>(null)
// Connects to: ws://localhost:3001

// Executor Server - Execution events (execution-client.ts)
export const $trpcClientExecutor = trpcDomain.createStore<TRPCClient | null>(null)
// Connects to: ws://localhost:4021

Why Two Clients?

  1. Separation of Concerns: Flow editing and execution are independent
  2. Load Distribution: Heavy execution traffic doesn't block editing
  3. Independent Scaling: Executor can scale separately
  4. Failure Isolation: Execution server crash doesn't break editing

Flow Subscription Lifecycle

Files:

  • Subscription: apps/chaingraph-frontend/src/store/flow/subscription.ts
  • Event Buffer: apps/chaingraph-frontend/src/store/flow/event-buffer.ts

Event Sequence

1. FlowInitStart
   └─ Clear existing nodes/edges
   └─ Set status: CONNECTING → SUBSCRIBED

2. NodesAdded (batch)
   └─ Buffer accumulates events

3. EdgesAdded (batch)
   └─ Buffer accumulates events

4. FlowInitEnd (COMMIT SIGNAL)
   └─ Buffer flushes immediately
   └─ All events processed atomically
   └─ Nodes render BEFORE edges (race condition solved)

5. Live Updates (ongoing)
   └─ Buffer with 50ms interval
   └─ NodeUpdated, PortUpdated, EdgeAdded, etc.

Subscription Status

enum FlowSubscriptionStatus {
  IDLE = 'idle',
  CONNECTING = 'connecting',
  SUBSCRIBED = 'subscribed',
  ERROR = 'error',
  DISCONNECTED = 'disconnected',
}

Event Buffer Pattern

Problem: Race condition where edges render before nodes during flow initialization.

Root Cause:

1. addNodes triggers xyflowStructureChanged with 50ms debounce
2. setEdges updates $xyflowEdges immediately
3. $xyflowEdges filters out edges because $xyflowNodes is empty

Solution: Buffer ALL FlowEvents and flush atomically on FlowInitEnd.

File: apps/chaingraph-frontend/src/store/flow/event-buffer.ts

import { interval } from 'patronum'

// Buffer accumulates events
export const $flowEventBuffer = flowDomain.createStore<FlowEvent[]>([])
  .on(flowEventReceived, (buffer, event) => [...buffer, event])

// Ticker runs every 50ms (configurable via VITE_FLOW_EVENT_BUFFER_INTERVAL)
const ticker = interval({
  timeout: 50,  // BUFFER_INTERVAL_MS
  start: tickerStart,
  stop: tickerStop,
})

// Auto-start ticker when first event arrives
sample({
  clock: flowEventReceived,
  source: $flowEventBuffer,
  filter: buffer => buffer.length === 1,  // Buffer was empty
  target: tickerStart,
})

// Auto-stop ticker when buffer is empty
sample({
  clock: $flowEventBuffer,
  filter: buffer => buffer.length === 0,
  target: tickerStop,
})

// CRITICAL: Flush immediately on FlowInitEnd
sample({
  clock: flowEventReceived,
  filter: event => event.type === FlowEventType.FlowInitEnd,
  target: flushBuffer,
})

Buffer Processing Flow

Subscription → flowEventReceived → $flowEventBuffer
                                         │
                    ┌────────────────────┴────────────────────┐
                    │                                          │
              [FlowInitEnd]                               [50ms tick]
                    │                                          │
                    ▼                                          ▼
             flushBuffer (immediate)              processBufferFx (batched)
                    │                                          │
                    └────────────────┬─────────────────────────┘
                                     │
                                     ▼
                              newFlowEvents (batch of FlowEvent[])
                                     │
                                     ▼
                              Event Handlers in stores.ts

Execution Subscription

File: apps/chaingraph-frontend/src/store/execution/subscription.ts

Execution events are processed directly (no buffering needed):

// Subscribe to execution events
// Note: No .execution namespace - procedures are at router root
const subscription = trpcClientExecutor.subscribeToExecutionEvents.subscribe(
  { executionId, fromIndex: 0 },
  {
    onData: (event) => {
      executionEventReceived(event)  // Direct dispatch
    },
    onError: (error) => {
      executionError(error)
    },
  }
)

Execution Event Types

enum ExecutionEventEnum {
  EXECUTION_CREATED = 'EXECUTION_CREATED',  // index -1
  FLOW_STARTED = 'FLOW_STARTED',
  NODE_STARTED = 'NODE_STARTED',
  NODE_COMPLETED = 'NODE_COMPLETED',
  NODE_FAILED = 'NODE_FAILED',
  EDGE_TRANSFER_COMPLETED = 'EDGE_TRANSFER_COMPLETED',
  FLOW_COMPLETED = 'FLOW_COMPLETED',
  FLOW_FAILED = 'FLOW_FAILED',
  CHILD_EXECUTION_SPAWNED = 'CHILD_EXECUTION_SPAWNED',
}

Key Files

FilePurpose
src/store/trpc/store.tstRPC client stores
src/store/flow/subscription.tsFlow subscription management
src/store/flow/event-buffer.tsEvent buffering with patronum
src/store/execution/subscription.tsExecution event subscription
src/store/flow/stores.tsEvent handlers (newFlowEvents)

Common Patterns

Subscribe to Flow

import { subscribeToFlowFx, unsubscribeFromFlowFx } from '@/store/flow/subscription'

// Subscribe
subscribeToFlowFx(flowId)

// Unsubscribe (cleanup)
unsubscribeFromFlowFx()

Handle Flow Events

// In stores.ts
sample({
  clock: newFlowEvents,
  filter: events => events.some(e => e.type === FlowEventType.NodeUpdated),
  fn: events => events.filter(e => e.type === FlowEventType.NodeUpdated),
  target: processNodeUpdates,
})

Subscribe to Execution

import { subscribeToExecutionFx } from '@/store/execution/subscription'

// Subscribe and wait for EXECUTION_CREATED
await subscribeToExecutionFx({ executionId })

// Start execution after subscription is ready
startExecution({ executionId })

Anti-Patterns

Anti-Pattern #1: Processing events without buffering

// ❌ BAD: Direct dispatch causes race conditions
onData: (event) => {
  newFlowEvents([event])  // Edges may render before nodes!
}

// ✅ GOOD: Use buffer
onData: (event) => {
  flowEventReceived(event)  // Buffer handles ordering
}

Anti-Pattern #2: Not waiting for EXECUTION_CREATED

// ❌ BAD: Start before subscription is ready
startExecution({ executionId })
subscribeToExecutionFx({ executionId })  // Might miss events!

// ✅ GOOD: Subscribe first, then start
await subscribeToExecutionFx({ executionId })
startExecution({ executionId })

Anti-Pattern #3: Not cleaning up subscriptions

// ❌ BAD: Memory leak
useEffect(() => {
  subscribeToFlowFx(flowId)
  // No cleanup!
}, [flowId])

// ✅ GOOD: Cleanup on unmount/change
useEffect(() => {
  subscribeToFlowFx(flowId)
  return () => {
    unsubscribeFromFlowFx()
  }
}, [flowId])

Quick Reference

NeedPatternFile
Subscribe to flowsubscribeToFlowFx(flowId)flow/subscription.ts
Buffer eventsflowEventReceived(event)flow/event-buffer.ts
Process buffered eventsnewFlowEvents eventflow/stores.ts
Subscribe to executionsubscribeToExecutionFx()execution/subscription.ts
Get subscription status$flowSubscriptionStatusflow/stores.ts

Related Skills

  • effector-patterns - Effector patterns used in subscriptions
  • frontend-architecture - Overall frontend structure
  • executor-architecture - Backend event emission
  • dbos-patterns - DBOS event streaming
  • trpc-patterns - General tRPC framework patterns
  • trpc-flow-editing - Flow editing tRPC procedures
  • trpc-execution - Execution tRPC procedures