Episode 4 — Generative AI Engineering / 4.9 — Combining Streaming with Structured Data
4.9.c — Separating UI from System Outputs
In one sentence: Production AI architectures need a clean separation between the UI-facing channel (streamed text for human consumption) and the system-facing channel (structured JSON for databases, APIs, and downstream services) — this section covers event-based architectures, WebSocket patterns, and production-grade strategies for serving both channels reliably.
Navigation: <- 4.9.b — Returning Structured JSON After Generation | 4.9 Overview
1. The Dual-Channel Architecture Problem
In a real production system, the LLM's output typically needs to serve multiple consumers simultaneously:
┌──────────────────────┐
│ LLM Response │
└──────────┬─────────────┘
│
┌───────────────┼───────────────┐
│ │ │
┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
│ UI Layer │ │ Data Layer│ │ Event Bus │
│ (stream │ │ (JSON to │ │ (notify │
│ text to │ │ database, │ │ other │
│ browser) │ │ analytics)│ │ services) │
└────────────┘ └────────────┘ └────────────┘
Human System System
Channel Channel Channel
This is not a simple "parse the response" problem. It is an architecture problem. How you separate these channels affects:
- Reliability — if the UI stream fails, does the system still get its data?
- Latency — does the user wait for the structured extraction?
- Consistency — does the structured data match what the user saw?
- Scalability — can you add new system consumers without changing the streaming logic?
2. When to Use One Call vs Two Calls
The decision between a single LLM call (with post-processing) and two separate calls is nuanced:
Decision framework
Does the structured data need to
match the conversational text exactly?
│
┌────YES────┴────NO────┐
│ │
Is latency critical? Can the two calls share
│ the same context?
┌───YES───┴───NO───┐ │
│ │ ┌──YES─┴──NO──┐
│ │ │ │
Single call + Two calls Two calls Two calls
post-processing sequential parallel parallel
(delimiter or (extract (same (different
parse) from text) prompt) prompts)
Detailed comparison
| Factor | Single call + parse | Two calls sequential | Two calls parallel |
|---|---|---|---|
| Data consistency with UI text | Highest (same generation) | High (extracted from text) | Low (independent) |
| Latency to user | Best | Worst (user may wait) | Good |
| Latency to system | Delayed (must finish stream) | Delayed (must finish + extract) | Good (parallel) |
| Cost | Lowest (1 call) | Highest (2 calls, shared context) | Medium (2 calls, no shared context) |
| Reliability of JSON | Medium | Highest | High |
| Architecture simplicity | Medium | Simplest | Medium |
| Scalability | Limited | Good | Best |
3. Event-Based Architecture for Dual-Purpose Responses
An event-driven architecture cleanly separates concerns. The LLM interaction emits events that different consumers subscribe to independently.
3.1 Event emitter pattern
import { EventEmitter } from 'events';
import OpenAI from 'openai';
class AIResponsePipeline extends EventEmitter {
constructor(openai, options = {}) {
super();
this.openai = openai;
this.model = options.model || 'gpt-4o';
}
async process(userMessage, systemPrompt) {
const requestId = crypto.randomUUID();
this.emit('request:start', { requestId, userMessage, timestamp: Date.now() });
try {
// Phase 1: Stream conversational text
const conversationalText = await this.streamPhase(requestId, userMessage, systemPrompt);
// Phase 2: Extract structured data
const structuredData = await this.structurePhase(requestId, userMessage, conversationalText);
// Phase 3: Emit completion
this.emit('response:complete', {
requestId,
text: conversationalText,
data: structuredData,
timestamp: Date.now()
});
return { text: conversationalText, data: structuredData };
} catch (error) {
this.emit('response:error', { requestId, error, timestamp: Date.now() });
throw error;
}
}
async streamPhase(requestId, userMessage, systemPrompt) {
this.emit('stream:start', { requestId });
const stream = await this.openai.chat.completions.create({
model: this.model,
messages: [
{ role: 'system', content: systemPrompt },
{ role: 'user', content: userMessage }
],
stream: true,
stream_options: { include_usage: true }
});
let fullText = '';
let tokenCount = 0;
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
tokenCount++;
fullText += content;
this.emit('stream:token', { requestId, content, tokenIndex: tokenCount });
}
if (chunk.usage) {
this.emit('stream:usage', { requestId, usage: chunk.usage });
}
}
this.emit('stream:end', { requestId, fullText, tokenCount });
return fullText;
}
async structurePhase(requestId, userMessage, conversationalText) {
this.emit('structure:start', { requestId });
const response = await this.openai.chat.completions.create({
model: this.model,
messages: [
{
role: 'system',
content: 'Extract structured data from this AI response. Return valid JSON.'
},
{ role: 'user', content: userMessage },
{ role: 'assistant', content: conversationalText }
],
response_format: { type: 'json_object' },
temperature: 0
});
const data = JSON.parse(response.choices[0].message.content);
this.emit('structure:complete', { requestId, data });
return data;
}
}
// Usage: Different consumers subscribe to different events
const pipeline = new AIResponsePipeline(openai);
// Consumer 1: UI (streams text to the browser)
pipeline.on('stream:token', ({ content }) => {
process.stdout.write(content); // Or send via SSE/WebSocket
});
// Consumer 2: Analytics (tracks usage)
pipeline.on('stream:usage', ({ requestId, usage }) => {
console.log(`\n[Analytics] Request ${requestId}: ${usage.total_tokens} tokens`);
});
// Consumer 3: Database (saves structured data)
pipeline.on('structure:complete', ({ requestId, data }) => {
console.log(`\n[DB] Saving structured data for ${requestId}`);
// await db.collection('ai_responses').insertOne({ requestId, data });
});
// Consumer 4: Monitoring (tracks performance)
const timings = {};
pipeline.on('request:start', ({ requestId, timestamp }) => {
timings[requestId] = { start: timestamp };
});
pipeline.on('stream:token', ({ requestId }) => {
if (timings[requestId] && !timings[requestId].firstToken) {
timings[requestId].firstToken = Date.now();
}
});
pipeline.on('response:complete', ({ requestId, timestamp }) => {
const t = timings[requestId];
console.log(`\n[Monitor] TTFT: ${t.firstToken - t.start}ms, Total: ${timestamp - t.start}ms`);
delete timings[requestId];
});
// Consumer 5: Error alerting
pipeline.on('response:error', ({ requestId, error }) => {
console.error(`\n[ALERT] Request ${requestId} failed: ${error.message}`);
// await alertService.notify({ level: 'error', requestId, error });
});
// Execute
await pipeline.process(
'Analyze our Q3 sales data and recommend actions',
'You are a business analyst. Provide clear analysis and actionable recommendations.'
);
3.2 Benefits of event-based architecture
| Benefit | Explanation |
|---|---|
| Loose coupling | UI, database, analytics, and monitoring are independent. Adding a new consumer requires zero changes to the pipeline |
| Fault isolation | If the database write fails, the UI stream is unaffected |
| Testability | Each consumer can be unit tested independently with mock events |
| Observability | Events form a natural audit trail — every state transition is logged |
| Extensibility | New features (caching, A/B testing, rate limiting) are just new event listeners |
4. WebSocket Patterns for Real-Time + Structured
For applications that need bidirectional real-time communication (chat apps, collaborative tools, dashboards), WebSockets provide a more flexible transport than SSE.
4.1 WebSocket server with dual channels
import { WebSocketServer } from 'ws';
import OpenAI from 'openai';
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', (ws) => {
console.log('Client connected');
ws.on('message', async (raw) => {
const message = JSON.parse(raw.toString());
if (message.type === 'chat') {
await handleChatMessage(ws, message);
}
});
});
async function handleChatMessage(ws, message) {
const requestId = crypto.randomUUID();
// Notify client that processing has started
ws.send(JSON.stringify({
type: 'status',
requestId,
status: 'processing'
}));
try {
// Phase 1: Stream text tokens over WebSocket
const stream = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [
{ role: 'system', content: message.systemPrompt || 'You are a helpful assistant.' },
...message.history || [],
{ role: 'user', content: message.content }
],
stream: true,
stream_options: { include_usage: true }
});
let fullText = '';
let usage = null;
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
fullText += content;
// UI channel: stream tokens
ws.send(JSON.stringify({
type: 'stream:token',
requestId,
content
}));
}
if (chunk.usage) {
usage = chunk.usage;
}
}
// UI channel: stream complete
ws.send(JSON.stringify({
type: 'stream:complete',
requestId,
fullText
}));
// Phase 2: Extract structured data (non-blocking for UI)
const structuredResponse = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [
{
role: 'system',
content: `Extract structured insights from this conversation. Return JSON with:
{ "topics": ["string"], "sentiment": "positive|negative|neutral", "actionItems": ["string"], "summary": "string" }`
},
{ role: 'user', content: message.content },
{ role: 'assistant', content: fullText }
],
response_format: { type: 'json_object' },
temperature: 0
});
const structuredData = JSON.parse(structuredResponse.choices[0].message.content);
// System channel: structured data
ws.send(JSON.stringify({
type: 'structured:data',
requestId,
data: structuredData,
usage
}));
} catch (error) {
ws.send(JSON.stringify({
type: 'error',
requestId,
message: error.message
}));
}
}
console.log('WebSocket server running on ws://localhost:8080');
4.2 WebSocket client with channel separation
class AIWebSocketClient {
constructor(url) {
this.ws = new WebSocket(url);
this.handlers = new Map();
this.pendingRequests = new Map();
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
this.dispatch(message);
};
}
// Register handlers for different message types
on(type, handler) {
if (!this.handlers.has(type)) {
this.handlers.set(type, []);
}
this.handlers.get(type).push(handler);
}
dispatch(message) {
const handlers = this.handlers.get(message.type) || [];
for (const handler of handlers) {
handler(message);
}
// Also dispatch to request-specific callbacks
const requestCallbacks = this.pendingRequests.get(message.requestId);
if (requestCallbacks?.[message.type]) {
requestCallbacks[message.type](message);
}
}
// Send a chat message and get callbacks for both channels
chat(content, options = {}) {
const requestId = crypto.randomUUID();
return new Promise((resolve, reject) => {
let fullText = '';
let structuredData = null;
let streamComplete = false;
let structureComplete = false;
const checkDone = () => {
if (streamComplete && structureComplete) {
this.pendingRequests.delete(requestId);
resolve({ text: fullText, data: structuredData });
}
};
this.pendingRequests.set(requestId, {
'stream:token': (msg) => {
options.onToken?.(msg.content);
fullText += msg.content;
},
'stream:complete': (msg) => {
streamComplete = true;
options.onStreamComplete?.(msg.fullText);
checkDone();
},
'structured:data': (msg) => {
structuredData = msg.data;
structureComplete = true;
options.onStructuredData?.(msg.data);
checkDone();
},
'error': (msg) => {
this.pendingRequests.delete(requestId);
reject(new Error(msg.message));
}
});
this.ws.send(JSON.stringify({
type: 'chat',
requestId,
content,
...options
}));
});
}
}
// Usage
const client = new AIWebSocketClient('ws://localhost:8080');
// Global handlers (for all messages)
client.on('stream:token', (msg) => {
document.getElementById('chat-output').textContent += msg.content;
});
client.on('structured:data', (msg) => {
// Update sidebar dashboard with structured insights
document.getElementById('topics').textContent = msg.data.topics.join(', ');
document.getElementById('sentiment').textContent = msg.data.sentiment;
});
// Per-request usage
const result = await client.chat('Analyze our customer feedback from last month', {
onToken: (token) => console.log('Token:', token),
onStreamComplete: (text) => console.log('Stream done:', text.length, 'chars'),
onStructuredData: (data) => console.log('Structure:', data)
});
console.log('Full result:', result);
5. Server-Sent Events (SSE) with Typed Channels
SSE is simpler than WebSockets for the common case of server-to-client streaming. You can use SSE event types to separate channels:
5.1 Multi-channel SSE server
import express from 'express';
import OpenAI from 'openai';
const app = express();
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
app.use(express.json());
app.post('/api/analyze', async (req, res) => {
const { message, history = [] } = req.body;
// SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no');
const requestId = crypto.randomUUID();
// Helper: send typed SSE events
const sendEvent = (eventType, data) => {
res.write(`event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`);
};
sendEvent('status', { requestId, status: 'streaming' });
try {
// Phase 1: Stream conversational text
const stream = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [
{ role: 'system', content: 'You are a business analyst. Provide clear analysis.' },
...history,
{ role: 'user', content: message }
],
stream: true,
stream_options: { include_usage: true }
});
let fullText = '';
let usage = null;
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
fullText += content;
sendEvent('text', { content }); // UI channel
}
if (chunk.usage) usage = chunk.usage;
}
sendEvent('text:complete', { fullText, usage }); // UI channel: done
sendEvent('status', { requestId, status: 'extracting' });
// Phase 2: Extract structured data
const structuredResponse = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [
{
role: 'system',
content: `Extract structured insights. Return JSON:
{
"keyFindings": [{"finding": "string", "impact": "high|medium|low"}],
"recommendations": [{"action": "string", "priority": 1-5}],
"overallSentiment": "positive|negative|neutral|mixed",
"confidence": 0.0-1.0
}`
},
{ role: 'user', content: message },
{ role: 'assistant', content: fullText }
],
response_format: { type: 'json_object' },
temperature: 0
});
const structuredData = JSON.parse(structuredResponse.choices[0].message.content);
sendEvent('structured', { data: structuredData }); // System channel
sendEvent('status', { requestId, status: 'complete' });
} catch (error) {
sendEvent('error', { message: error.message, requestId });
}
res.end();
});
app.listen(3000);
5.2 Multi-channel SSE client
async function consumeMultiChannelSSE(message) {
const response = await fetch('/api/analyze', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
// Channel-specific handlers
const channels = {
text: (data) => {
// UI channel: append text in real time
document.getElementById('analysis-output').textContent += data.content;
},
'text:complete': (data) => {
console.log('Text streaming complete. Tokens:', data.usage?.total_tokens);
},
structured: (data) => {
// System channel: render structured insights
renderDashboard(data.data);
},
status: (data) => {
document.getElementById('status').textContent = data.status;
},
error: (data) => {
document.getElementById('error').textContent = data.message;
}
};
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Parse SSE events (with named event types)
const events = parseSSEEvents(buffer);
buffer = events.remaining;
for (const event of events.parsed) {
const handler = channels[event.type];
if (handler) {
handler(event.data);
}
}
}
}
function parseSSEEvents(buffer) {
const parsed = [];
const blocks = buffer.split('\n\n');
const remaining = blocks.pop() || ''; // Last block might be incomplete
for (const block of blocks) {
if (!block.trim()) continue;
let eventType = 'message'; // Default SSE event type
let data = '';
for (const line of block.split('\n')) {
if (line.startsWith('event: ')) {
eventType = line.slice(7);
} else if (line.startsWith('data: ')) {
data += line.slice(6);
}
}
if (data) {
try {
parsed.push({ type: eventType, data: JSON.parse(data) });
} catch {
parsed.push({ type: eventType, data });
}
}
}
return { parsed, remaining };
}
function renderDashboard(data) {
// Render key findings
const findingsList = document.getElementById('findings');
findingsList.innerHTML = '';
for (const finding of data.keyFindings) {
const li = document.createElement('li');
li.textContent = `[${finding.impact.toUpperCase()}] ${finding.finding}`;
findingsList.appendChild(li);
}
// Render recommendations
const recsList = document.getElementById('recommendations');
recsList.innerHTML = '';
for (const rec of data.recommendations.sort((a, b) => a.priority - b.priority)) {
const li = document.createElement('li');
li.textContent = `P${rec.priority}: ${rec.action}`;
recsList.appendChild(li);
}
// Render sentiment
document.getElementById('sentiment-badge').textContent = data.overallSentiment;
document.getElementById('confidence').textContent = `${(data.confidence * 100).toFixed(0)}%`;
}
6. Production Pattern: Response Router
In larger systems, you may need to route different parts of an LLM response to different services. A response router abstracts this:
class ResponseRouter {
constructor() {
this.routes = [];
}
/**
* Register a route: when a condition matches, send data to a handler.
*/
route(name, condition, handler) {
this.routes.push({ name, condition, handler });
return this; // Allow chaining
}
/**
* Process a completed AI response through all routes.
*/
async process(response) {
const { text, structuredData, metadata } = response;
const results = {};
const promises = this.routes
.filter(route => route.condition({ text, structuredData, metadata }))
.map(async (route) => {
try {
results[route.name] = await route.handler({ text, structuredData, metadata });
} catch (error) {
results[route.name] = { error: error.message };
}
});
await Promise.all(promises);
return results;
}
}
// Configuration
const router = new ResponseRouter();
// Route 1: Always save to conversation history
router.route(
'history',
() => true, // Always matches
async ({ text, metadata }) => {
// await db.conversations.insertOne({
// requestId: metadata.requestId,
// text,
// timestamp: Date.now()
// });
console.log('[History] Saved conversation');
return { saved: true };
}
);
// Route 2: Save structured data if extraction succeeded
router.route(
'structured-storage',
({ structuredData }) => structuredData !== null,
async ({ structuredData, metadata }) => {
// await db.insights.insertOne({
// requestId: metadata.requestId,
// ...structuredData
// });
console.log('[Storage] Saved structured data');
return { saved: true };
}
);
// Route 3: Trigger alerts for high-priority findings
router.route(
'alerts',
({ structuredData }) => {
return structuredData?.recommendations?.some(r => r.priority === 1);
},
async ({ structuredData }) => {
const urgent = structuredData.recommendations.filter(r => r.priority === 1);
// await alertService.send({ level: 'urgent', items: urgent });
console.log(`[Alert] ${urgent.length} urgent recommendation(s) detected`);
return { alerted: true, count: urgent.length };
}
);
// Route 4: Update dashboard metrics if sentiment data exists
router.route(
'dashboard',
({ structuredData }) => structuredData?.overallSentiment != null,
async ({ structuredData }) => {
// await dashboardService.updateMetric('sentiment', structuredData.overallSentiment);
console.log(`[Dashboard] Updated sentiment: ${structuredData.overallSentiment}`);
return { updated: true };
}
);
// Route 5: Log analytics for long responses
router.route(
'analytics',
({ text }) => text.length > 500,
async ({ text, metadata }) => {
// await analytics.track('long_response', {
// requestId: metadata.requestId,
// length: text.length,
// wordCount: text.split(/\s+/).length
// });
console.log(`[Analytics] Long response logged (${text.length} chars)`);
return { logged: true };
}
);
// Execute the pipeline
async function fullPipeline(userMessage, onToken) {
// Phase 1: Stream
const text = await streamToUser(userMessage, onToken);
// Phase 2: Structure
const structuredData = await extractStructured(userMessage, text);
// Phase 3: Route
const routeResults = await router.process({
text,
structuredData,
metadata: { requestId: crypto.randomUUID(), userMessage, timestamp: Date.now() }
});
console.log('\nRoute results:', routeResults);
return { text, structuredData, routeResults };
}
7. Middleware Pattern for Request/Response Processing
For Express-based applications, middleware provides a clean way to separate streaming concerns:
import express from 'express';
import OpenAI from 'openai';
const app = express();
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
app.use(express.json());
// Middleware 1: Attach AI context
function aiContext(req, res, next) {
req.ai = {
requestId: crypto.randomUUID(),
startTime: Date.now(),
model: 'gpt-4o'
};
next();
}
// Middleware 2: Stream the text response to the client
function streamResponse(systemPrompt) {
return async (req, res, next) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const stream = await openai.chat.completions.create({
model: req.ai.model,
messages: [
{ role: 'system', content: systemPrompt },
{ role: 'user', content: req.body.message }
],
stream: true
});
let fullText = '';
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
fullText += content;
res.write(`event: text\ndata: ${JSON.stringify({ content })}\n\n`);
}
}
req.ai.fullText = fullText;
req.ai.streamEndTime = Date.now();
res.write(`event: text:complete\ndata: ${JSON.stringify({
fullText,
streamDurationMs: req.ai.streamEndTime - req.ai.startTime
})}\n\n`);
next(); // Continue to structured extraction
};
}
// Middleware 3: Extract structured data and send on system channel
function extractStructured(extractionPrompt) {
return async (req, res, next) => {
try {
const response = await openai.chat.completions.create({
model: req.ai.model,
messages: [
{ role: 'system', content: extractionPrompt },
{ role: 'user', content: req.body.message },
{ role: 'assistant', content: req.ai.fullText }
],
response_format: { type: 'json_object' },
temperature: 0
});
req.ai.structuredData = JSON.parse(response.choices[0].message.content);
res.write(`event: structured\ndata: ${JSON.stringify({
data: req.ai.structuredData
})}\n\n`);
next();
} catch (error) {
res.write(`event: structured:error\ndata: ${JSON.stringify({
error: error.message
})}\n\n`);
next();
}
};
}
// Middleware 4: Log and finalize
function finalize(req, res) {
const totalTime = Date.now() - req.ai.startTime;
res.write(`event: meta\ndata: ${JSON.stringify({
requestId: req.ai.requestId,
totalDurationMs: totalTime,
streamDurationMs: req.ai.streamEndTime - req.ai.startTime,
hasStructuredData: req.ai.structuredData != null
})}\n\n`);
// Log for monitoring
console.log(`[${req.ai.requestId}] Complete in ${totalTime}ms`);
res.end();
}
// Compose the pipeline
app.post('/api/analyze',
aiContext,
streamResponse('You are a business analyst. Provide clear, actionable analysis.'),
extractStructured(`Extract structured insights. Return JSON:
{
"keyFindings": [{"finding": "string", "impact": "high|medium|low"}],
"recommendations": [{"action": "string", "priority": 1-5}],
"sentiment": "positive|negative|neutral|mixed"
}`),
finalize
);
app.listen(3000, () => console.log('Server on port 3000'));
8. Handling Concurrent Users
In production, many users stream simultaneously. Each stream must be isolated:
class ConnectionManager {
constructor() {
this.connections = new Map(); // requestId -> connection info
this.metrics = {
activeStreams: 0,
totalRequests: 0,
peakConcurrent: 0
};
}
register(requestId, res) {
this.connections.set(requestId, {
res,
startTime: Date.now(),
bytesStreamed: 0,
tokensStreamed: 0
});
this.metrics.activeStreams++;
this.metrics.totalRequests++;
this.metrics.peakConcurrent = Math.max(
this.metrics.peakConcurrent,
this.metrics.activeStreams
);
// Handle client disconnect
res.on('close', () => {
this.unregister(requestId);
});
}
streamToken(requestId, content) {
const conn = this.connections.get(requestId);
if (!conn) return false; // Client disconnected
try {
conn.res.write(`event: text\ndata: ${JSON.stringify({ content })}\n\n`);
conn.bytesStreamed += content.length;
conn.tokensStreamed++;
return true;
} catch {
this.unregister(requestId);
return false; // Write failed (client gone)
}
}
sendStructured(requestId, data) {
const conn = this.connections.get(requestId);
if (!conn) return false;
try {
conn.res.write(`event: structured\ndata: ${JSON.stringify({ data })}\n\n`);
return true;
} catch {
this.unregister(requestId);
return false;
}
}
complete(requestId) {
const conn = this.connections.get(requestId);
if (!conn) return;
const duration = Date.now() - conn.startTime;
conn.res.write(`event: meta\ndata: ${JSON.stringify({
requestId,
durationMs: duration,
tokensStreamed: conn.tokensStreamed
})}\n\n`);
conn.res.end();
this.unregister(requestId);
}
unregister(requestId) {
if (this.connections.has(requestId)) {
this.connections.delete(requestId);
this.metrics.activeStreams--;
}
}
getMetrics() {
return {
...this.metrics,
connectionDetails: Array.from(this.connections.entries()).map(([id, conn]) => ({
requestId: id,
durationMs: Date.now() - conn.startTime,
tokensStreamed: conn.tokensStreamed
}))
};
}
}
const connManager = new ConnectionManager();
// Metrics endpoint
// app.get('/api/metrics', (req, res) => {
// res.json(connManager.getMetrics());
// });
9. Testing Dual-Channel Architectures
Testing is critical for systems that serve two channels. Each channel needs independent verification:
// Test utilities for dual-channel responses
class DualChannelTestHarness {
constructor() {
this.textEvents = [];
this.structuredEvents = [];
this.statusEvents = [];
this.errors = [];
}
createMockResponse() {
const self = this;
return {
setHeader: () => {},
write(data) {
// Parse SSE format
const lines = data.split('\n');
let eventType = 'message';
let eventData = null;
for (const line of lines) {
if (line.startsWith('event: ')) eventType = line.slice(7);
if (line.startsWith('data: ')) {
try {
eventData = JSON.parse(line.slice(6));
} catch {
eventData = line.slice(6);
}
}
}
if (eventData) {
switch (eventType) {
case 'text':
self.textEvents.push(eventData);
break;
case 'structured':
self.structuredEvents.push(eventData);
break;
case 'status':
self.statusEvents.push(eventData);
break;
case 'error':
self.errors.push(eventData);
break;
}
}
},
end() {},
on() {}
};
}
// Assertions
assertTextStreamComplete() {
const fullText = this.textEvents.map(e => e.content).join('');
if (!fullText.length) throw new Error('No text was streamed');
return fullText;
}
assertStructuredDataReceived() {
if (this.structuredEvents.length === 0) {
throw new Error('No structured data was emitted');
}
return this.structuredEvents[0].data;
}
assertStructuredDataMatchesSchema(schema) {
const data = this.assertStructuredDataReceived();
for (const field of schema.required || []) {
if (!(field in data)) {
throw new Error(`Missing required field: ${field}`);
}
}
return data;
}
assertNoErrors() {
if (this.errors.length > 0) {
throw new Error(`Errors emitted: ${JSON.stringify(this.errors)}`);
}
}
assertChannelOrder() {
// Text events should arrive before structured events
// (This validates the two-phase pattern)
const lastTextIndex = this.textEvents.length - 1;
if (this.structuredEvents.length > 0 && lastTextIndex < 0) {
throw new Error('Structured data arrived before any text');
}
return true;
}
}
// Example test
async function testDualChannel() {
const harness = new DualChannelTestHarness();
const mockRes = harness.createMockResponse();
// Run the handler with mock response
// await analyzeHandler({ body: { message: 'Test message' }, ai: {} }, mockRes, () => {});
// Verify both channels
harness.assertNoErrors();
const text = harness.assertTextStreamComplete();
console.log('Text streamed:', text.length, 'chars');
const data = harness.assertStructuredDataMatchesSchema({
required: ['keyFindings', 'recommendations', 'sentiment']
});
console.log('Structured data valid:', Object.keys(data));
harness.assertChannelOrder();
console.log('Channel ordering correct');
console.log('All tests passed.');
}
10. Architecture Decision Guide
Choosing the right pattern for your application
┌─────────────────────────────────────────────────────────────────┐
│ ARCHITECTURE DECISION TREE │
│ │
│ How many concurrent users? │
│ │ │
│ ├── < 100 ──→ Simple SSE + sequential extraction │
│ │ │
│ ├── 100-10K ──→ SSE + event-based pipeline + connection │
│ │ management │
│ │ │
│ └── > 10K ──→ WebSocket + message queue + worker pool │
│ │
│ Do you need bidirectional communication? │
│ │ │
│ ├── No ──→ SSE (simpler, auto-reconnect, HTTP/2 compat) │
│ │ │
│ └── Yes ──→ WebSocket (user can cancel, send new context) │
│ │
│ How critical is structured data reliability? │
│ │ │
│ ├── Nice to have ──→ Single call + delimiter/parse │
│ │ │
│ ├── Important ──→ Two calls, fallback on parse failure │
│ │ │
│ └── Mission critical ──→ Two calls + validation + retry │
│ + human review queue │
└─────────────────────────────────────────────────────────────────┘
Summary table
| Pattern | Use case | Complexity | Reliability | Cost |
|---|---|---|---|---|
| Simple SSE stream | Chat-only, no structured data needed | Low | High | Low |
| SSE + delimiter extraction | Add structured data cheaply | Medium | Medium | Low |
| SSE + two-phase (sequential) | Reliable structured extraction | Medium | High | Medium |
| SSE + two-phase (parallel) | Speed + structured data | Medium | High | Medium |
| Event-based pipeline | Multiple system consumers | High | Very high | Medium |
| WebSocket + channels | Bidirectional, real-time dashboard | High | High | Medium |
| Message queue + workers | High-scale, async processing | Very high | Very high | Higher |
11. Key Takeaways
- Separate the human channel from the machine channel — UI-facing streamed text and system-facing structured JSON are fundamentally different concerns that need different architectures.
- Event-based architectures provide the cleanest separation. Emit events for tokens, completion, structured data, and errors. Let consumers subscribe independently.
- SSE is sufficient for most applications — WebSockets are only needed when you require bidirectional communication (user cancellation, context updates, real-time collaboration).
- Connection management is critical at scale — track active streams, handle disconnects, and monitor concurrent usage to prevent resource exhaustion.
- Test both channels independently — verify that text streams correctly AND that structured data matches the expected schema. Test that failures in one channel don't break the other.
- Start simple, evolve as needed — begin with SSE + sequential extraction. Add event-based routing when you have multiple system consumers. Move to WebSockets only if bidirectional communication is required.
Explain-It Challenge
- Your team has been using a simple "stream then parse" pattern. Product wants to add: (a) a sidebar that shows key metrics from the AI response, (b) automatic Slack notifications for high-priority findings, and (c) analytics tracking. How do you evolve the architecture?
- A user reports that the chat text appeared instantly but the "insights panel" (populated from structured data) took an additional 3 seconds to load. Explain why this happens and propose two different solutions.
- Your application serves 50,000 concurrent users. Each user interaction creates two LLM calls (stream + structure). The LLM provider rate-limits you at 10,000 requests per minute. Design a queuing/prioritization strategy that prioritizes the streaming call (user-facing) over the structured extraction call (system-facing).
Navigation: <- 4.9.b — Returning Structured JSON After Generation | 4.9 Overview