Marketplace

migration-planning

Plan ETL/ELT pipelines, data migrations, change data capture, and rollback strategies.

allowed_tools: Read, Write, Glob, Grep, Task

$ Installer

git clone https://github.com/melodic-software/claude-code-plugins /tmp/claude-code-plugins && cp -r /tmp/claude-code-plugins/plugins/data-architecture/skills/migration-planning ~/.claude/skills/claude-code-plugins

// tip: Run this command in your terminal to install the skill


name: migration-planning description: Plan ETL/ELT pipelines, data migrations, change data capture, and rollback strategies. allowed-tools: Read, Write, Glob, Grep, Task

Migration Planning

When to Use This Skill

Use this skill when:

  • Migration Planning tasks - Working on plan etl/elt pipelines, data migrations, change data capture, and rollback strategies
  • Planning or design - Need guidance on Migration Planning approaches
  • Best practices - Want to follow established patterns and standards

Overview

Migration planning defines how data moves between systems, including initial loads, incremental updates, transformations, and recovery strategies.

ETL vs ELT

Pattern Comparison

AspectETLELT
Transform LocationStaging serverTarget database
Best ForLimited target capacityCloud data warehouses
Data VolumeSmall to mediumLarge scale
LatencyHigherLower
FlexibilityDefined upfrontSchema-on-read
ExamplesSSIS, Informaticadbt, Snowflake, Databricks

Architecture Patterns

ETL PATTERN
┌─────────┐    ┌──────────────────┐    ┌──────────┐
│ Source  │───►│   ETL Server     │───►│  Target  │
│ Systems │    │ Extract→Transform│    │    DW    │
└─────────┘    │ →Load            │    └──────────┘
               └──────────────────┘

ELT PATTERN
┌─────────┐    ┌──────────┐    ┌────────────────────┐
│ Source  │───►│  Staging │───►│    Target DW       │
│ Systems │    │  (Raw)   │    │ Transform in place │
└─────────┘    └──────────┘    └────────────────────┘

Load Strategies

Full vs Incremental

StrategyDescriptionUse Case
Full LoadComplete refreshSmall tables, dimension tables
IncrementalOnly changesLarge tables, transaction data
CDCReal-time changesNear real-time requirements
SnapshotPoint-in-timeAudit, historical comparison

Incremental Load Approaches

ApproachDetection MethodProsCons
Timestampmodified_at columnSimpleClock skew, deletes missed
RowversionDatabase versionAccurateSQL Server specific
Trigger-basedAudit tablesAll changesPerformance overhead
Log-based CDCTransaction logReal-timeComplex setup
Hash comparisonRow hashDatabase agnosticProcessing overhead

Migration Plan Template

# Data Migration Plan: CRM to Data Warehouse

## Executive Summary
- Source: Salesforce CRM
- Target: Azure Synapse Analytics
- Data Volume: 50M records initial, 100K daily incremental
- Timeline: 6 weeks

## Scope

### In Scope
| Object | Records | Load Type | Priority |
|--------|---------|-----------|----------|
| Accounts | 5M | Full then CDC | P1 |
| Contacts | 20M | Full then CDC | P1 |
| Opportunities | 15M | Full then Incremental | P1 |
| Activities | 10M | Incremental only | P2 |

### Out of Scope
- Attachments/Files
- Custom objects (phase 2)

## Migration Approach

### Phase 1: Initial Load
1. Extract full data to Azure Data Lake (Parquet)
2. Apply transformations via Synapse Spark pools
3. Load to staging tables
4. Validate record counts and checksums
5. Promote to production tables

### Phase 2: Incremental Sync
1. Enable CDC on source
2. Stream changes via Azure Data Factory
3. Apply upsert logic to target
4. Run hourly (can reduce to 15 min)

## Data Mapping
See Appendix A for full source-to-target mapping

## Validation Strategy
- Record count comparison
- Checksum validation on key columns
- Business rule validation queries
- Sample record verification

## Rollback Plan
1. Preserve backup of target tables before migration
2. Maintain ability to revert to backup for 7 days
3. If issues found, truncate and restore from backup

Change Data Capture (CDC)

CDC Implementation Options

OptionTechnologyLatencyComplexity
SQL Server CDCBuilt-in featureSecondsLow
DebeziumKafka ConnectSub-secondMedium
AWS DMSManaged serviceSecondsLow
Azure Data FactoryManaged serviceMinutesLow

SQL Server CDC Setup

-- Enable CDC on database
EXEC sys.sp_cdc_enable_db;

-- Enable CDC on table
EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name = N'Customers',
    @role_name = NULL,
    @supports_net_changes = 1;

-- Query changes
DECLARE @from_lsn binary(10) = sys.fn_cdc_get_min_lsn('dbo_Customers');
DECLARE @to_lsn binary(10) = sys.fn_cdc_get_max_lsn();

SELECT
    __$operation,  -- 1=Delete, 2=Insert, 3=Before Update, 4=After Update
    customer_id,
    customer_name,
    email,
    __$start_lsn
FROM cdc.fn_cdc_get_net_changes_dbo_Customers(@from_lsn, @to_lsn, 'all')
ORDER BY __$start_lsn;

C# CDC Consumer

public class CdcProcessor
{
    private readonly IDbConnection _connection;
    private readonly IMessagePublisher _publisher;

    public async Task ProcessChanges(
        string captureInstance,
        byte[] fromLsn,
        CancellationToken ct)
    {
        var toLsn = await GetMaxLsn(ct);

        var changes = await GetNetChanges(captureInstance, fromLsn, toLsn, ct);

        foreach (var change in changes)
        {
            var operation = change.__$operation switch
            {
                1 => ChangeOperation.Delete,
                2 => ChangeOperation.Insert,
                4 => ChangeOperation.Update,
                _ => ChangeOperation.Unknown
            };

            await _publisher.PublishAsync(new ChangeEvent
            {
                Operation = operation,
                EntityType = captureInstance,
                EntityId = change.customer_id,
                Data = change,
                Timestamp = DateTime.UtcNow
            }, ct);
        }

        await SaveCheckpoint(captureInstance, toLsn, ct);
    }
}

Bulk Load Patterns

SQL Server Bulk Insert

public class BulkLoader
{
    public async Task BulkInsert<T>(
        IEnumerable<T> records,
        string tableName,
        SqlConnection connection,
        CancellationToken ct)
    {
        using var bulkCopy = new SqlBulkCopy(connection)
        {
            DestinationTableName = tableName,
            BatchSize = 10000,
            BulkCopyTimeout = 600
        };

        // Map properties to columns
        var properties = typeof(T).GetProperties();
        foreach (var prop in properties)
        {
            bulkCopy.ColumnMappings.Add(prop.Name, prop.Name);
        }

        using var reader = new ObjectDataReader<T>(records);
        await bulkCopy.WriteToServerAsync(reader, ct);
    }
}

// Using with EF Core Bulk Extensions
await _context.BulkInsertAsync(customers, options =>
{
    options.BatchSize = 10000;
    options.SetOutputIdentity = true;
    options.UseTempDB = true;
});

Pipeline Orchestration

Pipeline Definition Template

# Pipeline: Daily Customer Sync

## Schedule
- Frequency: Daily at 02:00 UTC
- Timeout: 2 hours
- Retry: 3 attempts with exponential backoff

## Steps

### Step 1: Extract
- Source: CRM API
- Method: Incremental (last 24 hours)
- Output: customers_extract_{date}.parquet

### Step 2: Validate Extract
- Check record count > 0
- Validate schema matches expected
- Log statistics

### Step 3: Transform
- Apply data quality rules
- Standardize address format
- Lookup reference data

### Step 4: Stage
- Load to staging.stg_customers
- Truncate and reload

### Step 5: Merge
- Upsert to dbo.dim_customer (SCD Type 2)
- Track new, updated, unchanged counts

### Step 6: Validate Load
- Compare source vs target counts
- Run quality checks
- Alert if discrepancies

## Dependencies
- Ref data pipeline (must complete first)
- DW infrastructure available

## Notifications
- Success: Slack #data-ops
- Failure: PagerDuty, Email to data-team@

Azure Data Factory Pipeline

{
  "name": "CustomerSync",
  "properties": {
    "activities": [
      {
        "name": "ExtractFromCRM",
        "type": "Copy",
        "inputs": [{ "referenceName": "CRM_Customer", "type": "DatasetReference" }],
        "outputs": [{ "referenceName": "ADLS_CustomerRaw", "type": "DatasetReference" }]
      },
      {
        "name": "TransformWithDataFlow",
        "type": "ExecuteDataFlow",
        "dependsOn": [{ "activity": "ExtractFromCRM", "dependencyConditions": ["Succeeded"] }],
        "dataFlow": { "referenceName": "CustomerTransform", "type": "DataFlowReference" }
      },
      {
        "name": "LoadToSynapse",
        "type": "Copy",
        "dependsOn": [{ "activity": "TransformWithDataFlow", "dependencyConditions": ["Succeeded"] }],
        "inputs": [{ "referenceName": "ADLS_CustomerClean", "type": "DatasetReference" }],
        "outputs": [{ "referenceName": "Synapse_DimCustomer", "type": "DatasetReference" }]
      }
    ]
  }
}

Rollback Strategies

Rollback Approaches

StrategyRecovery TimeData Loss RiskComplexity
Backup/RestoreHoursLowLow
Point-in-timeMinutesNoneMedium
Dual-writeSecondsNoneHigh
Temporal tablesImmediateNoneMedium

Rollback Implementation

-- Pre-migration backup
SELECT * INTO dbo.Customers_Backup_20241220
FROM dbo.Customers;

-- Create temporal table for automatic history
ALTER TABLE dbo.Customers
ADD
    ValidFrom DATETIME2 GENERATED ALWAYS AS ROW START DEFAULT SYSUTCDATETIME(),
    ValidTo DATETIME2 GENERATED ALWAYS AS ROW END DEFAULT CAST('9999-12-31 23:59:59' AS DATETIME2),
    PERIOD FOR SYSTEM_TIME (ValidFrom, ValidTo);

ALTER TABLE dbo.Customers
SET (SYSTEM_VERSIONING = ON (HISTORY_TABLE = dbo.Customers_History));

-- Rollback to point in time
SELECT * FROM dbo.Customers
FOR SYSTEM_TIME AS OF '2024-12-20 01:00:00';

Validation Framework

Validation Checks

public class MigrationValidator
{
    public async Task<ValidationResult> Validate(MigrationContext context, CancellationToken ct)
    {
        var results = new List<ValidationCheck>();

        // Row count comparison
        var sourceCount = await context.Source.CountAsync(ct);
        var targetCount = await context.Target.CountAsync(ct);
        results.Add(new ValidationCheck
        {
            Name = "RowCount",
            Passed = sourceCount == targetCount,
            SourceValue = sourceCount.ToString(),
            TargetValue = targetCount.ToString()
        });

        // Checksum validation
        var sourceChecksum = await CalculateChecksum(context.Source, context.KeyColumns, ct);
        var targetChecksum = await CalculateChecksum(context.Target, context.KeyColumns, ct);
        results.Add(new ValidationCheck
        {
            Name = "Checksum",
            Passed = sourceChecksum == targetChecksum,
            SourceValue = sourceChecksum,
            TargetValue = targetChecksum
        });

        // Sample record validation
        var sampleResults = await ValidateSampleRecords(context, 100, ct);
        results.AddRange(sampleResults);

        return new ValidationResult
        {
            AllPassed = results.All(r => r.Passed),
            Checks = results
        };
    }
}

Validation Checklist

  • Source and target systems identified
  • Data volumes estimated
  • Load strategy selected (full, incremental, CDC)
  • Source-to-target mappings documented
  • Transformation rules defined
  • Validation approach planned
  • Rollback strategy defined
  • Performance testing planned
  • Cutover plan documented

Integration Points

Inputs from:

  • er-modeling skill → Source schema
  • schema-design skill → Target schema
  • data-lineage skill → Transformation specs

Outputs to:

  • ETL/ELT tools → Pipeline configurations
  • data-quality-planning skill → Validation rules
  • Operations → Monitoring and alerting

Repository

melodic-software
melodic-software
Author
melodic-software/claude-code-plugins/plugins/data-architecture/skills/migration-planning
3
Stars
0
Forks
Updated2d ago
Added1w ago