How I restructured a monolithic data processor into an event-driven system using Node.js streams, Redis Pub/Sub, and a willingness to throw away code that was not working.
It started, as these things always do, with a single file. processor.js was 2,400 lines
of sequential logic that ingested events from an API, transformed them, deduplicated them against
a PostgreSQL table, enriched them with user context, and wrote them back to the database. All
synchronously. All in one process.
For a year, it worked fine. Our event volume was modest -- maybe 10,000 events per hour -- and the VPS had enough headroom to handle the load with room to spare. But then a new client onboarded with 50x the volume, and the whole thing fell apart in about twelve minutes.
The symptoms were textbook: memory usage spiking to 95%, response times climbing from milliseconds
to seconds, and eventually the dreaded FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed
that tells you Node.js has given up trying to garbage collect its way out of your mess.
"The first step to fixing a system is admitting that you built it wrong in the first place."
The core problem was not the code quality. Each individual function was well-tested and did its job. The problem was coupling. Every stage of the pipeline was tightly bound to the next. If the enrichment step slowed down, everything upstream backed up. There was no buffering, no backpressure, no way to scale individual stages independently.
I needed to decompose this monolith into independent stages that could be scaled, monitored, and failed independently. I needed an event-driven architecture.
Before touching the keyboard, I spent two days sketching. The old system had five implicit stages. The new system would make them explicit, connected by Redis streams acting as durable message queues.
Each stage would be a standalone Node.js process that reads from one stream and writes to another. This gives us three immediate benefits:
import { createStage } from './lib/stage.js'; import { validate } from './stages/validate.js'; import { enrich } from './stages/enrich.js'; import { deduplicate } from './stages/dedup.js'; import { persist } from './stages/persist.js'; // Each stage reads from one stream, // processes the event, writes to the next. const stages = [ createStage('validate', { input: 'events:raw', output: 'events:validated', handler: validate, }), createStage('enrich', { input: 'events:validated', output: 'events:enriched', handler: enrich, concurrency: 4, }), createStage('deduplicate', { input: 'events:enriched', output: 'events:unique', handler: deduplicate, }), createStage('persist', { input: 'events:unique', output: null, // terminal stage handler: persist, batchSize: 100, }), ]; // Start all stages in parallel await Promise.all( stages.map(s => s.start()) );
Each stage is defined in its own file with a single export. The createStage
factory wraps the handler with Redis stream reading, error handling, metrics collection,
and graceful shutdown logic. The handler itself knows nothing about Redis.
The first stage reads from events:raw (populated by our API ingestion layer)
and writes validated events to events:validated. The validation handler receives
a raw event, validates it against a JSON schema, and returns either the validated event or throws.
This declarative approach means you can understand the entire pipeline topology just by reading this one file. No hidden pub/sub subscriptions, no magic.
The enrichment stage calls external APIs (user service, geo-IP lookup) and is inherently
IO-bound. Setting concurrency: 4 lets it process four events simultaneously
within a single worker process, dramatically improving throughput without additional infrastructure.
The dedup stage uses a Redis sorted set with event hashes and timestamps. Events seen within a configurable window (default: 5 minutes) are silently dropped. This catches both exact duplicates and near-duplicates caused by client retries.
The persist stage is the terminal stage -- output: null means it does not
write to another stream. Instead, it batches events and uses PostgreSQL's
COPY protocol for bulk inserts. A batch size of 100 gives us the sweet
spot between latency and throughput.
All stages start concurrently. Each one creates its consumer group (if it does not already exist), registers itself, and begins polling for messages. If any stage fails to start, the promise rejects and we abort the entire process cleanly.
The most important code in any long-running process is the code that runs when it stops. In a containerized environment, you get a SIGTERM signal and a configurable grace period (usually 30 seconds) before a SIGKILL. Every in-flight event must be processed or returned to the stream before that deadline.
function createShutdownHandler(stage) { let shuttingDown = false; return async () => { if (shuttingDown) return; shuttingDown = true; console.log(`[${stage.name}] Shutting down gracefully...`); // Stop accepting new messages await stage.stopPolling(); // Wait for in-flight events (with timeout) const timeout = setTimeout(() => { console.error(`[${stage.name}] Forced shutdown after 25s`); process.exit(1); }, 25000); await stage.drainInFlight(); clearTimeout(timeout); // Acknowledge processed, return unprocessed await stage.finalizeAcks(); console.log(`[${stage.name}] Clean shutdown complete.`); process.exit(0); }; }
The key insight here is the two-phase shutdown: first we stop accepting new messages, then we drain everything that is already in flight. The 25-second timeout is 5 seconds less than the typical Kubernetes termination grace period, giving us a buffer before the hard kill.
Never call process.exit(0) without draining first. Unacknowledged messages in a
Redis consumer group will be retried by other consumers, potentially causing duplicate processing
if the event was partially handled.
Events that fail processing after three retries are moved to a dead letter stream
(events:dead) with the original event, the error message, and a timestamp.
A separate monitoring process watches this stream and sends alerts to Slack.
Each stage exposes a tiny HTTP server on a unique port with two endpoints:
/health returns a 200 if the stage is running and connected to Redis,
and /metrics returns Prometheus-formatted counters for events processed,
errors, and current in-flight count. These plug directly into our existing Grafana dashboards.
Redis Streams' XPENDING command is invaluable for debugging. It shows you
exactly which events are in-flight, which consumer has them, and how long they have been
processing. I check this first whenever something looks stuck.
After two weeks of refactoring and a weekend of load testing, the new pipeline handled 500,000 events per hour on the same hardware that choked at 50,000. Memory usage stayed flat at 180MB per worker. The 99th percentile end-to-end latency dropped from 12 seconds to 340 milliseconds.
More importantly, when the enrichment API went down for twenty minutes during a Tuesday morning deploy, the pipeline did not crash. Messages buffered in Redis, enrichment workers retried with exponential backoff, and when the API came back, the backlog cleared in under three minutes. Nobody got paged. Nobody even noticed.
10x throughput improvement, sub-second latency, and zero data loss during upstream failures. All on the same VPS, with the same monthly bill.
Event-driven systems are not magic. They are a set of trade-offs that happen to work really well when your problem is "too many things happening at once." The key is keeping each piece small enough to fit in your head.