MCP_ORCHESTRATION

Tool discovery, routing, chaining, error handling, and composition for the 34+ MCP scheduling tools. Use when orchestrating complex multi-tool workflows, handling MCP errors, or discovering available capabilities.

$ 安裝

git clone https://github.com/Euda1mon1a/Autonomous-Assignment-Program-Manager /tmp/Autonomous-Assignment-Program-Manager && cp -r /tmp/Autonomous-Assignment-Program-Manager/.claude/skills/MCP_ORCHESTRATION ~/.claude/skills/Autonomous-Assignment-Program-Manager

// tip: Run this command in your terminal to install the skill


name: MCP_ORCHESTRATION description: Tool discovery, routing, chaining, error handling, and composition for the 34+ MCP scheduling tools. Use when orchestrating complex multi-tool workflows, handling MCP errors, or discovering available capabilities. model_tier: haiku parallel_hints: can_parallel_with: [code-review, test-writer, security-audit] must_serialize_with: [database-migration] preferred_batch_size: 5

MCP Orchestration Skill

Expert orchestration of Model Context Protocol (MCP) tools for medical residency scheduling. Handles tool discovery, intelligent routing, error recovery, and complex multi-tool composition.

When This Skill Activates

  • Multi-step workflows requiring 2+ MCP tools
  • Error recovery from failed MCP calls
  • Tool capability discovery needed
  • Complex scheduling operations requiring orchestration
  • Debugging MCP integration issues
  • Performance optimization of tool chains

Overview

The MCP server exposes 34+ specialized tools across 6 categories:

CategoryToolsPurpose
Core Scheduling5Validation, generation, conflict detection, swaps
Resilience Framework13Utilization, contingency, defense levels, homeostasis
Background Tasks4Celery task management (start, status, cancel, list)
Deployment7Validation, security, smoke tests, rollback
Empirical Testing5Benchmarking solvers, constraints, modules
Resources2Schedule status, compliance summary

Total: 36 tools available for orchestration.

Key Orchestration Phases

Phase 1: Discovery

  1. Identify available tools matching task requirements
  2. Check tool availability and health
  3. Verify prerequisites (DB connection, API availability)
  4. Map inputs/outputs between dependent tools

Phase 2: Planning

  1. Create execution DAG (directed acyclic graph)
  2. Identify parallel vs sequential dependencies
  3. Plan error handling checkpoints
  4. Estimate execution time and resource usage

Phase 3: Execution

  1. Execute tools in dependency order
  2. Handle transient errors with retry logic
  3. Propagate results through tool chain
  4. Monitor progress and resource utilization

Phase 4: Recovery

  1. Detect permanent vs transient failures
  2. Execute fallback strategies
  3. Rollback partial state changes if needed
  4. Log errors for human escalation

Orchestration Patterns

Pattern 1: Sequential Chain

Tool A → Tool B → Tool C

Each tool depends on previous tool's output.

Example: Schedule Generation Pipeline

validate_deployment → generate_schedule → validate_schedule → run_smoke_tests

Pattern 2: Parallel Fan-Out

       → Tool B
Tool A → Tool C
       → Tool D

Multiple tools execute concurrently on same input.

Example: Comprehensive Schedule Analysis

                → validate_schedule
schedule_status → detect_conflicts
                → check_utilization_threshold

Pattern 3: Map-Reduce

Tool A → [Tool B, Tool B, Tool B] → Tool C

Parallel execution followed by aggregation.

Example: Multi-Person Swap Analysis

For each faculty:
    analyze_swap_candidates → aggregate_results → rank_by_score

Pattern 4: Conditional Routing

Tool A → Decision → Tool B (if condition)
               → Tool C (else)

Example: Deployment Workflow

validate_deployment → (if valid) → promote_to_production
                   → (else)      → rollback_deployment

Key Files

FilePurpose
Workflows/tool-discovery.mdMCP endpoint scanning and capability mapping
Workflows/error-handling.mdRetry logic, fallback strategies, escalation
Workflows/tool-composition.mdDAG patterns, parallel execution, result synthesis
Reference/mcp-tool-index.mdComplete tool catalog with I/O schemas
Reference/tool-error-patterns.mdKnown failure modes and workarounds
Reference/composition-examples.mdReal-world multi-tool chains

Output

This skill produces:

  1. Execution Plans: DAG of tool dependencies with timing estimates
  2. Error Reports: Categorized failures with recovery recommendations
  3. Performance Metrics: Latency, throughput, resource usage
  4. Capability Maps: Which tools can satisfy which requirements

Error Handling Strategy

See Workflows/error-handling.md for complete strategy. Key principles:

  1. Retry Transient Errors: Network timeouts, rate limits, DB locks
  2. Fail Fast on Permanent Errors: Invalid inputs, missing resources
  3. Graceful Degradation: Use cached data or reduced functionality
  4. Human Escalation: Alert on unrecoverable errors

Integration with MCP Server

The MCP server runs in Docker container mcp-server and exposes tools via:

  • STDIO Transport: For Claude Desktop integration
  • HTTP Transport (dev mode): Port 8080 for debugging

Health Check

docker-compose logs -f mcp-server
docker-compose exec mcp-server python -c \
  "from scheduler_mcp.server import mcp; print(f'Tools: {len(mcp.tools)}')"

API Connectivity Test

docker-compose exec mcp-server curl -s http://backend:8000/health

Common Workflows

1. Schedule Safety Check

Goal: Comprehensive validation before deployment

Parallel:
  - validate_schedule(date_range)
  - detect_conflicts(date_range)
  - check_utilization_threshold()
  - run_contingency_analysis_resilience(N-1, N-2)

Aggregate results → Generate safety report

2. Emergency Coverage Response

Goal: Handle faculty absence with minimal disruption

1. run_contingency_analysis(scenario="faculty_absence", person_ids=[...])
2. For each resolution_option:
     analyze_swap_candidates(requester_id, assignment_id)
3. execute_sacrifice_hierarchy(target_level="yellow", simulate=True)
4. get_static_fallbacks() → Identify pre-computed schedules

3. Deployment Pipeline

Goal: Safe production deployment

1. validate_deployment(env="staging", git_ref="main")
2. run_security_scan(git_ref="main")
3. If all passed:
     run_smoke_tests(env="staging", suite="full")
4. If smoke tests passed:
     promote_to_production(staging_version, approval_token)
5. Monitor: get_deployment_status(deployment_id)

4. Performance Optimization

Goal: Identify and remove low-value code

Parallel:
  - benchmark_solvers(scenario_count=20)
  - benchmark_constraints(test_schedules="historical")
  - benchmark_resilience(modules=all)
  - module_usage_analysis(entry_points=["main", "api", "scheduling"])

Aggregate → Generate cut list → ablation_study(module_path)

Concrete Implementation Examples

The workflows above are high-level. This section provides detailed, runnable code examples.

Example 1: Parallel Safety Check with Error Handling

"""Complete safety check implementation with error handling."""
import asyncio
from typing import Dict, List, Any
from datetime import date, datetime

async def comprehensive_safety_check(
    start_date: date,
    end_date: date,
    timeout_seconds: int = 60
) -> Dict[str, Any]:
    """
    Orchestrate parallel safety checks with robust error handling.

    Args:
        start_date: Schedule validation start date
        end_date: Schedule validation end date
        timeout_seconds: Maximum time to wait for all checks

    Returns:
        Safety report with pass/fail status and recommendations
    """
    # Phase 1: Define all checks with timeout wrapper
    async def safe_call(coro, check_name: str):
        """Wrapper that handles timeouts and exceptions."""
        try:
            result = await asyncio.wait_for(coro, timeout=30)
            return {"success": True, "data": result, "check": check_name}
        except asyncio.TimeoutError:
            return {"success": False, "error": "Timeout after 30s", "check": check_name}
        except Exception as e:
            return {"success": False, "error": str(e), "check": check_name}

    # Phase 2: Execute checks in parallel
    checks = [
        safe_call(validate_schedule_mcp(start_date, end_date), "validation"),
        safe_call(detect_conflicts_mcp(start_date, end_date), "conflicts"),
        safe_call(check_utilization_threshold_mcp(), "utilization"),
        safe_call(run_contingency_analysis_mcp(["N-1", "N-2"]), "contingency")
    ]

    results = await asyncio.gather(*checks)

    # Phase 3: Aggregate and analyze results
    report = {
        "timestamp": datetime.utcnow().isoformat(),
        "date_range": f"{start_date} to {end_date}",
        "checks_run": len(results),
        "checks_passed": sum(1 for r in results if r["success"]),
        "details": {},
        "blocking_issues": [],
        "warnings": [],
        "status": "PASS"
    }

    for result in results:
        check_name = result["check"]
        if result["success"]:
            report["details"][check_name] = result["data"]

            # Check-specific logic
            if check_name == "validation" and not result["data"].get("is_valid"):
                report["blocking_issues"].append("ACGME validation failed")
                report["status"] = "FAIL"

            if check_name == "conflicts" and result["data"].get("conflict_count", 0) > 0:
                report["blocking_issues"].append(
                    f"{result['data']['conflict_count']} conflicts detected"
                )
                report["status"] = "FAIL"

            if check_name == "utilization" and result["data"].get("exceeds_threshold"):
                report["warnings"].append("Utilization exceeds 80% threshold")

        else:
            # Check failed - add to warnings (non-blocking)
            report["warnings"].append(f"{check_name} check failed: {result['error']}")
            report["details"][check_name] = {"error": result["error"]}

    # Phase 4: Generate recommendations
    if report["status"] == "FAIL":
        report["recommendation"] = "DO NOT DEPLOY"
        report["next_steps"] = ["Fix blocking issues", "Re-run safety check"]
    elif report["warnings"]:
        report["recommendation"] = "DEPLOY WITH CAUTION"
        report["next_steps"] = ["Review warnings", "Monitor after deployment"]
    else:
        report["recommendation"] = "SAFE TO DEPLOY"
        report["next_steps"] = ["Proceed with deployment"]

    return report

Example 2: Emergency Coverage with Tiered Strategies

"""Multi-tier emergency coverage orchestration."""

async def handle_emergency_absence(
    absent_faculty_id: str,
    absence_start: date,
    absence_end: date
) -> Dict[str, Any]:
    """
    Handle faculty absence using tiered fallback strategies.

    Strategy Tiers (in order):
    1. Swap-based coverage (least disruptive)
    2. Sacrifice hierarchy (controlled degradation)
    3. Static fallback schedule (pre-approved backup)
    4. Manual escalation (all automation failed)

    Returns:
        Resolution plan with selected strategy
    """
    response = {
        "absent_faculty_id": absent_faculty_id,
        "strategies_attempted": [],
        "selected_strategy": None,
        "execution_steps": []
    }

    # TIER 1: Try swap-based coverage
    print("Tier 1: Attempting swap matching...")
    try:
        # Get affected assignments
        contingency = await run_contingency_analysis_mcp(
            scenario="faculty_absence",
            person_ids=[absent_faculty_id],
            start_date=absence_start,
            end_date=absence_end
        )

        affected = contingency.get("affected_assignments", [])
        swap_success_count = 0

        # Try to find swaps for each affected assignment
        for assignment_id in affected[:10]:  # Limit to first 10
            try:
                candidates = await analyze_swap_candidates_mcp(
                    requester_id=absent_faculty_id,
                    assignment_id=assignment_id
                )

                if candidates.get("candidates"):
                    swap_success_count += 1

            except Exception as e:
                print(f"  Swap analysis failed for {assignment_id}: {e}")

        # If we can cover >= 80% via swaps, use this strategy
        coverage_ratio = swap_success_count / len(affected) if affected else 0
        response["strategies_attempted"].append({
            "tier": 1,
            "strategy": "swap_based",
            "coverage": f"{coverage_ratio:.0%}",
            "viable": coverage_ratio >= 0.8
        })

        if coverage_ratio >= 0.8:
            response["selected_strategy"] = "swap_based_coverage"
            response["execution_steps"] = [
                f"Notify {swap_success_count} potential swap partners",
                "Execute approved swaps",
                "Monitor remaining gaps"
            ]
            return response

    except Exception as e:
        response["strategies_attempted"].append({
            "tier": 1,
            "strategy": "swap_based",
            "error": str(e)
        })

    # TIER 2: Try sacrifice hierarchy
    print("Tier 2: Evaluating sacrifice hierarchy...")
    try:
        sacrifice = await execute_sacrifice_hierarchy_mcp(
            target_level="yellow",
            simulate=True
        )

        has_violations = bool(sacrifice.get("acgme_violations"))
        response["strategies_attempted"].append({
            "tier": 2,
            "strategy": "sacrifice_hierarchy",
            "acgme_compliant": not has_violations,
            "viable": not has_violations
        })

        if not has_violations:
            response["selected_strategy"] = "controlled_degradation"
            response["execution_steps"] = [
                "Activate YELLOW defense level",
                f"Suspend {len(sacrifice.get('suspended_services', []))} services",
                "Notify personnel",
                "Monitor compliance"
            ]
            return response

    except Exception as e:
        response["strategies_attempted"].append({
            "tier": 2,
            "strategy": "sacrifice_hierarchy",
            "error": str(e)
        })

    # TIER 3: Try static fallback
    print("Tier 3: Checking static fallbacks...")
    try:
        fallbacks = await get_static_fallbacks_mcp()

        for fallback in fallbacks.get("fallback_schedules", []):
            if (fallback["start_date"] <= absence_start.isoformat() and
                fallback["end_date"] >= absence_end.isoformat()):

                response["strategies_attempted"].append({
                    "tier": 3,
                    "strategy": "static_fallback",
                    "fallback_name": fallback["name"],
                    "viable": True
                })

                response["selected_strategy"] = "static_fallback"
                response["execution_steps"] = [
                    f"Activate fallback: {fallback['name']}",
                    "Notify all personnel",
                    "Update assignments",
                    "Monitor for 24h"
                ]
                return response

        response["strategies_attempted"].append({
            "tier": 3,
            "strategy": "static_fallback",
            "viable": False,
            "reason": "No suitable fallback found"
        })

    except Exception as e:
        response["strategies_attempted"].append({
            "tier": 3,
            "strategy": "static_fallback",
            "error": str(e)
        })

    # TIER 4: Manual escalation
    print("Tier 4: All automation failed - escalating")
    response["selected_strategy"] = "manual_escalation"
    response["execution_steps"] = [
        "Escalate to Program Director",
        "Provide impact analysis",
        "Request manual coverage assignment"
    ]

    return response

Common Failure Modes and Solutions

Real-world orchestration failures and how to handle them.

Failure Mode 1: MCP Server Unresponsive

Symptoms:

  • All MCP tool calls timeout
  • Connection refused errors
  • No response from health check

Detection:

async def check_mcp_health() -> bool:
    """Verify MCP server is responsive."""
    try:
        # Simple health check with short timeout
        result = await asyncio.wait_for(
            schedule_status_mcp(),
            timeout=5
        )
        return True
    except asyncio.TimeoutError:
        print("ERROR: MCP server not responding")
        return False
    except Exception as e:
        print(f"ERROR: MCP health check failed: {e}")
        return False

Recovery Steps:

# 1. Check container status
docker-compose ps mcp-server

# 2. Check logs for errors
docker-compose logs --tail=50 mcp-server

# 3. Restart container if needed
docker-compose restart mcp-server

# 4. Verify backend connectivity from MCP
docker-compose exec mcp-server curl http://backend:8000/health

Prevention:

  • Implement health checks before orchestration
  • Use circuit breaker pattern for repeated failures
  • Set reasonable timeouts (30s default, not 5min)

Failure Mode 2: Partial Results from Parallel Fan-Out

Symptoms:

  • Some tools succeed, others fail
  • Incomplete data for aggregation
  • Missing expected fields in results

Example Scenario:

# 3 out of 4 checks succeeded - what do we do?
results = await asyncio.gather(*checks, return_exceptions=True)
# [
#   {"data": {...}},           # Success
#   TimeoutError(),             # Failed
#   {"data": {...}},           # Success
#   ConnectionError()           # Failed
# ]

Solution Pattern:

async def robust_parallel_execution(
    tools: List[tuple[str, Callable]],
    min_success_ratio: float = 0.75
) -> Dict[str, Any]:
    """
    Execute tools in parallel with partial failure tolerance.

    Args:
        tools: List of (name, async_callable) tuples
        min_success_ratio: Minimum fraction that must succeed

    Returns:
        Aggregated results with success indicators

    Raises:
        OrchestraionError: If too many tools fail
    """
    tasks = [tool[1]() for tool in tools]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    successes = []
    failures = []

    for (name, _), result in zip(tools, results):
        if isinstance(result, Exception):
            failures.append({"tool": name, "error": str(result)})
        else:
            successes.append({"tool": name, "data": result})

    success_ratio = len(successes) / len(tools)

    report = {
        "total": len(tools),
        "succeeded": len(successes),
        "failed": len(failures),
        "success_ratio": success_ratio,
        "successes": successes,
        "failures": failures
    }

    if success_ratio < min_success_ratio:
        raise OrchestrationError(
            f"Only {success_ratio:.0%} of tools succeeded "
            f"(minimum: {min_success_ratio:.0%})"
        )

    return report

Failure Mode 3: Tool Output Schema Mismatch

Symptoms:

  • KeyError when accessing expected fields
  • Type errors when processing results
  • Unexpected None values

Example:

# Expected schema
expected = {"is_valid": bool, "violations": list}

# Actual response (missing field)
actual = {"is_valid": True}  # Where's violations?

# Code crashes
violations = result["violations"]  # KeyError!

Solution Pattern:

from typing import Optional
from pydantic import BaseModel, Field

class ValidationResult(BaseModel):
    """Expected schema for validation tool."""
    is_valid: bool
    violations: list = Field(default_factory=list)
    timestamp: Optional[str] = None

def safe_extract(raw_result: Dict[str, Any]) -> ValidationResult:
    """
    Safely parse tool output with schema validation.

    Raises:
        ValidationError: If required fields missing or wrong type
    """
    try:
        return ValidationResult(**raw_result)
    except ValidationError as e:
        print(f"WARNING: Tool output schema mismatch: {e}")
        # Return safe default
        return ValidationResult(is_valid=False, violations=["Schema parse error"])

Failure Mode 4: Dependency Chain Breaks Mid-Execution

Symptoms:

  • Tool B needs output from Tool A, but Tool A failed
  • Cascading failures down the dependency chain
  • Incomplete state changes

Example:

# Sequential dependency: A → B → C
result_a = await tool_a()  # Succeeds
result_b = await tool_b(result_a["value"])  # FAILS - what about C?
result_c = await tool_c(result_b["value"])  # Can't run - result_b doesn't exist

Solution Pattern:

async def execute_dependency_chain(
    tools: List[tuple[str, Callable]],
    rollback_on_failure: bool = True
) -> Dict[str, Any]:
    """
    Execute tools with dependencies, rolling back on failure.

    Args:
        tools: List of (name, tool_func) in dependency order
        rollback_on_failure: Whether to undo previous steps on failure

    Returns:
        Chain execution report
    """
    results = {}
    executed_tools = []

    for tool_name, tool_func in tools:
        try:
            # Execute with previous results as context
            result = await tool_func(results)
            results[tool_name] = result
            executed_tools.append(tool_name)

        except Exception as e:
            # Chain broken - decide rollback
            report = {
                "status": "FAILED",
                "failed_at": tool_name,
                "completed": executed_tools,
                "error": str(e)
            }

            if rollback_on_failure:
                print(f"Rolling back {len(executed_tools)} completed steps...")
                for completed_tool in reversed(executed_tools):
                    try:
                        # Call rollback if available
                        rollback_func = getattr(
                            tool_func,
                            "rollback",
                            None
                        )
                        if rollback_func:
                            await rollback_func(results[completed_tool])
                    except Exception as rollback_error:
                        print(f"Rollback failed for {completed_tool}: {rollback_error}")

                report["rollback"] = "ATTEMPTED"

            return report

    return {
        "status": "COMPLETED",
        "completed": executed_tools,
        "results": results
    }

Integration Examples

How to call MCP tools from different contexts.

From FastAPI Endpoint

"""MCP orchestration from API route."""
from fastapi import APIRouter, HTTPException, BackgroundTasks
from datetime import date

router = APIRouter()

@router.post("/schedule/safety-check")
async def api_safety_check(
    start_date: date,
    end_date: date,
    background_tasks: BackgroundTasks
):
    """
    Trigger safety check via API.

    For long-running checks, use background task.
    """
    try:
        # Quick check - run synchronously
        if (end_date - start_date).days <= 7:
            report = await comprehensive_safety_check(start_date, end_date)
            return report

        # Long check - run in background
        else:
            task_id = str(uuid.uuid4())
            background_tasks.add_task(
                run_safety_check_background,
                task_id,
                start_date,
                end_date
            )
            return {
                "task_id": task_id,
                "status": "RUNNING",
                "message": "Check started in background"
            }

    except Exception as e:
        raise HTTPException(
            status_code=500,
            detail=f"Safety check failed: {str(e)}"
        )

From Celery Task

"""MCP orchestration from Celery background task."""
from celery import shared_task

@shared_task(bind=True, max_retries=3)
def celery_emergency_coverage(
    self,
    absent_faculty_id: str,
    absence_start: str,
    absence_end: str
):
    """
    Handle emergency coverage in background.

    Retries up to 3 times on transient failures.
    """
    try:
        # Celery tasks aren't async - use asyncio.run
        result = asyncio.run(
            handle_emergency_absence(
                absent_faculty_id,
                date.fromisoformat(absence_start),
                date.fromisoformat(absence_end)
            )
        )

        return {
            "status": "COMPLETED",
            "strategy": result["selected_strategy"],
            "execution_plan": result["execution_steps"]
        }

    except Exception as e:
        # Retry on transient errors
        if "timeout" in str(e).lower() or "connection" in str(e).lower():
            self.retry(exc=e, countdown=30)  # Retry after 30s
        else:
            # Permanent error - don't retry
            return {
                "status": "FAILED",
                "error": str(e)
            }

From CLI Script

"""MCP orchestration from command-line script."""
import asyncio
import sys
from datetime import date

async def main():
    """CLI entrypoint for safety check."""
    if len(sys.argv) != 3:
        print("Usage: safety_check.py START_DATE END_DATE")
        print("Example: safety_check.py 2025-01-15 2025-02-14")
        sys.exit(1)

    start = date.fromisoformat(sys.argv[1])
    end = date.fromisoformat(sys.argv[2])

    print(f"Running safety check: {start} to {end}")
    print("-" * 60)

    report = await comprehensive_safety_check(start, end)

    print(f"\nStatus: {report['status']}")
    print(f"Recommendation: {report['recommendation']}")

    if report['blocking_issues']:
        print("\nBlocking Issues:")
        for issue in report['blocking_issues']:
            print(f"  ❌ {issue}")

    if report['warnings']:
        print("\nWarnings:")
        for warning in report['warnings']:
            print(f"  ⚠️  {warning}")

    print("\nNext Steps:")
    for step in report['next_steps']:
        print(f"  • {step}")

    # Exit code based on status
    sys.exit(0 if report['status'] == "PASS" else 1)

if __name__ == "__main__":
    asyncio.run(main())

Best Practices

  1. Always check tool prerequisites before execution
  2. Use background tasks for long-running operations (>30s)
  3. Poll task status instead of blocking on Celery tasks
  4. Implement timeouts for all MCP calls (default: 30s)
  5. Log all tool inputs/outputs for debugging
  6. Cache tool results when appropriate (compliance summary, static fallbacks)
  7. Parallelize independent tools to reduce latency
  8. Handle partial failures gracefully in fan-out patterns

Troubleshooting

MCP Server Not Responding

# Check container status
docker-compose ps mcp-server

# View logs
docker-compose logs -f mcp-server

# Restart server
docker-compose restart mcp-server

Backend API Unreachable from MCP

# Test connectivity from MCP container
docker-compose exec mcp-server curl -s http://backend:8000/health

# Check network
docker network inspect autonomous-assignment-program-manager_default

Tool Returns Unexpected Result

  1. Check tool signature in Reference/mcp-tool-index.md
  2. Verify input schema matches expected format
  3. Check backend API logs: docker-compose logs backend
  4. Review error patterns in Reference/tool-error-patterns.md

Real-World Scenario: Multi-Skill Integration

Complete end-to-end example showing MCP orchestration integrated with other skills.

Scenario: Pre-Deployment Safety Validation

User Request: "Validate Block 10 schedule before deploying to production"

Orchestration Flow:

MCP_ORCHESTRATION (this skill)
    ↓
    Invokes: comprehensive_safety_check() using MCP tools
    ↓
    ├─→ constraint-preflight (verify all constraints registered)
    ├─→ schedule-validator (ACGME compliance verification)
    └─→ safe-schedule-generation (ensure backup exists)
    ↓
    If PASS: production-incident-responder (deployment monitoring)
    If FAIL: systematic-debugger (investigate failures)

Implementation:

async def validate_block_10_deployment():
    """
    Complete pre-deployment validation orchestration.

    Integrates multiple skills and MCP tools.
    """
    print("Step 1: MCP_ORCHESTRATION - Running comprehensive safety check...")

    # Use MCP tools for parallel checks
    safety_report = await comprehensive_safety_check(
        start_date=date(2025, 2, 3),
        end_date=date(2025, 3, 2)
    )

    if safety_report["status"] == "FAIL":
        print("\n❌ Safety check FAILED")
        print("Step 2: Invoking systematic-debugger skill...")

        # Systematic debugging of failures
        for issue in safety_report["blocking_issues"]:
            print(f"  Investigating: {issue}")
            # Debugger skill would explore root causes

        return {
            "deployment_approved": False,
            "reason": "Safety check failed",
            "next_action": "Fix issues identified by systematic-debugger"
        }

    # Additional validation via other skills
    print("\n✓ MCP safety checks passed")
    print("Step 2: Running constraint-preflight...")

    # Ensure all constraints are properly registered
    preflight_ok = await run_constraint_preflight()

    if not preflight_ok:
        return {
            "deployment_approved": False,
            "reason": "Constraint preflight failed",
            "next_action": "Review constraint registration"
        }

    print("Step 3: Running schedule-validator...")

    # Detailed ACGME compliance check
    validation_result = await run_schedule_validator(
        start_date=date(2025, 2, 3),
        end_date=date(2025, 3, 2)
    )

    if not validation_result["acgme_compliant"]:
        return {
            "deployment_approved": False,
            "reason": "ACGME violations detected",
            "violations": validation_result["violations"],
            "next_action": "Fix compliance violations"
        }

    print("Step 4: Ensuring database backup exists...")

    # Safe schedule generation - verify backup
    backup_status = await check_schedule_backup()

    if not backup_status["backup_exists"]:
        print("  Creating backup before deployment...")
        await create_schedule_backup()

    # All checks passed - approve deployment
    print("\n✅ All validation passed - DEPLOYMENT APPROVED")

    return {
        "deployment_approved": True,
        "safety_report": safety_report,
        "acgme_compliant": True,
        "backup_id": backup_status.get("backup_id"),
        "next_action": "Proceed to production deployment"
    }

Output Example:

Step 1: MCP_ORCHESTRATION - Running comprehensive safety check...
  ✓ Schedule validation: PASS
  ✓ Conflict detection: PASS (0 conflicts)
  ✓ Utilization check: PASS (78.3% < 80%)
  ⚠ Contingency analysis: 1 N-1 failure detected

✓ MCP safety checks passed
Step 2: Running constraint-preflight...
  ✓ All 47 constraints registered
  ✓ No orphaned constraints detected

Step 3: Running schedule-validator...
  ✓ 80-hour rule: PASS (all weeks compliant)
  ✓ 1-in-7 rule: PASS (all residents have 1 day off per week)
  ✓ Supervision ratios: PASS

Step 4: Ensuring database backup exists...
  Backup exists: backup_20250202_143022

✅ All validation passed - DEPLOYMENT APPROVED

Next Action: Proceed to production deployment

Related Skills

  • constraint-preflight: Verify constraints before schedule generation (integrates with validation workflow)
  • safe-schedule-generation: Database backup before write operations (ensures rollback capability)
  • production-incident-responder: Crisis response using MCP tools (handles deployment failures)
  • systematic-debugger: Root cause analysis of tool failures (investigates orchestration errors)
  • schedule-validator: ACGME compliance verification (complements MCP validation tools)
  • acgme-compliance: Regulatory expertise (validates MCP tool compliance checks)

Version

  • Created: 2025-12-26
  • MCP Server Version: 0.1.0
  • Total Tools: 36
  • Backend API: FastAPI 0.109.0

For detailed tool signatures and schemas, see Reference/mcp-tool-index.md For error handling procedures, see Workflows/error-handling.md For composition patterns, see Reference/composition-examples.md

Repository

Euda1mon1a
Euda1mon1a
Author
Euda1mon1a/Autonomous-Assignment-Program-Manager/.claude/skills/MCP_ORCHESTRATION
2
Stars
0
Forks
Updated4d ago
Added1w ago