secrets-rotation
Implement automated secrets rotation for API keys, credentials, certificates, and encryption keys. Use when managing secrets lifecycle, compliance requirements, or security hardening.
$ インストール
git clone https://github.com/aj-geddes/useful-ai-prompts /tmp/useful-ai-prompts && cp -r /tmp/useful-ai-prompts/skills/secrets-rotation ~/.claude/skills/useful-ai-prompts// tip: Run this command in your terminal to install the skill
SKILL.md
name: secrets-rotation description: Implement automated secrets rotation for API keys, credentials, certificates, and encryption keys. Use when managing secrets lifecycle, compliance requirements, or security hardening.
Secrets Rotation
Overview
Implement automated secrets rotation strategy for credentials, API keys, certificates, and encryption keys with zero-downtime deployment and comprehensive audit logging.
When to Use
- API key management
- Database credentials
- TLS/SSL certificates
- Encryption key rotation
- Compliance requirements
- Security incident response
- Service account management
Implementation Examples
1. Node.js Secrets Manager with Rotation
// secrets-manager.js
const AWS = require('aws-sdk');
const crypto = require('crypto');
class SecretsManager {
constructor() {
this.secretsManager = new AWS.SecretsManager({
region: process.env.AWS_REGION
});
this.rotationSchedule = new Map();
}
/**
* Generate new secret value
*/
generateSecret(type = 'api_key', length = 32) {
switch (type) {
case 'api_key':
return crypto.randomBytes(length).toString('hex');
case 'password':
// Generate strong password
const chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*';
let password = '';
for (let i = 0; i < length; i++) {
password += chars.charAt(crypto.randomInt(chars.length));
}
return password;
case 'jwt_secret':
return crypto.randomBytes(64).toString('base64');
default:
return crypto.randomBytes(length).toString('base64');
}
}
/**
* Store secret in AWS Secrets Manager
*/
async createSecret(name, value, description = '') {
const params = {
Name: name,
SecretString: JSON.stringify(value),
Description: description
};
try {
const result = await this.secretsManager.createSecret(params).promise();
return result;
} catch (error) {
if (error.code === 'ResourceExistsException') {
// Update existing secret
return this.updateSecret(name, value);
}
throw error;
}
}
/**
* Retrieve secret
*/
async getSecret(name) {
const params = { SecretId: name };
try {
const data = await this.secretsManager.getSecretValue(params).promise();
if ('SecretString' in data) {
return JSON.parse(data.SecretString);
}
// Binary secret
const buff = Buffer.from(data.SecretBinary, 'base64');
return buff.toString('ascii');
} catch (error) {
console.error(`Error retrieving secret ${name}:`, error);
throw error;
}
}
/**
* Update secret value
*/
async updateSecret(name, value) {
const params = {
SecretId: name,
SecretString: JSON.stringify(value)
};
return this.secretsManager.updateSecret(params).promise();
}
/**
* Rotate secret with zero downtime
*/
async rotateSecret(name, type = 'api_key') {
console.log(`Starting rotation for secret: ${name}`);
try {
// Step 1: Generate new secret
const newValue = this.generateSecret(type);
// Step 2: Store new version
const currentSecret = await this.getSecret(name);
// Keep old value temporarily for graceful transition
const secretWithRotation = {
current: newValue,
previous: currentSecret.current || currentSecret,
rotatedAt: new Date().toISOString()
};
await this.updateSecret(name, secretWithRotation);
console.log(`New secret version created for: ${name}`);
// Step 3: Wait for applications to pick up new secret
await this.waitForPropagation(5000);
// Step 4: Verify new secret works
const verificationPassed = await this.verifySecret(name, newValue);
if (!verificationPassed) {
throw new Error('Secret verification failed');
}
// Step 5: Remove previous version after grace period
setTimeout(async () => {
await this.updateSecret(name, {
current: newValue,
rotatedAt: new Date().toISOString()
});
console.log(`Rotation completed for: ${name}`);
}, 300000); // 5 minutes grace period
return {
success: true,
secretName: name,
rotatedAt: new Date().toISOString()
};
} catch (error) {
console.error(`Rotation failed for ${name}:`, error);
// Rollback on failure
await this.rollbackRotation(name);
throw error;
}
}
/**
* Schedule automatic rotation
*/
async scheduleRotation(name, intervalDays = 90) {
const intervalMs = intervalDays * 24 * 60 * 60 * 1000;
const rotationJob = setInterval(async () => {
try {
await this.rotateSecret(name);
console.log(`Scheduled rotation completed for: ${name}`);
} catch (error) {
console.error(`Scheduled rotation failed for ${name}:`, error);
// Alert operations team
this.sendAlert(name, error);
}
}, intervalMs);
this.rotationSchedule.set(name, rotationJob);
// AWS Secrets Manager automatic rotation
const params = {
SecretId: name,
RotationLambdaARN: process.env.ROTATION_LAMBDA_ARN,
RotationRules: {
AutomaticallyAfterDays: intervalDays
}
};
await this.secretsManager.rotateSecret(params).promise();
}
/**
* Rotate database credentials
*/
async rotateDatabaseCredentials(secretName) {
const credentials = await this.getSecret(secretName);
// Generate new password
const newPassword = this.generateSecret('password', 20);
// Update database user password
const connection = await this.connectToDatabase(credentials);
await connection.query(
'ALTER USER ? IDENTIFIED BY ?',
[credentials.username, newPassword]
);
// Update secret
await this.updateSecret(secretName, {
username: credentials.username,
password: newPassword,
host: credentials.host,
database: credentials.database,
rotatedAt: new Date().toISOString()
});
await connection.end();
return { success: true };
}
/**
* Rotate TLS certificate
*/
async rotateTLSCertificate(domain) {
// Use Let's Encrypt or internal CA
const certbot = require('certbot');
try {
// Request new certificate
const newCert = await certbot.certonly({
domains: [domain],
email: process.env.ADMIN_EMAIL,
agreeTos: true,
renewByDefault: true
});
// Store in secrets manager
await this.createSecret(`tls-cert-${domain}`, {
certificate: newCert.certificate,
privateKey: newCert.privateKey,
chain: newCert.chain,
issuedAt: new Date().toISOString(),
expiresAt: newCert.expiresAt
});
// Update load balancer/web server
await this.updateServerCertificate(domain, newCert);
console.log(`TLS certificate rotated for: ${domain}`);
return { success: true };
} catch (error) {
console.error('Certificate rotation failed:', error);
throw error;
}
}
async waitForPropagation(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async verifySecret(name, value) {
// Implement verification logic
// Test API call, database connection, etc.
return true;
}
async rollbackRotation(name) {
// Restore previous version
console.log(`Rolling back rotation for: ${name}`);
}
async sendAlert(secretName, error) {
// Send to monitoring system
console.error(`ALERT: Rotation failed for ${secretName}`, error);
}
async connectToDatabase(credentials) {
// Database connection logic
return null;
}
async updateServerCertificate(domain, cert) {
// Update server configuration
return null;
}
}
// Usage
const secretsManager = new SecretsManager();
// Rotate API key
async function rotateAPIKey() {
await secretsManager.rotateSecret('api-key-external-service', 'api_key');
}
// Schedule automatic rotation
async function setupRotationSchedule() {
await secretsManager.scheduleRotation('database-credentials', 90);
await secretsManager.scheduleRotation('api-keys', 30);
}
// Rotate database credentials
async function rotateDatabaseCreds() {
await secretsManager.rotateDatabaseCredentials('rds-production');
}
module.exports = SecretsManager;
2. Python Secrets Rotation with Vault
# secrets_rotation.py
import hvac
import secrets
import string
from datetime import datetime, timedelta
from typing import Dict, Any
import psycopg2
import boto3
class SecretsRotation:
def __init__(self, vault_url: str, vault_token: str):
self.vault_client = hvac.Client(url=vault_url, token=vault_token)
self.ssm = boto3.client('ssm')
def generate_secret(self, secret_type: str = 'api_key', length: int = 32) -> str:
"""Generate new secret value"""
if secret_type == 'api_key':
return secrets.token_urlsafe(length)
elif secret_type == 'password':
# Strong password with all character types
chars = string.ascii_letters + string.digits + string.punctuation
return ''.join(secrets.choice(chars) for _ in range(length))
elif secret_type == 'jwt_secret':
return secrets.token_urlsafe(64)
else:
return secrets.token_bytes(length).hex()
def rotate_secret(self, path: str, secret_type: str = 'api_key') -> Dict[str, Any]:
"""Rotate secret with zero downtime"""
print(f"Starting rotation for: {path}")
try:
# Read current secret
current_secret = self.vault_client.secrets.kv.v2.read_secret(path=path)
current_data = current_secret['data']['data']
# Generate new value
new_value = self.generate_secret(secret_type)
# Store with both old and new values
rotation_data = {
'current': new_value,
'previous': current_data.get('current', current_data.get('value')),
'rotated_at': datetime.utcnow().isoformat()
}
self.vault_client.secrets.kv.v2.create_or_update_secret(
path=path,
secret=rotation_data
)
print(f"Secret rotated successfully: {path}")
return {
'success': True,
'path': path,
'rotated_at': rotation_data['rotated_at']
}
except Exception as e:
print(f"Rotation failed for {path}: {e}")
raise
def rotate_database_password(self, secret_path: str) -> Dict[str, Any]:
"""Rotate database credentials"""
# Get current credentials
secret = self.vault_client.secrets.kv.v2.read_secret(path=secret_path)
creds = secret['data']['data']
# Generate new password
new_password = self.generate_secret('password', 20)
# Connect to database
conn = psycopg2.connect(
host=creds['host'],
database=creds['database'],
user=creds['username'],
password=creds['password']
)
cursor = conn.cursor()
try:
# Update password in database
cursor.execute(
f"ALTER USER {creds['username']} WITH PASSWORD %s",
(new_password,)
)
conn.commit()
# Update secret in Vault
updated_creds = {
**creds,
'password': new_password,
'rotated_at': datetime.utcnow().isoformat()
}
self.vault_client.secrets.kv.v2.create_or_update_secret(
path=secret_path,
secret=updated_creds
)
print(f"Database credentials rotated: {secret_path}")
return {'success': True}
finally:
cursor.close()
conn.close()
def schedule_rotation(self, path: str, interval_days: int = 90):
"""Schedule automatic rotation using AWS Lambda"""
# Create rotation schedule in AWS Secrets Manager
# or use cron job
schedule_expression = f"rate({interval_days} days)"
# This would trigger a Lambda function
print(f"Rotation scheduled for {path}: every {interval_days} days")
def rotate_encryption_keys(self, key_id: str):
"""Rotate encryption keys"""
kms = boto3.client('kms')
# Enable automatic key rotation
kms.enable_key_rotation(KeyId=key_id)
print(f"Automatic rotation enabled for KMS key: {key_id}")
def audit_rotation_history(self, path: str) -> list:
"""Get rotation history"""
versions = self.vault_client.secrets.kv.v2.read_secret_metadata(path=path)
history = []
for version, metadata in versions['data']['versions'].items():
history.append({
'version': version,
'created_time': metadata['created_time'],
'deleted': metadata.get('deletion_time') is not None
})
return sorted(history, key=lambda x: x['created_time'], reverse=True)
# Usage
if __name__ == '__main__':
rotation = SecretsRotation(
vault_url='http://localhost:8200',
vault_token='your-token'
)
# Rotate API key
rotation.rotate_secret('api-keys/external-service', 'api_key')
# Rotate database credentials
rotation.rotate_database_password('database/production')
# Schedule rotations
rotation.schedule_rotation('api-keys/external-service', 30)
rotation.schedule_rotation('database/production', 90)
# View history
history = rotation.audit_rotation_history('api-keys/external-service')
print(f"Rotation history: {history}")
3. Kubernetes Secrets Rotation
# secrets-rotation-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: secrets-rotation
namespace: production
spec:
schedule: "0 2 * * 0" # Weekly at 2 AM Sunday
jobTemplate:
spec:
template:
spec:
serviceAccountName: secrets-rotator
containers:
- name: rotate
image: secrets-rotator:latest
env:
- name: VAULT_ADDR
value: "http://vault:8200"
- name: VAULT_TOKEN
valueFrom:
secretKeyRef:
name: vault-token
key: token
command:
- /bin/sh
- -c
- |
# Rotate secrets
python /app/rotate_secrets.py \
--secret database-password \
--secret api-keys \
--secret tls-certificates
restartPolicy: OnFailure
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: secrets-rotator
namespace: production
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: secrets-rotator
namespace: production
rules:
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get", "list", "update", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: secrets-rotator
namespace: production
subjects:
- kind: ServiceAccount
name: secrets-rotator
roleRef:
kind: Role
name: secrets-rotator
apiGroup: rbac.authorization.k8s.io
Best Practices
✅ DO
- Automate rotation
- Use grace periods
- Verify new secrets
- Maintain rotation audit trail
- Implement rollback procedures
- Monitor rotation failures
- Use managed services (AWS Secrets Manager)
- Test rotation procedures
❌ DON'T
- Hardcode secrets
- Share secrets
- Skip verification
- Rotate without grace period
- Ignore rotation failures
- Store secrets in version control
Rotation Schedule
- API Keys: 30-90 days
- Database Passwords: 90 days
- TLS Certificates: Before expiry
- Encryption Keys: 1 year
- Service Account Tokens: 90 days
Zero-Downtime Strategy
- Generate new secret
- Store with versioning
- Grace period (both versions valid)
- Verification
- Deprecate old version
- Remove after grace period
Resources
Repository

aj-geddes
Author
aj-geddes/useful-ai-prompts/skills/secrets-rotation
25
Stars
1
Forks
Updated5d ago
Added1w ago