Marketplace

streams

Master Node.js streams for memory-efficient processing of large datasets, real-time data handling, and building data pipelines

$ 安裝

git clone https://github.com/pluginagentmarketplace/custom-plugin-nodejs /tmp/custom-plugin-nodejs && cp -r /tmp/custom-plugin-nodejs/skills/streams ~/.claude/skills/custom-plugin-nodejs

// tip: Run this command in your terminal to install the skill


name: streams description: Master Node.js streams for memory-efficient processing of large datasets, real-time data handling, and building data pipelines version: "2.1.0" sasmp_version: "1.3.0" bonded_agent: 03-async-programming bond_type: PRIMARY_BOND

Node.js Streams Skill

Master streams for memory-efficient processing of large files, real-time data, and building composable data pipelines.

Quick Start

Streams in 4 types:

  1. Readable - Source of data (file, HTTP request)
  2. Writable - Destination (file, HTTP response)
  3. Transform - Modify data in transit
  4. Duplex - Both readable and writable

Core Concepts

Readable Stream

const fs = require('fs');

// Create readable stream
const readStream = fs.createReadStream('large-file.txt', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 64KB chunks
});

// Event-based consumption
readStream.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes`);
});

readStream.on('end', () => {
  console.log('Finished reading');
});

readStream.on('error', (err) => {
  console.error('Read error:', err);
});

Writable Stream

const writeStream = fs.createWriteStream('output.txt');

// Write data
writeStream.write('Hello, ');
writeStream.write('World!\n');
writeStream.end(); // Signal end

// Handle backpressure
const ok = writeStream.write(data);
if (!ok) {
  // Wait for drain event before writing more
  writeStream.once('drain', () => {
    continueWriting();
  });
}

Transform Stream

const { Transform } = require('stream');

// Custom transform: uppercase text
const upperCase = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

// Usage
fs.createReadStream('input.txt')
  .pipe(upperCase)
  .pipe(fs.createWriteStream('output.txt'));

Learning Path

Beginner (1-2 weeks)

  • ✅ Understand stream types
  • ✅ Read/write file streams
  • ✅ Basic pipe operations
  • ✅ Handle stream events

Intermediate (3-4 weeks)

  • ✅ Transform streams
  • ✅ Backpressure handling
  • ✅ Object mode streams
  • ✅ Pipeline utility

Advanced (5-6 weeks)

  • ✅ Custom stream implementation
  • ✅ Async iterators
  • ✅ Web Streams API
  • ✅ Performance optimization

Pipeline (Recommended)

const { pipeline } = require('stream/promises');
const zlib = require('zlib');

// Compose streams with error handling
async function compressFile(input, output) {
  await pipeline(
    fs.createReadStream(input),
    zlib.createGzip(),
    fs.createWriteStream(output)
  );
  console.log('Compression complete');
}

// With transform
await pipeline(
  fs.createReadStream('data.csv'),
  csvParser(),
  transformRow(),
  jsonStringify(),
  fs.createWriteStream('data.json')
);

Pipeline with Error Handling

const { pipeline } = require('stream');

pipeline(
  source,
  transform1,
  transform2,
  destination,
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

HTTP Streaming

const http = require('http');
const fs = require('fs');

// Stream file as HTTP response
http.createServer((req, res) => {
  const filePath = './video.mp4';
  const stat = fs.statSync(filePath);

  res.writeHead(200, {
    'Content-Type': 'video/mp4',
    'Content-Length': stat.size
  });

  // Stream instead of loading entire file
  fs.createReadStream(filePath).pipe(res);
}).listen(3000);

// Stream HTTP request body
http.createServer((req, res) => {
  const writeStream = fs.createWriteStream('./upload.bin');
  req.pipe(writeStream);

  req.on('end', () => {
    res.end('Upload complete');
  });
}).listen(3001);

Object Mode Streams

const { Transform } = require('stream');

const jsonParser = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    try {
      const obj = JSON.parse(chunk);
      this.push(obj);
      callback();
    } catch (err) {
      callback(err);
    }
  }
});

// Process objects instead of buffers
const processRecords = new Transform({
  objectMode: true,
  transform(record, encoding, callback) {
    record.processed = true;
    record.timestamp = Date.now();
    this.push(record);
    callback();
  }
});

Async Iterators

const { Readable } = require('stream');

// Create from async iterator
async function* generateData() {
  for (let i = 0; i < 100; i++) {
    yield { id: i, data: `item-${i}` };
  }
}

const stream = Readable.from(generateData(), { objectMode: true });

// Consume with for-await
async function processStream(readable) {
  for await (const chunk of readable) {
    console.log('Processing:', chunk);
  }
}

Backpressure Handling

const readable = fs.createReadStream('huge-file.txt');
const writable = fs.createWriteStream('output.txt');

readable.on('data', (chunk) => {
  // Check if writable can accept more data
  const canContinue = writable.write(chunk);

  if (!canContinue) {
    // Pause reading until writable is ready
    readable.pause();
    writable.once('drain', () => {
      readable.resume();
    });
  }
});

// Or use pipeline (handles automatically)
pipeline(readable, writable, (err) => {
  if (err) console.error('Error:', err);
});

Custom Readable Stream

const { Readable } = require('stream');

class DatabaseStream extends Readable {
  constructor(query, options) {
    super({ ...options, objectMode: true });
    this.query = query;
    this.cursor = null;
  }

  async _read() {
    if (!this.cursor) {
      this.cursor = await db.collection('items').find(this.query).cursor();
    }

    const doc = await this.cursor.next();
    if (doc) {
      this.push(doc);
    } else {
      this.push(null); // Signal end
    }
  }
}

// Usage
const dbStream = new DatabaseStream({ status: 'active' });
for await (const item of dbStream) {
  console.log(item);
}

Unit Test Template

const { Readable, Transform } = require('stream');
const { pipeline } = require('stream/promises');

describe('Stream Processing', () => {
  it('should transform data correctly', async () => {
    const input = Readable.from(['hello', 'world']);
    const chunks = [];

    const upperCase = new Transform({
      transform(chunk, enc, cb) {
        this.push(chunk.toString().toUpperCase());
        cb();
      }
    });

    await pipeline(
      input,
      upperCase,
      async function* (source) {
        for await (const chunk of source) {
          chunks.push(chunk.toString());
        }
      }
    );

    expect(chunks).toEqual(['HELLO', 'WORLD']);
  });
});

Troubleshooting

ProblemCauseSolution
Memory grows infinitelyNo backpressureUse pipeline or handle drain
Data lossErrors not caughtUse pipeline with error callback
Slow processingSmall chunk sizeIncrease highWaterMark
Stream hangsMissing end() callCall writable.end()

When to Use

Use streams when:

  • Processing large files (GB+)
  • Real-time data processing
  • Memory-constrained environments
  • Building data pipelines
  • HTTP request/response handling

Related Skills

  • Async Programming (async patterns)
  • Performance Optimization (memory efficiency)
  • Express REST API (streaming responses)

Resources