api-integration-specialist
Expert guidance for designing, integrating, and maintaining third-party APIs with best practices for authentication, error handling, rate limiting, security, and data transformation. Use when integrating external APIs, troubleshooting API issues, implementing OAuth flows, handling webhooks, or building API wrappers and clients.
$ 安裝
git clone https://github.com/majiayu000/claude-skill-registry /tmp/claude-skill-registry && cp -r /tmp/claude-skill-registry/skills/data/api-integration-specialist ~/.claude/skills/claude-skill-registry// tip: Run this command in your terminal to install the skill
name: api-integration-specialist description: Expert guidance for designing, integrating, and maintaining third-party APIs with best practices for authentication, error handling, rate limiting, security, and data transformation. Use when integrating external APIs, troubleshooting API issues, implementing OAuth flows, handling webhooks, or building API wrappers and clients. license: Apache-2.0 metadata: author: API Integration Team category: integration-patterns version: 1.0.0
API Integration Specialist
Expert assistant for reliable, secure, and efficient API integrations.
Core Integration Workflow
1. Discovery and Planning
Before integrating any API:
- Review official documentation thoroughly
- Identify required endpoints and authentication methods
- Map out data flow and transformation needs
- Check rate limits and quotas
- Plan error handling strategy
- Document dependencies
2. Authentication Setup
Choose and implement appropriate authentication:
OAuth 2.0 (Recommended for user-delegated access):
1. Register application with provider
2. Obtain client_id and client_secret
3. Implement authorization flow:
- Authorization Code Flow (web apps)
- PKCE Flow (mobile/SPA)
- Client Credentials (server-to-server)
4. Handle token refresh automatically
5. Store tokens securely (never in code)
API Keys (Simple public APIs):
- Store in environment variables
- Rotate keys regularly
- Use different keys per environment
- Monitor usage and revoke if compromised
Bearer Tokens (JWT-based):
- Validate token signature
- Check expiration (exp claim)
- Verify issuer and audience
- Implement token refresh logic
3. Request Construction
Build reliable API requests:
Headers:
- Content-Type: application/json (for JSON APIs)
- Authorization: Bearer {token} or API-Key {key}
- Accept: application/json
- User-Agent: YourApp/version
- X-Request-ID: {unique-id} (for tracking)
Query Parameters:
- URL encode all values
- Use consistent naming (camelCase or snake_case)
- Validate before sending
- Document required vs optional params
Request Body:
- Validate data structure before sending
- Use schema validation (JSON Schema, Zod, Joi)
- Handle nested objects correctly
- Respect payload size limits
4. Response Handling
Process API responses systematically:
Status Code Patterns:
2xx Success:
200 OK - Request successful, data returned
201 Created - Resource created successfully
202 Accepted - Request accepted, processing async
204 No Content - Success with no response body
4xx Client Errors:
400 Bad Request - Invalid input data
401 Unauthorized - Missing/invalid auth
403 Forbidden - Valid auth but no permission
404 Not Found - Resource doesn't exist
422 Unprocessable Entity - Validation failed
429 Too Many Requests - Rate limit exceeded
5xx Server Errors:
500 Internal Server Error - Server issue
502 Bad Gateway - Upstream server error
503 Service Unavailable - Temporary outage
504 Gateway Timeout - Request timeout
Response Validation:
# Always validate response structure
def validate_response(response):
# Check status code first
if response.status_code >= 500:
raise ServerError("API server error")
# Validate JSON structure
try:
data = response.json()
except ValueError:
raise ParseError("Invalid JSON response")
# Validate expected fields
if 'data' not in data:
raise ValidationError("Missing data field")
return data['data']
Error Handling Patterns
Retry Strategy with Exponential Backoff
import time
import random
def retry_with_backoff(func, max_retries=3, base_delay=1):
"""
Retry failed requests with exponential backoff
"""
for attempt in range(max_retries):
try:
return func()
except (ConnectionError, TimeoutError, ServerError) as e:
if attempt == max_retries - 1:
raise
# Calculate backoff: base_delay * 2^attempt + jitter
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {delay:.2f} seconds...")
time.sleep(delay)
Circuit Breaker Pattern
class CircuitBreaker:
"""
Prevent cascading failures by breaking circuit after threshold
"""
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.timeout:
self.state = 'HALF_OPEN'
else:
raise CircuitOpenError("Circuit breaker is OPEN")
try:
result = func()
self.on_success()
return result
except Exception as e:
self.on_failure()
raise
def on_success(self):
self.failure_count = 0
self.state = 'CLOSED'
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
Error Response Format
Always return structured errors:
{
"error": {
"code": "VALIDATION_ERROR",
"message": "Invalid email format",
"details": [
{
"field": "email",
"issue": "Must be valid email address"
}
],
"correlation_id": "req-abc-123",
"timestamp": "2025-11-03T10:30:00Z",
"documentation_url": "https://docs.api.com/errors/validation"
}
}
Rate Limiting and Throttling
Respect Rate Limits
import time
from collections import deque
class RateLimiter:
"""
Token bucket algorithm for rate limiting
"""
def __init__(self, max_requests, time_window):
self.max_requests = max_requests
self.time_window = time_window
self.requests = deque()
def allow_request(self):
now = time.time()
# Remove expired requests
while self.requests and self.requests[0] < now - self.time_window:
self.requests.popleft()
# Check if we can make request
if len(self.requests) < self.max_requests:
self.requests.append(now)
return True
return False
def wait_time(self):
"""Calculate wait time until next request allowed"""
if not self.requests:
return 0
oldest = self.requests[0]
wait = (oldest + self.time_window) - time.time()
return max(0, wait)
Handle 429 Responses
def handle_rate_limit(response):
"""
Handle rate limit using Retry-After header
"""
if response.status_code == 429:
# Check for Retry-After header
retry_after = response.headers.get('Retry-After')
if retry_after:
# Can be seconds or HTTP date
try:
wait_seconds = int(retry_after)
except ValueError:
# Parse HTTP date
retry_time = parse_http_date(retry_after)
wait_seconds = (retry_time - datetime.now()).total_seconds()
print(f"Rate limited. Waiting {wait_seconds} seconds...")
time.sleep(wait_seconds)
return True
# No Retry-After, use exponential backoff
return False
return False
Security Best Practices
Secret Management
NEVER hardcode secrets:
# ❌ WRONG - Never do this
API_KEY = "sk-1234567890abcdef"
API_SECRET = "secret_abc123"
# ✅ CORRECT - Use environment variables
import os
API_KEY = os.getenv('API_KEY')
API_SECRET = os.getenv('API_SECRET')
# Validate secrets are present
if not API_KEY or not API_SECRET:
raise ValueError("Missing required API credentials")
Input Validation
Always validate and sanitize:
def validate_api_input(data):
"""
Validate data before sending to API
"""
# Check required fields
required = ['email', 'name']
for field in required:
if field not in data:
raise ValidationError(f"Missing required field: {field}")
# Sanitize strings
data['name'] = data['name'].strip()
data['email'] = data['email'].lower().strip()
# Validate email format
if not is_valid_email(data['email']):
raise ValidationError("Invalid email format")
# Remove unexpected fields
allowed_fields = ['email', 'name', 'age']
data = {k: v for k, v in data.items() if k in allowed_fields}
return data
HTTPS Only
# Always use HTTPS
BASE_URL = "https://api.example.com" # ✅ Secure
BASE_URL = "http://api.example.com" # ❌ Insecure
# Verify SSL certificates
import requests
response = requests.get(url, verify=True) # ✅ Verify SSL
response = requests.get(url, verify=False) # ❌ Dangerous
Data Transformation
Request Transformation
def transform_for_api(internal_data):
"""
Transform internal data format to API format
"""
return {
'firstName': internal_data['first_name'],
'lastName': internal_data['last_name'],
'emailAddress': internal_data['email'],
'phoneNumber': format_phone(internal_data['phone']),
'dateOfBirth': internal_data['dob'].isoformat(),
'metadata': {
'source': 'internal_system',
'version': '1.0'
}
}
Response Transformation
def transform_from_api(api_response):
"""
Transform API response to internal format
"""
return {
'id': api_response['userId'],
'first_name': api_response['firstName'],
'last_name': api_response['lastName'],
'email': api_response['emailAddress'],
'phone': parse_phone(api_response['phoneNumber']),
'dob': datetime.fromisoformat(api_response['dateOfBirth']),
'created_at': datetime.fromisoformat(api_response['createdAt'])
}
Pagination Patterns
Offset-Based Pagination
def fetch_all_pages_offset(base_url, limit=100):
"""
Fetch all pages using offset pagination
"""
all_data = []
offset = 0
while True:
response = requests.get(
base_url,
params={'limit': limit, 'offset': offset}
)
response.raise_for_status()
data = response.json()
items = data['items']
if not items:
break
all_data.extend(items)
offset += limit
# Check if we got all data
if len(items) < limit:
break
return all_data
Cursor-Based Pagination
def fetch_all_pages_cursor(base_url):
"""
Fetch all pages using cursor pagination
"""
all_data = []
cursor = None
while True:
params = {'cursor': cursor} if cursor else {}
response = requests.get(base_url, params=params)
response.raise_for_status()
data = response.json()
all_data.extend(data['items'])
# Check for next page
cursor = data.get('next_cursor')
if not cursor:
break
return all_data
Webhook Integration
Webhook Security
import hmac
import hashlib
def verify_webhook_signature(payload, signature, secret):
"""
Verify webhook came from trusted source
"""
# Compute expected signature
expected = hmac.new(
secret.encode(),
payload.encode(),
hashlib.sha256
).hexdigest()
# Compare signatures (timing-safe)
return hmac.compare_digest(signature, expected)
# Usage in webhook handler
@app.route('/webhooks/payment', methods=['POST'])
def handle_payment_webhook():
signature = request.headers.get('X-Webhook-Signature')
payload = request.get_data(as_text=True)
if not verify_webhook_signature(payload, signature, WEBHOOK_SECRET):
return jsonify({'error': 'Invalid signature'}), 401
# Process webhook
data = request.json
process_payment_event(data)
return jsonify({'status': 'received'}), 200
Testing Strategy
Unit Tests
def test_api_client():
"""Test API client with mocked responses"""
with requests_mock.Mocker() as m:
# Mock successful response
m.get(
'https://api.example.com/users/123',
json={'id': 123, 'name': 'John'},
status_code=200
)
client = APIClient()
user = client.get_user(123)
assert user['id'] == 123
assert user['name'] == 'John'
def test_error_handling():
"""Test error handling"""
with requests_mock.Mocker() as m:
# Mock error response
m.get(
'https://api.example.com/users/999',
status_code=404,
json={'error': 'User not found'}
)
client = APIClient()
with pytest.raises(NotFoundError):
client.get_user(999)
Integration Tests
Run against sandbox/test environments:
def test_real_api_integration():
"""Test against sandbox environment"""
client = APIClient(
api_key=TEST_API_KEY,
base_url='https://sandbox.api.example.com'
)
# Test create
user = client.create_user({
'name': 'Test User',
'email': 'test@example.com'
})
assert user['id'] is not None
# Test read
fetched = client.get_user(user['id'])
assert fetched['name'] == 'Test User'
# Test cleanup
client.delete_user(user['id'])
Monitoring and Logging
Request Logging
import logging
def log_api_request(method, url, status_code, duration, error=None):
"""Log API request with relevant details"""
log_data = {
'method': method,
'url': url,
'status_code': status_code,
'duration_ms': duration * 1000,
'timestamp': datetime.utcnow().isoformat()
}
if error:
log_data['error'] = str(error)
logging.error(f"API request failed: {log_data}")
else:
logging.info(f"API request completed: {log_data}")
Metrics to Track
Monitor these key metrics:
- Request success rate (%)
- Average response time (ms)
- Error rate by status code
- Rate limit hit frequency
- Circuit breaker state changes
- Retry attempts
- Timeout occurrences
Common Integration Patterns
1. Idempotency
Use idempotency keys for retries:
import uuid
def create_payment(amount, idempotency_key=None):
"""Create payment with idempotency support"""
if not idempotency_key:
idempotency_key = str(uuid.uuid4())
response = requests.post(
f'{BASE_URL}/payments',
json={'amount': amount},
headers={
'Authorization': f'Bearer {API_KEY}',
'Idempotency-Key': idempotency_key
}
)
return response.json()
2. Batch Operations
Batch requests when supported:
def batch_create_users(users, batch_size=100):
"""Create users in batches"""
results = []
for i in range(0, len(users), batch_size):
batch = users[i:i + batch_size]
response = requests.post(
f'{BASE_URL}/users/batch',
json={'users': batch}
)
response.raise_for_status()
results.extend(response.json()['created'])
return results
3. Caching
Implement smart caching:
from functools import lru_cache
from datetime import datetime, timedelta
class CachedAPIClient:
def __init__(self):
self.cache = {}
self.cache_ttl = timedelta(minutes=5)
def get_user(self, user_id):
cache_key = f'user:{user_id}'
# Check cache
if cache_key in self.cache:
data, timestamp = self.cache[cache_key]
if datetime.now() - timestamp < self.cache_ttl:
return data
# Fetch from API
response = requests.get(f'{BASE_URL}/users/{user_id}')
response.raise_for_status()
data = response.json()
# Update cache
self.cache[cache_key] = (data, datetime.now())
return data
Resources
For detailed guidance, see:
- references/ERROR_CODES.md - Complete error code reference
- references/AUTHENTICATION_PATTERNS.md - Auth implementation details
- references/RATE_LIMITING.md - Advanced rate limiting strategies
- scripts/validate_api_spec.py - Validate OpenAPI specifications
- scripts/test_api_health.py - Health check automation
Validation Tools
Run validation before deployment:
# Validate API specification
python scripts/validate_api_spec.py openapi.yaml
# Test API health
python scripts/test_api_health.py --base-url https://api.example.com
Quick Reference
Authentication Priority:
- OAuth 2.0 (user delegation)
- JWT Bearer tokens
- API Keys (public APIs only)
Error Handling:
- Implement retry with exponential backoff
- Use circuit breaker for cascading failures
- Log all errors with correlation IDs
- Return structured error responses
Security Checklist:
- ✅ Use HTTPS only
- ✅ Secrets in environment variables
- ✅ Validate all inputs
- ✅ Verify SSL certificates
- ✅ Implement rate limiting
- ✅ Use webhook signatures
Testing Levels:
- Unit tests (mocked responses)
- Integration tests (sandbox)
- Load tests (performance)
- Security tests (penetration)
This skill provides comprehensive API integration guidance. For advanced topics and detailed examples, refer to the references directory.
Repository
