MCP_ORCHESTRATION
Tool discovery, routing, chaining, error handling, and composition for the 34+ MCP scheduling tools. Use when orchestrating complex multi-tool workflows, handling MCP errors, or discovering available capabilities.
$ Installer
git clone https://github.com/Euda1mon1a/Autonomous-Assignment-Program-Manager /tmp/Autonomous-Assignment-Program-Manager && cp -r /tmp/Autonomous-Assignment-Program-Manager/.claude/skills/MCP_ORCHESTRATION ~/.claude/skills/Autonomous-Assignment-Program-Manager// tip: Run this command in your terminal to install the skill
name: MCP_ORCHESTRATION description: Tool discovery, routing, chaining, error handling, and composition for the 34+ MCP scheduling tools. Use when orchestrating complex multi-tool workflows, handling MCP errors, or discovering available capabilities. model_tier: haiku parallel_hints: can_parallel_with: [code-review, test-writer, security-audit] must_serialize_with: [database-migration] preferred_batch_size: 5
MCP Orchestration Skill
Expert orchestration of Model Context Protocol (MCP) tools for medical residency scheduling. Handles tool discovery, intelligent routing, error recovery, and complex multi-tool composition.
When This Skill Activates
- Multi-step workflows requiring 2+ MCP tools
- Error recovery from failed MCP calls
- Tool capability discovery needed
- Complex scheduling operations requiring orchestration
- Debugging MCP integration issues
- Performance optimization of tool chains
Overview
The MCP server exposes 34+ specialized tools across 6 categories:
| Category | Tools | Purpose |
|---|---|---|
| Core Scheduling | 5 | Validation, generation, conflict detection, swaps |
| Resilience Framework | 13 | Utilization, contingency, defense levels, homeostasis |
| Background Tasks | 4 | Celery task management (start, status, cancel, list) |
| Deployment | 7 | Validation, security, smoke tests, rollback |
| Empirical Testing | 5 | Benchmarking solvers, constraints, modules |
| Resources | 2 | Schedule status, compliance summary |
Total: 36 tools available for orchestration.
Key Orchestration Phases
Phase 1: Discovery
- Identify available tools matching task requirements
- Check tool availability and health
- Verify prerequisites (DB connection, API availability)
- Map inputs/outputs between dependent tools
Phase 2: Planning
- Create execution DAG (directed acyclic graph)
- Identify parallel vs sequential dependencies
- Plan error handling checkpoints
- Estimate execution time and resource usage
Phase 3: Execution
- Execute tools in dependency order
- Handle transient errors with retry logic
- Propagate results through tool chain
- Monitor progress and resource utilization
Phase 4: Recovery
- Detect permanent vs transient failures
- Execute fallback strategies
- Rollback partial state changes if needed
- Log errors for human escalation
Orchestration Patterns
Pattern 1: Sequential Chain
Tool A β Tool B β Tool C
Each tool depends on previous tool's output.
Example: Schedule Generation Pipeline
validate_deployment β generate_schedule β validate_schedule β run_smoke_tests
Pattern 2: Parallel Fan-Out
β Tool B
Tool A β Tool C
β Tool D
Multiple tools execute concurrently on same input.
Example: Comprehensive Schedule Analysis
β validate_schedule
schedule_status β detect_conflicts
β check_utilization_threshold
Pattern 3: Map-Reduce
Tool A β [Tool B, Tool B, Tool B] β Tool C
Parallel execution followed by aggregation.
Example: Multi-Person Swap Analysis
For each faculty:
analyze_swap_candidates β aggregate_results β rank_by_score
Pattern 4: Conditional Routing
Tool A β Decision β Tool B (if condition)
β Tool C (else)
Example: Deployment Workflow
validate_deployment β (if valid) β promote_to_production
β (else) β rollback_deployment
Key Files
| File | Purpose |
|---|---|
Workflows/tool-discovery.md | MCP endpoint scanning and capability mapping |
Workflows/error-handling.md | Retry logic, fallback strategies, escalation |
Workflows/tool-composition.md | DAG patterns, parallel execution, result synthesis |
Reference/mcp-tool-index.md | Complete tool catalog with I/O schemas |
Reference/tool-error-patterns.md | Known failure modes and workarounds |
Reference/composition-examples.md | Real-world multi-tool chains |
Output
This skill produces:
- Execution Plans: DAG of tool dependencies with timing estimates
- Error Reports: Categorized failures with recovery recommendations
- Performance Metrics: Latency, throughput, resource usage
- Capability Maps: Which tools can satisfy which requirements
Error Handling Strategy
See Workflows/error-handling.md for complete strategy. Key principles:
- Retry Transient Errors: Network timeouts, rate limits, DB locks
- Fail Fast on Permanent Errors: Invalid inputs, missing resources
- Graceful Degradation: Use cached data or reduced functionality
- Human Escalation: Alert on unrecoverable errors
Integration with MCP Server
The MCP server runs in Docker container mcp-server and exposes tools via:
- STDIO Transport: For Claude Desktop integration
- HTTP Transport (dev mode): Port 8080 for debugging
Health Check
docker-compose logs -f mcp-server
docker-compose exec mcp-server python -c \
"from scheduler_mcp.server import mcp; print(f'Tools: {len(mcp.tools)}')"
API Connectivity Test
docker-compose exec mcp-server curl -s http://backend:8000/health
Common Workflows
1. Schedule Safety Check
Goal: Comprehensive validation before deployment
Parallel:
- validate_schedule(date_range)
- detect_conflicts(date_range)
- check_utilization_threshold()
- run_contingency_analysis_resilience(N-1, N-2)
Aggregate results β Generate safety report
2. Emergency Coverage Response
Goal: Handle faculty absence with minimal disruption
1. run_contingency_analysis(scenario="faculty_absence", person_ids=[...])
2. For each resolution_option:
analyze_swap_candidates(requester_id, assignment_id)
3. execute_sacrifice_hierarchy(target_level="yellow", simulate=True)
4. get_static_fallbacks() β Identify pre-computed schedules
3. Deployment Pipeline
Goal: Safe production deployment
1. validate_deployment(env="staging", git_ref="main")
2. run_security_scan(git_ref="main")
3. If all passed:
run_smoke_tests(env="staging", suite="full")
4. If smoke tests passed:
promote_to_production(staging_version, approval_token)
5. Monitor: get_deployment_status(deployment_id)
4. Performance Optimization
Goal: Identify and remove low-value code
Parallel:
- benchmark_solvers(scenario_count=20)
- benchmark_constraints(test_schedules="historical")
- benchmark_resilience(modules=all)
- module_usage_analysis(entry_points=["main", "api", "scheduling"])
Aggregate β Generate cut list β ablation_study(module_path)
Concrete Implementation Examples
The workflows above are high-level. This section provides detailed, runnable code examples.
Example 1: Parallel Safety Check with Error Handling
"""Complete safety check implementation with error handling."""
import asyncio
from typing import Dict, List, Any
from datetime import date, datetime
async def comprehensive_safety_check(
start_date: date,
end_date: date,
timeout_seconds: int = 60
) -> Dict[str, Any]:
"""
Orchestrate parallel safety checks with robust error handling.
Args:
start_date: Schedule validation start date
end_date: Schedule validation end date
timeout_seconds: Maximum time to wait for all checks
Returns:
Safety report with pass/fail status and recommendations
"""
# Phase 1: Define all checks with timeout wrapper
async def safe_call(coro, check_name: str):
"""Wrapper that handles timeouts and exceptions."""
try:
result = await asyncio.wait_for(coro, timeout=30)
return {"success": True, "data": result, "check": check_name}
except asyncio.TimeoutError:
return {"success": False, "error": "Timeout after 30s", "check": check_name}
except Exception as e:
return {"success": False, "error": str(e), "check": check_name}
# Phase 2: Execute checks in parallel
checks = [
safe_call(validate_schedule_mcp(start_date, end_date), "validation"),
safe_call(detect_conflicts_mcp(start_date, end_date), "conflicts"),
safe_call(check_utilization_threshold_mcp(), "utilization"),
safe_call(run_contingency_analysis_mcp(["N-1", "N-2"]), "contingency")
]
results = await asyncio.gather(*checks)
# Phase 3: Aggregate and analyze results
report = {
"timestamp": datetime.utcnow().isoformat(),
"date_range": f"{start_date} to {end_date}",
"checks_run": len(results),
"checks_passed": sum(1 for r in results if r["success"]),
"details": {},
"blocking_issues": [],
"warnings": [],
"status": "PASS"
}
for result in results:
check_name = result["check"]
if result["success"]:
report["details"][check_name] = result["data"]
# Check-specific logic
if check_name == "validation" and not result["data"].get("is_valid"):
report["blocking_issues"].append("ACGME validation failed")
report["status"] = "FAIL"
if check_name == "conflicts" and result["data"].get("conflict_count", 0) > 0:
report["blocking_issues"].append(
f"{result['data']['conflict_count']} conflicts detected"
)
report["status"] = "FAIL"
if check_name == "utilization" and result["data"].get("exceeds_threshold"):
report["warnings"].append("Utilization exceeds 80% threshold")
else:
# Check failed - add to warnings (non-blocking)
report["warnings"].append(f"{check_name} check failed: {result['error']}")
report["details"][check_name] = {"error": result["error"]}
# Phase 4: Generate recommendations
if report["status"] == "FAIL":
report["recommendation"] = "DO NOT DEPLOY"
report["next_steps"] = ["Fix blocking issues", "Re-run safety check"]
elif report["warnings"]:
report["recommendation"] = "DEPLOY WITH CAUTION"
report["next_steps"] = ["Review warnings", "Monitor after deployment"]
else:
report["recommendation"] = "SAFE TO DEPLOY"
report["next_steps"] = ["Proceed with deployment"]
return report
Example 2: Emergency Coverage with Tiered Strategies
"""Multi-tier emergency coverage orchestration."""
async def handle_emergency_absence(
absent_faculty_id: str,
absence_start: date,
absence_end: date
) -> Dict[str, Any]:
"""
Handle faculty absence using tiered fallback strategies.
Strategy Tiers (in order):
1. Swap-based coverage (least disruptive)
2. Sacrifice hierarchy (controlled degradation)
3. Static fallback schedule (pre-approved backup)
4. Manual escalation (all automation failed)
Returns:
Resolution plan with selected strategy
"""
response = {
"absent_faculty_id": absent_faculty_id,
"strategies_attempted": [],
"selected_strategy": None,
"execution_steps": []
}
# TIER 1: Try swap-based coverage
print("Tier 1: Attempting swap matching...")
try:
# Get affected assignments
contingency = await run_contingency_analysis_mcp(
scenario="faculty_absence",
person_ids=[absent_faculty_id],
start_date=absence_start,
end_date=absence_end
)
affected = contingency.get("affected_assignments", [])
swap_success_count = 0
# Try to find swaps for each affected assignment
for assignment_id in affected[:10]: # Limit to first 10
try:
candidates = await analyze_swap_candidates_mcp(
requester_id=absent_faculty_id,
assignment_id=assignment_id
)
if candidates.get("candidates"):
swap_success_count += 1
except Exception as e:
print(f" Swap analysis failed for {assignment_id}: {e}")
# If we can cover >= 80% via swaps, use this strategy
coverage_ratio = swap_success_count / len(affected) if affected else 0
response["strategies_attempted"].append({
"tier": 1,
"strategy": "swap_based",
"coverage": f"{coverage_ratio:.0%}",
"viable": coverage_ratio >= 0.8
})
if coverage_ratio >= 0.8:
response["selected_strategy"] = "swap_based_coverage"
response["execution_steps"] = [
f"Notify {swap_success_count} potential swap partners",
"Execute approved swaps",
"Monitor remaining gaps"
]
return response
except Exception as e:
response["strategies_attempted"].append({
"tier": 1,
"strategy": "swap_based",
"error": str(e)
})
# TIER 2: Try sacrifice hierarchy
print("Tier 2: Evaluating sacrifice hierarchy...")
try:
sacrifice = await execute_sacrifice_hierarchy_mcp(
target_level="yellow",
simulate=True
)
has_violations = bool(sacrifice.get("acgme_violations"))
response["strategies_attempted"].append({
"tier": 2,
"strategy": "sacrifice_hierarchy",
"acgme_compliant": not has_violations,
"viable": not has_violations
})
if not has_violations:
response["selected_strategy"] = "controlled_degradation"
response["execution_steps"] = [
"Activate YELLOW defense level",
f"Suspend {len(sacrifice.get('suspended_services', []))} services",
"Notify personnel",
"Monitor compliance"
]
return response
except Exception as e:
response["strategies_attempted"].append({
"tier": 2,
"strategy": "sacrifice_hierarchy",
"error": str(e)
})
# TIER 3: Try static fallback
print("Tier 3: Checking static fallbacks...")
try:
fallbacks = await get_static_fallbacks_mcp()
for fallback in fallbacks.get("fallback_schedules", []):
if (fallback["start_date"] <= absence_start.isoformat() and
fallback["end_date"] >= absence_end.isoformat()):
response["strategies_attempted"].append({
"tier": 3,
"strategy": "static_fallback",
"fallback_name": fallback["name"],
"viable": True
})
response["selected_strategy"] = "static_fallback"
response["execution_steps"] = [
f"Activate fallback: {fallback['name']}",
"Notify all personnel",
"Update assignments",
"Monitor for 24h"
]
return response
response["strategies_attempted"].append({
"tier": 3,
"strategy": "static_fallback",
"viable": False,
"reason": "No suitable fallback found"
})
except Exception as e:
response["strategies_attempted"].append({
"tier": 3,
"strategy": "static_fallback",
"error": str(e)
})
# TIER 4: Manual escalation
print("Tier 4: All automation failed - escalating")
response["selected_strategy"] = "manual_escalation"
response["execution_steps"] = [
"Escalate to Program Director",
"Provide impact analysis",
"Request manual coverage assignment"
]
return response
Common Failure Modes and Solutions
Real-world orchestration failures and how to handle them.
Failure Mode 1: MCP Server Unresponsive
Symptoms:
- All MCP tool calls timeout
- Connection refused errors
- No response from health check
Detection:
async def check_mcp_health() -> bool:
"""Verify MCP server is responsive."""
try:
# Simple health check with short timeout
result = await asyncio.wait_for(
schedule_status_mcp(),
timeout=5
)
return True
except asyncio.TimeoutError:
print("ERROR: MCP server not responding")
return False
except Exception as e:
print(f"ERROR: MCP health check failed: {e}")
return False
Recovery Steps:
# 1. Check container status
docker-compose ps mcp-server
# 2. Check logs for errors
docker-compose logs --tail=50 mcp-server
# 3. Restart container if needed
docker-compose restart mcp-server
# 4. Verify backend connectivity from MCP
docker-compose exec mcp-server curl http://backend:8000/health
Prevention:
- Implement health checks before orchestration
- Use circuit breaker pattern for repeated failures
- Set reasonable timeouts (30s default, not 5min)
Failure Mode 2: Partial Results from Parallel Fan-Out
Symptoms:
- Some tools succeed, others fail
- Incomplete data for aggregation
- Missing expected fields in results
Example Scenario:
# 3 out of 4 checks succeeded - what do we do?
results = await asyncio.gather(*checks, return_exceptions=True)
# [
# {"data": {...}}, # Success
# TimeoutError(), # Failed
# {"data": {...}}, # Success
# ConnectionError() # Failed
# ]
Solution Pattern:
async def robust_parallel_execution(
tools: List[tuple[str, Callable]],
min_success_ratio: float = 0.75
) -> Dict[str, Any]:
"""
Execute tools in parallel with partial failure tolerance.
Args:
tools: List of (name, async_callable) tuples
min_success_ratio: Minimum fraction that must succeed
Returns:
Aggregated results with success indicators
Raises:
OrchestraionError: If too many tools fail
"""
tasks = [tool[1]() for tool in tools]
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = []
failures = []
for (name, _), result in zip(tools, results):
if isinstance(result, Exception):
failures.append({"tool": name, "error": str(result)})
else:
successes.append({"tool": name, "data": result})
success_ratio = len(successes) / len(tools)
report = {
"total": len(tools),
"succeeded": len(successes),
"failed": len(failures),
"success_ratio": success_ratio,
"successes": successes,
"failures": failures
}
if success_ratio < min_success_ratio:
raise OrchestrationError(
f"Only {success_ratio:.0%} of tools succeeded "
f"(minimum: {min_success_ratio:.0%})"
)
return report
Failure Mode 3: Tool Output Schema Mismatch
Symptoms:
- KeyError when accessing expected fields
- Type errors when processing results
- Unexpected None values
Example:
# Expected schema
expected = {"is_valid": bool, "violations": list}
# Actual response (missing field)
actual = {"is_valid": True} # Where's violations?
# Code crashes
violations = result["violations"] # KeyError!
Solution Pattern:
from typing import Optional
from pydantic import BaseModel, Field
class ValidationResult(BaseModel):
"""Expected schema for validation tool."""
is_valid: bool
violations: list = Field(default_factory=list)
timestamp: Optional[str] = None
def safe_extract(raw_result: Dict[str, Any]) -> ValidationResult:
"""
Safely parse tool output with schema validation.
Raises:
ValidationError: If required fields missing or wrong type
"""
try:
return ValidationResult(**raw_result)
except ValidationError as e:
print(f"WARNING: Tool output schema mismatch: {e}")
# Return safe default
return ValidationResult(is_valid=False, violations=["Schema parse error"])
Failure Mode 4: Dependency Chain Breaks Mid-Execution
Symptoms:
- Tool B needs output from Tool A, but Tool A failed
- Cascading failures down the dependency chain
- Incomplete state changes
Example:
# Sequential dependency: A β B β C
result_a = await tool_a() # Succeeds
result_b = await tool_b(result_a["value"]) # FAILS - what about C?
result_c = await tool_c(result_b["value"]) # Can't run - result_b doesn't exist
Solution Pattern:
async def execute_dependency_chain(
tools: List[tuple[str, Callable]],
rollback_on_failure: bool = True
) -> Dict[str, Any]:
"""
Execute tools with dependencies, rolling back on failure.
Args:
tools: List of (name, tool_func) in dependency order
rollback_on_failure: Whether to undo previous steps on failure
Returns:
Chain execution report
"""
results = {}
executed_tools = []
for tool_name, tool_func in tools:
try:
# Execute with previous results as context
result = await tool_func(results)
results[tool_name] = result
executed_tools.append(tool_name)
except Exception as e:
# Chain broken - decide rollback
report = {
"status": "FAILED",
"failed_at": tool_name,
"completed": executed_tools,
"error": str(e)
}
if rollback_on_failure:
print(f"Rolling back {len(executed_tools)} completed steps...")
for completed_tool in reversed(executed_tools):
try:
# Call rollback if available
rollback_func = getattr(
tool_func,
"rollback",
None
)
if rollback_func:
await rollback_func(results[completed_tool])
except Exception as rollback_error:
print(f"Rollback failed for {completed_tool}: {rollback_error}")
report["rollback"] = "ATTEMPTED"
return report
return {
"status": "COMPLETED",
"completed": executed_tools,
"results": results
}
Integration Examples
How to call MCP tools from different contexts.
From FastAPI Endpoint
"""MCP orchestration from API route."""
from fastapi import APIRouter, HTTPException, BackgroundTasks
from datetime import date
router = APIRouter()
@router.post("/schedule/safety-check")
async def api_safety_check(
start_date: date,
end_date: date,
background_tasks: BackgroundTasks
):
"""
Trigger safety check via API.
For long-running checks, use background task.
"""
try:
# Quick check - run synchronously
if (end_date - start_date).days <= 7:
report = await comprehensive_safety_check(start_date, end_date)
return report
# Long check - run in background
else:
task_id = str(uuid.uuid4())
background_tasks.add_task(
run_safety_check_background,
task_id,
start_date,
end_date
)
return {
"task_id": task_id,
"status": "RUNNING",
"message": "Check started in background"
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Safety check failed: {str(e)}"
)
From Celery Task
"""MCP orchestration from Celery background task."""
from celery import shared_task
@shared_task(bind=True, max_retries=3)
def celery_emergency_coverage(
self,
absent_faculty_id: str,
absence_start: str,
absence_end: str
):
"""
Handle emergency coverage in background.
Retries up to 3 times on transient failures.
"""
try:
# Celery tasks aren't async - use asyncio.run
result = asyncio.run(
handle_emergency_absence(
absent_faculty_id,
date.fromisoformat(absence_start),
date.fromisoformat(absence_end)
)
)
return {
"status": "COMPLETED",
"strategy": result["selected_strategy"],
"execution_plan": result["execution_steps"]
}
except Exception as e:
# Retry on transient errors
if "timeout" in str(e).lower() or "connection" in str(e).lower():
self.retry(exc=e, countdown=30) # Retry after 30s
else:
# Permanent error - don't retry
return {
"status": "FAILED",
"error": str(e)
}
From CLI Script
"""MCP orchestration from command-line script."""
import asyncio
import sys
from datetime import date
async def main():
"""CLI entrypoint for safety check."""
if len(sys.argv) != 3:
print("Usage: safety_check.py START_DATE END_DATE")
print("Example: safety_check.py 2025-01-15 2025-02-14")
sys.exit(1)
start = date.fromisoformat(sys.argv[1])
end = date.fromisoformat(sys.argv[2])
print(f"Running safety check: {start} to {end}")
print("-" * 60)
report = await comprehensive_safety_check(start, end)
print(f"\nStatus: {report['status']}")
print(f"Recommendation: {report['recommendation']}")
if report['blocking_issues']:
print("\nBlocking Issues:")
for issue in report['blocking_issues']:
print(f" β {issue}")
if report['warnings']:
print("\nWarnings:")
for warning in report['warnings']:
print(f" β οΈ {warning}")
print("\nNext Steps:")
for step in report['next_steps']:
print(f" β’ {step}")
# Exit code based on status
sys.exit(0 if report['status'] == "PASS" else 1)
if __name__ == "__main__":
asyncio.run(main())
Best Practices
- Always check tool prerequisites before execution
- Use background tasks for long-running operations (>30s)
- Poll task status instead of blocking on Celery tasks
- Implement timeouts for all MCP calls (default: 30s)
- Log all tool inputs/outputs for debugging
- Cache tool results when appropriate (compliance summary, static fallbacks)
- Parallelize independent tools to reduce latency
- Handle partial failures gracefully in fan-out patterns
Troubleshooting
MCP Server Not Responding
# Check container status
docker-compose ps mcp-server
# View logs
docker-compose logs -f mcp-server
# Restart server
docker-compose restart mcp-server
Backend API Unreachable from MCP
# Test connectivity from MCP container
docker-compose exec mcp-server curl -s http://backend:8000/health
# Check network
docker network inspect autonomous-assignment-program-manager_default
Tool Returns Unexpected Result
- Check tool signature in
Reference/mcp-tool-index.md - Verify input schema matches expected format
- Check backend API logs:
docker-compose logs backend - Review error patterns in
Reference/tool-error-patterns.md
Real-World Scenario: Multi-Skill Integration
Complete end-to-end example showing MCP orchestration integrated with other skills.
Scenario: Pre-Deployment Safety Validation
User Request: "Validate Block 10 schedule before deploying to production"
Orchestration Flow:
MCP_ORCHESTRATION (this skill)
β
Invokes: comprehensive_safety_check() using MCP tools
β
βββ constraint-preflight (verify all constraints registered)
βββ schedule-validator (ACGME compliance verification)
βββ safe-schedule-generation (ensure backup exists)
β
If PASS: production-incident-responder (deployment monitoring)
If FAIL: systematic-debugger (investigate failures)
Implementation:
async def validate_block_10_deployment():
"""
Complete pre-deployment validation orchestration.
Integrates multiple skills and MCP tools.
"""
print("Step 1: MCP_ORCHESTRATION - Running comprehensive safety check...")
# Use MCP tools for parallel checks
safety_report = await comprehensive_safety_check(
start_date=date(2025, 2, 3),
end_date=date(2025, 3, 2)
)
if safety_report["status"] == "FAIL":
print("\nβ Safety check FAILED")
print("Step 2: Invoking systematic-debugger skill...")
# Systematic debugging of failures
for issue in safety_report["blocking_issues"]:
print(f" Investigating: {issue}")
# Debugger skill would explore root causes
return {
"deployment_approved": False,
"reason": "Safety check failed",
"next_action": "Fix issues identified by systematic-debugger"
}
# Additional validation via other skills
print("\nβ MCP safety checks passed")
print("Step 2: Running constraint-preflight...")
# Ensure all constraints are properly registered
preflight_ok = await run_constraint_preflight()
if not preflight_ok:
return {
"deployment_approved": False,
"reason": "Constraint preflight failed",
"next_action": "Review constraint registration"
}
print("Step 3: Running schedule-validator...")
# Detailed ACGME compliance check
validation_result = await run_schedule_validator(
start_date=date(2025, 2, 3),
end_date=date(2025, 3, 2)
)
if not validation_result["acgme_compliant"]:
return {
"deployment_approved": False,
"reason": "ACGME violations detected",
"violations": validation_result["violations"],
"next_action": "Fix compliance violations"
}
print("Step 4: Ensuring database backup exists...")
# Safe schedule generation - verify backup
backup_status = await check_schedule_backup()
if not backup_status["backup_exists"]:
print(" Creating backup before deployment...")
await create_schedule_backup()
# All checks passed - approve deployment
print("\nβ
All validation passed - DEPLOYMENT APPROVED")
return {
"deployment_approved": True,
"safety_report": safety_report,
"acgme_compliant": True,
"backup_id": backup_status.get("backup_id"),
"next_action": "Proceed to production deployment"
}
Output Example:
Step 1: MCP_ORCHESTRATION - Running comprehensive safety check...
β Schedule validation: PASS
β Conflict detection: PASS (0 conflicts)
β Utilization check: PASS (78.3% < 80%)
β Contingency analysis: 1 N-1 failure detected
β MCP safety checks passed
Step 2: Running constraint-preflight...
β All 47 constraints registered
β No orphaned constraints detected
Step 3: Running schedule-validator...
β 80-hour rule: PASS (all weeks compliant)
β 1-in-7 rule: PASS (all residents have 1 day off per week)
β Supervision ratios: PASS
Step 4: Ensuring database backup exists...
Backup exists: backup_20250202_143022
β
All validation passed - DEPLOYMENT APPROVED
Next Action: Proceed to production deployment
Related Skills
- constraint-preflight: Verify constraints before schedule generation (integrates with validation workflow)
- safe-schedule-generation: Database backup before write operations (ensures rollback capability)
- production-incident-responder: Crisis response using MCP tools (handles deployment failures)
- systematic-debugger: Root cause analysis of tool failures (investigates orchestration errors)
- schedule-validator: ACGME compliance verification (complements MCP validation tools)
- acgme-compliance: Regulatory expertise (validates MCP tool compliance checks)
Version
- Created: 2025-12-26
- MCP Server Version: 0.1.0
- Total Tools: 36
- Backend API: FastAPI 0.109.0
For detailed tool signatures and schemas, see Reference/mcp-tool-index.md
For error handling procedures, see Workflows/error-handling.md
For composition patterns, see Reference/composition-examples.md
Repository
