idempotency-handling
Implement idempotency keys and handling to ensure operations can be safely retried without duplicate effects. Use when building payment systems, APIs with retries, or distributed transactions.
$ 安裝
git clone https://github.com/aj-geddes/useful-ai-prompts /tmp/useful-ai-prompts && cp -r /tmp/useful-ai-prompts/skills/idempotency-handling ~/.claude/skills/useful-ai-prompts// tip: Run this command in your terminal to install the skill
SKILL.md
name: idempotency-handling description: Implement idempotency keys and handling to ensure operations can be safely retried without duplicate effects. Use when building payment systems, APIs with retries, or distributed transactions.
Idempotency Handling
Overview
Implement idempotency to ensure operations produce the same result regardless of how many times they're executed.
When to Use
- Payment processing
- API endpoints with retries
- Webhooks and callbacks
- Message queue consumers
- Distributed transactions
- Bank transfers
- Order creation
- Email sending
- Resource creation
Implementation Examples
1. Express Idempotency Middleware
import express from 'express';
import Redis from 'ioredis';
import crypto from 'crypto';
interface IdempotentRequest {
key: string;
status: 'processing' | 'completed' | 'failed';
response?: any;
error?: string;
createdAt: number;
completedAt?: number;
}
class IdempotencyService {
private redis: Redis;
private ttl = 86400; // 24 hours
constructor(redisUrl: string) {
this.redis = new Redis(redisUrl);
}
async getRequest(key: string): Promise<IdempotentRequest | null> {
const data = await this.redis.get(`idempotency:${key}`);
return data ? JSON.parse(data) : null;
}
async setRequest(
key: string,
request: IdempotentRequest
): Promise<void> {
await this.redis.setex(
`idempotency:${key}`,
this.ttl,
JSON.stringify(request)
);
}
async startProcessing(key: string): Promise<boolean> {
const request: IdempotentRequest = {
key,
status: 'processing',
createdAt: Date.now()
};
// Use SET NX to ensure only one request processes
const result = await this.redis.set(
`idempotency:${key}`,
JSON.stringify(request),
'EX',
this.ttl,
'NX'
);
return result === 'OK';
}
async completeRequest(
key: string,
response: any
): Promise<void> {
const request: IdempotentRequest = {
key,
status: 'completed',
response,
createdAt: Date.now(),
completedAt: Date.now()
};
await this.setRequest(key, request);
}
async failRequest(
key: string,
error: string
): Promise<void> {
const request: IdempotentRequest = {
key,
status: 'failed',
error,
createdAt: Date.now(),
completedAt: Date.now()
};
await this.setRequest(key, request);
}
}
function idempotencyMiddleware(idempotency: IdempotencyService) {
return async (
req: express.Request,
res: express.Response,
next: express.NextFunction
) => {
// Only apply to POST, PUT, PATCH, DELETE
if (!['POST', 'PUT', 'PATCH', 'DELETE'].includes(req.method)) {
return next();
}
const idempotencyKey = req.headers['idempotency-key'] as string;
if (!idempotencyKey) {
return res.status(400).json({
error: 'Idempotency-Key header required'
});
}
// Check for existing request
const existing = await idempotency.getRequest(idempotencyKey);
if (existing) {
if (existing.status === 'processing') {
return res.status(409).json({
error: 'Request already processing',
message: 'Please wait and retry'
});
}
if (existing.status === 'completed') {
return res.status(200).json(existing.response);
}
if (existing.status === 'failed') {
return res.status(500).json({
error: 'Previous request failed',
message: existing.error
});
}
}
// Start processing
const canProcess = await idempotency.startProcessing(idempotencyKey);
if (!canProcess) {
return res.status(409).json({
error: 'Request already processing'
});
}
// Capture response
const originalSend = res.json.bind(res);
res.json = (body: any) => {
// Save response for future requests
idempotency.completeRequest(idempotencyKey, body).catch(console.error);
return originalSend(body);
};
// Handle errors
const originalNext = next;
next = (err?: any) => {
if (err) {
idempotency.failRequest(idempotencyKey, err.message).catch(console.error);
}
return originalNext(err);
};
next();
};
}
// Usage
const app = express();
const redis = new Redis('redis://localhost:6379');
const idempotency = new IdempotencyService('redis://localhost:6379');
app.use(express.json());
app.use(idempotencyMiddleware(idempotency));
app.post('/api/payments', async (req, res) => {
const { amount, userId } = req.body;
// Process payment
const payment = await processPayment(amount, userId);
res.json(payment);
});
async function processPayment(amount: number, userId: string) {
// Payment processing logic
return {
id: crypto.randomUUID(),
amount,
userId,
status: 'completed'
};
}
app.listen(3000);
2. Database-Based Idempotency
import { Pool } from 'pg';
interface IdempotencyRecord {
key: string;
request_body: any;
response_body?: any;
status: string;
error_message?: string;
created_at: Date;
completed_at?: Date;
}
class DatabaseIdempotency {
constructor(private db: Pool) {
this.createTable();
}
private async createTable(): Promise<void> {
await this.db.query(`
CREATE TABLE IF NOT EXISTS idempotency_keys (
key VARCHAR(255) PRIMARY KEY,
request_body JSONB NOT NULL,
response_body JSONB,
status VARCHAR(50) NOT NULL,
error_message TEXT,
created_at TIMESTAMP DEFAULT NOW(),
completed_at TIMESTAMP,
expires_at TIMESTAMP NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_idempotency_expires
ON idempotency_keys (expires_at);
`);
}
async checkIdempotency(
key: string,
requestBody: any
): Promise<IdempotencyRecord | null> {
const result = await this.db.query(
'SELECT * FROM idempotency_keys WHERE key = $1',
[key]
);
if (result.rows.length === 0) {
return null;
}
const record = result.rows[0];
// Check if request body matches
if (JSON.stringify(record.request_body) !== JSON.stringify(requestBody)) {
throw new Error('Request body mismatch for idempotency key');
}
return record;
}
async startProcessing(
key: string,
requestBody: any
): Promise<boolean> {
try {
const expiresAt = new Date(Date.now() + 86400 * 1000); // 24 hours
await this.db.query(`
INSERT INTO idempotency_keys (key, request_body, status, expires_at)
VALUES ($1, $2, 'processing', $3)
`, [key, requestBody, expiresAt]);
return true;
} catch (error: any) {
if (error.code === '23505') { // Unique violation
return false;
}
throw error;
}
}
async completeRequest(
key: string,
responseBody: any
): Promise<void> {
await this.db.query(`
UPDATE idempotency_keys
SET
response_body = $1,
status = 'completed',
completed_at = NOW()
WHERE key = $2
`, [responseBody, key]);
}
async failRequest(
key: string,
errorMessage: string
): Promise<void> {
await this.db.query(`
UPDATE idempotency_keys
SET
error_message = $1,
status = 'failed',
completed_at = NOW()
WHERE key = $2
`, [errorMessage, key]);
}
async cleanup(): Promise<number> {
const result = await this.db.query(`
DELETE FROM idempotency_keys
WHERE expires_at < NOW()
`);
return result.rowCount || 0;
}
}
3. Stripe-Style Idempotency
import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import psycopg2
class IdempotencyManager:
def __init__(self, db_connection):
self.db = db_connection
self.ttl_days = 1
def process_request(
self,
idempotency_key: str,
request_data: Dict[str, Any],
process_fn: callable
) -> Dict[str, Any]:
"""
Process request with idempotency guarantee.
Args:
idempotency_key: Unique key for this request
request_data: Request payload
process_fn: Function to process the request
Returns:
Response data
"""
# Check for existing request
existing = self.get_existing_request(
idempotency_key,
request_data
)
if existing:
if existing['status'] == 'processing':
raise ConflictError('Request already processing')
if existing['status'] == 'completed':
return existing['response']
if existing['status'] == 'failed':
raise ProcessingError(existing['error'])
# Start processing
if not self.start_processing(idempotency_key, request_data):
raise ConflictError('Request already processing')
try:
# Process request
result = process_fn(request_data)
# Store result
self.complete_request(idempotency_key, result)
return result
except Exception as e:
# Store error
self.fail_request(idempotency_key, str(e))
raise
def get_existing_request(
self,
key: str,
request_data: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Get existing idempotent request."""
cursor = self.db.cursor()
cursor.execute("""
SELECT status, response, error, request_hash
FROM idempotency_requests
WHERE idempotency_key = %s
AND created_at > %s
""", (key, datetime.now() - timedelta(days=self.ttl_days)))
row = cursor.fetchone()
cursor.close()
if not row:
return None
# Verify request data matches
request_hash = self.hash_request(request_data)
if row[3] != request_hash:
raise ValueError(
'Request data does not match idempotency key'
)
return {
'status': row[0],
'response': row[1],
'error': row[2]
}
def start_processing(
self,
key: str,
request_data: Dict[str, Any]
) -> bool:
"""Mark request as processing."""
cursor = self.db.cursor()
request_hash = self.hash_request(request_data)
try:
cursor.execute("""
INSERT INTO idempotency_requests
(idempotency_key, request_hash, status, created_at)
VALUES (%s, %s, 'processing', NOW())
""", (key, request_hash))
self.db.commit()
cursor.close()
return True
except psycopg2.IntegrityError:
self.db.rollback()
cursor.close()
return False
def complete_request(
self,
key: str,
response: Dict[str, Any]
):
"""Mark request as completed."""
cursor = self.db.cursor()
cursor.execute("""
UPDATE idempotency_requests
SET
status = 'completed',
response = %s,
completed_at = NOW()
WHERE idempotency_key = %s
""", (json.dumps(response), key))
self.db.commit()
cursor.close()
def fail_request(self, key: str, error: str):
"""Mark request as failed."""
cursor = self.db.cursor()
cursor.execute("""
UPDATE idempotency_requests
SET
status = 'failed',
error = %s,
completed_at = NOW()
WHERE idempotency_key = %s
""", (error, key))
self.db.commit()
cursor.close()
def hash_request(self, data: Dict[str, Any]) -> str:
"""Create hash of request data."""
json_str = json.dumps(data, sort_keys=True)
return hashlib.sha256(json_str.encode()).hexdigest()
class ConflictError(Exception):
pass
class ProcessingError(Exception):
pass
# Usage
def process_payment(data):
# Process payment logic
return {
'payment_id': 'pay_123',
'amount': data['amount'],
'status': 'completed'
}
# In your API handler
idempotency = IdempotencyManager(db_connection)
try:
result = idempotency.process_request(
idempotency_key='key_abc123',
request_data={'amount': 100, 'currency': 'USD'},
process_fn=process_payment
)
print(result)
except ConflictError as e:
print(f"Conflict: {e}")
except ProcessingError as e:
print(f"Processing error: {e}")
4. Message Queue Idempotency
interface Message {
id: string;
data: any;
timestamp: number;
}
class IdempotentMessageProcessor {
private processedMessages = new Set<string>();
private db: Pool;
constructor(db: Pool) {
this.db = db;
this.loadProcessedMessages();
}
private async loadProcessedMessages(): Promise<void> {
// Load recent processed message IDs
const result = await this.db.query(`
SELECT message_id
FROM processed_messages
WHERE processed_at > NOW() - INTERVAL '24 hours'
`);
result.rows.forEach(row => {
this.processedMessages.add(row.message_id);
});
}
async processMessage(message: Message): Promise<void> {
// Check if already processed
if (this.processedMessages.has(message.id)) {
console.log(`Message ${message.id} already processed, skipping`);
return;
}
// Mark as processing (atomic operation)
const wasInserted = await this.markAsProcessing(message.id);
if (!wasInserted) {
console.log(`Message ${message.id} already being processed`);
return;
}
try {
// Process message
await this.handleMessage(message);
// Mark as completed
await this.markAsCompleted(message.id);
this.processedMessages.add(message.id);
} catch (error) {
console.error(`Failed to process message ${message.id}:`, error);
await this.markAsFailed(message.id, (error as Error).message);
throw error;
}
}
private async markAsProcessing(messageId: string): Promise<boolean> {
try {
await this.db.query(`
INSERT INTO processed_messages (message_id, status, processed_at)
VALUES ($1, 'processing', NOW())
`, [messageId]);
return true;
} catch (error: any) {
if (error.code === '23505') {
return false;
}
throw error;
}
}
private async markAsCompleted(messageId: string): Promise<void> {
await this.db.query(`
UPDATE processed_messages
SET status = 'completed', completed_at = NOW()
WHERE message_id = $1
`, [messageId]);
}
private async markAsFailed(
messageId: string,
error: string
): Promise<void> {
await this.db.query(`
UPDATE processed_messages
SET status = 'failed', error = $2, completed_at = NOW()
WHERE message_id = $1
`, [messageId, error]);
}
private async handleMessage(message: Message): Promise<void> {
// Actual message processing logic
console.log('Processing message:', message);
}
}
Best Practices
✅ DO
- Require idempotency keys for mutations
- Store request and response together
- Set appropriate TTL for idempotency records
- Validate request body matches stored request
- Handle concurrent requests gracefully
- Return same response for duplicate requests
- Clean up old idempotency records
- Use database constraints for atomicity
❌ DON'T
- Apply idempotency to GET requests
- Store idempotency data forever
- Skip validation of request body
- Use non-unique idempotency keys
- Process same request concurrently
- Change response for duplicate requests
Schema Design
CREATE TABLE idempotency_keys (
key VARCHAR(255) PRIMARY KEY,
request_hash VARCHAR(64) NOT NULL,
request_body JSONB NOT NULL,
response_body JSONB,
status VARCHAR(20) NOT NULL CHECK (status IN ('processing', 'completed', 'failed')),
error_message TEXT,
created_at TIMESTAMP DEFAULT NOW(),
completed_at TIMESTAMP,
expires_at TIMESTAMP NOT NULL
);
CREATE INDEX idx_idempotency_expires ON idempotency_keys (expires_at);
CREATE INDEX idx_idempotency_status ON idempotency_keys (status);
Resources
Repository

aj-geddes
Author
aj-geddes/useful-ai-prompts/skills/idempotency-handling
25
Stars
1
Forks
Updated3d ago
Added5d ago