Episode 6 — Scaling Reliability Microservices Web3 / 6.2 — Building and Orchestrating Microservices
6.2.d — Event-Driven Architecture
In one sentence: Instead of services calling each other directly (synchronous coupling), services publish events to a message queue and other services consume those events asynchronously — enabling loose coupling, independent scaling, and resilience to temporary failures.
Navigation: <- 6.2.c Retry, Timeout & Circuit Breaker | 6.2.e — Event Payloads & Idempotency ->
1. Why Events Matter in Microservices
The synchronous coupling problem
SYNCHRONOUS (request-response):
Order Service Notification Service
│ │
├── POST /notifications ──────────→│ (HTTP call)
│ (blocks waiting...) │ (processes...)
│←── 200 OK ──────────────────────│
│
│ If Notification Service is down:
│ Order Service FAILS or WAITS
│ Customer can't place an order because email is broken!
A customer should not be blocked from placing an order just because the email service is temporarily down. The order and the notification are separate concerns.
The event-driven solution
EVENT-DRIVEN (publish-subscribe):
Order Service Message Queue Notification Service
│ │ │
├── publish ──────────→│ │
│ "order.placed" │ (stored in queue) │
│ │ │
│ (returns immediately│ │
│ — not blocking) │ │
│ ├── deliver ──────────────→│
│ │ "order.placed" │ (processes)
│ │ │
│ If Notification Service is down: │
│ Message stays in queue │
│ Delivered when service comes back │
│ Order was NOT affected! │
2. Synchronous vs Asynchronous Communication
| Aspect | Synchronous (HTTP) | Asynchronous (Events) |
|---|---|---|
| Coupling | Tight — caller waits for response | Loose — publisher doesn't know consumers |
| Failure impact | Caller fails if callee is down | Messages queue up; processed when ready |
| Latency | Adds to response time | Does not affect response time |
| Scaling | Caller must handle callee's capacity | Queue absorbs traffic spikes |
| Debugging | Simple — follow the HTTP chain | Harder — follow events across services |
| Use when | Need immediate response (get user, validate) | Fire-and-forget (notify, log, sync) |
Rule of thumb: Use synchronous calls when you need the response to continue. Use events when the downstream action can happen eventually.
3. Pub/Sub Pattern
The Publish/Subscribe pattern decouples producers from consumers:
Producer (Publisher):
"I have something to announce. I don't care who listens."
Consumer (Subscriber):
"I'm interested in certain types of announcements."
Message Broker (Queue):
"I'll make sure messages get from producers to interested consumers."
┌─────────────────────┐
│ Message Broker │
┌──────────┐ │ (RabbitMQ / Kafka) │ ┌──────────────┐
│ Order │────→│ │────→│ Notification │
│ Service │ │ order.placed ──→ │ │ Service │
└──────────┘ │ ├── queue-A ──→ │ └──────────────┘
│ ├── queue-B ──→ │ ┌──────────────┐
│ └── queue-C ──→ │────→│ Analytics │
│ │ │ Service │
└─────────────────────┘ └──────────────┘
┌──────────────┐
────→│ Inventory │
│ Service │
└──────────────┘
One event, three consumers — publisher knows none of them.
4. RabbitMQ Fundamentals
RabbitMQ is one of the most popular message brokers. It uses the AMQP (Advanced Message Queuing Protocol).
Core concepts
┌─────────────────────────────────────────────────────────────────┐
│ RabbitMQ Architecture │
│ │
│ Producer ──→ Exchange ──→ Binding ──→ Queue ──→ Consumer │
│ │
│ EXCHANGE: Receives messages from producers │
│ Routes to queues based on rules (bindings) │
│ Does NOT store messages │
│ │
│ BINDING: Rule that connects an exchange to a queue │
│ Includes a routing key pattern │
│ │
│ QUEUE: Stores messages until consumed │
│ FIFO (first in, first out) │
│ Durable = survives broker restart │
│ │
│ CONSUMER: Subscribes to a queue │
│ Acknowledges messages after processing │
└─────────────────────────────────────────────────────────────────┘
Exchange types
| Exchange Type | Routing Logic | Use Case |
|---|---|---|
| Fanout | Sends to ALL bound queues (ignores routing key) | Broadcast: "notify all interested services" |
| Direct | Sends to queues where binding key = routing key exactly | Point-to-point: "send to order-queue only" |
| Topic | Sends to queues where binding key matches routing key pattern (* / #) | Flexible: order.* matches order.placed, order.shipped |
| Headers | Routes based on message headers, not routing key | Rare; complex routing requirements |
FANOUT example:
Exchange "events" → ALL bound queues
publish("events", "", message)
→ notification-queue ✓
→ analytics-queue ✓
→ audit-queue ✓
DIRECT example:
Exchange "orders" with routing key "order.placed"
publish("orders", "order.placed", message)
→ notification-queue (bound to "order.placed") ✓
→ shipping-queue (bound to "order.shipped") ✗
TOPIC example:
Exchange "events" with routing key "order.placed"
→ notification-queue (bound to "order.*") ✓ (* = one word)
→ audit-queue (bound to "#") ✓ (# = zero or more words)
→ shipping-queue (bound to "order.shipped") ✗
5. Setting Up RabbitMQ with Docker
# Add to docker-compose.yml
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672" # AMQP protocol
- "15672:15672" # Management UI
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
volumes:
- rabbitmq_data:/var/lib/rabbitmq
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 10s
timeout: 5s
retries: 5
volumes:
rabbitmq_data:
# Start RabbitMQ
docker-compose up rabbitmq -d
# Access management UI
# Open http://localhost:15672
# Login: guest / guest
The management UI lets you see exchanges, queues, message rates, and connection counts — essential for debugging.
6. Publishing Events from Node.js
npm install amqplib
6.1 Event Publisher Module
// shared/events/publisher.js
const amqp = require('amqplib');
let connection = null;
let channel = null;
async function connect(url = 'amqp://guest:guest@localhost:5672') {
try {
connection = await amqp.connect(url);
channel = await connection.createChannel();
// Handle connection errors
connection.on('error', (err) => {
console.error('[publisher] Connection error:', err.message);
connection = null;
channel = null;
});
connection.on('close', () => {
console.log('[publisher] Connection closed. Reconnecting...');
connection = null;
channel = null;
setTimeout(() => connect(url), 5000); // Reconnect after 5s
});
console.log('[publisher] Connected to RabbitMQ');
return channel;
} catch (err) {
console.error('[publisher] Failed to connect:', err.message);
setTimeout(() => connect(url), 5000);
}
}
async function publishEvent(exchange, routingKey, event) {
if (!channel) {
throw new Error('RabbitMQ channel not available');
}
// Ensure exchange exists (idempotent)
await channel.assertExchange(exchange, 'topic', { durable: true });
const message = {
...event,
metadata: {
eventId: event.metadata?.eventId || generateId(),
timestamp: new Date().toISOString(),
source: event.metadata?.source || 'unknown',
version: event.metadata?.version || '1.0',
},
};
const buffer = Buffer.from(JSON.stringify(message));
channel.publish(exchange, routingKey, buffer, {
persistent: true, // Survives broker restart
contentType: 'application/json',
messageId: message.metadata.eventId,
timestamp: Date.now(),
});
console.log(`[publisher] Published ${routingKey}: ${message.metadata.eventId}`);
return message.metadata.eventId;
}
function generateId() {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
async function close() {
if (channel) await channel.close();
if (connection) await connection.close();
}
module.exports = { connect, publishEvent, close };
6.2 Publishing from Order Service
// services/order-service/src/index.js (updated)
const express = require('express');
const { connect, publishEvent } = require('../../../shared/events/publisher');
const app = express();
app.use(express.json());
const PORT = process.env.PORT || 4002;
const orders = [];
// Connect to RabbitMQ on startup
const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://guest:guest@localhost:5672';
connect(RABBITMQ_URL);
app.post('/orders', async (req, res) => {
const { userId, product, quantity } = req.body;
const order = {
id: String(orders.length + 1),
userId,
product,
quantity,
status: 'pending',
createdAt: new Date().toISOString(),
};
orders.push(order);
// Publish event AFTER saving order
try {
await publishEvent('platform-events', 'order.placed', {
type: 'order.placed',
data: {
orderId: order.id,
userId: order.userId,
product: order.product,
quantity: order.quantity,
total: order.quantity * 29.99,
},
metadata: {
source: 'order-service',
},
});
} catch (err) {
// Event publishing failure should NOT fail the order
console.error('Failed to publish order.placed event:', err.message);
// Log for retry / manual processing
}
res.status(201).json({ data: order });
});
app.listen(PORT, () => {
console.log(`[order-service] running on port ${PORT}`);
});
7. Consuming Events
7.1 Event Consumer Module
// shared/events/consumer.js
const amqp = require('amqplib');
async function startConsumer(config) {
const {
url = 'amqp://guest:guest@localhost:5672',
exchange,
exchangeType = 'topic',
queue,
bindingKey,
handler,
prefetch = 10,
} = config;
try {
const connection = await amqp.connect(url);
const channel = await connection.createChannel();
// Limit unacknowledged messages (backpressure)
await channel.prefetch(prefetch);
// Declare exchange and queue
await channel.assertExchange(exchange, exchangeType, { durable: true });
await channel.assertQueue(queue, {
durable: true,
arguments: {
'x-dead-letter-exchange': `${exchange}.dlx`, // Dead letter exchange
'x-dead-letter-routing-key': `${queue}.dead`,
},
});
await channel.bindQueue(queue, exchange, bindingKey);
// Set up dead letter exchange and queue
await channel.assertExchange(`${exchange}.dlx`, 'direct', { durable: true });
await channel.assertQueue(`${queue}.dead`, { durable: true });
await channel.bindQueue(`${queue}.dead`, `${exchange}.dlx`, `${queue}.dead`);
console.log(`[consumer] Listening on queue "${queue}" (binding: ${bindingKey})`);
// Start consuming
channel.consume(queue, async (msg) => {
if (!msg) return;
const content = JSON.parse(msg.content.toString());
const eventId = content.metadata?.eventId || 'unknown';
try {
console.log(`[consumer] Processing ${content.type} (${eventId})`);
await handler(content);
channel.ack(msg); // Message processed successfully
console.log(`[consumer] Acknowledged ${eventId}`);
} catch (err) {
console.error(`[consumer] Error processing ${eventId}:`, err.message);
// Reject and requeue (up to a limit)
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0);
if (retryCount < 3) {
// Requeue with incremented retry count
channel.nack(msg, false, true);
console.log(`[consumer] Requeued ${eventId} (retry ${retryCount + 1})`);
} else {
// Max retries exceeded — send to dead letter queue
channel.nack(msg, false, false);
console.log(`[consumer] Sent ${eventId} to dead letter queue`);
}
}
});
connection.on('error', (err) => {
console.error('[consumer] Connection error:', err.message);
});
return { connection, channel };
} catch (err) {
console.error('[consumer] Failed to start:', err.message);
setTimeout(() => startConsumer(config), 5000);
}
}
module.exports = { startConsumer };
7.2 Notification Service as Consumer
// services/notification-service/src/index.js (updated with event consumer)
const express = require('express');
const { startConsumer } = require('../../../shared/events/consumer');
const app = express();
app.use(express.json());
const PORT = process.env.PORT || 4003;
const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://guest:guest@localhost:5672';
const notifications = [];
// HTTP endpoints (still available for direct calls)
app.get('/health', (req, res) => {
res.json({ service: 'notification-service', status: 'healthy' });
});
app.get('/notifications', (req, res) => {
res.json({ data: notifications });
});
// Start consuming events
startConsumer({
url: RABBITMQ_URL,
exchange: 'platform-events',
exchangeType: 'topic',
queue: 'notification-queue',
bindingKey: 'order.*', // Listen to all order events
handler: async (event) => {
switch (event.type) {
case 'order.placed':
await handleOrderPlaced(event.data);
break;
case 'order.shipped':
await handleOrderShipped(event.data);
break;
default:
console.log(`[notification] Unhandled event type: ${event.type}`);
}
},
});
async function handleOrderPlaced(data) {
const notification = {
id: String(notifications.length + 1),
userId: data.userId,
type: 'email',
message: `Your order #${data.orderId} for ${data.product} has been placed!`,
sentAt: new Date().toISOString(),
};
notifications.push(notification);
console.log(`[notification] Sent order confirmation to user ${data.userId}`);
}
async function handleOrderShipped(data) {
const notification = {
id: String(notifications.length + 1),
userId: data.userId,
type: 'email',
message: `Your order #${data.orderId} has been shipped! Tracking: ${data.trackingNumber}`,
sentAt: new Date().toISOString(),
};
notifications.push(notification);
console.log(`[notification] Sent shipping notification to user ${data.userId}`);
}
app.listen(PORT, () => {
console.log(`[notification-service] running on port ${PORT}`);
});
8. Exchange Types in Practice
Fanout — Broadcast to all
// Every bound queue gets the message
await channel.assertExchange('broadcasts', 'fanout', { durable: true });
// All three queues receive every message published to "broadcasts"
await channel.bindQueue('notification-queue', 'broadcasts', '');
await channel.bindQueue('analytics-queue', 'broadcasts', '');
await channel.bindQueue('audit-queue', 'broadcasts', '');
channel.publish('broadcasts', '', Buffer.from(JSON.stringify(event)));
// Routing key is ignored with fanout
Direct — Exact routing key match
await channel.assertExchange('tasks', 'direct', { durable: true });
// Only receives messages with routing key "email"
await channel.bindQueue('email-queue', 'tasks', 'email');
// Only receives messages with routing key "sms"
await channel.bindQueue('sms-queue', 'tasks', 'sms');
channel.publish('tasks', 'email', Buffer.from(JSON.stringify(event)));
// Only email-queue receives this
Topic — Pattern matching
await channel.assertExchange('platform-events', 'topic', { durable: true });
// * matches exactly one word
await channel.bindQueue('order-notifications', 'platform-events', 'order.*');
// Matches: order.placed, order.shipped, order.cancelled
// # matches zero or more words
await channel.bindQueue('audit-log', 'platform-events', '#');
// Matches: EVERYTHING (order.placed, user.created, payment.failed, etc.)
await channel.bindQueue('user-events', 'platform-events', 'user.*');
// Matches: user.created, user.updated, user.deleted
9. Dead Letter Queues
When a message cannot be processed after multiple retries, it goes to a dead letter queue (DLQ) instead of being lost forever.
Normal flow:
Exchange → Queue → Consumer → ack → (message removed)
Dead letter flow:
Exchange → Queue → Consumer → nack (no requeue) → DLQ
│
▼
Manual review
or automated retry
// Queue declaration with dead letter configuration
await channel.assertQueue('order-queue', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'platform-events.dlx',
'x-dead-letter-routing-key': 'order-queue.dead',
'x-message-ttl': 300000, // Optional: messages expire after 5 min
},
});
Monitoring the DLQ
// Simple DLQ monitor
async function checkDeadLetterQueue(channel, dlqName) {
const queueInfo = await channel.checkQueue(dlqName);
if (queueInfo.messageCount > 0) {
console.warn(
`[DLQ ALERT] ${dlqName} has ${queueInfo.messageCount} dead letters!`
);
// Send alert to monitoring (PagerDuty, Slack, etc.)
}
return queueInfo.messageCount;
}
10. Complete Docker Compose with RabbitMQ
# docker-compose.yml (complete with RabbitMQ)
version: '3.8'
services:
api-gateway:
build: ./api-gateway
ports:
- "3000:3000"
environment:
- PORT=3000
- USER_SERVICE_URL=http://user-service:4001
- ORDER_SERVICE_URL=http://order-service:4002
- NOTIF_SERVICE_URL=http://notification-service:4003
depends_on:
- user-service
- order-service
- notification-service
user-service:
build: ./services/user-service
expose:
- "4001"
environment:
- PORT=4001
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672
order-service:
build: ./services/order-service
expose:
- "4002"
environment:
- PORT=4002
- USER_SERVICE_URL=http://user-service:4001
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672
depends_on:
rabbitmq:
condition: service_healthy
notification-service:
build: ./services/notification-service
expose:
- "4003"
environment:
- PORT=4003
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672
depends_on:
rabbitmq:
condition: service_healthy
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
volumes:
- rabbitmq_data:/var/lib/rabbitmq
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 10s
timeout: 5s
retries: 5
volumes:
rabbitmq_data:
11. Key Takeaways
- Use events for fire-and-forget operations — notifications, analytics, logging, syncing data. Use HTTP for request-response where you need an immediate answer.
- RabbitMQ exchanges route messages to queues via bindings. Topic exchanges are the most flexible for microservices.
- Always acknowledge messages after successful processing. Use
nackwith requeue for retries,nackwithout requeue for dead-lettering. - Dead letter queues catch failed messages — monitor them and have a process for reprocessing or manual review.
- Publish events AFTER persisting data — never publish before the database write completes; you could announce something that never happened.
- Event publishing failures should not break the primary operation — if you can't publish "order.placed", the order itself should still succeed.
Explain-It Challenge
- Your notification service is down for 2 hours during maintenance. 10,000 "order.placed" events are published. What happens to them? Walk through the flow when the service comes back.
- Why do we use a topic exchange instead of a fanout exchange for
platform-events? - A developer asks "Why not just use REST webhooks instead of RabbitMQ?" What are the trade-offs?
Navigation: <- 6.2.c Retry, Timeout & Circuit Breaker | 6.2.e — Event Payloads & Idempotency ->