execution-engine-analysis

Analyze control flow, concurrency models, and event architectures in agent frameworks. Use when (1) understanding async vs sync execution patterns, (2) classifying execution topology (DAG/FSM/Linear), (3) mapping event emission and observability hooks, (4) evaluating scalability characteristics, or (5) comparing execution models across frameworks.

$ 安裝

git clone https://github.com/Dowwie/agent_framework_study /tmp/agent_framework_study && cp -r /tmp/agent_framework_study/.claude/skills/execution-engine-analysis ~/.claude/skills/agent_framework_study

// tip: Run this command in your terminal to install the skill


name: execution-engine-analysis description: Analyze control flow, concurrency models, and event architectures in agent frameworks. Use when (1) understanding async vs sync execution patterns, (2) classifying execution topology (DAG/FSM/Linear), (3) mapping event emission and observability hooks, (4) evaluating scalability characteristics, or (5) comparing execution models across frameworks.

Execution Engine Analysis

Analyzes the control flow substrate and concurrency model.

Process

  1. Identify async model — Native async, sync-with-wrappers, or hybrid
  2. Classify topology — DAG, FSM, or linear chain
  3. Catalog events — Callbacks, listeners, generators
  4. Map observability — Pre/post hooks, interception points

Concurrency Model Classification

Native Async

# Signature: async/await throughout
async def run(self):
    result = await self.llm.agenerate(messages)
    return await self.process(result)

# Entry point uses asyncio
asyncio.run(agent.run())

Indicators: async def, await, asyncio.gather, aiohttp

Sync with Wrappers

# Signature: sync API wrapping async internals
def run(self):
    return asyncio.run(self._async_run())

# Or using thread pools
def run(self):
    with ThreadPoolExecutor() as pool:
        future = pool.submit(self._blocking_call)
        return future.result()

Indicators: asyncio.run() inside sync methods, ThreadPoolExecutor, run_in_executor

Hybrid

# Both sync and async APIs exposed
def invoke(self, input):
    return self._sync_invoke(input)

async def ainvoke(self, input):
    return await self._async_invoke(input)

Indicators: Paired methods (invoke/ainvoke), sync_to_async decorators

Execution Topology

DAG (Directed Acyclic Graph)

# Signature: Nodes with dependencies
class Node:
    def __init__(self, deps: list[Node]): ...

graph.add_edge(node_a, node_b)
result = graph.execute()  # Topological order

Indicators: Graph, Node, Edge classes, networkx, topological sort

FSM (Finite State Machine)

# Signature: Explicit states and transitions
class State(Enum):
    THINKING = "thinking"
    ACTING = "acting"
    DONE = "done"

def transition(self, current: State, event: str) -> State:
    if current == State.THINKING and event == "action_chosen":
        return State.ACTING

Indicators: State enums, transition tables, current_state, state machine libraries

Linear Chain

# Signature: Sequential step execution
def run(self):
    result = self.step1()
    result = self.step2(result)
    result = self.step3(result)
    return result

# Or pipeline pattern
chain = step1 | step2 | step3
result = chain.invoke(input)

Indicators: Sequential calls, pipe operators (|), Chain, Pipeline classes

Event Architecture

Callbacks

class Callbacks:
    def on_llm_start(self, prompt): ...
    def on_llm_end(self, response): ...
    def on_tool_start(self, tool, input): ...
    def on_tool_end(self, output): ...
    def on_error(self, error): ...

Flexibility: Low — fixed hook points Traceability: Medium — easy to follow

Event Listeners/Emitters

emitter = EventEmitter()
emitter.on('llm:start', handler)
emitter.on('tool:*', wildcard_handler)
emitter.emit('llm:start', {'prompt': prompt})

Flexibility: High — dynamic registration Traceability: Low — harder to trace

Async Generators (Streaming)

async def run(self):
    async for chunk in self.llm.astream(prompt):
        yield {"type": "token", "content": chunk}
    yield {"type": "done"}

Flexibility: Medium — streaming-native Traceability: High — follows data flow

Observability Hooks Inventory

Hook PointPurposeInterception Level
Pre-LLMModify promptInput
Post-LLMAccess raw responseOutput
Pre-ToolValidate tool inputInput
Post-ToolTransform tool outputOutput
Pre-StepObserve stateRead-only
Post-StepModify next stepControl flow
On-ErrorHandle/transformError

Questions to Answer

  • Can you intercept tool input before execution?
  • Is the raw LLM response accessible (with token counts)?
  • Can you modify control flow from hooks?
  • Are hooks sync or async?

Output Template

## Execution Engine Analysis: [Framework Name]

### Concurrency Model
- **Type**: [Native Async / Sync-with-Wrappers / Hybrid]
- **Entry Point**: `path/to/main.py:run()`
- **Thread Safety**: [Yes/No/Partial]

### Execution Topology
- **Model**: [DAG / FSM / Linear Chain]
- **Implementation**: [Description with code refs]
- **Parallelization**: [Supported/Not Supported]

### Event Architecture
- **Pattern**: [Callbacks / Listeners / Generators]
- **Registration**: [Static / Dynamic]
- **Streaming**: [Supported / Not Supported]

### Observability Inventory

| Hook | Location | Async | Modifiable |
|------|----------|-------|------------|
| on_llm_start | callbacks.py:L23 | Yes | Input only |
| on_tool_end | callbacks.py:L45 | Yes | Output |
| ... | ... | ... | ... |

### Scalability Assessment
- **Blocking Operations**: [List any]
- **Resource Limits**: [Token counters, rate limits]
- **Recommended Concurrency**: [Threads/Processes/AsyncIO]

Integration

  • Prerequisite: codebase-mapping to identify execution files
  • Feeds into: comparative-matrix for async decisions
  • Related: control-loop-extraction for agent-specific flow