Data pipeline visualization
Architecture / Dec 14, 2025 / 18 min read

Building an Event-Driven Pipeline Without Losing Your Mind

How I restructured a monolithic data processor into an event-driven system using Node.js streams, Redis Pub/Sub, and a willingness to throw away code that was not working.

Begin reading
Chapter 01

The monolith was not scaling, and neither was my patience.

It started, as these things always do, with a single file. processor.js was 2,400 lines of sequential logic that ingested events from an API, transformed them, deduplicated them against a PostgreSQL table, enriched them with user context, and wrote them back to the database. All synchronously. All in one process.

For a year, it worked fine. Our event volume was modest -- maybe 10,000 events per hour -- and the VPS had enough headroom to handle the load with room to spare. But then a new client onboarded with 50x the volume, and the whole thing fell apart in about twelve minutes.

The symptoms were textbook: memory usage spiking to 95%, response times climbing from milliseconds to seconds, and eventually the dreaded FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed that tells you Node.js has given up trying to garbage collect its way out of your mess.

"The first step to fixing a system is admitting that you built it wrong in the first place."

Server infrastructure The old single-process architecture, visualized as a traffic jam.

The core problem was not the code quality. Each individual function was well-tested and did its job. The problem was coupling. Every stage of the pipeline was tightly bound to the next. If the enrichment step slowed down, everything upstream backed up. There was no buffering, no backpressure, no way to scale individual stages independently.

I needed to decompose this monolith into independent stages that could be scaled, monitored, and failed independently. I needed an event-driven architecture.

Chapter 02

Drawing the pipeline on a whiteboard before writing a single line.

The shape of the new system

Before touching the keyboard, I spent two days sketching. The old system had five implicit stages. The new system would make them explicit, connected by Redis streams acting as durable message queues.

Each stage would be a standalone Node.js process that reads from one stream and writes to another. This gives us three immediate benefits:

pipeline.js
import { createStage } from './lib/stage.js';
import { validate } from './stages/validate.js';
import { enrich } from './stages/enrich.js';
import { deduplicate } from './stages/dedup.js';
import { persist } from './stages/persist.js';

// Each stage reads from one stream,
// processes the event, writes to the next.
const stages = [
  createStage('validate', {
    input:  'events:raw',
    output: 'events:validated',
    handler: validate,
  }),
  createStage('enrich', {
    input:  'events:validated',
    output: 'events:enriched',
    handler: enrich,
    concurrency: 4,
  }),
  createStage('deduplicate', {
    input:  'events:enriched',
    output: 'events:unique',
    handler: deduplicate,
  }),
  createStage('persist', {
    input:  'events:unique',
    output: null, // terminal stage
    handler: persist,
    batchSize: 100,
  }),
];

// Start all stages in parallel
await Promise.all(
  stages.map(s => s.start())
);

Clean imports, clean boundaries

Each stage is defined in its own file with a single export. The createStage factory wraps the handler with Redis stream reading, error handling, metrics collection, and graceful shutdown logic. The handler itself knows nothing about Redis.

Declarative stage definitions

The first stage reads from events:raw (populated by our API ingestion layer) and writes validated events to events:validated. The validation handler receives a raw event, validates it against a JSON schema, and returns either the validated event or throws.

This declarative approach means you can understand the entire pipeline topology just by reading this one file. No hidden pub/sub subscriptions, no magic.

Concurrency where it counts

The enrichment stage calls external APIs (user service, geo-IP lookup) and is inherently IO-bound. Setting concurrency: 4 lets it process four events simultaneously within a single worker process, dramatically improving throughput without additional infrastructure.

Deduplication with time windows

The dedup stage uses a Redis sorted set with event hashes and timestamps. Events seen within a configurable window (default: 5 minutes) are silently dropped. This catches both exact duplicates and near-duplicates caused by client retries.

Batch persistence

The persist stage is the terminal stage -- output: null means it does not write to another stream. Instead, it batches events and uses PostgreSQL's COPY protocol for bulk inserts. A batch size of 100 gives us the sweet spot between latency and throughput.

Parallel startup

All stages start concurrently. Each one creates its consumer group (if it does not already exist), registers itself, and begins polling for messages. If any stage fails to start, the promise rejects and we abort the entire process cleanly.

Data flow visualization Events flowing through the pipeline stages.
Chapter 04

Making it survive the real world.

Graceful shutdown

The most important code in any long-running process is the code that runs when it stops. In a containerized environment, you get a SIGTERM signal and a configurable grace period (usually 30 seconds) before a SIGKILL. Every in-flight event must be processed or returned to the stream before that deadline.

lib/stage.js — shutdown handler
function createShutdownHandler(stage) {
  let shuttingDown = false;

  return async () => {
    if (shuttingDown) return;
    shuttingDown = true;

    console.log(`[${stage.name}] Shutting down gracefully...`);

    // Stop accepting new messages
    await stage.stopPolling();

    // Wait for in-flight events (with timeout)
    const timeout = setTimeout(() => {
      console.error(`[${stage.name}] Forced shutdown after 25s`);
      process.exit(1);
    }, 25000);

    await stage.drainInFlight();
    clearTimeout(timeout);

    // Acknowledge processed, return unprocessed
    await stage.finalizeAcks();

    console.log(`[${stage.name}] Clean shutdown complete.`);
    process.exit(0);
  };
}

The key insight here is the two-phase shutdown: first we stop accepting new messages, then we drain everything that is already in flight. The 25-second timeout is 5 seconds less than the typical Kubernetes termination grace period, giving us a buffer before the hard kill.

Warning

Never call process.exit(0) without draining first. Unacknowledged messages in a Redis consumer group will be retried by other consumers, potentially causing duplicate processing if the event was partially handled.

Dead letter queues

Events that fail processing after three retries are moved to a dead letter stream (events:dead) with the original event, the error message, and a timestamp. A separate monitoring process watches this stream and sends alerts to Slack.

Health checks and metrics

Each stage exposes a tiny HTTP server on a unique port with two endpoints: /health returns a 200 if the stage is running and connected to Redis, and /metrics returns Prometheus-formatted counters for events processed, errors, and current in-flight count. These plug directly into our existing Grafana dashboards.

Tip

Redis Streams' XPENDING command is invaluable for debugging. It shows you exactly which events are in-flight, which consumer has them, and how long they have been processing. I check this first whenever something looks stuck.

The results

After two weeks of refactoring and a weekend of load testing, the new pipeline handled 500,000 events per hour on the same hardware that choked at 50,000. Memory usage stayed flat at 180MB per worker. The 99th percentile end-to-end latency dropped from 12 seconds to 340 milliseconds.

More importantly, when the enrichment API went down for twenty minutes during a Tuesday morning deploy, the pipeline did not crash. Messages buffered in Redis, enrichment workers retried with exponential backoff, and when the API came back, the backlog cleared in under three minutes. Nobody got paged. Nobody even noticed.

Result

10x throughput improvement, sub-second latency, and zero data loss during upstream failures. All on the same VPS, with the same monthly bill.

The best architecture is the one
you actually understand.

Event-driven systems are not magic. They are a set of trade-offs that happen to work really well when your problem is "too many things happening at once." The key is keeping each piece small enough to fit in your head.