Episode 4 — Generative AI Engineering / 4.8 — Streaming Responses

4.8.a — Streaming Tokens

In one sentence: Streaming delivers LLM-generated tokens one at a time as they are produced, using Server-Sent Events (SSE) over HTTP, so your application can display partial results immediately instead of waiting for the entire response to complete.

Navigation: <- 4.8 Overview | 4.8.b — Progressive Rendering ->


1. The Fundamental Problem: Waiting for Full Responses

When you make a standard (non-streaming) API call to an LLM, the entire response is generated server-side before anything is returned to you. For a 500-token response at ~50 tokens/second, that means 10 seconds of dead silence followed by a wall of text.

NON-STREAMING (buffered):

  User sends prompt ──────────────────────────────────────> Response arrives
  |                                                         |
  |  <──── 10 seconds of nothing (model is generating) ──> |
  |                                                         |
  t=0s                                                    t=10s
  User sees: [loading spinner...] then BOOM → full response


STREAMING (incremental):

  User sends prompt ─> first token ─> ... ─> last token
  |                    |                      |
  t=0s               t=0.2s                 t=10s
  User sees: each word appearing as the model "types"

The total time is the same in both cases — the model still takes ~10 seconds to generate 500 tokens. The difference is perceived latency: streaming shows the first token in ~200ms, while buffered shows nothing for 10 seconds.


2. What Is Streaming in the LLM Context?

Streaming means the API pushes each generated token (or small batch of tokens) to your client as soon as it is produced, rather than accumulating the full response and sending it once.

Key concepts

ConceptDescription
Token-by-token deliveryEach token is sent the moment the model generates it
Server-Sent Events (SSE)The HTTP protocol used to push events from server to client
Chunked transferHTTP Transfer-Encoding: chunked allows data to be sent in pieces
Partial responseThe incomplete response accumulated so far from received chunks
Stream terminationA special signal ([DONE] in OpenAI, message_stop in Anthropic) indicating the stream is complete

How it works under the hood

1. Client sends POST request with { "stream": true }
2. Server responds with Content-Type: text/event-stream
3. Connection stays open (HTTP keep-alive)
4. Server pushes SSE events as tokens are generated:

   data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":"Hello"}}]}

   data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":" there"}}]}

   data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":"!"}}]}

   data: [DONE]

5. Client processes each event as it arrives
6. Connection closes after [DONE]

3. Server-Sent Events (SSE) Deep Dive

SSE is a W3C standard for one-way server-to-client communication over HTTP. Unlike WebSockets (bidirectional), SSE is simpler and perfect for streaming LLM responses because the data flows in one direction: server to client.

SSE format

Each SSE message follows this format:

event: <event-type>\n
data: <payload>\n
\n

The double newline (\n\n) separates events. Most LLM APIs only use the data: field:

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":" world"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]

SSE vs WebSockets vs Long Polling

FeatureSSEWebSocketsLong Polling
DirectionServer -> Client onlyBidirectionalClient -> Server (repeated)
ProtocolHTTPWS (upgraded HTTP)HTTP
ReconnectionBuilt-in auto-reconnectManual reconnect logicEach request is separate
ComplexityLowMediumLow but inefficient
LLM streamingStandard choiceOverkill for most casesNot suitable
Proxy/CDN supportExcellentCan be problematicExcellent

SSE is the standard for LLM streaming because you only need one-way data flow (model -> client) and it works seamlessly through proxies, load balancers, and CDNs.


4. The stream Parameter in LLM APIs

OpenAI API

Setting stream: true changes the response format from a single ChatCompletion object to a series of ChatCompletionChunk objects:

// NON-STREAMING — returns a single ChatCompletion
const response = await openai.chat.completions.create({
  model: 'gpt-4o',
  messages: [{ role: 'user', content: 'Explain streaming in 3 sentences.' }],
  // stream defaults to false
});
console.log(response.choices[0].message.content); // Full response at once

// STREAMING — returns an async iterable of ChatCompletionChunk
const stream = await openai.chat.completions.create({
  model: 'gpt-4o',
  messages: [{ role: 'user', content: 'Explain streaming in 3 sentences.' }],
  stream: true,
});

for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content || '';
  process.stdout.write(content); // Print each token as it arrives
}

Anthropic API

Anthropic uses a similar pattern but with a different event structure:

// NON-STREAMING — returns a single Message
const message = await anthropic.messages.create({
  model: 'claude-sonnet-4-20250514',
  max_tokens: 1024,
  messages: [{ role: 'user', content: 'Explain streaming in 3 sentences.' }],
  // stream defaults to false
});
console.log(message.content[0].text); // Full response at once

// STREAMING — returns a stream of events
const stream = await anthropic.messages.stream({
  model: 'claude-sonnet-4-20250514',
  max_tokens: 1024,
  messages: [{ role: 'user', content: 'Explain streaming in 3 sentences.' }],
});

for await (const event of stream) {
  if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
    process.stdout.write(event.delta.text);
  }
}

5. Chunk Structure: OpenAI vs Anthropic

Understanding the structure of each chunk is essential for parsing streaming responses correctly.

OpenAI chunk structure

// First chunk (includes role)
{
  "id": "chatcmpl-abc123",
  "object": "chat.completion.chunk",
  "created": 1677858242,
  "model": "gpt-4o-2024-08-06",
  "choices": [{
    "index": 0,
    "delta": {
      "role": "assistant",
      "content": ""
    },
    "finish_reason": null
  }]
}

// Content chunks (the actual tokens)
{
  "id": "chatcmpl-abc123",
  "object": "chat.completion.chunk",
  "created": 1677858242,
  "model": "gpt-4o-2024-08-06",
  "choices": [{
    "index": 0,
    "delta": {
      "content": "Streaming"   // <-- the token
    },
    "finish_reason": null
  }]
}

// Final chunk (stream complete)
{
  "id": "chatcmpl-abc123",
  "object": "chat.completion.chunk",
  "created": 1677858242,
  "model": "gpt-4o-2024-08-06",
  "choices": [{
    "index": 0,
    "delta": {},
    "finish_reason": "stop"    // <-- done
  }],
  "usage": {
    "prompt_tokens": 12,
    "completion_tokens": 45,
    "total_tokens": 57
  }
}

Key details:

  • Content lives in choices[0].delta.content (not message.content)
  • delta contains only the new content for this chunk, not the accumulated text
  • finish_reason is null during streaming, "stop" when complete
  • Usage data appears only in the final chunk (when stream_options: { include_usage: true } is set)

Anthropic event structure

Anthropic uses a typed event system with multiple event types:

// 1. message_start — metadata about the response
{ "type": "message_start", "message": { "id": "msg_abc", "model": "claude-sonnet-4-20250514", "usage": { "input_tokens": 25 } } }

// 2. content_block_start — beginning of a content block
{ "type": "content_block_start", "index": 0, "content_block": { "type": "text", "text": "" } }

// 3. content_block_delta — the actual tokens (repeated many times)
{ "type": "content_block_delta", "index": 0, "delta": { "type": "text_delta", "text": "Streaming" } }

// 4. content_block_stop — end of this content block
{ "type": "content_block_stop", "index": 0 }

// 5. message_delta — final usage info
{ "type": "message_delta", "delta": { "stop_reason": "end_turn" }, "usage": { "output_tokens": 45 } }

// 6. message_stop — stream complete
{ "type": "message_stop" }

Key details:

  • Text content lives in content_block_delta events with delta.type === "text_delta"
  • Anthropic sends structured lifecycle events (start, delta, stop) for each content block
  • Usage info is split: input tokens in message_start, output tokens in message_delta

6. Reading Chunks in Node.js

Method 1: for await...of with the SDK (Recommended)

The official SDKs return async iterables, making for await...of the cleanest approach:

import OpenAI from 'openai';

const openai = new OpenAI(); // Uses OPENAI_API_KEY env var

async function streamChat(userMessage) {
  const stream = await openai.chat.completions.create({
    model: 'gpt-4o',
    messages: [
      { role: 'system', content: 'You are a helpful assistant.' },
      { role: 'user', content: userMessage }
    ],
    stream: true,
  });

  let fullResponse = '';

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content;
    if (content) {
      fullResponse += content;
      process.stdout.write(content); // Real-time output
    }
  }

  console.log('\n--- Stream complete ---');
  console.log('Full response:', fullResponse);
  return fullResponse;
}

streamChat('What are 3 benefits of streaming?');

Method 2: Raw HTTP with fetch (Understanding the protocol)

If you need to understand what happens beneath the SDK:

async function streamWithFetch(userMessage) {
  const response = await fetch('https://api.openai.com/v1/chat/completions', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${process.env.OPENAI_API_KEY}`,
    },
    body: JSON.stringify({
      model: 'gpt-4o',
      messages: [{ role: 'user', content: userMessage }],
      stream: true,
    }),
  });

  if (!response.ok) {
    throw new Error(`HTTP ${response.status}: ${await response.text()}`);
  }

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let fullResponse = '';
  let buffer = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    // Decode the raw bytes into text
    buffer += decoder.decode(value, { stream: true });

    // SSE events are separated by double newlines
    const lines = buffer.split('\n');
    buffer = lines.pop(); // Keep incomplete line in buffer

    for (const line of lines) {
      const trimmed = line.trim();
      if (!trimmed || !trimmed.startsWith('data: ')) continue;

      const data = trimmed.slice(6); // Remove 'data: ' prefix
      if (data === '[DONE]') {
        console.log('\n--- Stream complete ---');
        return fullResponse;
      }

      try {
        const parsed = JSON.parse(data);
        const content = parsed.choices[0]?.delta?.content;
        if (content) {
          fullResponse += content;
          process.stdout.write(content);
        }
      } catch (e) {
        // Skip non-JSON lines (e.g., empty lines, comments)
      }
    }
  }

  return fullResponse;
}

Method 3: Node.js EventSource (browser-compatible approach)

// In browsers or with an EventSource polyfill in Node.js
// Note: EventSource only supports GET, so it's not directly usable
// with LLM APIs that require POST. Use fetch + ReadableStream instead.

// For browser-based SSE from YOUR backend (which proxies to the LLM):
const eventSource = new EventSource('/api/chat?message=Hello');

eventSource.onmessage = (event) => {
  if (event.data === '[DONE]') {
    eventSource.close();
    return;
  }
  const chunk = JSON.parse(event.data);
  const content = chunk.choices[0]?.delta?.content || '';
  document.getElementById('output').textContent += content;
};

eventSource.onerror = (error) => {
  console.error('SSE error:', error);
  eventSource.close();
};

7. Async Iterators and the Streaming Pattern

Async iterators are the JavaScript mechanism that makes for await...of work. Understanding them helps when you need custom stream processing.

How async iterators work

// An async iterable implements Symbol.asyncIterator
// The SDK's stream object is an async iterable

// This:
for await (const chunk of stream) {
  console.log(chunk);
}

// Is syntactic sugar for this:
const iterator = stream[Symbol.asyncIterator]();
while (true) {
  const { value, done } = await iterator.next();
  if (done) break;
  console.log(value);
}

Building a custom async iterable for testing

// Useful for testing streaming UIs without hitting a real API
async function* mockStream(text, delayMs = 50) {
  const words = text.split(' ');
  for (const word of words) {
    await new Promise(resolve => setTimeout(resolve, delayMs));
    yield {
      choices: [{
        delta: { content: word + ' ' },
        finish_reason: null,
      }],
    };
  }
  // Final chunk
  yield {
    choices: [{
      delta: {},
      finish_reason: 'stop',
    }],
  };
}

// Usage — identical to real SDK streaming
async function testWithMock() {
  const stream = mockStream('This is a simulated streaming response from an LLM.');

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content;
    if (content) process.stdout.write(content);
  }
}

Transforming streams with async generators

// Filter, transform, or aggregate streaming chunks
async function* filterStream(stream) {
  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content;
    if (content) {
      // Example: uppercase every token
      yield content.toUpperCase();
    }
  }
}

// Accumulate and yield complete sentences
async function* sentenceStream(stream) {
  let buffer = '';
  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || '';
    buffer += content;

    // Yield complete sentences
    const sentenceEnd = buffer.search(/[.!?]\s/);
    if (sentenceEnd !== -1) {
      yield buffer.slice(0, sentenceEnd + 1);
      buffer = buffer.slice(sentenceEnd + 1).trimStart();
    }
  }
  // Yield any remaining text
  if (buffer.trim()) yield buffer.trim();
}

8. OpenAI Stream Helper

The OpenAI Node.js SDK provides a .stream() method and helper utilities for common streaming patterns:

Basic stream with event handlers

import OpenAI from 'openai';

const openai = new OpenAI();

async function streamWithHelpers() {
  const stream = await openai.chat.completions.create({
    model: 'gpt-4o',
    messages: [{ role: 'user', content: 'Write a haiku about streaming.' }],
    stream: true,
    stream_options: { include_usage: true }, // Get token counts
  });

  let fullContent = '';
  let usage = null;

  for await (const chunk of stream) {
    // Collect content
    const content = chunk.choices[0]?.delta?.content;
    if (content) {
      fullContent += content;
      process.stdout.write(content);
    }

    // Collect usage (arrives in the last chunk)
    if (chunk.usage) {
      usage = chunk.usage;
    }
  }

  console.log('\n\nToken usage:', usage);
  // { prompt_tokens: 14, completion_tokens: 22, total_tokens: 36 }

  return { content: fullContent, usage };
}

Streaming with abort controller (cancellation)

async function streamWithCancellation(userMessage, timeoutMs = 30000) {
  const controller = new AbortController();

  // Auto-cancel after timeout
  const timeout = setTimeout(() => controller.abort(), timeoutMs);

  try {
    const stream = await openai.chat.completions.create(
      {
        model: 'gpt-4o',
        messages: [{ role: 'user', content: userMessage }],
        stream: true,
      },
      {
        signal: controller.signal, // Pass abort signal to the request
      }
    );

    let fullResponse = '';

    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content;
      if (content) {
        fullResponse += content;

        // Example: cancel if response contains unwanted content
        if (fullResponse.includes('UNSAFE_PATTERN')) {
          controller.abort();
          break;
        }
      }
    }

    return fullResponse;
  } catch (error) {
    if (error.name === 'AbortError') {
      console.log('Stream was cancelled');
      return null;
    }
    throw error;
  } finally {
    clearTimeout(timeout);
  }
}

Converting a stream to a complete response

// Sometimes you want streaming for UX but also need the full response
async function streamAndCollect(messages) {
  const stream = await openai.chat.completions.create({
    model: 'gpt-4o',
    messages,
    stream: true,
    stream_options: { include_usage: true },
  });

  const chunks = [];
  let fullContent = '';
  let usage = null;

  for await (const chunk of stream) {
    chunks.push(chunk);
    const content = chunk.choices[0]?.delta?.content;
    if (content) fullContent += content;
    if (chunk.usage) usage = chunk.usage;
  }

  // Reconstruct a ChatCompletion-like object
  return {
    content: fullContent,
    usage,
    finishReason: chunks.at(-2)?.choices[0]?.finish_reason || 'stop',
    chunks, // Keep raw chunks for debugging
  };
}

9. Anthropic Stream Helper

Anthropic's Node.js SDK provides a dedicated .stream() method with typed event handling:

Basic streaming

import Anthropic from '@anthropic-ai/sdk';

const anthropic = new Anthropic(); // Uses ANTHROPIC_API_KEY env var

async function streamWithAnthropic() {
  const stream = await anthropic.messages.stream({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    messages: [{ role: 'user', content: 'Write a haiku about streaming.' }],
  });

  let fullContent = '';

  for await (const event of stream) {
    if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
      fullContent += event.delta.text;
      process.stdout.write(event.delta.text);
    }
  }

  // After the stream completes, get the full message
  const finalMessage = await stream.finalMessage();
  console.log('\n\nUsage:', finalMessage.usage);
  // { input_tokens: 14, output_tokens: 22 }

  return fullContent;
}

Event-based streaming with typed handlers

async function streamWithEventHandlers() {
  const stream = anthropic.messages.stream({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    messages: [{ role: 'user', content: 'Explain async iterators in JS.' }],
  });

  // Register typed event handlers
  stream.on('text', (text) => {
    process.stdout.write(text); // Each text delta
  });

  stream.on('message', (message) => {
    console.log('\n\nFull message:', message); // Complete Message object
  });

  stream.on('error', (error) => {
    console.error('Stream error:', error);
  });

  // Wait for the stream to complete
  const finalMessage = await stream.finalMessage();
  return finalMessage.content[0].text;
}

Anthropic streaming with system prompt and multi-turn

async function multiTurnStreaming(conversationHistory) {
  const stream = await anthropic.messages.stream({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 2048,
    system: 'You are an expert JavaScript teacher. Be concise and use code examples.',
    messages: conversationHistory,
  });

  let assistantResponse = '';

  for await (const event of stream) {
    if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
      assistantResponse += event.delta.text;
      process.stdout.write(event.delta.text);
    }
  }

  // Get token usage for cost tracking
  const finalMessage = await stream.finalMessage();
  const { input_tokens, output_tokens } = finalMessage.usage;

  console.log(`\nTokens: ${input_tokens} in, ${output_tokens} out`);

  // Add assistant response to history for next turn
  conversationHistory.push({ role: 'assistant', content: assistantResponse });

  return {
    content: assistantResponse,
    usage: { input_tokens, output_tokens },
  };
}

// Usage
const history = [
  { role: 'user', content: 'What is an async iterator?' }
];
await multiTurnStreaming(history);

// Follow-up
history.push({ role: 'user', content: 'Show me a code example.' });
await multiTurnStreaming(history);

10. Complete End-to-End Example: Streaming Chat Server

Here is a complete Express.js server that streams LLM responses to a browser client via SSE:

// server.js
import express from 'express';
import OpenAI from 'openai';

const app = express();
const openai = new OpenAI();

app.use(express.json());
app.use(express.static('public')); // Serve frontend files

// Streaming endpoint
app.post('/api/chat', async (req, res) => {
  const { messages } = req.body;

  // Set SSE headers
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  res.setHeader('X-Accel-Buffering', 'no'); // Disable nginx buffering

  // Handle client disconnect
  let isClientConnected = true;
  req.on('close', () => {
    isClientConnected = false;
  });

  try {
    const stream = await openai.chat.completions.create({
      model: 'gpt-4o',
      messages: [
        { role: 'system', content: 'You are a helpful assistant.' },
        ...messages,
      ],
      stream: true,
      stream_options: { include_usage: true },
    });

    for await (const chunk of stream) {
      if (!isClientConnected) break; // Stop if client disconnected

      const content = chunk.choices[0]?.delta?.content;
      if (content) {
        // Send as SSE event
        res.write(`data: ${JSON.stringify({ type: 'token', content })}\n\n`);
      }

      // Send usage info from final chunk
      if (chunk.usage) {
        res.write(`data: ${JSON.stringify({ type: 'usage', usage: chunk.usage })}\n\n`);
      }
    }

    // Signal stream completion
    res.write(`data: ${JSON.stringify({ type: 'done' })}\n\n`);
    res.end();

  } catch (error) {
    console.error('Streaming error:', error);

    if (isClientConnected) {
      res.write(`data: ${JSON.stringify({ type: 'error', message: error.message })}\n\n`);
      res.end();
    }
  }
});

app.listen(3000, () => {
  console.log('Server running on http://localhost:3000');
});
// public/client.js — Browser client
async function sendMessage(userMessage) {
  const outputElement = document.getElementById('output');
  outputElement.textContent = ''; // Clear previous output

  const response = await fetch('/api/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      messages: [{ role: 'user', content: userMessage }],
    }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split('\n\n');
    buffer = lines.pop(); // Keep incomplete event in buffer

    for (const line of lines) {
      if (!line.startsWith('data: ')) continue;
      const data = JSON.parse(line.slice(6));

      switch (data.type) {
        case 'token':
          outputElement.textContent += data.content;
          break;
        case 'usage':
          console.log('Token usage:', data.usage);
          break;
        case 'done':
          console.log('Stream complete');
          break;
        case 'error':
          outputElement.textContent += `\n[Error: ${data.message}]`;
          break;
      }
    }
  }
}

11. Streaming with Token Counting and Cost Tracking

In production, you need to track token usage even with streaming:

async function streamWithCostTracking(messages, model = 'gpt-4o') {
  // Pricing per 1M tokens (example rates — check current pricing)
  const pricing = {
    'gpt-4o': { input: 2.50, output: 10.00 },
    'gpt-4o-mini': { input: 0.15, output: 0.60 },
    'claude-sonnet-4-20250514': { input: 3.00, output: 15.00 },
  };

  const startTime = Date.now();
  let firstTokenTime = null;

  const stream = await openai.chat.completions.create({
    model,
    messages,
    stream: true,
    stream_options: { include_usage: true },
  });

  let fullContent = '';
  let usage = null;

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content;
    if (content) {
      if (!firstTokenTime) {
        firstTokenTime = Date.now();
      }
      fullContent += content;
      process.stdout.write(content);
    }
    if (chunk.usage) usage = chunk.usage;
  }

  const endTime = Date.now();
  const rates = pricing[model] || { input: 0, output: 0 };

  const metrics = {
    content: fullContent,
    model,
    timeToFirstToken: firstTokenTime ? firstTokenTime - startTime : null,
    totalTime: endTime - startTime,
    tokensPerSecond: usage
      ? (usage.completion_tokens / ((endTime - startTime) / 1000)).toFixed(1)
      : null,
    usage,
    cost: usage
      ? {
          input: ((usage.prompt_tokens / 1_000_000) * rates.input).toFixed(6),
          output: ((usage.completion_tokens / 1_000_000) * rates.output).toFixed(6),
          total: (
            (usage.prompt_tokens / 1_000_000) * rates.input +
            (usage.completion_tokens / 1_000_000) * rates.output
          ).toFixed(6),
        }
      : null,
  };

  console.log('\n\nMetrics:', JSON.stringify(metrics, null, 2));
  return metrics;
}

12. Error Handling in Streams

Streams can fail mid-way. Unlike buffered responses where you either get the full response or an error, streaming errors can occur after you have already received partial data.

async function robustStream(messages, maxRetries = 3) {
  let attempt = 0;

  while (attempt < maxRetries) {
    attempt++;

    try {
      const stream = await openai.chat.completions.create({
        model: 'gpt-4o',
        messages,
        stream: true,
      });

      let fullContent = '';
      let lastChunkTime = Date.now();

      for await (const chunk of stream) {
        lastChunkTime = Date.now();
        const content = chunk.choices[0]?.delta?.content;
        if (content) {
          fullContent += content;
          process.stdout.write(content);
        }
      }

      return { content: fullContent, attempts: attempt };

    } catch (error) {
      console.error(`\nStream attempt ${attempt} failed:`, error.message);

      if (attempt === maxRetries) {
        throw new Error(
          `Stream failed after ${maxRetries} attempts: ${error.message}`
        );
      }

      // Exponential backoff before retry
      const backoffMs = Math.min(1000 * Math.pow(2, attempt - 1), 10000);
      console.log(`Retrying in ${backoffMs}ms...`);
      await new Promise(resolve => setTimeout(resolve, backoffMs));
    }
  }
}

Common streaming error scenarios

ErrorCauseHandling
Connection resetNetwork interruption mid-streamRetry with the full prompt (partial context not recoverable)
TimeoutModel takes too long between chunksSet a per-chunk timeout, abort and retry
Rate limit (429)Too many concurrent streamsQueue requests, exponential backoff
Server error (500/502)API infrastructure issueRetry with backoff
Content filterResponse flagged mid-streamfinish_reason: "content_filter", handle gracefully
Context overflowInput too largeReduce input tokens, summarize history

13. Key Takeaways

  1. Streaming delivers tokens incrementally via Server-Sent Events (SSE), transforming a 10-second wait into 200ms time-to-first-token.
  2. Set stream: true in the API call — the response becomes an async iterable of chunk objects instead of a single response.
  3. Token content lives in delta, not messagechunk.choices[0].delta.content for OpenAI, event.delta.text for Anthropic.
  4. Use for await...of to consume streams in Node.js — it is the cleanest and most idiomatic pattern.
  5. Both OpenAI and Anthropic provide stream helpers — use them instead of parsing raw SSE manually.
  6. Handle errors mid-stream — partial data + error is a real scenario that buffered responses never have.
  7. Track metrics — time-to-first-token, tokens/second, and cost even in streaming mode.

Explain-It Challenge

  1. A colleague asks "why not just use WebSockets for LLM streaming?" Explain why SSE is preferred.
  2. You receive a bug report: "The streaming response drops the last few words sometimes." What is likely happening and how would you fix it?
  3. Your Node.js streaming endpoint works locally but tokens arrive in large batches (not one-by-one) in production. What infrastructure layer might be buffering, and how do you fix it?

Navigation: <- 4.8 Overview | 4.8.b — Progressive Rendering ->