Messaging

Why Messaging Systems Matter

Messaging systems enable asynchronous communication between services, decouple producers from consumers, and provide reliable message delivery with persistence, retry, and dead-letter handling. Production messaging requires connection management, graceful shutdown, error handling, and monitoring to prevent message loss and system overload.

Core Benefits:

  • Asynchronous processing: Offload time-consuming tasks (email, image processing)
  • Decoupling: Producers don’t wait for consumers (independent scaling)
  • Load leveling: Queue absorbs traffic spikes (prevents system overload)
  • Reliability: Message persistence survives crashes
  • Scalability: Distribute work across multiple consumers
  • Event-driven architecture: Services react to events (loosely coupled)

Problem: Direct HTTP calls block threads, tight coupling prevents independent deployment, and failures cascade across services.

Solution: Use message brokers (RabbitMQ, Kafka) and job queues (Bull) that provide durable message storage, automatic retry, dead-letter queues, and at-least-once delivery guarantees.

Standard Library First: EventEmitter

Node.js provides EventEmitter for in-process pub/sub without external dependencies.

Basic Event-Driven Pattern

EventEmitter enables communication between modules via events.

Pattern:

import { EventEmitter } from "events";
// => Built-in events module
// => In-process event bus

const eventBus = new EventEmitter();
// => Create event emitter instance
// => Central event bus for application

// Event listener (subscriber)
eventBus.on("user.created", (user) => {
  // => Register event listener
  // => Triggered when "user.created" event emitted
  console.log(`User created: ${user.name}`);
  // => Handle event (e.g., send welcome email)
});

eventBus.on("user.created", (user) => {
  // => Multiple listeners for same event
  // => All listeners triggered
  console.log(`Analytics: New user ${user.id}`);
  // => Track user signup in analytics
});

// Event emitter (publisher)
function createUser(name: string, email: string) {
  // => Business logic
  const user = {
    id: Math.random(),
    name,
    email,
    createdAt: new Date(),
  };

  // Save user to database...

  eventBus.emit("user.created", user);
  // => Emit event with user data
  // => All listeners notified immediately
  // => Synchronous execution (blocks until all listeners complete)

  return user;
}

// Usage
const user = createUser("Alice", "alice@example.com");
// => Triggers both listeners
// => Output:
// => User created: Alice
// => Analytics: New user 0.123456

// Error handling
eventBus.on("error", (error) => {
  // => Special "error" event
  // => Catches errors from listeners
  console.error("Event error:", error);
});

eventBus.on("user.created", (user) => {
  // => Listener that throws error
  throw new Error("Listener failed");
});

try {
  createUser("Bob", "bob@example.com");
  // => Error thrown during listener execution
} catch (error) {
  // => Error propagates to caller
  console.error("User creation failed:", error);
}

// Remove listener
const listener = (user: any) => {
  console.log(`User: ${user.name}`);
};

eventBus.on("user.created", listener);
// => Register listener

eventBus.off("user.created", listener);
// => Remove specific listener
// => Must be same function reference

eventBus.removeAllListeners("user.created");
// => Remove all listeners for event

// Once listener (triggered only once)
eventBus.once("app.started", () => {
  // => Listener removed after first trigger
  console.log("Application started");
});

eventBus.emit("app.started");
// => Triggers listener
eventBus.emit("app.started");
// => Does nothing (listener already removed)

Typed EventEmitter

Add TypeScript types for event safety.

Pattern:

import { EventEmitter } from "events";

// Define event types
interface Events {
  // => Map event names to payload types
  "user.created": { id: number; name: string; email: string };
  "user.updated": { id: number; updates: Partial<{ name: string; email: string }> };
  "user.deleted": { id: number };
  "order.placed": { orderId: string; userId: number; total: number };
}

class TypedEventEmitter extends EventEmitter {
  // => Type-safe EventEmitter wrapper

  on<K extends keyof Events>(event: K, listener: (payload: Events[K]) => void): this {
    // => Type-safe listener registration
    // => K: Event name (must be key of Events)
    // => payload: Type inferred from Events[K]
    return super.on(event, listener);
  }

  emit<K extends keyof Events>(event: K, payload: Events[K]): boolean {
    // => Type-safe event emission
    // => Ensures payload matches event type
    return super.emit(event, payload);
  }
}

const eventBus = new TypedEventEmitter();

// Type-safe listener
eventBus.on("user.created", (user) => {
  // => user type inferred as Events["user.created"]
  // => TypeScript knows user has id, name, email
  console.log(user.name);
  // => No type assertion needed
});

// Type-safe emitter
eventBus.emit("user.created", {
  id: 123,
  name: "Alice",
  email: "alice@example.com",
});
// => TypeScript validates payload structure

// Type error caught at compile time
/*
eventBus.emit("user.created", {
  id: 123,
  // name: missing (TypeScript error)
});
*/

Limitations for production:

  • In-process only: Events don’t cross process boundaries
  • Synchronous execution: Listeners block event emission
  • No persistence: Events lost on application crash
  • No delivery guarantee: If listener fails, event lost
  • No retry logic: Failed listeners not retried automatically
  • No distributed messaging: Cannot send events to other servers
  • Memory-based: Events held in memory (lost on restart)
  • No message ordering: Multiple emitters can interleave events

When standard library suffices:

  • Single-process applications (no microservices)
  • Simple event notification (in-app events only)
  • Learning event-driven patterns
  • Minimal external dependencies required

Production Framework: RabbitMQ with amqplib

RabbitMQ provides distributed message broker with queues, exchanges, and routing.

Installation and Setup

npm install amqplib
# => Install AMQP client for RabbitMQ
# => Low-level library (manual connection management)

# Run RabbitMQ (Docker)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# => RabbitMQ server on port 5672
# => Management UI on http://localhost:15672 (guest/guest)

Basic queue pattern:

import amqp from "amqplib";
// => Import amqplib library

const RABBITMQ_URL = process.env.RABBITMQ_URL || "amqp://localhost";
// => Connection string
// => Format: amqp://username:password@host:port

async function publishMessage(queue: string, message: any) {
  // => Publish message to queue
  const connection = await amqp.connect(RABBITMQ_URL);
  // => Connect to RabbitMQ server
  // => Creates TCP connection

  const channel = await connection.createChannel();
  // => Create channel (virtual connection)
  // => Channels are lightweight, multiple per connection

  await channel.assertQueue(queue, {
    // => Ensure queue exists
    // => Creates queue if doesn't exist
    durable: true,
    // => Queue survives broker restart
    // => Messages persisted to disk
  });

  channel.sendToQueue(
    queue,
    // => Queue name
    Buffer.from(JSON.stringify(message)),
    // => Message content (Buffer)
    // => JSON-encode object to string, then Buffer
    {
      persistent: true,
      // => Message survives broker restart
      // => Stored on disk
    },
  );
  // => Send message to queue
  // => Message stored in queue until consumed

  console.log(`Published message to ${queue}:`, message);

  await channel.close();
  // => Close channel
  await connection.close();
  // => Close connection
  // => CRITICAL: Always close connections
}

async function consumeMessages(queue: string, handler: (message: any) => Promise<void>) {
  // => Consume messages from queue
  // => handler: Async function to process message

  const connection = await amqp.connect(RABBITMQ_URL);
  const channel = await connection.createChannel();

  await channel.assertQueue(queue, { durable: true });

  await channel.prefetch(1);
  // => Prefetch count: Number of unacknowledged messages
  // => 1 = Process one message at a time
  // => Fair dispatch (load balancing across consumers)

  console.log(`Waiting for messages in ${queue}...`);

  channel.consume(
    queue,
    // => Queue to consume from
    async (msg) => {
      // => Message handler callback
      // => msg: Message object or null

      if (!msg) {
        // => Channel closed by broker
        return;
      }

      const content = JSON.parse(msg.content.toString());
      // => Parse message content
      // => Buffer → string → JSON object
      console.log("Received message:", content);

      try {
        await handler(content);
        // => Process message
        // => Async handler function

        channel.ack(msg);
        // => Acknowledge message
        // => Message removed from queue
        // => If not acked, message redelivered
      } catch (error) {
        console.error("Message processing failed:", error);
        channel.nack(msg, false, true);
        // => Negative acknowledgment
        // => First parameter: message
        // => Second parameter: multiple (false = this message only)
        // => Third parameter: requeue (true = return to queue)
        // => Message redelivered to another consumer
      }
    },
    {
      noAck: false,
      // => Manual acknowledgment required
      // => false = Must call ack() or nack()
      // => true = Auto-acknowledge (no delivery guarantee)
    },
  );
}

// Usage
await publishMessage("email-queue", {
  to: "alice@example.com",
  subject: "Welcome",
  body: "Hello Alice",
});
// => Message sent to queue

await consumeMessages("email-queue", async (message) => {
  // => Process email message
  console.log(`Sending email to ${message.to}`);
  // Send email via SMTP...
  await new Promise((resolve) => setTimeout(resolve, 1000));
  // => Simulate async email sending
  console.log("Email sent");
});
// => Consumer waits for messages

Pub/Sub Pattern with Exchanges

Exchanges route messages to multiple queues (fanout, topic, direct).

Pattern:

// Publisher
async function publishEvent(exchange: string, routingKey: string, event: any) {
  // => Publish event to exchange
  // => Exchange routes to queues based on routing key

  const connection = await amqp.connect(RABBITMQ_URL);
  const channel = await connection.createChannel();

  await channel.assertExchange(exchange, "topic", {
    // => Ensure exchange exists
    // => Type: "topic" (route by pattern)
    // => Other types: "fanout" (broadcast), "direct" (exact match)
    durable: true,
  });

  channel.publish(
    exchange,
    // => Exchange name
    routingKey,
    // => Routing key (e.g., "user.created", "order.shipped")
    Buffer.from(JSON.stringify(event)),
    // => Message content
    { persistent: true },
  );

  console.log(`Published event to ${exchange} (${routingKey}):`, event);

  await channel.close();
  await connection.close();
}

// Subscriber
async function subscribeToEvents(exchange: string, pattern: string, handler: (event: any) => Promise<void>) {
  // => Subscribe to events matching pattern
  // => pattern: Routing key pattern (e.g., "user.*", "order.#")

  const connection = await amqp.connect(RABBITMQ_URL);
  const channel = await connection.createChannel();

  await channel.assertExchange(exchange, "topic", { durable: true });

  const { queue } = await channel.assertQueue("", {
    // => Anonymous queue (random name)
    // => Deleted when consumer disconnects
    exclusive: true,
    // => Queue exclusive to this connection
  });

  await channel.bindQueue(queue, exchange, pattern);
  // => Bind queue to exchange with routing pattern
  // => Messages matching pattern routed to queue
  // => pattern: "user.*" matches "user.created", "user.updated"
  // => pattern: "order.#" matches "order.placed", "order.shipped.tracking"

  console.log(`Subscribed to ${exchange} with pattern ${pattern}`);

  channel.consume(
    queue,
    async (msg) => {
      if (!msg) return;

      const event = JSON.parse(msg.content.toString());
      console.log(`Received event (${msg.fields.routingKey}):`, event);

      try {
        await handler(event);
        channel.ack(msg);
      } catch (error) {
        console.error("Event processing failed:", error);
        channel.nack(msg, false, false);
        // => Don't requeue (event already processed elsewhere)
      }
    },
    { noAck: false },
  );
}

// Usage
await publishEvent("events", "user.created", {
  userId: 123,
  name: "Alice",
});
// => Publish event to "events" exchange with routing key "user.created"

await subscribeToEvents("events", "user.*", async (event) => {
  // => Subscribe to all user events (user.created, user.updated, user.deleted)
  console.log("User event:", event);
});

await subscribeToEvents("events", "user.created", async (event) => {
  // => Subscribe only to user.created events
  console.log("User created:", event);
});

Production benefits:

  • Distributed messaging: Cross-process, cross-server communication
  • Message persistence: Messages survive broker restarts
  • Delivery guarantee: At-least-once delivery with ack/nack
  • Dead-letter queues: Failed messages routed to error queue
  • Flexible routing: Exchange types (fanout, topic, direct, headers)
  • Load balancing: Multiple consumers share queue workload

Trade-offs:

  • Manual connection management: Must handle connections, channels
  • Reconnection logic: Must implement retry on connection loss
  • Complexity: AMQP protocol and RabbitMQ concepts (exchanges, bindings)

When to use RabbitMQ:

  • Microservices architecture (inter-service communication)
  • Asynchronous processing (email, image processing)
  • Event-driven systems (pub/sub patterns)
  • Workload distribution (multiple consumers)

Production Framework: amqp-connection-manager

amqp-connection-manager provides automatic reconnection and channel management for RabbitMQ.

Installation and Setup

npm install amqp-connection-manager
# => Install connection manager wrapper
# => Automatic reconnection on failure

Connection management:

import amqp from "amqp-connection-manager";
// => Import connection manager
// => Wraps amqplib with automatic reconnection

const connection = amqp.connect([RABBITMQ_URL]);
// => Create managed connection
// => Automatically reconnects on failure
// => Array of URLs for failover

connection.on("connect", () => {
  // => Connection established event
  console.log("Connected to RabbitMQ");
});

connection.on("disconnect", (err) => {
  // => Connection lost event
  console.error("Disconnected from RabbitMQ:", err.err);
});

connection.on("connectFailed", (err) => {
  // => Connection failed event
  console.error("Connection failed:", err.err);
  // => Will retry automatically
});

// Create channel wrapper
const channelWrapper = connection.createChannel({
  // => Create managed channel
  // => Automatically recreated on connection loss
  json: true,
  // => Automatically JSON-encode/decode messages
  // => No manual Buffer conversion
  setup: async (channel) => {
    // => Channel setup function
    // => Called on channel creation and reconnection
    await channel.assertQueue("tasks", { durable: true });
    // => Ensure queue exists
    // => Re-declared after reconnection
  },
});

// Publish message
await channelWrapper.sendToQueue("tasks", {
  // => Send message to queue
  // => Automatically JSON-encoded
  task: "send-email",
  to: "alice@example.com",
});
// => Message sent, automatic retry on failure

// Consume messages
await channelWrapper.addSetup(async (channel) => {
  // => Add setup function (multiple allowed)
  await channel.consume("tasks", async (msg) => {
    if (!msg) return;

    const task = msg.content;
    // => Already parsed (json: true)
    console.log("Processing task:", task);

    try {
      // Process task...
      await new Promise((resolve) => setTimeout(resolve, 1000));

      channel.ack(msg);
      // => Acknowledge message
    } catch (error) {
      console.error("Task failed:", error);
      channel.nack(msg, false, true);
      // => Requeue failed message
    }
  });
});

// Graceful shutdown
process.on("SIGINT", async () => {
  // => Handle Ctrl+C
  await channelWrapper.close();
  // => Close channel gracefully
  await connection.close();
  // => Close connection
  console.log("Connection closed");
  process.exit(0);
});

Production benefits:

  • Automatic reconnection: Handles connection failures automatically
  • Channel recovery: Re-creates channels after reconnection
  • Queue/exchange re-declaration: Re-declares on reconnection
  • JSON encoding: Automatic JSON serialization/deserialization
  • Simpler API: Less boilerplate than raw amqplib

Trade-offs:

  • External dependency: Additional library (amqplib wrapper)
  • Less control: Abstracts reconnection logic

When to use amqp-connection-manager:

  • Production RabbitMQ applications (always - handles reconnection)
  • Long-running consumers (workers, services)
  • Network unreliability (automatic retry)

Production Framework: Kafka with kafkajs

Kafka provides distributed event streaming platform for high-throughput messaging.

Installation and Setup

npm install kafkajs
# => Install Kafka client for Node.js

# Run Kafka (Docker)
docker run -d --name kafka -p 9092:9092 \
  -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
  confluentinc/cp-kafka
# => Kafka broker on port 9092
# => Requires Zookeeper for coordination

Basic producer/consumer:

import { Kafka } from "kafkajs";
// => Import kafkajs library

const kafka = new Kafka({
  // => Create Kafka client
  clientId: "my-app",
  // => Client identifier
  // => Used for logging and metrics
  brokers: ["localhost:9092"],
  // => Kafka broker addresses
  // => Multiple brokers for failover
});

// Producer
const producer = kafka.producer();
// => Create producer instance

await producer.connect();
// => Connect to Kafka cluster
console.log("Producer connected");

await producer.send({
  // => Send message to topic
  topic: "user-events",
  // => Topic name (created automatically)
  messages: [
    {
      key: "user-123",
      // => Message key (for partitioning)
      // => Same key always goes to same partition
      value: JSON.stringify({
        // => Message value (string or Buffer)
        userId: 123,
        event: "user.created",
        timestamp: Date.now(),
      }),
    },
  ],
});
console.log("Message sent to Kafka");

await producer.disconnect();
// => Disconnect producer

// Consumer
const consumer = kafka.consumer({
  // => Create consumer instance
  groupId: "my-consumer-group",
  // => Consumer group ID
  // => Consumers in same group share partitions
  // => Each message delivered to one consumer per group
});

await consumer.connect();
// => Connect to Kafka cluster
console.log("Consumer connected");

await consumer.subscribe({
  // => Subscribe to topics
  topic: "user-events",
  // => Topic to consume from
  fromBeginning: false,
  // => Start from latest messages
  // => true = Start from beginning of topic
});

await consumer.run({
  // => Start consuming messages
  eachMessage: async ({ topic, partition, message }) => {
    // => Message handler
    // => topic: Topic name
    // => partition: Partition number (0-N)
    // => message: Message object

    const event = JSON.parse(message.value?.toString() || "{}");
    // => Parse message value
    console.log(`Received message from ${topic} partition ${partition}:`, event);

    // Process message...
    // => Consumer automatically commits offset after processing
    // => Can configure manual commit for at-least-once delivery
  },
});

// Graceful shutdown
process.on("SIGINT", async () => {
  await consumer.disconnect();
  console.log("Consumer disconnected");
  process.exit(0);
});

Producer Patterns

Batch sending and partitioning strategies.

Pattern:

const producer = kafka.producer({
  // => Producer configuration
  allowAutoTopicCreation: true,
  // => Automatically create topics
  // => Production: Create topics manually with partitions/replication
  transactionalId: "my-transactional-producer",
  // => Enable transactions (exactly-once semantics)
  maxInFlightRequests: 5,
  // => Maximum concurrent requests
  idempotent: true,
  // => Enable idempotent producer
  // => Duplicate messages rejected
});

await producer.connect();

// Batch send
await producer.sendBatch({
  // => Send multiple messages efficiently
  topicMessages: [
    {
      topic: "user-events",
      messages: [
        { key: "user-123", value: JSON.stringify({ event: "created" }) },
        { key: "user-124", value: JSON.stringify({ event: "created" }) },
        { key: "user-125", value: JSON.stringify({ event: "created" }) },
      ],
    },
    {
      topic: "order-events",
      messages: [{ key: "order-1", value: JSON.stringify({ event: "placed" }) }],
    },
  ],
});
// => Send to multiple topics in one request

// Partitioning strategy
await producer.send({
  topic: "user-events",
  messages: [
    {
      key: "user-123",
      // => Key determines partition (hash-based)
      // => Same key always to same partition (ordering guarantee)
      value: JSON.stringify({ event: "created" }),
      partition: 0,
      // => Explicit partition (overrides key-based partitioning)
      headers: {
        // => Message headers (metadata)
        "correlation-id": "abc-123",
        "content-type": "application/json",
      },
    },
  ],
});

// Transactional send (exactly-once semantics)
const transaction = await producer.transaction();
// => Start transaction

try {
  await transaction.send({
    topic: "user-events",
    messages: [{ key: "user-123", value: JSON.stringify({ event: "created" }) }],
  });

  await transaction.send({
    topic: "order-events",
    messages: [{ key: "order-1", value: JSON.stringify({ event: "placed" }) }],
  });

  await transaction.commit();
  // => Commit transaction
  // => Both messages delivered atomically
} catch (error) {
  await transaction.abort();
  // => Rollback transaction
  // => No messages delivered
  throw error;
}

Consumer Groups and Rebalancing

Multiple consumers share topic partitions.

Pattern:

// Consumer 1 (same group)
const consumer1 = kafka.consumer({ groupId: "email-workers" });
await consumer1.connect();
await consumer1.subscribe({ topic: "email-queue" });

await consumer1.run({
  eachMessage: async ({ message }) => {
    console.log("Consumer 1 processing:", message.value?.toString());
    // => Process message on consumer 1
  },
});

// Consumer 2 (same group)
const consumer2 = kafka.consumer({ groupId: "email-workers" });
await consumer2.connect();
await consumer2.subscribe({ topic: "email-queue" });

await consumer2.run({
  eachMessage: async ({ message }) => {
    console.log("Consumer 2 processing:", message.value?.toString());
    // => Process message on consumer 2
  },
});
// => Consumers in same group share partitions
// => Each message delivered to ONE consumer in group
// => Load balancing across consumers

// Consumer with manual commit (at-least-once delivery)
const consumer = kafka.consumer({
  groupId: "manual-commit-group",
  // => Disable auto-commit for manual control
});

await consumer.connect();
await consumer.subscribe({ topic: "important-events" });

await consumer.run({
  autoCommit: false,
  // => Disable automatic offset commit
  // => Must manually commit after processing
  eachMessage: async ({ topic, partition, message }) => {
    const event = JSON.parse(message.value?.toString() || "{}");

    try {
      // Process message...
      await processEvent(event);

      // Commit offset after successful processing
      await consumer.commitOffsets([
        {
          topic,
          partition,
          offset: (Number(message.offset) + 1).toString(),
          // => Commit next offset (current + 1)
          // => Mark message as processed
        },
      ]);
    } catch (error) {
      console.error("Processing failed, message will be redelivered:", error);
      // => Don't commit offset
      // => Consumer restart will redeliver from last committed offset
    }
  },
});

Production benefits:

  • High throughput: Millions of messages per second
  • Horizontal scalability: Add partitions and consumers
  • Message persistence: Messages retained for configurable duration
  • Consumer groups: Load balancing and failover
  • Offset management: Resume from last processed message
  • Exactly-once semantics: Transactional guarantees

Trade-offs:

  • Operational complexity: Requires Zookeeper, topic configuration
  • Ordering guarantee: Only within partition (not across partitions)
  • Message size limits: Default 1MB max message size
  • No request/reply: Designed for event streaming, not RPC

When to use Kafka:

  • Event streaming (high-throughput event logs)
  • Data pipelines (ETL, real-time analytics)
  • Change data capture (database replication)
  • Microservices messaging (scale to millions of messages)

Production Framework: Bull (Redis Job Queues)

Bull provides Redis-based job queues with priority, retries, and scheduling.

Installation and Setup

npm install bull
# => Install Bull job queue library

npm install @types/bull
# => TypeScript type definitions

# Requires Redis
docker run -d --name redis -p 6379:6379 redis:7-alpine
# => Redis server on port 6379

Basic job queue:

import Bull from "bull";
// => Import Bull library

const emailQueue = new Bull("email", {
  // => Create job queue
  // => Queue name: "email"
  redis: {
    host: "localhost",
    port: 6379,
    // => Redis connection
  },
});
// => Jobs stored in Redis
// => Survives application restarts

// Add job to queue
await emailQueue.add({
  // => Add job with data
  to: "alice@example.com",
  subject: "Welcome",
  body: "Hello Alice",
});
// => Job added to queue
console.log("Email job added to queue");

// Process jobs
emailQueue.process(async (job) => {
  // => Job processor function
  // => job: Job object with data and methods
  console.log("Processing email job:", job.data);

  // Send email...
  await sendEmail(job.data.to, job.data.subject, job.data.body);

  return { sent: true };
  // => Return value stored in job result
});
// => Worker processes jobs from queue

// Job completion event
emailQueue.on("completed", (job, result) => {
  // => Job completed successfully
  console.log(`Job ${job.id} completed:`, result);
});

// Job failure event
emailQueue.on("failed", (job, error) => {
  // => Job failed
  console.log(`Job ${job?.id} failed:`, error.message);
});

Job Options and Retries

Configure job priority, delays, and retry behavior.

Pattern:

// Add job with options
await emailQueue.add(
  {
    to: "bob@example.com",
    subject: "Important",
    body: "Hello Bob",
  },
  {
    // => Job options
    priority: 1,
    // => Higher priority (lower number = higher priority)
    // => Default: 0 (highest priority)
    // => Jobs processed by priority
    delay: 5000,
    // => Delay job execution by 5 seconds
    // => Job not processed until delay elapsed
    attempts: 3,
    // => Retry up to 3 times on failure
    backoff: {
      type: "exponential",
      // => Exponential backoff: 1s, 2s, 4s, 8s
      // => Alternative: "fixed" (constant delay)
      delay: 1000,
      // => Initial delay: 1 second
    },
    removeOnComplete: true,
    // => Remove job from Redis when completed
    // => Saves memory (jobs accumulate otherwise)
    removeOnFail: false,
    // => Keep failed jobs for debugging
    timeout: 30000,
    // => Job timeout: 30 seconds
    // => Job fails if exceeds timeout
  },
);

// Process with concurrency
emailQueue.process(5, async (job) => {
  // => Process up to 5 jobs concurrently
  // => Default: 1 (serial processing)
  console.log(`Processing job ${job.id}`);

  if (job.attemptsMade > 0) {
    // => Retry attempt
    console.log(`Retry attempt ${job.attemptsMade}`);
  }

  // Simulate processing
  await new Promise((resolve) => setTimeout(resolve, 2000));

  return { processed: true };
});

// Scheduled jobs (cron-like)
await emailQueue.add(
  { type: "newsletter" },
  {
    repeat: {
      cron: "0 9 * * *",
      // => Run daily at 9 AM
      // => Cron syntax: minute hour day month weekday
    },
  },
);
// => Repeating job added to queue

// Job progress tracking
emailQueue.process(async (job) => {
  // => Job with progress updates
  await job.progress(10);
  // => Update progress to 10%
  // => Visible in Bull UI

  // Process step 1...
  await new Promise((resolve) => setTimeout(resolve, 1000));

  await job.progress(50);
  // => Update progress to 50%

  // Process step 2...
  await new Promise((resolve) => setTimeout(resolve, 1000));

  await job.progress(100);
  // => Complete

  return { done: true };
});

// Progress event
emailQueue.on("progress", (job, progress) => {
  // => Job progress updated
  console.log(`Job ${job.id} progress: ${progress}%`);
});

Queue Monitoring and Management

Monitor queue health and manage jobs.

Pattern:

// Queue metrics
const jobCounts = await emailQueue.getJobCounts();
// => Get job counts by state
console.log(jobCounts);
/*
{
  waiting: 5,    // Waiting to be processed
  active: 2,     // Currently processing
  completed: 10, // Completed successfully
  failed: 1,     // Failed after retries
  delayed: 3,    // Scheduled for future
  paused: 0      // Queue paused
}
*/

// Get jobs by state
const waitingJobs = await emailQueue.getWaiting();
// => Get waiting jobs (Job[])
const activeJobs = await emailQueue.getActive();
// => Get active jobs
const failedJobs = await emailQueue.getFailed();
// => Get failed jobs

// Retry failed jobs
for (const job of failedJobs) {
  await job.retry();
  // => Retry failed job
  // => Job moved back to waiting state
}

// Remove jobs
await emailQueue.clean(24 * 60 * 60 * 1000);
// => Remove completed/failed jobs older than 24 hours
// => Saves Redis memory

// Pause/resume queue
await emailQueue.pause();
// => Pause queue (no new jobs processed)
console.log("Queue paused");

await emailQueue.resume();
// => Resume queue
console.log("Queue resumed");

// Drain queue (remove all waiting jobs)
await emailQueue.drain();
// => Remove all waiting jobs
// => Does not affect active jobs

// Close queue
await emailQueue.close();
// => Close queue gracefully
// => Wait for active jobs to complete

Production benefits:

  • Redis-backed persistence: Jobs survive application restarts
  • Automatic retries: Exponential backoff on failure
  • Priority queues: High-priority jobs processed first
  • Delayed jobs: Schedule jobs for future execution
  • Concurrency control: Limit concurrent job processing
  • Job progress: Track job completion progress
  • Dead-letter queue: Failed jobs separated for debugging

Trade-offs:

  • Redis dependency: Requires Redis server
  • Limited throughput: Lower than Kafka (thousands vs millions/second)
  • Memory-based: Redis memory limits queue size
  • Single Redis: Not horizontally scalable without Redis Cluster

When to use Bull:

  • Background job processing (email, image processing)
  • Scheduled tasks (cron jobs, recurring tasks)
  • Rate-limited API calls (external API quotas)
  • Simple message queues (lower throughput requirements)

Messaging Pattern Comparison

  %% Color Palette: Blue #0173B2, Orange #DE8F05, Teal #029E73
%% All colors are color-blind friendly and meet WCAG AA contrast standards

flowchart TD
    A[Choose Messaging System] --> B{Use Case?}

    B -->|In-process events<br/>No persistence| C[EventEmitter]
    B -->|Distributed queues<br/>RPC patterns| D[RabbitMQ]
    B -->|Event streaming<br/>High throughput| E[Kafka]
    B -->|Background jobs<br/>Scheduling| F[Bull Redis]

    C --> C1[EventEmitter Features]
    D --> D1[RabbitMQ Features]
    E --> E1[Kafka Features]
    F --> F1[Bull Features]

    C1 --> C2[✓ Built-in<br/>✓ Synchronous<br/>✓ Simple API<br/>✗ In-process only<br/>✗ No persistence]

    D1 --> D2[✓ Distributed<br/>✓ Flexible routing<br/>✓ Dead-letter queues<br/>✓ RPC support<br/>✗ Operational complexity]

    E1 --> E2[✓ High throughput<br/>✓ Log retention<br/>✓ Replay events<br/>✓ Partitioning<br/>✗ Complex setup]

    F1 --> F2[✓ Redis-backed<br/>✓ Priority queues<br/>✓ Scheduling<br/>✓ Retries<br/>✗ Single Redis limit]

    C2 --> G[Use Cases]
    D2 --> G
    E2 --> G
    F2 --> G

    G --> G1[EventEmitter: Module communication<br/>In-app events]
    G --> G2[RabbitMQ: Microservices<br/>Task distribution<br/>Pub/Sub]
    G --> G3[Kafka: Event streaming<br/>Data pipelines<br/>Analytics]
    G --> G4[Bull: Background jobs<br/>Email/image processing<br/>Cron tasks]

    style A fill:#0173B2,stroke:#333,stroke-width:2px,color:#fff
    style D fill:#DE8F05,stroke:#333,stroke-width:2px,color:#fff
    style E fill:#029E73,stroke:#333,stroke-width:2px,color:#fff
    style F fill:#029E73,stroke:#333,stroke-width:2px,color:#fff

Production Best Practices

Dead-Letter Queue Pattern

Route failed messages to error queue for debugging.

Pattern:

// RabbitMQ dead-letter queue
async function setupDeadLetterQueue() {
  const connection = await amqp.connect(RABBITMQ_URL);
  const channel = await connection.createChannel();

  // Create dead-letter exchange and queue
  await channel.assertExchange("dlx", "direct", { durable: true });
  // => Dead-letter exchange (DLX)

  await channel.assertQueue("dead-letter-queue", { durable: true });
  // => Dead-letter queue (stores failed messages)

  await channel.bindQueue("dead-letter-queue", "dlx", "");
  // => Bind queue to exchange

  // Create main queue with DLX
  await channel.assertQueue("tasks", {
    durable: true,
    deadLetterExchange: "dlx",
    // => Route rejected messages to DLX
    // => Messages sent to DLX when:
    // => - Message rejected (nack with requeue=false)
    // => - Message TTL expired
    // => - Queue length exceeded
  });

  // Process main queue
  channel.consume("tasks", async (msg) => {
    if (!msg) return;

    const task = JSON.parse(msg.content.toString());

    try {
      await processTask(task);
      channel.ack(msg);
      // => Success, acknowledge
    } catch (error) {
      console.error("Task failed:", error);
      channel.nack(msg, false, false);
      // => Reject without requeue
      // => Message sent to dead-letter queue
    }
  });

  // Monitor dead-letter queue
  channel.consume("dead-letter-queue", async (msg) => {
    if (!msg) return;

    console.error("Dead letter message:", JSON.parse(msg.content.toString()));
    // => Log failed message for debugging
    // => Alert monitoring system
    channel.ack(msg);
  });
}

Graceful Shutdown

Close connections cleanly on application shutdown.

Pattern:

import { Kafka } from "kafkajs";

const kafka = new Kafka({ clientId: "my-app", brokers: ["localhost:9092"] });
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: "my-group" });

let isShuttingDown = false;

async function gracefulShutdown() {
  // => Graceful shutdown handler
  if (isShuttingDown) {
    // => Already shutting down
    return;
  }
  isShuttingDown = true;

  console.log("Shutting down gracefully...");

  // Stop accepting new messages
  await consumer.stop();
  // => Stop consuming (finish processing current message)

  // Wait for in-flight messages
  await new Promise((resolve) => setTimeout(resolve, 5000));
  // => Allow time for message processing to complete

  // Disconnect clients
  await consumer.disconnect();
  // => Disconnect consumer
  await producer.disconnect();
  // => Disconnect producer

  console.log("Shutdown complete");
  process.exit(0);
}

// Handle shutdown signals
process.on("SIGINT", gracefulShutdown);
// => Ctrl+C
process.on("SIGTERM", gracefulShutdown);
// => Kill signal from container orchestrator

Message Idempotency

Ensure message processing is idempotent (safe to process multiple times).

Pattern:

import { Redis } from "ioredis";

const redis = new Redis();

async function processMessageIdempotent(messageId: string, message: any) {
  // => Process message with idempotency check

  const lockKey = `processing:${messageId}`;
  // => Lock key for this message

  const acquired = await redis.set(lockKey, "locked", "NX", "EX", 300);
  // => Try to acquire lock
  // => NX: Set only if not exists
  // => EX 300: Expire after 5 minutes
  // => Returns "OK" if acquired, null if already locked

  if (!acquired) {
    // => Message already being processed
    console.log(`Message ${messageId} already processed, skipping`);
    return;
  }

  try {
    // Process message...
    await processMessage(message);

    // Mark as processed
    await redis.set(`processed:${messageId}`, "true", "EX", 86400);
    // => Mark processed (24 hour TTL)
  } finally {
    await redis.del(lockKey);
    // => Release lock
  }
}

// Usage with RabbitMQ
channel.consume("tasks", async (msg) => {
  if (!msg) return;

  const messageId = msg.properties.messageId;
  // => Message ID from properties
  // => Set when publishing: { messageId: uuid() }

  const task = JSON.parse(msg.content.toString());

  try {
    await processMessageIdempotent(messageId, task);
    channel.ack(msg);
  } catch (error) {
    console.error("Processing failed:", error);
    channel.nack(msg, false, true);
    // => Requeue for retry
  }
});

Trade-offs and When to Use Each

EventEmitter

Use when:

  • Single-process applications (no microservices)
  • In-app event notification (module communication)
  • Learning event-driven patterns
  • Minimal dependencies required

Avoid when:

  • Multi-process or distributed systems
  • Message persistence required
  • Network failures need handling

RabbitMQ

Use when:

  • Microservices architecture (inter-service messaging)
  • Flexible routing (topic exchanges, fanout)
  • Request/reply patterns (RPC)
  • Dead-letter queues needed

Avoid when:

  • High throughput required (>100k messages/second)
  • Event streaming use case (use Kafka)
  • Simple background jobs (use Bull)

Kafka

Use when:

  • Event streaming (millions of messages/second)
  • Data pipelines (ETL, analytics)
  • Event sourcing (replay events)
  • High throughput required

Avoid when:

  • Simple task queues (overkill)
  • Request/reply patterns (not designed for RPC)
  • Small message volume (<1000/second)

Bull

Use when:

  • Background job processing (email, image processing)
  • Scheduled tasks (cron jobs)
  • Priority queues (high-priority jobs first)
  • Simple message queues (moderate throughput)

Avoid when:

  • Distributed messaging (cross-server)
  • High throughput (>10k messages/second)
  • Need for complex routing (use RabbitMQ)

Common Pitfalls

Pitfall 1: No Message Acknowledgment

Problem: Messages lost on consumer failure.

Solution: Use manual acknowledgment.

// ❌ BAD: Auto-acknowledge
channel.consume(
  "tasks",
  async (msg) => {
    await processTask(msg);
    // => If processing fails, message lost
  },
  { noAck: true },
);

// ✅ GOOD: Manual acknowledgment
channel.consume(
  "tasks",
  async (msg) => {
    try {
      await processTask(msg);
      channel.ack(msg);
      // => Acknowledge after successful processing
    } catch (error) {
      channel.nack(msg, false, true);
      // => Negative acknowledge, requeue
    }
  },
  { noAck: false },
);

Pitfall 2: No Connection Error Handling

Problem: Application crashes on connection loss.

Solution: Use connection manager with automatic reconnection.

// ❌ BAD: No error handling
const connection = await amqp.connect(RABBITMQ_URL);
// => Crashes if connection lost

// ✅ GOOD: Connection manager with reconnection
const connection = amqp.connect([RABBITMQ_URL]);
connection.on("disconnect", (err) => {
  console.error("Disconnected:", err);
  // => Will reconnect automatically
});

Pitfall 3: Processing Non-Idempotent Operations

Problem: Duplicate message processing causes issues (double charge, duplicate email).

Solution: Implement idempotency checks.

// ❌ BAD: Not idempotent
async function processPayment(orderId: string, amount: number) {
  await chargeCard(amount);
  // => If message redelivered, charges card twice
}

// ✅ GOOD: Idempotent with deduplication
async function processPayment(orderId: string, amount: number) {
  const processed = await redis.get(`payment:${orderId}`);
  if (processed) {
    // => Already processed, skip
    return;
  }

  await chargeCard(amount);
  await redis.set(`payment:${orderId}`, "true", "EX", 86400);
  // => Mark as processed
}

Pitfall 4: No Dead-Letter Queue

Problem: Failed messages lost or stuck in queue forever.

Solution: Configure dead-letter queue for failed messages.

// ❌ BAD: No dead-letter queue
await channel.assertQueue("tasks", { durable: true });
// => Failed messages requeued infinitely

// ✅ GOOD: Dead-letter queue configured
await channel.assertQueue("tasks", {
  durable: true,
  deadLetterExchange: "dlx",
  // => Failed messages routed to DLX
});

Summary

Messaging systems enable asynchronous, distributed communication between services. EventEmitter provides in-process events, RabbitMQ offers flexible routing with queues and exchanges, Kafka provides high-throughput event streaming, and Bull manages background jobs with Redis-backed persistence.

Progression path:

  1. Learn with EventEmitter: Understand event-driven patterns
  2. Queue with RabbitMQ: Distributed task queues and pub/sub
  3. Stream with Kafka: High-throughput event streaming (advanced)
  4. Schedule with Bull: Background jobs and cron tasks

Production checklist:

  • ✅ Manual message acknowledgment (prevent message loss)
  • ✅ Automatic reconnection (connection manager)
  • ✅ Dead-letter queues (failed message handling)
  • ✅ Graceful shutdown (finish processing before exit)
  • ✅ Idempotent message processing (deduplication)
  • ✅ Message persistence (durable queues)
  • ✅ Error handling and retry logic (transient failures)
  • ✅ Monitoring and alerting (queue depth, consumer lag)

Recommended approach:

  • Microservices messaging: RabbitMQ with amqp-connection-manager
  • Background jobs: Bull with Redis
  • Event streaming: Kafka (high throughput)
  • In-app events: EventEmitter (simple cases)

Choose messaging system based on use case: RabbitMQ for flexible routing and RPC, Kafka for event streaming and high throughput, Bull for background job processing with scheduling.

Last updated