Episode 6 — Scaling Reliability Microservices Web3 / 6.2 — Building and Orchestrating Microservices

6.2.d — Event-Driven Architecture

In one sentence: Instead of services calling each other directly (synchronous coupling), services publish events to a message queue and other services consume those events asynchronously — enabling loose coupling, independent scaling, and resilience to temporary failures.

Navigation: <- 6.2.c Retry, Timeout & Circuit Breaker | 6.2.e — Event Payloads & Idempotency ->


1. Why Events Matter in Microservices

The synchronous coupling problem

SYNCHRONOUS (request-response):

  Order Service                    Notification Service
       │                                  │
       ├── POST /notifications ──────────→│  (HTTP call)
       │   (blocks waiting...)            │  (processes...)
       │←── 200 OK ──────────────────────│
       │
       │   If Notification Service is down:
       │   Order Service FAILS or WAITS
       │   Customer can't place an order because email is broken!

A customer should not be blocked from placing an order just because the email service is temporarily down. The order and the notification are separate concerns.

The event-driven solution

EVENT-DRIVEN (publish-subscribe):

  Order Service           Message Queue           Notification Service
       │                      │                          │
       ├── publish ──────────→│                          │
       │  "order.placed"      │ (stored in queue)        │
       │                      │                          │
       │  (returns immediately│                          │
       │   — not blocking)    │                          │
       │                      ├── deliver ──────────────→│
       │                      │   "order.placed"         │ (processes)
       │                      │                          │
       │   If Notification Service is down:              │
       │   Message stays in queue                        │
       │   Delivered when service comes back              │
       │   Order was NOT affected!                       │

2. Synchronous vs Asynchronous Communication

AspectSynchronous (HTTP)Asynchronous (Events)
CouplingTight — caller waits for responseLoose — publisher doesn't know consumers
Failure impactCaller fails if callee is downMessages queue up; processed when ready
LatencyAdds to response timeDoes not affect response time
ScalingCaller must handle callee's capacityQueue absorbs traffic spikes
DebuggingSimple — follow the HTTP chainHarder — follow events across services
Use whenNeed immediate response (get user, validate)Fire-and-forget (notify, log, sync)

Rule of thumb: Use synchronous calls when you need the response to continue. Use events when the downstream action can happen eventually.


3. Pub/Sub Pattern

The Publish/Subscribe pattern decouples producers from consumers:

Producer (Publisher):
  "I have something to announce. I don't care who listens."

Consumer (Subscriber):
  "I'm interested in certain types of announcements."

Message Broker (Queue):
  "I'll make sure messages get from producers to interested consumers."
                    ┌─────────────────────┐
                    │    Message Broker    │
  ┌──────────┐     │  (RabbitMQ / Kafka)  │     ┌──────────────┐
  │  Order    │────→│                     │────→│ Notification │
  │  Service  │     │  order.placed ──→   │     │ Service      │
  └──────────┘     │      ├── queue-A ──→ │     └──────────────┘
                    │      ├── queue-B ──→ │     ┌──────────────┐
                    │      └── queue-C ──→ │────→│ Analytics    │
                    │                     │     │ Service      │
                    └─────────────────────┘     └──────────────┘
                                                ┌──────────────┐
                                           ────→│ Inventory    │
                                                │ Service      │
                                                └──────────────┘

  One event, three consumers — publisher knows none of them.

4. RabbitMQ Fundamentals

RabbitMQ is one of the most popular message brokers. It uses the AMQP (Advanced Message Queuing Protocol).

Core concepts

┌─────────────────────────────────────────────────────────────────┐
│                     RabbitMQ Architecture                        │
│                                                                  │
│  Producer ──→ Exchange ──→ Binding ──→ Queue ──→ Consumer        │
│                                                                  │
│  EXCHANGE:  Receives messages from producers                     │
│             Routes to queues based on rules (bindings)           │
│             Does NOT store messages                              │
│                                                                  │
│  BINDING:   Rule that connects an exchange to a queue            │
│             Includes a routing key pattern                       │
│                                                                  │
│  QUEUE:     Stores messages until consumed                       │
│             FIFO (first in, first out)                           │
│             Durable = survives broker restart                    │
│                                                                  │
│  CONSUMER:  Subscribes to a queue                                │
│             Acknowledges messages after processing               │
└─────────────────────────────────────────────────────────────────┘

Exchange types

Exchange TypeRouting LogicUse Case
FanoutSends to ALL bound queues (ignores routing key)Broadcast: "notify all interested services"
DirectSends to queues where binding key = routing key exactlyPoint-to-point: "send to order-queue only"
TopicSends to queues where binding key matches routing key pattern (* / #)Flexible: order.* matches order.placed, order.shipped
HeadersRoutes based on message headers, not routing keyRare; complex routing requirements
FANOUT example:
  Exchange "events" → ALL bound queues
  publish("events", "", message)
    → notification-queue  ✓
    → analytics-queue     ✓
    → audit-queue         ✓

DIRECT example:
  Exchange "orders" with routing key "order.placed"
  publish("orders", "order.placed", message)
    → notification-queue (bound to "order.placed")  ✓
    → shipping-queue (bound to "order.shipped")      ✗

TOPIC example:
  Exchange "events" with routing key "order.placed"
  → notification-queue (bound to "order.*")    ✓  (* = one word)
  → audit-queue (bound to "#")                 ✓  (# = zero or more words)
  → shipping-queue (bound to "order.shipped")  ✗

5. Setting Up RabbitMQ with Docker

# Add to docker-compose.yml
services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"       # AMQP protocol
      - "15672:15672"     # Management UI
    environment:
      - RABBITMQ_DEFAULT_USER=guest
      - RABBITMQ_DEFAULT_PASS=guest
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    healthcheck:
      test: rabbitmq-diagnostics -q ping
      interval: 10s
      timeout: 5s
      retries: 5

volumes:
  rabbitmq_data:
# Start RabbitMQ
docker-compose up rabbitmq -d

# Access management UI
# Open http://localhost:15672
# Login: guest / guest

The management UI lets you see exchanges, queues, message rates, and connection counts — essential for debugging.


6. Publishing Events from Node.js

npm install amqplib

6.1 Event Publisher Module

// shared/events/publisher.js
const amqp = require('amqplib');

let connection = null;
let channel = null;

async function connect(url = 'amqp://guest:guest@localhost:5672') {
  try {
    connection = await amqp.connect(url);
    channel = await connection.createChannel();

    // Handle connection errors
    connection.on('error', (err) => {
      console.error('[publisher] Connection error:', err.message);
      connection = null;
      channel = null;
    });

    connection.on('close', () => {
      console.log('[publisher] Connection closed. Reconnecting...');
      connection = null;
      channel = null;
      setTimeout(() => connect(url), 5000); // Reconnect after 5s
    });

    console.log('[publisher] Connected to RabbitMQ');
    return channel;
  } catch (err) {
    console.error('[publisher] Failed to connect:', err.message);
    setTimeout(() => connect(url), 5000);
  }
}

async function publishEvent(exchange, routingKey, event) {
  if (!channel) {
    throw new Error('RabbitMQ channel not available');
  }

  // Ensure exchange exists (idempotent)
  await channel.assertExchange(exchange, 'topic', { durable: true });

  const message = {
    ...event,
    metadata: {
      eventId: event.metadata?.eventId || generateId(),
      timestamp: new Date().toISOString(),
      source: event.metadata?.source || 'unknown',
      version: event.metadata?.version || '1.0',
    },
  };

  const buffer = Buffer.from(JSON.stringify(message));

  channel.publish(exchange, routingKey, buffer, {
    persistent: true,       // Survives broker restart
    contentType: 'application/json',
    messageId: message.metadata.eventId,
    timestamp: Date.now(),
  });

  console.log(`[publisher] Published ${routingKey}: ${message.metadata.eventId}`);
  return message.metadata.eventId;
}

function generateId() {
  return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}

async function close() {
  if (channel) await channel.close();
  if (connection) await connection.close();
}

module.exports = { connect, publishEvent, close };

6.2 Publishing from Order Service

// services/order-service/src/index.js (updated)
const express = require('express');
const { connect, publishEvent } = require('../../../shared/events/publisher');

const app = express();
app.use(express.json());
const PORT = process.env.PORT || 4002;

const orders = [];

// Connect to RabbitMQ on startup
const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://guest:guest@localhost:5672';
connect(RABBITMQ_URL);

app.post('/orders', async (req, res) => {
  const { userId, product, quantity } = req.body;

  const order = {
    id: String(orders.length + 1),
    userId,
    product,
    quantity,
    status: 'pending',
    createdAt: new Date().toISOString(),
  };

  orders.push(order);

  // Publish event AFTER saving order
  try {
    await publishEvent('platform-events', 'order.placed', {
      type: 'order.placed',
      data: {
        orderId: order.id,
        userId: order.userId,
        product: order.product,
        quantity: order.quantity,
        total: order.quantity * 29.99,
      },
      metadata: {
        source: 'order-service',
      },
    });
  } catch (err) {
    // Event publishing failure should NOT fail the order
    console.error('Failed to publish order.placed event:', err.message);
    // Log for retry / manual processing
  }

  res.status(201).json({ data: order });
});

app.listen(PORT, () => {
  console.log(`[order-service] running on port ${PORT}`);
});

7. Consuming Events

7.1 Event Consumer Module

// shared/events/consumer.js
const amqp = require('amqplib');

async function startConsumer(config) {
  const {
    url = 'amqp://guest:guest@localhost:5672',
    exchange,
    exchangeType = 'topic',
    queue,
    bindingKey,
    handler,
    prefetch = 10,
  } = config;

  try {
    const connection = await amqp.connect(url);
    const channel = await connection.createChannel();

    // Limit unacknowledged messages (backpressure)
    await channel.prefetch(prefetch);

    // Declare exchange and queue
    await channel.assertExchange(exchange, exchangeType, { durable: true });
    await channel.assertQueue(queue, {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': `${exchange}.dlx`,  // Dead letter exchange
        'x-dead-letter-routing-key': `${queue}.dead`,
      },
    });
    await channel.bindQueue(queue, exchange, bindingKey);

    // Set up dead letter exchange and queue
    await channel.assertExchange(`${exchange}.dlx`, 'direct', { durable: true });
    await channel.assertQueue(`${queue}.dead`, { durable: true });
    await channel.bindQueue(`${queue}.dead`, `${exchange}.dlx`, `${queue}.dead`);

    console.log(`[consumer] Listening on queue "${queue}" (binding: ${bindingKey})`);

    // Start consuming
    channel.consume(queue, async (msg) => {
      if (!msg) return;

      const content = JSON.parse(msg.content.toString());
      const eventId = content.metadata?.eventId || 'unknown';

      try {
        console.log(`[consumer] Processing ${content.type} (${eventId})`);
        await handler(content);
        channel.ack(msg);  // Message processed successfully
        console.log(`[consumer] Acknowledged ${eventId}`);
      } catch (err) {
        console.error(`[consumer] Error processing ${eventId}:`, err.message);

        // Reject and requeue (up to a limit)
        const retryCount = (msg.properties.headers?.['x-retry-count'] || 0);
        if (retryCount < 3) {
          // Requeue with incremented retry count
          channel.nack(msg, false, true);
          console.log(`[consumer] Requeued ${eventId} (retry ${retryCount + 1})`);
        } else {
          // Max retries exceeded — send to dead letter queue
          channel.nack(msg, false, false);
          console.log(`[consumer] Sent ${eventId} to dead letter queue`);
        }
      }
    });

    connection.on('error', (err) => {
      console.error('[consumer] Connection error:', err.message);
    });

    return { connection, channel };
  } catch (err) {
    console.error('[consumer] Failed to start:', err.message);
    setTimeout(() => startConsumer(config), 5000);
  }
}

module.exports = { startConsumer };

7.2 Notification Service as Consumer

// services/notification-service/src/index.js (updated with event consumer)
const express = require('express');
const { startConsumer } = require('../../../shared/events/consumer');

const app = express();
app.use(express.json());
const PORT = process.env.PORT || 4003;
const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://guest:guest@localhost:5672';

const notifications = [];

// HTTP endpoints (still available for direct calls)
app.get('/health', (req, res) => {
  res.json({ service: 'notification-service', status: 'healthy' });
});

app.get('/notifications', (req, res) => {
  res.json({ data: notifications });
});

// Start consuming events
startConsumer({
  url: RABBITMQ_URL,
  exchange: 'platform-events',
  exchangeType: 'topic',
  queue: 'notification-queue',
  bindingKey: 'order.*',        // Listen to all order events
  handler: async (event) => {
    switch (event.type) {
      case 'order.placed':
        await handleOrderPlaced(event.data);
        break;
      case 'order.shipped':
        await handleOrderShipped(event.data);
        break;
      default:
        console.log(`[notification] Unhandled event type: ${event.type}`);
    }
  },
});

async function handleOrderPlaced(data) {
  const notification = {
    id: String(notifications.length + 1),
    userId: data.userId,
    type: 'email',
    message: `Your order #${data.orderId} for ${data.product} has been placed!`,
    sentAt: new Date().toISOString(),
  };
  notifications.push(notification);
  console.log(`[notification] Sent order confirmation to user ${data.userId}`);
}

async function handleOrderShipped(data) {
  const notification = {
    id: String(notifications.length + 1),
    userId: data.userId,
    type: 'email',
    message: `Your order #${data.orderId} has been shipped! Tracking: ${data.trackingNumber}`,
    sentAt: new Date().toISOString(),
  };
  notifications.push(notification);
  console.log(`[notification] Sent shipping notification to user ${data.userId}`);
}

app.listen(PORT, () => {
  console.log(`[notification-service] running on port ${PORT}`);
});

8. Exchange Types in Practice

Fanout — Broadcast to all

// Every bound queue gets the message
await channel.assertExchange('broadcasts', 'fanout', { durable: true });

// All three queues receive every message published to "broadcasts"
await channel.bindQueue('notification-queue', 'broadcasts', '');
await channel.bindQueue('analytics-queue', 'broadcasts', '');
await channel.bindQueue('audit-queue', 'broadcasts', '');

channel.publish('broadcasts', '', Buffer.from(JSON.stringify(event)));
// Routing key is ignored with fanout

Direct — Exact routing key match

await channel.assertExchange('tasks', 'direct', { durable: true });

// Only receives messages with routing key "email"
await channel.bindQueue('email-queue', 'tasks', 'email');
// Only receives messages with routing key "sms"
await channel.bindQueue('sms-queue', 'tasks', 'sms');

channel.publish('tasks', 'email', Buffer.from(JSON.stringify(event)));
// Only email-queue receives this

Topic — Pattern matching

await channel.assertExchange('platform-events', 'topic', { durable: true });

// * matches exactly one word
await channel.bindQueue('order-notifications', 'platform-events', 'order.*');
// Matches: order.placed, order.shipped, order.cancelled

// # matches zero or more words
await channel.bindQueue('audit-log', 'platform-events', '#');
// Matches: EVERYTHING (order.placed, user.created, payment.failed, etc.)

await channel.bindQueue('user-events', 'platform-events', 'user.*');
// Matches: user.created, user.updated, user.deleted

9. Dead Letter Queues

When a message cannot be processed after multiple retries, it goes to a dead letter queue (DLQ) instead of being lost forever.

Normal flow:
  Exchange → Queue → Consumer → ack → (message removed)

Dead letter flow:
  Exchange → Queue → Consumer → nack (no requeue) → DLQ
                                                      │
                                                      ▼
                                               Manual review
                                               or automated retry
// Queue declaration with dead letter configuration
await channel.assertQueue('order-queue', {
  durable: true,
  arguments: {
    'x-dead-letter-exchange': 'platform-events.dlx',
    'x-dead-letter-routing-key': 'order-queue.dead',
    'x-message-ttl': 300000,  // Optional: messages expire after 5 min
  },
});

Monitoring the DLQ

// Simple DLQ monitor
async function checkDeadLetterQueue(channel, dlqName) {
  const queueInfo = await channel.checkQueue(dlqName);
  if (queueInfo.messageCount > 0) {
    console.warn(
      `[DLQ ALERT] ${dlqName} has ${queueInfo.messageCount} dead letters!`
    );
    // Send alert to monitoring (PagerDuty, Slack, etc.)
  }
  return queueInfo.messageCount;
}

10. Complete Docker Compose with RabbitMQ

# docker-compose.yml (complete with RabbitMQ)
version: '3.8'

services:
  api-gateway:
    build: ./api-gateway
    ports:
      - "3000:3000"
    environment:
      - PORT=3000
      - USER_SERVICE_URL=http://user-service:4001
      - ORDER_SERVICE_URL=http://order-service:4002
      - NOTIF_SERVICE_URL=http://notification-service:4003
    depends_on:
      - user-service
      - order-service
      - notification-service

  user-service:
    build: ./services/user-service
    expose:
      - "4001"
    environment:
      - PORT=4001
      - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672

  order-service:
    build: ./services/order-service
    expose:
      - "4002"
    environment:
      - PORT=4002
      - USER_SERVICE_URL=http://user-service:4001
      - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672
    depends_on:
      rabbitmq:
        condition: service_healthy

  notification-service:
    build: ./services/notification-service
    expose:
      - "4003"
    environment:
      - PORT=4003
      - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672
    depends_on:
      rabbitmq:
        condition: service_healthy

  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      - RABBITMQ_DEFAULT_USER=guest
      - RABBITMQ_DEFAULT_PASS=guest
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    healthcheck:
      test: rabbitmq-diagnostics -q ping
      interval: 10s
      timeout: 5s
      retries: 5

volumes:
  rabbitmq_data:

11. Key Takeaways

  1. Use events for fire-and-forget operations — notifications, analytics, logging, syncing data. Use HTTP for request-response where you need an immediate answer.
  2. RabbitMQ exchanges route messages to queues via bindings. Topic exchanges are the most flexible for microservices.
  3. Always acknowledge messages after successful processing. Use nack with requeue for retries, nack without requeue for dead-lettering.
  4. Dead letter queues catch failed messages — monitor them and have a process for reprocessing or manual review.
  5. Publish events AFTER persisting data — never publish before the database write completes; you could announce something that never happened.
  6. Event publishing failures should not break the primary operation — if you can't publish "order.placed", the order itself should still succeed.

Explain-It Challenge

  1. Your notification service is down for 2 hours during maintenance. 10,000 "order.placed" events are published. What happens to them? Walk through the flow when the service comes back.
  2. Why do we use a topic exchange instead of a fanout exchange for platform-events?
  3. A developer asks "Why not just use REST webhooks instead of RabbitMQ?" What are the trade-offs?

Navigation: <- 6.2.c Retry, Timeout & Circuit Breaker | 6.2.e — Event Payloads & Idempotency ->