temporal

Temporal.io workflow orchestration for SignalRoom. Use when designing workflows, debugging activities, managing schedules, or troubleshooting stuck workflows.

$ 安裝

git clone https://github.com/mmbianco78/signalroom /tmp/signalroom && cp -r /tmp/signalroom/.claude/skills/temporal ~/.claude/skills/signalroom

// tip: Run this command in your terminal to install the skill


name: temporal description: Temporal.io workflow orchestration for SignalRoom. Use when designing workflows, debugging activities, managing schedules, or troubleshooting stuck workflows.

Temporal Workflow Orchestration

Architecture

Temporal Cloud (signalroom-713.nzg5u)
    │
    ├── Schedules (cron triggers)
    │
    └── Workflows (orchestration)
            │
            └── Activities (actual work)
                    │
                    └── dlt Pipelines / Reports / Notifications

Key Concepts

ConceptPurposeLocation
WorkflowOrchestration logic (no I/O)temporal/workflows.py
ActivityRetryable unit of worktemporal/activities.py
WorkerProcess that executes workflows/activitiesworkers/main.py
ScheduleCron-like triggerTemporal Cloud UI

Workflows vs Activities

Workflows (pure orchestration):

  • No I/O, no network calls, no file access
  • Must be deterministic (same input = same output)
  • Can call activities and wait for results

Activities (actual work):

  • API calls, database writes, file operations
  • Retried automatically on failure
  • Can be long-running

SignalRoom Workflows

# Sync a single source
SyncSourceWorkflow(source_name, resources, notify_on_success, notify_on_failure)

# Sync multiple sources sequentially
ScheduledSyncWorkflow(sources)

# Run and send a report
RunReportWorkflow(report_name, channel, send)

Triggering Workflows

# Via script (recommended)
python scripts/trigger_workflow.py everflow -w

# Programmatically
from signalroom.temporal.config import get_temporal_client

client = await get_temporal_client()
await client.start_workflow(
    SyncSourceWorkflow.run,
    args=[...],
    id="sync-everflow-manual",
    task_queue="api-tasks"
)

Schedules

Active Schedules

Schedule IDCronWorkflow
hourly-sync-everflow-redtrack0 12-23 * * * (7am-11pm ET)ScheduledSyncWorkflow
daily-sync-s30 11 * * * (6am ET)SyncSourceWorkflow

Managing Schedules

# Setup/update schedules
python scripts/setup_schedules.py

# Delete all schedules
python scripts/setup_schedules.py --delete

Temporal Cloud UI

https://cloud.temporal.io/namespaces/signalroom-713.nzg5u/schedules

Retry Policy

Defined in temporal/config.py:

RETRY_POLICY = RetryPolicy(
    initial_interval=timedelta(seconds=1),
    maximum_interval=timedelta(minutes=5),
    backoff_coefficient=2.0,
    maximum_attempts=5,
    non_retryable_error_types=["ValueError", "KeyError"],
)

Debugging Stuck Workflows

1. Check Temporal UI

https://cloud.temporal.io/namespaces/signalroom-713.nzg5u/workflows

Look for:

  • Workflow status (Running, Completed, Failed)
  • Pending activities
  • Event history

2. Check Worker Logs

# Local
make logs-worker

# Fly.io
fly logs

3. Common Issues

"Activity timed out"

  • Pipeline took > 30 minutes
  • Worker crashed mid-activity

"No worker available"

  • Worker not running
  • Wrong task queue

"Workflow stuck in Running"

  • Activity repeatedly failing and retrying
  • Check activity error in event history

UnsandboxedWorkflowRunner

Temporal sandboxes workflows by default. If you get RestrictedWorkflowAccessError from imports like structlog:

from temporalio.worker import UnsandboxedWorkflowRunner

worker = Worker(
    client,
    task_queue=settings.temporal_task_queue,
    workflows=[...],
    activities=[...],
    workflow_runner=UnsandboxedWorkflowRunner(),  # Add this
)

Activity Patterns

Async Context

Activities run in async context. Don't use asyncio.run() inside:

# WRONG - nested event loop
@activity.defn
async def my_activity():
    result = some_sync_function_that_uses_asyncio_run()  # Fails!

# RIGHT - await directly
@activity.defn
async def my_activity():
    result = await some_async_function()

Heartbeats (Long-Running)

For activities > 1 minute:

@activity.defn
async def long_activity():
    for batch in batches:
        activity.heartbeat()  # Prevent timeout
        await process(batch)

Resources