Episode 4 — Generative AI Engineering / 4.9 — Combining Streaming with Structured Data

4.9.c — Separating UI from System Outputs

In one sentence: Production AI architectures need a clean separation between the UI-facing channel (streamed text for human consumption) and the system-facing channel (structured JSON for databases, APIs, and downstream services) — this section covers event-based architectures, WebSocket patterns, and production-grade strategies for serving both channels reliably.

Navigation: <- 4.9.b — Returning Structured JSON After Generation | 4.9 Overview


1. The Dual-Channel Architecture Problem

In a real production system, the LLM's output typically needs to serve multiple consumers simultaneously:

                          ┌──────────────────────┐
                          │      LLM Response      │
                          └──────────┬─────────────┘
                                     │
                     ┌───────────────┼───────────────┐
                     │               │               │
               ┌─────▼─────┐  ┌─────▼─────┐  ┌─────▼─────┐
               │  UI Layer  │  │  Data Layer│  │  Event Bus │
               │  (stream   │  │  (JSON to  │  │  (notify   │
               │   text to  │  │  database, │  │  other     │
               │   browser) │  │  analytics)│  │  services) │
               └────────────┘  └────────────┘  └────────────┘
                  Human            System          System
                  Channel          Channel         Channel

This is not a simple "parse the response" problem. It is an architecture problem. How you separate these channels affects:

  • Reliability — if the UI stream fails, does the system still get its data?
  • Latency — does the user wait for the structured extraction?
  • Consistency — does the structured data match what the user saw?
  • Scalability — can you add new system consumers without changing the streaming logic?

2. When to Use One Call vs Two Calls

The decision between a single LLM call (with post-processing) and two separate calls is nuanced:

Decision framework

                               Does the structured data need to
                               match the conversational text exactly?
                                          │
                              ┌────YES────┴────NO────┐
                              │                      │
                    Is latency critical?    Can the two calls share
                              │            the same context?
                    ┌───YES───┴───NO───┐         │
                    │                  │    ┌──YES─┴──NO──┐
                    │                  │    │             │
              Single call +      Two calls  Two calls    Two calls
              post-processing    sequential  parallel     parallel
              (delimiter or      (extract    (same       (different
               parse)            from text)   prompt)     prompts)

Detailed comparison

FactorSingle call + parseTwo calls sequentialTwo calls parallel
Data consistency with UI textHighest (same generation)High (extracted from text)Low (independent)
Latency to userBestWorst (user may wait)Good
Latency to systemDelayed (must finish stream)Delayed (must finish + extract)Good (parallel)
CostLowest (1 call)Highest (2 calls, shared context)Medium (2 calls, no shared context)
Reliability of JSONMediumHighestHigh
Architecture simplicityMediumSimplestMedium
ScalabilityLimitedGoodBest

3. Event-Based Architecture for Dual-Purpose Responses

An event-driven architecture cleanly separates concerns. The LLM interaction emits events that different consumers subscribe to independently.

3.1 Event emitter pattern

import { EventEmitter } from 'events';
import OpenAI from 'openai';

class AIResponsePipeline extends EventEmitter {
  constructor(openai, options = {}) {
    super();
    this.openai = openai;
    this.model = options.model || 'gpt-4o';
  }

  async process(userMessage, systemPrompt) {
    const requestId = crypto.randomUUID();

    this.emit('request:start', { requestId, userMessage, timestamp: Date.now() });

    try {
      // Phase 1: Stream conversational text
      const conversationalText = await this.streamPhase(requestId, userMessage, systemPrompt);

      // Phase 2: Extract structured data
      const structuredData = await this.structurePhase(requestId, userMessage, conversationalText);

      // Phase 3: Emit completion
      this.emit('response:complete', {
        requestId,
        text: conversationalText,
        data: structuredData,
        timestamp: Date.now()
      });

      return { text: conversationalText, data: structuredData };
    } catch (error) {
      this.emit('response:error', { requestId, error, timestamp: Date.now() });
      throw error;
    }
  }

  async streamPhase(requestId, userMessage, systemPrompt) {
    this.emit('stream:start', { requestId });

    const stream = await this.openai.chat.completions.create({
      model: this.model,
      messages: [
        { role: 'system', content: systemPrompt },
        { role: 'user', content: userMessage }
      ],
      stream: true,
      stream_options: { include_usage: true }
    });

    let fullText = '';
    let tokenCount = 0;

    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content;
      if (content) {
        tokenCount++;
        fullText += content;
        this.emit('stream:token', { requestId, content, tokenIndex: tokenCount });
      }
      if (chunk.usage) {
        this.emit('stream:usage', { requestId, usage: chunk.usage });
      }
    }

    this.emit('stream:end', { requestId, fullText, tokenCount });
    return fullText;
  }

  async structurePhase(requestId, userMessage, conversationalText) {
    this.emit('structure:start', { requestId });

    const response = await this.openai.chat.completions.create({
      model: this.model,
      messages: [
        {
          role: 'system',
          content: 'Extract structured data from this AI response. Return valid JSON.'
        },
        { role: 'user', content: userMessage },
        { role: 'assistant', content: conversationalText }
      ],
      response_format: { type: 'json_object' },
      temperature: 0
    });

    const data = JSON.parse(response.choices[0].message.content);
    this.emit('structure:complete', { requestId, data });
    return data;
  }
}

// Usage: Different consumers subscribe to different events

const pipeline = new AIResponsePipeline(openai);

// Consumer 1: UI (streams text to the browser)
pipeline.on('stream:token', ({ content }) => {
  process.stdout.write(content); // Or send via SSE/WebSocket
});

// Consumer 2: Analytics (tracks usage)
pipeline.on('stream:usage', ({ requestId, usage }) => {
  console.log(`\n[Analytics] Request ${requestId}: ${usage.total_tokens} tokens`);
});

// Consumer 3: Database (saves structured data)
pipeline.on('structure:complete', ({ requestId, data }) => {
  console.log(`\n[DB] Saving structured data for ${requestId}`);
  // await db.collection('ai_responses').insertOne({ requestId, data });
});

// Consumer 4: Monitoring (tracks performance)
const timings = {};
pipeline.on('request:start', ({ requestId, timestamp }) => {
  timings[requestId] = { start: timestamp };
});
pipeline.on('stream:token', ({ requestId }) => {
  if (timings[requestId] && !timings[requestId].firstToken) {
    timings[requestId].firstToken = Date.now();
  }
});
pipeline.on('response:complete', ({ requestId, timestamp }) => {
  const t = timings[requestId];
  console.log(`\n[Monitor] TTFT: ${t.firstToken - t.start}ms, Total: ${timestamp - t.start}ms`);
  delete timings[requestId];
});

// Consumer 5: Error alerting
pipeline.on('response:error', ({ requestId, error }) => {
  console.error(`\n[ALERT] Request ${requestId} failed: ${error.message}`);
  // await alertService.notify({ level: 'error', requestId, error });
});

// Execute
await pipeline.process(
  'Analyze our Q3 sales data and recommend actions',
  'You are a business analyst. Provide clear analysis and actionable recommendations.'
);

3.2 Benefits of event-based architecture

BenefitExplanation
Loose couplingUI, database, analytics, and monitoring are independent. Adding a new consumer requires zero changes to the pipeline
Fault isolationIf the database write fails, the UI stream is unaffected
TestabilityEach consumer can be unit tested independently with mock events
ObservabilityEvents form a natural audit trail — every state transition is logged
ExtensibilityNew features (caching, A/B testing, rate limiting) are just new event listeners

4. WebSocket Patterns for Real-Time + Structured

For applications that need bidirectional real-time communication (chat apps, collaborative tools, dashboards), WebSockets provide a more flexible transport than SSE.

4.1 WebSocket server with dual channels

import { WebSocketServer } from 'ws';
import OpenAI from 'openai';

const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const wss = new WebSocketServer({ port: 8080 });

wss.on('connection', (ws) => {
  console.log('Client connected');

  ws.on('message', async (raw) => {
    const message = JSON.parse(raw.toString());

    if (message.type === 'chat') {
      await handleChatMessage(ws, message);
    }
  });
});

async function handleChatMessage(ws, message) {
  const requestId = crypto.randomUUID();

  // Notify client that processing has started
  ws.send(JSON.stringify({
    type: 'status',
    requestId,
    status: 'processing'
  }));

  try {
    // Phase 1: Stream text tokens over WebSocket
    const stream = await openai.chat.completions.create({
      model: 'gpt-4o',
      messages: [
        { role: 'system', content: message.systemPrompt || 'You are a helpful assistant.' },
        ...message.history || [],
        { role: 'user', content: message.content }
      ],
      stream: true,
      stream_options: { include_usage: true }
    });

    let fullText = '';
    let usage = null;

    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content;
      if (content) {
        fullText += content;

        // UI channel: stream tokens
        ws.send(JSON.stringify({
          type: 'stream:token',
          requestId,
          content
        }));
      }
      if (chunk.usage) {
        usage = chunk.usage;
      }
    }

    // UI channel: stream complete
    ws.send(JSON.stringify({
      type: 'stream:complete',
      requestId,
      fullText
    }));

    // Phase 2: Extract structured data (non-blocking for UI)
    const structuredResponse = await openai.chat.completions.create({
      model: 'gpt-4o',
      messages: [
        {
          role: 'system',
          content: `Extract structured insights from this conversation. Return JSON with:
{ "topics": ["string"], "sentiment": "positive|negative|neutral", "actionItems": ["string"], "summary": "string" }`
        },
        { role: 'user', content: message.content },
        { role: 'assistant', content: fullText }
      ],
      response_format: { type: 'json_object' },
      temperature: 0
    });

    const structuredData = JSON.parse(structuredResponse.choices[0].message.content);

    // System channel: structured data
    ws.send(JSON.stringify({
      type: 'structured:data',
      requestId,
      data: structuredData,
      usage
    }));

  } catch (error) {
    ws.send(JSON.stringify({
      type: 'error',
      requestId,
      message: error.message
    }));
  }
}

console.log('WebSocket server running on ws://localhost:8080');

4.2 WebSocket client with channel separation

class AIWebSocketClient {
  constructor(url) {
    this.ws = new WebSocket(url);
    this.handlers = new Map();
    this.pendingRequests = new Map();

    this.ws.onmessage = (event) => {
      const message = JSON.parse(event.data);
      this.dispatch(message);
    };
  }

  // Register handlers for different message types
  on(type, handler) {
    if (!this.handlers.has(type)) {
      this.handlers.set(type, []);
    }
    this.handlers.get(type).push(handler);
  }

  dispatch(message) {
    const handlers = this.handlers.get(message.type) || [];
    for (const handler of handlers) {
      handler(message);
    }

    // Also dispatch to request-specific callbacks
    const requestCallbacks = this.pendingRequests.get(message.requestId);
    if (requestCallbacks?.[message.type]) {
      requestCallbacks[message.type](message);
    }
  }

  // Send a chat message and get callbacks for both channels
  chat(content, options = {}) {
    const requestId = crypto.randomUUID();

    return new Promise((resolve, reject) => {
      let fullText = '';
      let structuredData = null;
      let streamComplete = false;
      let structureComplete = false;

      const checkDone = () => {
        if (streamComplete && structureComplete) {
          this.pendingRequests.delete(requestId);
          resolve({ text: fullText, data: structuredData });
        }
      };

      this.pendingRequests.set(requestId, {
        'stream:token': (msg) => {
          options.onToken?.(msg.content);
          fullText += msg.content;
        },
        'stream:complete': (msg) => {
          streamComplete = true;
          options.onStreamComplete?.(msg.fullText);
          checkDone();
        },
        'structured:data': (msg) => {
          structuredData = msg.data;
          structureComplete = true;
          options.onStructuredData?.(msg.data);
          checkDone();
        },
        'error': (msg) => {
          this.pendingRequests.delete(requestId);
          reject(new Error(msg.message));
        }
      });

      this.ws.send(JSON.stringify({
        type: 'chat',
        requestId,
        content,
        ...options
      }));
    });
  }
}

// Usage
const client = new AIWebSocketClient('ws://localhost:8080');

// Global handlers (for all messages)
client.on('stream:token', (msg) => {
  document.getElementById('chat-output').textContent += msg.content;
});

client.on('structured:data', (msg) => {
  // Update sidebar dashboard with structured insights
  document.getElementById('topics').textContent = msg.data.topics.join(', ');
  document.getElementById('sentiment').textContent = msg.data.sentiment;
});

// Per-request usage
const result = await client.chat('Analyze our customer feedback from last month', {
  onToken: (token) => console.log('Token:', token),
  onStreamComplete: (text) => console.log('Stream done:', text.length, 'chars'),
  onStructuredData: (data) => console.log('Structure:', data)
});

console.log('Full result:', result);

5. Server-Sent Events (SSE) with Typed Channels

SSE is simpler than WebSockets for the common case of server-to-client streaming. You can use SSE event types to separate channels:

5.1 Multi-channel SSE server

import express from 'express';
import OpenAI from 'openai';

const app = express();
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

app.use(express.json());

app.post('/api/analyze', async (req, res) => {
  const { message, history = [] } = req.body;

  // SSE headers
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  res.setHeader('X-Accel-Buffering', 'no');

  const requestId = crypto.randomUUID();

  // Helper: send typed SSE events
  const sendEvent = (eventType, data) => {
    res.write(`event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`);
  };

  sendEvent('status', { requestId, status: 'streaming' });

  try {
    // Phase 1: Stream conversational text
    const stream = await openai.chat.completions.create({
      model: 'gpt-4o',
      messages: [
        { role: 'system', content: 'You are a business analyst. Provide clear analysis.' },
        ...history,
        { role: 'user', content: message }
      ],
      stream: true,
      stream_options: { include_usage: true }
    });

    let fullText = '';
    let usage = null;

    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content;
      if (content) {
        fullText += content;
        sendEvent('text', { content }); // UI channel
      }
      if (chunk.usage) usage = chunk.usage;
    }

    sendEvent('text:complete', { fullText, usage }); // UI channel: done
    sendEvent('status', { requestId, status: 'extracting' });

    // Phase 2: Extract structured data
    const structuredResponse = await openai.chat.completions.create({
      model: 'gpt-4o',
      messages: [
        {
          role: 'system',
          content: `Extract structured insights. Return JSON:
{
  "keyFindings": [{"finding": "string", "impact": "high|medium|low"}],
  "recommendations": [{"action": "string", "priority": 1-5}],
  "overallSentiment": "positive|negative|neutral|mixed",
  "confidence": 0.0-1.0
}`
        },
        { role: 'user', content: message },
        { role: 'assistant', content: fullText }
      ],
      response_format: { type: 'json_object' },
      temperature: 0
    });

    const structuredData = JSON.parse(structuredResponse.choices[0].message.content);

    sendEvent('structured', { data: structuredData }); // System channel
    sendEvent('status', { requestId, status: 'complete' });

  } catch (error) {
    sendEvent('error', { message: error.message, requestId });
  }

  res.end();
});

app.listen(3000);

5.2 Multi-channel SSE client

async function consumeMultiChannelSSE(message) {
  const response = await fetch('/api/analyze', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ message })
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  // Channel-specific handlers
  const channels = {
    text: (data) => {
      // UI channel: append text in real time
      document.getElementById('analysis-output').textContent += data.content;
    },
    'text:complete': (data) => {
      console.log('Text streaming complete. Tokens:', data.usage?.total_tokens);
    },
    structured: (data) => {
      // System channel: render structured insights
      renderDashboard(data.data);
    },
    status: (data) => {
      document.getElementById('status').textContent = data.status;
    },
    error: (data) => {
      document.getElementById('error').textContent = data.message;
    }
  };

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });

    // Parse SSE events (with named event types)
    const events = parseSSEEvents(buffer);
    buffer = events.remaining;

    for (const event of events.parsed) {
      const handler = channels[event.type];
      if (handler) {
        handler(event.data);
      }
    }
  }
}

function parseSSEEvents(buffer) {
  const parsed = [];
  const blocks = buffer.split('\n\n');
  const remaining = blocks.pop() || ''; // Last block might be incomplete

  for (const block of blocks) {
    if (!block.trim()) continue;

    let eventType = 'message'; // Default SSE event type
    let data = '';

    for (const line of block.split('\n')) {
      if (line.startsWith('event: ')) {
        eventType = line.slice(7);
      } else if (line.startsWith('data: ')) {
        data += line.slice(6);
      }
    }

    if (data) {
      try {
        parsed.push({ type: eventType, data: JSON.parse(data) });
      } catch {
        parsed.push({ type: eventType, data });
      }
    }
  }

  return { parsed, remaining };
}

function renderDashboard(data) {
  // Render key findings
  const findingsList = document.getElementById('findings');
  findingsList.innerHTML = '';
  for (const finding of data.keyFindings) {
    const li = document.createElement('li');
    li.textContent = `[${finding.impact.toUpperCase()}] ${finding.finding}`;
    findingsList.appendChild(li);
  }

  // Render recommendations
  const recsList = document.getElementById('recommendations');
  recsList.innerHTML = '';
  for (const rec of data.recommendations.sort((a, b) => a.priority - b.priority)) {
    const li = document.createElement('li');
    li.textContent = `P${rec.priority}: ${rec.action}`;
    recsList.appendChild(li);
  }

  // Render sentiment
  document.getElementById('sentiment-badge').textContent = data.overallSentiment;
  document.getElementById('confidence').textContent = `${(data.confidence * 100).toFixed(0)}%`;
}

6. Production Pattern: Response Router

In larger systems, you may need to route different parts of an LLM response to different services. A response router abstracts this:

class ResponseRouter {
  constructor() {
    this.routes = [];
  }

  /**
   * Register a route: when a condition matches, send data to a handler.
   */
  route(name, condition, handler) {
    this.routes.push({ name, condition, handler });
    return this; // Allow chaining
  }

  /**
   * Process a completed AI response through all routes.
   */
  async process(response) {
    const { text, structuredData, metadata } = response;
    const results = {};

    const promises = this.routes
      .filter(route => route.condition({ text, structuredData, metadata }))
      .map(async (route) => {
        try {
          results[route.name] = await route.handler({ text, structuredData, metadata });
        } catch (error) {
          results[route.name] = { error: error.message };
        }
      });

    await Promise.all(promises);
    return results;
  }
}

// Configuration
const router = new ResponseRouter();

// Route 1: Always save to conversation history
router.route(
  'history',
  () => true,  // Always matches
  async ({ text, metadata }) => {
    // await db.conversations.insertOne({
    //   requestId: metadata.requestId,
    //   text,
    //   timestamp: Date.now()
    // });
    console.log('[History] Saved conversation');
    return { saved: true };
  }
);

// Route 2: Save structured data if extraction succeeded
router.route(
  'structured-storage',
  ({ structuredData }) => structuredData !== null,
  async ({ structuredData, metadata }) => {
    // await db.insights.insertOne({
    //   requestId: metadata.requestId,
    //   ...structuredData
    // });
    console.log('[Storage] Saved structured data');
    return { saved: true };
  }
);

// Route 3: Trigger alerts for high-priority findings
router.route(
  'alerts',
  ({ structuredData }) => {
    return structuredData?.recommendations?.some(r => r.priority === 1);
  },
  async ({ structuredData }) => {
    const urgent = structuredData.recommendations.filter(r => r.priority === 1);
    // await alertService.send({ level: 'urgent', items: urgent });
    console.log(`[Alert] ${urgent.length} urgent recommendation(s) detected`);
    return { alerted: true, count: urgent.length };
  }
);

// Route 4: Update dashboard metrics if sentiment data exists
router.route(
  'dashboard',
  ({ structuredData }) => structuredData?.overallSentiment != null,
  async ({ structuredData }) => {
    // await dashboardService.updateMetric('sentiment', structuredData.overallSentiment);
    console.log(`[Dashboard] Updated sentiment: ${structuredData.overallSentiment}`);
    return { updated: true };
  }
);

// Route 5: Log analytics for long responses
router.route(
  'analytics',
  ({ text }) => text.length > 500,
  async ({ text, metadata }) => {
    // await analytics.track('long_response', {
    //   requestId: metadata.requestId,
    //   length: text.length,
    //   wordCount: text.split(/\s+/).length
    // });
    console.log(`[Analytics] Long response logged (${text.length} chars)`);
    return { logged: true };
  }
);

// Execute the pipeline
async function fullPipeline(userMessage, onToken) {
  // Phase 1: Stream
  const text = await streamToUser(userMessage, onToken);

  // Phase 2: Structure
  const structuredData = await extractStructured(userMessage, text);

  // Phase 3: Route
  const routeResults = await router.process({
    text,
    structuredData,
    metadata: { requestId: crypto.randomUUID(), userMessage, timestamp: Date.now() }
  });

  console.log('\nRoute results:', routeResults);
  return { text, structuredData, routeResults };
}

7. Middleware Pattern for Request/Response Processing

For Express-based applications, middleware provides a clean way to separate streaming concerns:

import express from 'express';
import OpenAI from 'openai';

const app = express();
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

app.use(express.json());

// Middleware 1: Attach AI context
function aiContext(req, res, next) {
  req.ai = {
    requestId: crypto.randomUUID(),
    startTime: Date.now(),
    model: 'gpt-4o'
  };
  next();
}

// Middleware 2: Stream the text response to the client
function streamResponse(systemPrompt) {
  return async (req, res, next) => {
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');

    const stream = await openai.chat.completions.create({
      model: req.ai.model,
      messages: [
        { role: 'system', content: systemPrompt },
        { role: 'user', content: req.body.message }
      ],
      stream: true
    });

    let fullText = '';

    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content;
      if (content) {
        fullText += content;
        res.write(`event: text\ndata: ${JSON.stringify({ content })}\n\n`);
      }
    }

    req.ai.fullText = fullText;
    req.ai.streamEndTime = Date.now();

    res.write(`event: text:complete\ndata: ${JSON.stringify({
      fullText,
      streamDurationMs: req.ai.streamEndTime - req.ai.startTime
    })}\n\n`);

    next(); // Continue to structured extraction
  };
}

// Middleware 3: Extract structured data and send on system channel
function extractStructured(extractionPrompt) {
  return async (req, res, next) => {
    try {
      const response = await openai.chat.completions.create({
        model: req.ai.model,
        messages: [
          { role: 'system', content: extractionPrompt },
          { role: 'user', content: req.body.message },
          { role: 'assistant', content: req.ai.fullText }
        ],
        response_format: { type: 'json_object' },
        temperature: 0
      });

      req.ai.structuredData = JSON.parse(response.choices[0].message.content);

      res.write(`event: structured\ndata: ${JSON.stringify({
        data: req.ai.structuredData
      })}\n\n`);

      next();
    } catch (error) {
      res.write(`event: structured:error\ndata: ${JSON.stringify({
        error: error.message
      })}\n\n`);
      next();
    }
  };
}

// Middleware 4: Log and finalize
function finalize(req, res) {
  const totalTime = Date.now() - req.ai.startTime;

  res.write(`event: meta\ndata: ${JSON.stringify({
    requestId: req.ai.requestId,
    totalDurationMs: totalTime,
    streamDurationMs: req.ai.streamEndTime - req.ai.startTime,
    hasStructuredData: req.ai.structuredData != null
  })}\n\n`);

  // Log for monitoring
  console.log(`[${req.ai.requestId}] Complete in ${totalTime}ms`);

  res.end();
}

// Compose the pipeline
app.post('/api/analyze',
  aiContext,
  streamResponse('You are a business analyst. Provide clear, actionable analysis.'),
  extractStructured(`Extract structured insights. Return JSON:
{
  "keyFindings": [{"finding": "string", "impact": "high|medium|low"}],
  "recommendations": [{"action": "string", "priority": 1-5}],
  "sentiment": "positive|negative|neutral|mixed"
}`),
  finalize
);

app.listen(3000, () => console.log('Server on port 3000'));

8. Handling Concurrent Users

In production, many users stream simultaneously. Each stream must be isolated:

class ConnectionManager {
  constructor() {
    this.connections = new Map(); // requestId -> connection info
    this.metrics = {
      activeStreams: 0,
      totalRequests: 0,
      peakConcurrent: 0
    };
  }

  register(requestId, res) {
    this.connections.set(requestId, {
      res,
      startTime: Date.now(),
      bytesStreamed: 0,
      tokensStreamed: 0
    });

    this.metrics.activeStreams++;
    this.metrics.totalRequests++;
    this.metrics.peakConcurrent = Math.max(
      this.metrics.peakConcurrent,
      this.metrics.activeStreams
    );

    // Handle client disconnect
    res.on('close', () => {
      this.unregister(requestId);
    });
  }

  streamToken(requestId, content) {
    const conn = this.connections.get(requestId);
    if (!conn) return false; // Client disconnected

    try {
      conn.res.write(`event: text\ndata: ${JSON.stringify({ content })}\n\n`);
      conn.bytesStreamed += content.length;
      conn.tokensStreamed++;
      return true;
    } catch {
      this.unregister(requestId);
      return false; // Write failed (client gone)
    }
  }

  sendStructured(requestId, data) {
    const conn = this.connections.get(requestId);
    if (!conn) return false;

    try {
      conn.res.write(`event: structured\ndata: ${JSON.stringify({ data })}\n\n`);
      return true;
    } catch {
      this.unregister(requestId);
      return false;
    }
  }

  complete(requestId) {
    const conn = this.connections.get(requestId);
    if (!conn) return;

    const duration = Date.now() - conn.startTime;
    conn.res.write(`event: meta\ndata: ${JSON.stringify({
      requestId,
      durationMs: duration,
      tokensStreamed: conn.tokensStreamed
    })}\n\n`);

    conn.res.end();
    this.unregister(requestId);
  }

  unregister(requestId) {
    if (this.connections.has(requestId)) {
      this.connections.delete(requestId);
      this.metrics.activeStreams--;
    }
  }

  getMetrics() {
    return {
      ...this.metrics,
      connectionDetails: Array.from(this.connections.entries()).map(([id, conn]) => ({
        requestId: id,
        durationMs: Date.now() - conn.startTime,
        tokensStreamed: conn.tokensStreamed
      }))
    };
  }
}

const connManager = new ConnectionManager();

// Metrics endpoint
// app.get('/api/metrics', (req, res) => {
//   res.json(connManager.getMetrics());
// });

9. Testing Dual-Channel Architectures

Testing is critical for systems that serve two channels. Each channel needs independent verification:

// Test utilities for dual-channel responses
class DualChannelTestHarness {
  constructor() {
    this.textEvents = [];
    this.structuredEvents = [];
    this.statusEvents = [];
    this.errors = [];
  }

  createMockResponse() {
    const self = this;

    return {
      setHeader: () => {},
      write(data) {
        // Parse SSE format
        const lines = data.split('\n');
        let eventType = 'message';
        let eventData = null;

        for (const line of lines) {
          if (line.startsWith('event: ')) eventType = line.slice(7);
          if (line.startsWith('data: ')) {
            try {
              eventData = JSON.parse(line.slice(6));
            } catch {
              eventData = line.slice(6);
            }
          }
        }

        if (eventData) {
          switch (eventType) {
            case 'text':
              self.textEvents.push(eventData);
              break;
            case 'structured':
              self.structuredEvents.push(eventData);
              break;
            case 'status':
              self.statusEvents.push(eventData);
              break;
            case 'error':
              self.errors.push(eventData);
              break;
          }
        }
      },
      end() {},
      on() {}
    };
  }

  // Assertions
  assertTextStreamComplete() {
    const fullText = this.textEvents.map(e => e.content).join('');
    if (!fullText.length) throw new Error('No text was streamed');
    return fullText;
  }

  assertStructuredDataReceived() {
    if (this.structuredEvents.length === 0) {
      throw new Error('No structured data was emitted');
    }
    return this.structuredEvents[0].data;
  }

  assertStructuredDataMatchesSchema(schema) {
    const data = this.assertStructuredDataReceived();
    for (const field of schema.required || []) {
      if (!(field in data)) {
        throw new Error(`Missing required field: ${field}`);
      }
    }
    return data;
  }

  assertNoErrors() {
    if (this.errors.length > 0) {
      throw new Error(`Errors emitted: ${JSON.stringify(this.errors)}`);
    }
  }

  assertChannelOrder() {
    // Text events should arrive before structured events
    // (This validates the two-phase pattern)
    const lastTextIndex = this.textEvents.length - 1;
    if (this.structuredEvents.length > 0 && lastTextIndex < 0) {
      throw new Error('Structured data arrived before any text');
    }
    return true;
  }
}

// Example test
async function testDualChannel() {
  const harness = new DualChannelTestHarness();
  const mockRes = harness.createMockResponse();

  // Run the handler with mock response
  // await analyzeHandler({ body: { message: 'Test message' }, ai: {} }, mockRes, () => {});

  // Verify both channels
  harness.assertNoErrors();
  const text = harness.assertTextStreamComplete();
  console.log('Text streamed:', text.length, 'chars');

  const data = harness.assertStructuredDataMatchesSchema({
    required: ['keyFindings', 'recommendations', 'sentiment']
  });
  console.log('Structured data valid:', Object.keys(data));

  harness.assertChannelOrder();
  console.log('Channel ordering correct');

  console.log('All tests passed.');
}

10. Architecture Decision Guide

Choosing the right pattern for your application

┌─────────────────────────────────────────────────────────────────┐
│                  ARCHITECTURE DECISION TREE                      │
│                                                                  │
│  How many concurrent users?                                      │
│      │                                                           │
│      ├── < 100 ──→ Simple SSE + sequential extraction            │
│      │                                                           │
│      ├── 100-10K ──→ SSE + event-based pipeline + connection     │
│      │               management                                  │
│      │                                                           │
│      └── > 10K ──→ WebSocket + message queue + worker pool       │
│                                                                  │
│  Do you need bidirectional communication?                        │
│      │                                                           │
│      ├── No  ──→ SSE (simpler, auto-reconnect, HTTP/2 compat)   │
│      │                                                           │
│      └── Yes ──→ WebSocket (user can cancel, send new context)   │
│                                                                  │
│  How critical is structured data reliability?                    │
│      │                                                           │
│      ├── Nice to have ──→ Single call + delimiter/parse          │
│      │                                                           │
│      ├── Important ──→ Two calls, fallback on parse failure      │
│      │                                                           │
│      └── Mission critical ──→ Two calls + validation + retry     │
│                                + human review queue               │
└─────────────────────────────────────────────────────────────────┘

Summary table

PatternUse caseComplexityReliabilityCost
Simple SSE streamChat-only, no structured data neededLowHighLow
SSE + delimiter extractionAdd structured data cheaplyMediumMediumLow
SSE + two-phase (sequential)Reliable structured extractionMediumHighMedium
SSE + two-phase (parallel)Speed + structured dataMediumHighMedium
Event-based pipelineMultiple system consumersHighVery highMedium
WebSocket + channelsBidirectional, real-time dashboardHighHighMedium
Message queue + workersHigh-scale, async processingVery highVery highHigher

11. Key Takeaways

  1. Separate the human channel from the machine channel — UI-facing streamed text and system-facing structured JSON are fundamentally different concerns that need different architectures.
  2. Event-based architectures provide the cleanest separation. Emit events for tokens, completion, structured data, and errors. Let consumers subscribe independently.
  3. SSE is sufficient for most applications — WebSockets are only needed when you require bidirectional communication (user cancellation, context updates, real-time collaboration).
  4. Connection management is critical at scale — track active streams, handle disconnects, and monitor concurrent usage to prevent resource exhaustion.
  5. Test both channels independently — verify that text streams correctly AND that structured data matches the expected schema. Test that failures in one channel don't break the other.
  6. Start simple, evolve as needed — begin with SSE + sequential extraction. Add event-based routing when you have multiple system consumers. Move to WebSockets only if bidirectional communication is required.

Explain-It Challenge

  1. Your team has been using a simple "stream then parse" pattern. Product wants to add: (a) a sidebar that shows key metrics from the AI response, (b) automatic Slack notifications for high-priority findings, and (c) analytics tracking. How do you evolve the architecture?
  2. A user reports that the chat text appeared instantly but the "insights panel" (populated from structured data) took an additional 3 seconds to load. Explain why this happens and propose two different solutions.
  3. Your application serves 50,000 concurrent users. Each user interaction creates two LLM calls (stream + structure). The LLM provider rate-limits you at 10,000 requests per minute. Design a queuing/prioritization strategy that prioritizes the streaming call (user-facing) over the structured extraction call (system-facing).

Navigation: <- 4.9.b — Returning Structured JSON After Generation | 4.9 Overview