Episode 4 — Generative AI Engineering / 4.9 — Combining Streaming with Structured Data

4.9.a — Streaming Conversational Text

In one sentence: Streaming conversational text delivers the LLM's response to the user token by token in real time, creating a responsive experience — but knowing when to stream free-form text versus returning structured data is a critical design decision that shapes your entire application architecture.

Navigation: <- 4.9 Overview | 4.9.b — Returning Structured JSON After Generation ->


1. What Is Streaming Conversational Text?

When an LLM generates a response, it produces tokens one at a time. Streaming means sending each token to the client as soon as it is generated, rather than waiting for the entire response to complete. For conversational text — explanatory paragraphs, recommendations, summaries, creative writing — streaming creates the familiar "typing" effect users expect from modern AI chat interfaces.

Without streaming:
  User sends message → [Wait 3-8 seconds] → Entire response appears at once
  
With streaming:
  User sends message → [50ms] → "Based" → "on" → "your" → "data" → "," → ...
  User sees text appearing word-by-word in real time

The difference is dramatic. A 500-token response at 50 tokens/second takes 10 seconds to generate. Without streaming, the user stares at a spinner for 10 seconds. With streaming, they start reading within milliseconds. The perceived latency drops from 10 seconds to near-zero, even though the total generation time is identical.


2. When to Stream Text vs Return Structured Data

This is the foundational design decision of section 4.9. Not every LLM response should be streamed as free-form text. Some responses need to be structured JSON consumed by code. Many responses need both.

Decision matrix

ScenarioStream text?Structured JSON?Pattern
Chatbot response to userYesNoPure stream
Extract data from a documentNoYesPure structured
Explain analysis + save resultsYesYesDual-purpose (4.9.b)
Generate UI components + log metadataYesYesSeparated outputs (4.9.c)
Background data pipeline (no user)NoYesPure structured
Real-time dashboard narrationYesYesEvent-based (4.9.c)

The key question

"Who consumes this response — a human, a machine, or both?"

  • Human only -> Stream conversational text. Optimize for reading experience.
  • Machine only -> Return structured JSON. No streaming needed (or stream for progress).
  • Both -> You need a combination pattern. That is what 4.9.b and 4.9.c teach.

3. Basic Streaming Implementation (Foundation)

Before combining streaming with structured data, let's establish the streaming foundation. This builds directly on what you learned in 4.8.

3.1 OpenAI streaming with the SDK

import OpenAI from 'openai';

const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

async function streamConversationalText(userMessage) {
  const stream = await openai.chat.completions.create({
    model: 'gpt-4o',
    messages: [
      {
        role: 'system',
        content: 'You are a helpful assistant. Explain things clearly and conversationally.'
      },
      { role: 'user', content: userMessage }
    ],
    stream: true,
    temperature: 0.7
  });

  let fullText = '';

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content;
    if (content) {
      process.stdout.write(content); // Display in real time
      fullText += content;           // Accumulate for later use
    }
  }

  console.log('\n--- Stream complete ---');
  return fullText;
}

// Usage
const response = await streamConversationalText(
  'Explain how photosynthesis works in simple terms'
);
// response now contains the full text for logging, storage, etc.

3.2 Anthropic streaming with the SDK

import Anthropic from '@anthropic-ai/sdk';

const anthropic = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY });

async function streamWithClaude(userMessage) {
  let fullText = '';

  const stream = anthropic.messages.stream({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    messages: [{ role: 'user', content: userMessage }]
  });

  stream.on('text', (text) => {
    process.stdout.write(text);
    fullText += text;
  });

  const finalMessage = await stream.finalMessage();

  console.log('\n--- Stream complete ---');
  console.log('Stop reason:', finalMessage.stop_reason);
  console.log('Total tokens:', finalMessage.usage.input_tokens + finalMessage.usage.output_tokens);

  return fullText;
}

3.3 Key pattern: accumulate while streaming

Notice both examples follow the same pattern:

Stream tokens to the user  →  AND  →  Accumulate tokens into a string
         (UX)                              (for system use later)

This accumulate-while-streaming pattern is the foundation for everything in 4.9. You are always doing two things at once: serving the human in real time and building a complete response for the system.


4. User Experience Considerations for Streamed Text

Streaming is not just a technical feature — it is a UX feature. How you stream affects how users perceive your application's speed, reliability, and intelligence.

4.1 Perceived latency vs actual latency

Actual latency (total generation time):    10 seconds
Perceived latency WITHOUT streaming:       10 seconds (user waits for everything)
Perceived latency WITH streaming:          ~200ms (time to first token)

Key metric: TIME TO FIRST TOKEN (TTFT)
  - OpenAI GPT-4o:     ~200-500ms TTFT
  - Anthropic Claude:   ~300-700ms TTFT
  - Local models:       ~50-200ms TTFT

Users perceive responsiveness based on TTFT, not total generation time.

4.2 When streaming hurts UX

Streaming is not always better. There are cases where you should not stream:

ScenarioWhy not stream
Very short responses (< 50 tokens)Streaming adds visual noise for a response that would appear in < 1 second anyway
JSON/structured data for displayPartial JSON is not readable. Show a spinner, then render the complete structure
Actions with confirmation"I've deleted your account" streaming word-by-word is anxiety-inducing. Show the result atomically
Batch processingNo user watching. Accumulate and return
Error messagesShow the complete error. Streaming "Sorry, I..." creates uncertainty

4.3 Progressive rendering patterns

For longer responses, consider enhancing the streaming experience:

async function streamWithProgressiveRendering(userMessage, callbacks) {
  const stream = await openai.chat.completions.create({
    model: 'gpt-4o',
    messages: [{ role: 'user', content: userMessage }],
    stream: true
  });

  let fullText = '';
  let currentParagraph = '';
  let paragraphCount = 0;

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content;
    if (!content) continue;

    fullText += content;
    currentParagraph += content;

    // Stream individual tokens for real-time feel
    callbacks.onToken(content);

    // Detect paragraph boundaries for higher-level rendering
    if (content.includes('\n\n')) {
      paragraphCount++;
      callbacks.onParagraph(currentParagraph.trim(), paragraphCount);
      currentParagraph = '';
    }

    // Detect markdown headers for table-of-contents building
    const headerMatch = fullText.match(/^#{1,3}\s+(.+)$/m);
    if (headerMatch && content.includes('\n')) {
      callbacks.onHeader(headerMatch[1]);
    }
  }

  // Final paragraph (no trailing newline)
  if (currentParagraph.trim()) {
    paragraphCount++;
    callbacks.onParagraph(currentParagraph.trim(), paragraphCount);
  }

  callbacks.onComplete(fullText);
  return fullText;
}

// Usage with progressive callbacks
await streamWithProgressiveRendering('Explain quantum computing', {
  onToken: (token) => process.stdout.write(token),
  onParagraph: (para, index) => {
    console.log(`\n[Paragraph ${index} complete: ${para.length} chars]`);
  },
  onHeader: (header) => {
    console.log(`\n[Section detected: "${header}"]`);
  },
  onComplete: (full) => {
    console.log(`\n\n[Done. Total length: ${full.length} chars]`);
  }
});

5. Streaming Partial Messages in Multi-Turn Conversations

In a chat application, you need to manage streaming within the context of a conversation. Each assistant message is streamed in real time and then added to the conversation history for the next turn.

5.1 Conversation manager with streaming

class StreamingConversation {
  constructor(openai, systemPrompt) {
    this.openai = openai;
    this.messages = [{ role: 'system', content: systemPrompt }];
  }

  async sendMessage(userMessage, onToken) {
    // Add user message to history
    this.messages.push({ role: 'user', content: userMessage });

    // Stream the response
    const stream = await this.openai.chat.completions.create({
      model: 'gpt-4o',
      messages: this.messages,
      stream: true,
      temperature: 0.7
    });

    let assistantMessage = '';

    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content;
      if (content) {
        onToken(content);
        assistantMessage += content;
      }
    }

    // Add completed assistant message to history
    this.messages.push({ role: 'assistant', content: assistantMessage });

    return assistantMessage;
  }

  getHistory() {
    return this.messages;
  }

  getTokenEstimate() {
    // Rough estimate: 4 chars per token
    const totalChars = this.messages.reduce((sum, m) => sum + m.content.length, 0);
    return Math.ceil(totalChars / 4);
  }
}

// Usage
const conversation = new StreamingConversation(openai, 'You are a helpful coding tutor.');

// Turn 1
const reply1 = await conversation.sendMessage(
  'What is a closure in JavaScript?',
  (token) => process.stdout.write(token)
);
console.log('\n');

// Turn 2 (has context from turn 1)
const reply2 = await conversation.sendMessage(
  'Can you show me an example?',
  (token) => process.stdout.write(token)
);
console.log('\n');

console.log('Estimated tokens used:', conversation.getTokenEstimate());

5.2 Handling stream interruptions

Users may navigate away, close the tab, or click "stop generating." You need to handle aborted streams gracefully:

async function streamWithAbortSupport(userMessage, signal) {
  const stream = await openai.chat.completions.create({
    model: 'gpt-4o',
    messages: [{ role: 'user', content: userMessage }],
    stream: true
  });

  let fullText = '';
  let aborted = false;

  try {
    for await (const chunk of stream) {
      // Check if the user requested cancellation
      if (signal?.aborted) {
        aborted = true;
        // Close the stream to stop generation (saves tokens/cost)
        stream.controller.abort();
        break;
      }

      const content = chunk.choices[0]?.delta?.content;
      if (content) {
        process.stdout.write(content);
        fullText += content;
      }
    }
  } catch (error) {
    if (error.name === 'AbortError') {
      aborted = true;
    } else {
      throw error;
    }
  }

  return {
    text: fullText,
    completed: !aborted,
    tokenEstimate: Math.ceil(fullText.length / 4)
  };
}

// Usage with AbortController
const controller = new AbortController();

// Simulate user clicking "stop" after 2 seconds
setTimeout(() => controller.abort(), 2000);

const result = await streamWithAbortSupport(
  'Write a detailed essay about climate change',
  controller.signal
);

console.log('\nCompleted:', result.completed);
console.log('Partial text length:', result.text.length);

6. Streaming in Web Applications (Server-Sent Events)

In a real web application, you stream from your backend to the frontend using Server-Sent Events (SSE) — the standard mechanism for server-to-client streaming over HTTP.

6.1 Express.js backend with SSE

import express from 'express';
import OpenAI from 'openai';

const app = express();
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

app.use(express.json());

app.post('/api/chat/stream', async (req, res) => {
  const { message, conversationHistory = [] } = req.body;

  // Set SSE headers
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  res.setHeader('X-Accel-Buffering', 'no'); // Disable nginx buffering

  const messages = [
    { role: 'system', content: 'You are a helpful assistant.' },
    ...conversationHistory,
    { role: 'user', content: message }
  ];

  try {
    const stream = await openai.chat.completions.create({
      model: 'gpt-4o',
      messages,
      stream: true
    });

    let fullText = '';

    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content;
      if (content) {
        fullText += content;
        // Send each token as an SSE event
        res.write(`data: ${JSON.stringify({ type: 'token', content })}\n\n`);
      }
    }

    // Send completion event with the full text
    res.write(`data: ${JSON.stringify({
      type: 'done',
      fullText,
      tokenEstimate: Math.ceil(fullText.length / 4)
    })}\n\n`);

    res.end();
  } catch (error) {
    res.write(`data: ${JSON.stringify({ type: 'error', message: error.message })}\n\n`);
    res.end();
  }
});

app.listen(3000, () => console.log('Server running on port 3000'));

6.2 Frontend consuming the SSE stream

async function streamChat(message, onToken, onComplete, onError) {
  const response = await fetch('/api/chat/stream', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ message })
  });

  if (!response.ok) {
    onError(new Error(`HTTP ${response.status}`));
    return;
  }

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });

    // Parse SSE events from buffer
    const lines = buffer.split('\n');
    buffer = lines.pop() || ''; // Keep incomplete line in buffer

    for (const line of lines) {
      if (!line.startsWith('data: ')) continue;

      const data = JSON.parse(line.slice(6));

      switch (data.type) {
        case 'token':
          onToken(data.content);
          break;
        case 'done':
          onComplete(data.fullText, data.tokenEstimate);
          break;
        case 'error':
          onError(new Error(data.message));
          break;
      }
    }
  }
}

// Usage in a UI
const chatOutput = document.getElementById('chat-output');

streamChat(
  'Explain the water cycle',
  (token) => {
    // Append each token in real time
    chatOutput.textContent += token;
  },
  (fullText, tokens) => {
    console.log(`Complete. ${tokens} tokens used.`);
  },
  (error) => {
    chatOutput.textContent += `\n\nError: ${error.message}`;
  }
);

7. Streaming Text with Metadata Collection

A common pattern is streaming the human-readable text while simultaneously collecting metadata about the response — timing, token counts, content classification, etc. This is the bridge to 4.9.b, where the "metadata" becomes full structured JSON.

async function streamWithMetadata(userMessage) {
  const startTime = Date.now();
  let firstTokenTime = null;

  const stream = await openai.chat.completions.create({
    model: 'gpt-4o',
    messages: [{ role: 'user', content: userMessage }],
    stream: true,
    stream_options: { include_usage: true } // Request token usage in stream
  });

  let fullText = '';
  let chunkCount = 0;
  let usage = null;

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content;

    if (content) {
      if (!firstTokenTime) {
        firstTokenTime = Date.now();
      }
      chunkCount++;
      fullText += content;
      process.stdout.write(content);
    }

    // The final chunk includes usage data when stream_options.include_usage is true
    if (chunk.usage) {
      usage = chunk.usage;
    }
  }

  const endTime = Date.now();

  // Metadata collected alongside the streamed text
  const metadata = {
    text: fullText,
    timing: {
      totalMs: endTime - startTime,
      timeToFirstTokenMs: firstTokenTime ? firstTokenTime - startTime : null,
      tokensPerSecond: usage
        ? (usage.completion_tokens / ((endTime - startTime) / 1000)).toFixed(1)
        : null
    },
    tokens: usage || {
      estimated_input: Math.ceil(userMessage.length / 4),
      estimated_output: Math.ceil(fullText.length / 4)
    },
    chunks: chunkCount,
    textLength: fullText.length,
    wordCount: fullText.split(/\s+/).length,
    hasCodeBlock: fullText.includes('```'),
    hasList: /^\s*[-*\d+.]\s/m.test(fullText)
  };

  console.log('\n\n--- Metadata ---');
  console.log(JSON.stringify(metadata, null, 2));

  return metadata;
}

// Usage
const result = await streamWithMetadata('Explain 3 sorting algorithms with code examples');
// result.text = full streamed text
// result.timing = performance data
// result.tokens = token usage

8. Error Handling During Streaming

Streams can fail mid-generation. Network drops, rate limits, and server errors can interrupt a partially delivered response. Robust error handling is critical.

async function resilientStream(userMessage, options = {}) {
  const { maxRetries = 2, onToken, onError, onRetry } = options;
  let attempt = 0;
  let accumulatedText = '';

  while (attempt <= maxRetries) {
    try {
      const stream = await openai.chat.completions.create({
        model: 'gpt-4o',
        messages: [{ role: 'user', content: userMessage }],
        stream: true
      });

      for await (const chunk of stream) {
        const content = chunk.choices[0]?.delta?.content;
        if (content) {
          accumulatedText += content;
          onToken?.(content);
        }
      }

      // Success — stream completed
      return { text: accumulatedText, attempts: attempt + 1, completed: true };
    } catch (error) {
      attempt++;
      onError?.(error, attempt);

      if (attempt > maxRetries) {
        // All retries exhausted — return partial text
        return {
          text: accumulatedText,
          attempts: attempt,
          completed: false,
          error: error.message
        };
      }

      // Rate limit: respect Retry-After header
      if (error.status === 429) {
        const retryAfter = parseInt(error.headers?.['retry-after'] || '5', 10);
        onRetry?.(retryAfter, attempt);
        await new Promise((resolve) => setTimeout(resolve, retryAfter * 1000));
      } else {
        // Other errors: brief backoff
        const backoff = Math.min(1000 * Math.pow(2, attempt), 10000);
        onRetry?.(backoff / 1000, attempt);
        await new Promise((resolve) => setTimeout(resolve, backoff));
      }
    }
  }
}

// Usage
const result = await resilientStream('Explain recursion', {
  onToken: (token) => process.stdout.write(token),
  onError: (err, attempt) => console.error(`\n[Error on attempt ${attempt}: ${err.message}]`),
  onRetry: (seconds, attempt) => console.log(`\n[Retrying in ${seconds}s (attempt ${attempt})]`)
});

if (!result.completed) {
  console.log('\n[Stream interrupted. Partial response delivered.]');
  console.log(`[Accumulated ${result.text.length} characters before failure]`);
}

9. Performance Considerations

9.1 Buffering strategies

Sending every single token to the frontend creates many tiny network packets. In high-traffic applications, consider buffering:

function createTokenBuffer(flushCallback, intervalMs = 50) {
  let buffer = '';
  let timer = null;

  return {
    add(token) {
      buffer += token;

      // Flush immediately on newlines (natural break points)
      if (token.includes('\n')) {
        this.flush();
        return;
      }

      // Otherwise, batch tokens and flush on interval
      if (!timer) {
        timer = setTimeout(() => this.flush(), intervalMs);
      }
    },

    flush() {
      if (buffer) {
        flushCallback(buffer);
        buffer = '';
      }
      if (timer) {
        clearTimeout(timer);
        timer = null;
      }
    },

    destroy() {
      this.flush();
    }
  };
}

// Usage: buffer tokens and flush every 50ms or on newlines
const tokenBuffer = createTokenBuffer((batch) => {
  res.write(`data: ${JSON.stringify({ type: 'tokens', content: batch })}\n\n`);
}, 50);

for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content;
  if (content) tokenBuffer.add(content);
}
tokenBuffer.destroy();

9.2 Memory management for long responses

// BAD: String concatenation in a tight loop creates many intermediate strings
let fullText = '';
for await (const chunk of stream) {
  fullText += chunk.choices[0]?.delta?.content || ''; // New string each iteration
}

// BETTER: Use an array and join at the end
const parts = [];
for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content;
  if (content) parts.push(content);
}
const fullText = parts.join(''); // Single concatenation

10. Key Takeaways

  1. Streaming conversational text delivers tokens in real time, reducing perceived latency from seconds to milliseconds (time to first token).
  2. The core question is "who consumes this response?" — humans need streamed text, machines need structured JSON, and many features need both.
  3. Accumulate while streaming — always build the full response alongside the stream so it is available for logging, history, and downstream processing.
  4. Streaming is a UX decision — short responses, error messages, and structured data displays should often not be streamed.
  5. Error handling is non-negotiable — streams can break mid-response. Handle partial text, implement retries, and degrade gracefully.

Explain-It Challenge

  1. A product manager asks "Why can't we just wait for the full response and show it all at once? It's simpler." Explain the UX argument for streaming with specific numbers.
  2. You are building a chat app. The user sends a message and then closes their laptop before the stream finishes. What happens on the server side? What should happen?
  3. Your streaming chat works perfectly locally but users on slow mobile networks see text appearing in large "jumps" instead of smooth word-by-word flow. Why, and how do you fix it?

Navigation: <- 4.9 Overview | 4.9.b — Returning Structured JSON After Generation ->