Episode 4 — Generative AI Engineering / 4.13 — Building a RAG Pipeline

4.13.d — Building a Document QA System

In one sentence: This is the complete end-to-end build — a Document QA system with an ingestion pipeline (load, chunk, embed, store), a query pipeline (retrieve, prompt, generate, validate), structured JSON output { answer, confidence, sources } validated with Zod, comprehensive error handling, and testing.

Navigation: <- 4.13.c Prompt Construction for RAG | 4.13 Exercise Questions ->


1. System Architecture

┌──────────────────────────────────────────────────────────────────────────────┐
│                    DOCUMENT QA SYSTEM — FULL ARCHITECTURE                     │
│                                                                              │
│  ┌─────────────────────────────┐      ┌────────────────────────────────────┐ │
│  │  INGESTION PIPELINE          │      │  QUERY PIPELINE                    │ │
│  │                              │      │                                    │ │
│  │  Load docs (PDF, MD, TXT)   │      │  User query                       │ │
│  │       │                      │      │       │                            │ │
│  │       ▼                      │      │       ▼                            │ │
│  │  Clean & parse              │      │  Embed query                      │ │
│  │       │                      │      │       │                            │ │
│  │       ▼                      │      │       ▼                            │ │
│  │  Chunk (500 tokens, 50      │      │  Vector DB search (top-k)         │ │
│  │   overlap)                   │      │       │                            │ │
│  │       │                      │      │       ▼                            │ │
│  │       ▼                      │      │  [Optional] Re-rank              │ │
│  │  Embed each chunk           │      │       │                            │ │
│  │       │                      │      │       ▼                            │ │
│  │       ▼                      │      │  Build prompt (system + context   │ │
│  │  Store in Vector DB         │      │   + query)                        │ │
│  │  (vector + metadata)         │      │       │                            │ │
│  │                              │      │       ▼                            │ │
│  └─────────────────────────────┘      │  LLM Generate (temperature 0)    │ │
│                                        │       │                            │ │
│                                        │       ▼                            │ │
│                                        │  Parse JSON                       │ │
│                                        │       │                            │ │
│                                        │       ▼                            │ │
│                                        │  Zod Validate                     │ │
│                                        │       │                            │ │
│                                        │       ▼                            │ │
│                                        │  Return { answer, confidence,     │ │
│                                        │          sources }                │ │
│                                        └────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────────────┘

2. Project Setup

// package.json dependencies
// npm install openai zod
// (For a real project you'd also install your vector DB client, e.g., @pinecone-database/pinecone)

// For this example, we'll use an in-memory vector store to keep things self-contained.
// In production, replace with Pinecone, Chroma, pgvector, etc.

3. Zod Schemas — Define the Contract

Every piece of data flowing through the system gets a schema. This is the "trust but verify" principle applied to AI output.

import { z } from 'zod';

// --- Schema: Individual source reference ---
const SourceSchema = z.object({
  document: z.string().describe('Filename or path of the source document'),
  chunk: z.number().int().min(0).describe('Chunk index within the document'),
  relevance: z.number().min(0).max(1).describe('Relevance score from retrieval'),
});

// --- Schema: QA response from the LLM ---
const QAResponseSchema = z.object({
  answer: z.string().min(1).describe('Natural language answer with source citations'),
  confidence: z.number().min(0).max(1).describe('Model confidence in the answer'),
  sources: z.array(SourceSchema).describe('Sources used to generate the answer'),
  gaps: z.array(z.string()).optional().describe('Information asked about but not found'),
});

// --- Schema: Document chunk stored in vector DB ---
const ChunkSchema = z.object({
  id: z.string(),
  text: z.string(),
  embedding: z.array(z.number()),
  metadata: z.object({
    filename: z.string(),
    chunkIndex: z.number(),
    totalChunks: z.number(),
    charStart: z.number(),
    charEnd: z.number(),
  }),
});

// --- Schema: Ingestion result ---
const IngestionResultSchema = z.object({
  documentsProcessed: z.number(),
  chunksCreated: z.number(),
  embeddingsGenerated: z.number(),
  errors: z.array(z.object({
    document: z.string(),
    error: z.string(),
  })),
});

// Type inference for TypeScript users
// type QAResponse = z.infer<typeof QAResponseSchema>;
// type Source = z.infer<typeof SourceSchema>;
// type Chunk = z.infer<typeof ChunkSchema>;

4. In-Memory Vector Store

For this example, we build a minimal in-memory vector store. In production, replace this with Pinecone, Chroma, pgvector, or any vector database (see 4.12).

// vector-store.js — Minimal in-memory vector DB for learning

class InMemoryVectorStore {
  constructor() {
    this.vectors = [];   // Array of { id, values, metadata }
  }

  // Store vectors with metadata
  async upsert(items) {
    for (const item of items) {
      const existingIdx = this.vectors.findIndex(v => v.id === item.id);
      if (existingIdx >= 0) {
        this.vectors[existingIdx] = item;
      } else {
        this.vectors.push(item);
      }
    }
    return { upsertedCount: items.length };
  }

  // Query by vector similarity (cosine similarity)
  async query({ vector, topK = 5, filter = {}, includeMetadata = true }) {
    let candidates = this.vectors;

    // Apply metadata filters
    for (const [key, value] of Object.entries(filter)) {
      candidates = candidates.filter(v => v.metadata[key] === value);
    }

    // Calculate cosine similarity for each candidate
    const scored = candidates.map(candidate => ({
      id: candidate.id,
      score: cosineSimilarity(vector, candidate.values),
      metadata: includeMetadata ? candidate.metadata : undefined,
    }));

    // Sort by score descending and return top-k
    scored.sort((a, b) => b.score - a.score);
    return { matches: scored.slice(0, topK) };
  }

  // Get total count
  get size() {
    return this.vectors.length;
  }

  // Clear all data
  clear() {
    this.vectors = [];
  }
}

// Cosine similarity between two vectors
function cosineSimilarity(a, b) {
  let dotProduct = 0;
  let normA = 0;
  let normB = 0;

  for (let i = 0; i < a.length; i++) {
    dotProduct += a[i] * b[i];
    normA += a[i] * a[i];
    normB += b[i] * b[i];
  }

  if (normA === 0 || normB === 0) return 0;
  return dotProduct / (Math.sqrt(normA) * Math.sqrt(normB));
}

5. Document Chunking

Chunking splits documents into pieces that fit in the context window and are semantically meaningful.

// chunker.js — Text chunking with overlap

/**
 * Split text into overlapping chunks.
 *
 * @param {string} text - The full document text
 * @param {object} options
 * @param {number} options.chunkSize - Target characters per chunk (~500 tokens * 4 chars)
 * @param {number} options.chunkOverlap - Overlap characters between chunks
 * @returns {Array<{ text: string, charStart: number, charEnd: number }>}
 */
function chunkText(text, options = {}) {
  const {
    chunkSize = 2000,     // ~500 tokens at 4 chars/token
    chunkOverlap = 200,   // ~50 tokens overlap
  } = options;

  if (!text || text.length === 0) return [];
  if (text.length <= chunkSize) {
    return [{ text, charStart: 0, charEnd: text.length }];
  }

  const chunks = [];
  let start = 0;

  while (start < text.length) {
    let end = start + chunkSize;

    // Try to break at a paragraph boundary
    if (end < text.length) {
      const paragraphBreak = text.lastIndexOf('\n\n', end);
      if (paragraphBreak > start + chunkSize * 0.5) {
        end = paragraphBreak;
      } else {
        // Try sentence boundary
        const sentenceBreak = text.lastIndexOf('. ', end);
        if (sentenceBreak > start + chunkSize * 0.5) {
          end = sentenceBreak + 1; // Include the period
        }
      }
    } else {
      end = text.length;
    }

    chunks.push({
      text: text.slice(start, end).trim(),
      charStart: start,
      charEnd: end,
    });

    // Move forward by chunkSize minus overlap
    start = end - chunkOverlap;
    if (start >= text.length) break;

    // Prevent infinite loop
    if (start <= chunks[chunks.length - 1]?.charStart) {
      start = end;
    }
  }

  return chunks;
}

// Smarter chunking: respect Markdown headers
function chunkMarkdown(text, options = {}) {
  const { maxChunkSize = 2000 } = options;

  // Split by headers
  const sections = text.split(/(?=^#{1,3}\s)/m);

  const chunks = [];
  let currentChunk = '';
  let charOffset = 0;

  for (const section of sections) {
    if (currentChunk.length + section.length > maxChunkSize && currentChunk.length > 0) {
      chunks.push({
        text: currentChunk.trim(),
        charStart: charOffset,
        charEnd: charOffset + currentChunk.length,
      });
      charOffset += currentChunk.length;
      currentChunk = '';
    }
    currentChunk += section;
  }

  if (currentChunk.trim().length > 0) {
    chunks.push({
      text: currentChunk.trim(),
      charStart: charOffset,
      charEnd: charOffset + currentChunk.length,
    });
  }

  return chunks;
}

6. Embedding Pipeline

// embedder.js — Batch embedding with rate-limit handling

import OpenAI from 'openai';

const openai = new OpenAI();
const EMBEDDING_MODEL = 'text-embedding-3-small';
const EMBEDDING_DIMENSIONS = 1536;
const BATCH_SIZE = 100; // OpenAI supports up to 2048 inputs per request

/**
 * Embed an array of texts in batches.
 * Returns an array of embedding vectors in the same order as input.
 */
async function embedBatch(texts) {
  const allEmbeddings = [];

  for (let i = 0; i < texts.length; i += BATCH_SIZE) {
    const batch = texts.slice(i, i + BATCH_SIZE);

    try {
      const response = await openai.embeddings.create({
        model: EMBEDDING_MODEL,
        input: batch,
      });

      // Ensure embeddings are in the correct order
      const sorted = response.data.sort((a, b) => a.index - b.index);
      allEmbeddings.push(...sorted.map(d => d.embedding));

      console.log(`Embedded batch ${Math.floor(i / BATCH_SIZE) + 1}/${Math.ceil(texts.length / BATCH_SIZE)}`);
    } catch (error) {
      if (error.status === 429) {
        // Rate limited — wait and retry
        console.log('Rate limited, waiting 10 seconds...');
        await new Promise(resolve => setTimeout(resolve, 10000));
        i -= BATCH_SIZE; // Retry this batch
        continue;
      }
      throw error;
    }
  }

  return allEmbeddings;
}

/**
 * Embed a single text (for queries).
 */
async function embedQuery(text) {
  const response = await openai.embeddings.create({
    model: EMBEDDING_MODEL,
    input: text,
  });
  return response.data[0].embedding;
}

7. Ingestion Pipeline — Complete Implementation

// ingest.js — Full document ingestion pipeline

import { z } from 'zod';

const IngestionResultSchema = z.object({
  documentsProcessed: z.number(),
  chunksCreated: z.number(),
  embeddingsGenerated: z.number(),
  errors: z.array(z.object({
    document: z.string(),
    error: z.string(),
  })),
});

/**
 * Ingest documents into the vector store.
 *
 * @param {Array<{ id: string, filename: string, content: string }>} documents
 * @param {InMemoryVectorStore} vectorStore
 * @returns {z.infer<typeof IngestionResultSchema>}
 */
async function ingestDocuments(documents, vectorStore) {
  const result = {
    documentsProcessed: 0,
    chunksCreated: 0,
    embeddingsGenerated: 0,
    errors: [],
  };

  for (const doc of documents) {
    try {
      console.log(`Processing: ${doc.filename}`);

      // 1. Validate document
      if (!doc.content || doc.content.trim().length === 0) {
        result.errors.push({ document: doc.filename, error: 'Empty document' });
        continue;
      }

      // 2. Chunk the document
      const chunks = chunkText(doc.content, {
        chunkSize: 2000,    // ~500 tokens
        chunkOverlap: 200,  // ~50 tokens
      });

      if (chunks.length === 0) {
        result.errors.push({ document: doc.filename, error: 'No chunks produced' });
        continue;
      }

      console.log(`  Created ${chunks.length} chunks`);

      // 3. Embed all chunks
      const texts = chunks.map(c => c.text);
      const embeddings = await embedBatch(texts);

      console.log(`  Generated ${embeddings.length} embeddings`);

      // 4. Prepare vectors for storage
      const vectors = chunks.map((chunk, i) => ({
        id: `${doc.id}-chunk-${i}`,
        values: embeddings[i],
        metadata: {
          text: chunk.text,
          filename: doc.filename,
          documentId: doc.id,
          chunkIndex: i,
          totalChunks: chunks.length,
          charStart: chunk.charStart,
          charEnd: chunk.charEnd,
          ingestedAt: new Date().toISOString(),
        },
      }));

      // 5. Store in vector database
      await vectorStore.upsert(vectors);

      result.documentsProcessed++;
      result.chunksCreated += chunks.length;
      result.embeddingsGenerated += embeddings.length;

      console.log(`  Stored ${vectors.length} vectors`);
    } catch (error) {
      result.errors.push({
        document: doc.filename,
        error: error.message,
      });
      console.error(`  Error processing ${doc.filename}:`, error.message);
    }
  }

  // Validate the result
  const validated = IngestionResultSchema.parse(result);

  console.log('\n--- Ingestion Summary ---');
  console.log(`Documents processed: ${validated.documentsProcessed}`);
  console.log(`Chunks created: ${validated.chunksCreated}`);
  console.log(`Embeddings generated: ${validated.embeddingsGenerated}`);
  console.log(`Errors: ${validated.errors.length}`);

  return validated;
}

// --- Example: Ingest sample documents ---
async function exampleIngestion() {
  const vectorStore = new InMemoryVectorStore();

  const documents = [
    {
      id: 'doc-1',
      filename: 'employee-handbook.md',
      content: `# Employee Handbook

## Paid Time Off (PTO)

New employees receive 15 paid time off days per calendar year. PTO accrues monthly at a rate of 1.25 days per month. Unused PTO may be carried over up to a maximum of 5 days into the next calendar year.

After 5 years of continuous employment, PTO increases to 20 days per year. After 10 years, PTO increases to 25 days per year.

PTO requests must be submitted at least 2 weeks in advance through the HR portal. Requests during blackout periods (December 20-31, company events) require manager and VP approval.

## Remote Work Policy

Employees may work remotely up to 3 days per week with manager approval. A formal remote work agreement must be signed and filed with HR. Remote workers are expected to maintain core hours of 10am-3pm in their local timezone.

Equipment for remote work (monitor, keyboard, headset) is provided through a one-time $500 equipment allowance. Internet costs are not reimbursed.

## Health Benefits

Full-time employees are eligible for health insurance starting on the first day of the month following their start date. The company covers 80% of individual premiums and 60% of family premiums.

Dental and vision coverage is included in all plans. Mental health services are covered for up to 20 sessions per year.`,
    },
    {
      id: 'doc-2',
      filename: 'refund-policy.md',
      content: `# Refund Policy

## Standard Returns

Customers may return physical products within 30 days of purchase for a full refund. Items must be in original condition with all packaging and accessories included. A valid receipt or order confirmation is required.

Refunds are processed within 5-7 business days after the returned item is received and inspected. Refunds are issued to the original payment method.

## Digital Products

Digital products (software licenses, downloadable content, online courses) are non-refundable after download or activation. If a digital product is defective, customers may request a replacement or store credit within 14 days.

## Exceptions

Sale items are final sale and cannot be returned. Custom or personalized items are non-refundable. Gift cards are non-refundable and do not expire.

## How to Initiate a Return

1. Log into your account at shop.acme.com
2. Go to Order History
3. Select the item to return
4. Choose a reason for return
5. Print the prepaid shipping label
6. Ship the item within 7 days`,
    },
  ];

  const result = await ingestDocuments(documents, vectorStore);
  console.log(`\nVector store now contains ${vectorStore.size} vectors`);

  return { vectorStore, result };
}

8. Query Pipeline — Complete Implementation

// query.js — Full query pipeline with validation

import OpenAI from 'openai';
import { z } from 'zod';

const openai = new OpenAI();

// --- Response schema (must match what we ask the LLM to produce) ---
const QAResponseSchema = z.object({
  answer: z.string().min(1),
  confidence: z.number().min(0).max(1),
  sources: z.array(z.object({
    document: z.string(),
    chunk: z.number().int().min(0),
    relevance: z.number().min(0).max(1),
  })),
  gaps: z.array(z.string()).optional().default([]),
});

/**
 * Query the Document QA system.
 *
 * @param {string} userQuery - The user's natural language question
 * @param {InMemoryVectorStore} vectorStore - The vector store to search
 * @param {object} options - Configuration options
 * @returns {z.infer<typeof QAResponseSchema>}
 */
async function queryDocumentQA(userQuery, vectorStore, options = {}) {
  const {
    topK = 5,
    minScore = 0.6,
    model = 'gpt-4o',
    temperature = 0,
    maxRetries = 2,
  } = options;

  // === Step 1: Embed the query ===
  console.log(`\nQuery: "${userQuery}"`);
  const queryEmbedding = await embedQuery(userQuery);

  // === Step 2: Retrieve relevant chunks ===
  const searchResults = await vectorStore.query({
    vector: queryEmbedding,
    topK: topK,
    includeMetadata: true,
  });

  // Filter by minimum score
  const relevantChunks = searchResults.matches.filter(m => m.score >= minScore);

  console.log(`Retrieved ${relevantChunks.length} relevant chunks (of ${searchResults.matches.length} total)`);

  // Handle no results
  if (relevantChunks.length === 0) {
    return QAResponseSchema.parse({
      answer: "I don't have enough information in my knowledge base to answer that question.",
      confidence: 0,
      sources: [],
      gaps: [userQuery],
    });
  }

  // === Step 3: Build the prompt ===
  const formattedContext = relevantChunks.map((match, i) =>
    `[Source ${i + 1}: ${match.metadata.filename}, Chunk ${match.metadata.chunkIndex}]
${match.metadata.text}`
  ).join('\n\n---\n\n');

  const systemPrompt = `You are a document question-answering assistant.

RULES:
1. Answer ONLY from the provided CONTEXT. Do NOT use your training knowledge.
2. Cite sources using [Source N] notation.
3. If the context does not contain the answer, say "I don't have enough information" and set confidence to 0.
4. Never fabricate information not explicitly stated in the context.

OUTPUT FORMAT (valid JSON):
{
  "answer": "Your answer with [Source N] inline citations",
  "confidence": 0.0 to 1.0 based on how well the context answers the question,
  "sources": [
    { "document": "filename.md", "chunk": 0, "relevance": 0.95 }
  ],
  "gaps": ["information asked about but not found in context"]
}

CONFIDENCE GUIDE:
- 0.9-1.0: Context directly and completely answers the question
- 0.7-0.8: Context mostly answers with minor gaps
- 0.4-0.6: Context partially addresses the question
- 0.1-0.3: Context is tangentially related
- 0.0: Context does not address the question

CONTEXT:
${formattedContext}`;

  // === Step 4: Generate the answer ===
  let lastError = null;

  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      const response = await openai.chat.completions.create({
        model,
        temperature,
        messages: [
          { role: 'system', content: systemPrompt },
          { role: 'user', content: userQuery },
        ],
        response_format: { type: 'json_object' },
      });

      const rawContent = response.choices[0].message.content;

      // === Step 5: Parse JSON ===
      let parsed;
      try {
        parsed = JSON.parse(rawContent);
      } catch (parseError) {
        // Try to extract JSON from text that might have markdown wrapping
        const jsonMatch = rawContent.match(/\{[\s\S]*\}/);
        if (jsonMatch) {
          parsed = JSON.parse(jsonMatch[0]);
        } else {
          throw new Error(`Failed to parse JSON: ${rawContent.slice(0, 200)}`);
        }
      }

      // === Step 6: Validate with Zod ===
      const validated = QAResponseSchema.parse(parsed);

      // === Step 7: Post-processing ===
      // Enrich sources with retrieval scores
      validated.sources = validated.sources.map(source => {
        const matchingChunk = relevantChunks.find(
          c => c.metadata.filename === source.document && c.metadata.chunkIndex === source.chunk
        );
        return {
          ...source,
          relevance: matchingChunk?.score || source.relevance,
        };
      });

      console.log(`Answer generated (confidence: ${validated.confidence})`);
      console.log(`Sources: ${validated.sources.map(s => s.document).join(', ')}`);

      return validated;
    } catch (error) {
      lastError = error;
      console.warn(`Attempt ${attempt + 1} failed: ${error.message}`);

      if (attempt < maxRetries) {
        // Wait before retrying (exponential backoff)
        await new Promise(r => setTimeout(r, 1000 * Math.pow(2, attempt)));
      }
    }
  }

  // All retries exhausted — return a safe fallback
  console.error('All attempts failed, returning fallback response');
  return QAResponseSchema.parse({
    answer: 'I encountered an error while processing your question. Please try again.',
    confidence: 0,
    sources: [],
    gaps: ['System error — could not generate answer'],
  });
}

9. Putting It All Together — Full Working Example

// main.js — Complete Document QA System

import OpenAI from 'openai';
import { z } from 'zod';

// ============================================================
// CONFIGURATION
// ============================================================

const openai = new OpenAI();
const EMBEDDING_MODEL = 'text-embedding-3-small';

// ============================================================
// SCHEMAS
// ============================================================

const QAResponseSchema = z.object({
  answer: z.string().min(1),
  confidence: z.number().min(0).max(1),
  sources: z.array(z.object({
    document: z.string(),
    chunk: z.number().int().min(0),
    relevance: z.number().min(0).max(1),
  })),
  gaps: z.array(z.string()).optional().default([]),
});

// ============================================================
// VECTOR STORE (in-memory for this example)
// ============================================================

class VectorStore {
  constructor() { this.vectors = []; }

  async upsert(items) {
    for (const item of items) {
      const idx = this.vectors.findIndex(v => v.id === item.id);
      if (idx >= 0) this.vectors[idx] = item;
      else this.vectors.push(item);
    }
  }

  async query({ vector, topK = 5 }) {
    const scored = this.vectors.map(v => ({
      id: v.id,
      score: cosine(vector, v.values),
      metadata: v.metadata,
    }));
    scored.sort((a, b) => b.score - a.score);
    return { matches: scored.slice(0, topK) };
  }

  get size() { return this.vectors.length; }
}

function cosine(a, b) {
  let dot = 0, na = 0, nb = 0;
  for (let i = 0; i < a.length; i++) {
    dot += a[i] * b[i]; na += a[i] ** 2; nb += b[i] ** 2;
  }
  return na && nb ? dot / (Math.sqrt(na) * Math.sqrt(nb)) : 0;
}

// ============================================================
// CHUNKING
// ============================================================

function chunkText(text, { chunkSize = 2000, overlap = 200 } = {}) {
  if (!text || text.length <= chunkSize) {
    return [{ text: text?.trim() || '', start: 0, end: text?.length || 0 }];
  }
  const chunks = [];
  let pos = 0;
  while (pos < text.length) {
    let end = Math.min(pos + chunkSize, text.length);
    // Break at paragraph or sentence boundary
    if (end < text.length) {
      const paraBreak = text.lastIndexOf('\n\n', end);
      if (paraBreak > pos + chunkSize * 0.5) end = paraBreak;
      else {
        const sentBreak = text.lastIndexOf('. ', end);
        if (sentBreak > pos + chunkSize * 0.5) end = sentBreak + 1;
      }
    }
    chunks.push({ text: text.slice(pos, end).trim(), start: pos, end });
    pos = end - overlap;
    if (pos <= (chunks.at(-1)?.start ?? 0)) pos = end; // prevent loop
  }
  return chunks;
}

// ============================================================
// EMBEDDING
// ============================================================

async function embed(texts) {
  const res = await openai.embeddings.create({ model: EMBEDDING_MODEL, input: texts });
  return res.data.sort((a, b) => a.index - b.index).map(d => d.embedding);
}

async function embedOne(text) {
  const [vec] = await embed([text]);
  return vec;
}

// ============================================================
// INGESTION PIPELINE
// ============================================================

async function ingest(docs, store) {
  let totalChunks = 0;
  const errors = [];

  for (const doc of docs) {
    try {
      const chunks = chunkText(doc.content);
      const embeddings = await embed(chunks.map(c => c.text));

      const vectors = chunks.map((chunk, i) => ({
        id: `${doc.id}-${i}`,
        values: embeddings[i],
        metadata: {
          text: chunk.text,
          filename: doc.filename,
          chunkIndex: i,
          totalChunks: chunks.length,
        },
      }));

      await store.upsert(vectors);
      totalChunks += chunks.length;
      console.log(`  ${doc.filename}: ${chunks.length} chunks`);
    } catch (err) {
      errors.push({ document: doc.filename, error: err.message });
    }
  }

  console.log(`Ingested ${totalChunks} chunks from ${docs.length - errors.length} docs`);
  return { totalChunks, errors };
}

// ============================================================
// QUERY PIPELINE
// ============================================================

async function query(userQuery, store, options = {}) {
  const { topK = 5, minScore = 0.6, maxRetries = 2 } = options;

  // 1. Embed
  const qVec = await embedOne(userQuery);

  // 2. Retrieve
  const { matches } = await store.query({ vector: qVec, topK });
  const relevant = matches.filter(m => m.score >= minScore);

  if (relevant.length === 0) {
    return QAResponseSchema.parse({
      answer: "I don't have enough information in my knowledge base to answer that.",
      confidence: 0,
      sources: [],
    });
  }

  // 3. Build prompt
  const context = relevant.map((m, i) =>
    `[Source ${i + 1}: ${m.metadata.filename}, Chunk ${m.metadata.chunkIndex}]\n${m.metadata.text}`
  ).join('\n\n---\n\n');

  const systemPrompt = `You are a document QA assistant. Answer ONLY from the CONTEXT below.
Return valid JSON: { "answer": "...", "confidence": 0.0-1.0, "sources": [{ "document": "...", "chunk": N, "relevance": N }], "gaps": [] }
If the context doesn't answer the question, set confidence to 0.
Cite sources with [Source N] in your answer.

CONTEXT:
${context}`;

  // 4. Generate with retries
  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      const res = await openai.chat.completions.create({
        model: 'gpt-4o',
        temperature: 0,
        messages: [
          { role: 'system', content: systemPrompt },
          { role: 'user', content: userQuery },
        ],
        response_format: { type: 'json_object' },
      });

      const raw = res.choices[0].message.content;
      const parsed = JSON.parse(raw);
      const validated = QAResponseSchema.parse(parsed);

      // Enrich relevance scores from retrieval
      validated.sources = validated.sources.map(s => {
        const match = relevant.find(
          r => r.metadata.filename === s.document && r.metadata.chunkIndex === s.chunk
        );
        return { ...s, relevance: match?.score ?? s.relevance };
      });

      return validated;
    } catch (err) {
      console.warn(`Attempt ${attempt + 1} failed: ${err.message}`);
      if (attempt === maxRetries) {
        return QAResponseSchema.parse({
          answer: 'Error processing your question. Please try again.',
          confidence: 0,
          sources: [],
        });
      }
      await new Promise(r => setTimeout(r, 1000 * 2 ** attempt));
    }
  }
}

// ============================================================
// USAGE EXAMPLE
// ============================================================

async function main() {
  const store = new VectorStore();

  // --- Ingest ---
  console.log('=== INGESTION ===');
  await ingest([
    {
      id: 'handbook',
      filename: 'employee-handbook.md',
      content: `# Employee Handbook

## PTO Policy
New employees receive 15 paid time off days per year. PTO accrues monthly at 1.25 days per month. Maximum carryover is 5 days. After 5 years PTO increases to 20 days. After 10 years PTO increases to 25 days.

## Remote Work
Employees may work remotely up to 3 days per week with manager approval. Core hours are 10am-3pm local time. Equipment allowance is $500 one-time.

## Health Benefits
Health insurance starts the first of the month after start date. Company covers 80% of individual premiums. Mental health covered for 20 sessions per year.`,
    },
    {
      id: 'refund',
      filename: 'refund-policy.md',
      content: `# Refund Policy

Physical products may be returned within 30 days for a full refund. Items must be in original condition with receipt. Refunds processed in 5-7 business days.

Digital products are non-refundable after download. Defective digital products may be replaced within 14 days.

Sale items are final sale. Gift cards do not expire and are non-refundable.`,
    },
  ], store);

  console.log(`\nVector store: ${store.size} vectors\n`);

  // --- Query ---
  console.log('=== QUERIES ===\n');

  const q1 = await query('How many vacation days do new employees get?', store);
  console.log('Q1 Result:', JSON.stringify(q1, null, 2));

  console.log('\n---\n');

  const q2 = await query('What is the refund policy for digital products?', store);
  console.log('Q2 Result:', JSON.stringify(q2, null, 2));

  console.log('\n---\n');

  const q3 = await query('What is the dress code?', store);
  console.log('Q3 Result:', JSON.stringify(q3, null, 2));
}

main().catch(console.error);

Expected output

// Q1: "How many vacation days do new employees get?"
{
  "answer": "New employees receive 15 paid time off (PTO) days per year [Source 1]. PTO accrues monthly at a rate of 1.25 days per month. The maximum carryover to the next year is 5 days [Source 1].",
  "confidence": 0.95,
  "sources": [
    { "document": "employee-handbook.md", "chunk": 0, "relevance": 0.94 }
  ],
  "gaps": []
}

// Q2: "What is the refund policy for digital products?"
{
  "answer": "Digital products are non-refundable after download or activation [Source 1]. However, if a digital product is defective, customers may request a replacement within 14 days [Source 1].",
  "confidence": 0.92,
  "sources": [
    { "document": "refund-policy.md", "chunk": 0, "relevance": 0.91 }
  ],
  "gaps": []
}

// Q3: "What is the dress code?" (not in any document)
{
  "answer": "I don't have enough information in my knowledge base to answer that.",
  "confidence": 0,
  "sources": [],
  "gaps": ["dress code policy"]
}

10. Error Handling — Comprehensive Strategy

// errors.js — Error handling for the Document QA system

class RAGError extends Error {
  constructor(message, code, details = {}) {
    super(message);
    this.name = 'RAGError';
    this.code = code;
    this.details = details;
  }
}

// Error codes
const RAGErrorCodes = {
  EMBEDDING_FAILED: 'EMBEDDING_FAILED',
  RETRIEVAL_FAILED: 'RETRIEVAL_FAILED',
  NO_RESULTS: 'NO_RESULTS',
  LLM_FAILED: 'LLM_FAILED',
  PARSE_FAILED: 'PARSE_FAILED',
  VALIDATION_FAILED: 'VALIDATION_FAILED',
  RATE_LIMITED: 'RATE_LIMITED',
  CONTEXT_TOO_LONG: 'CONTEXT_TOO_LONG',
};

// Wrap the query pipeline with comprehensive error handling
async function safeQuery(userQuery, store, options = {}) {
  const startTime = Date.now();

  try {
    // Input validation
    if (!userQuery || typeof userQuery !== 'string') {
      throw new RAGError('Query must be a non-empty string', 'INVALID_INPUT');
    }

    if (userQuery.length > 10000) {
      throw new RAGError('Query too long (max 10,000 characters)', 'INVALID_INPUT');
    }

    const result = await query(userQuery, store, options);

    return {
      success: true,
      data: result,
      metadata: {
        latencyMs: Date.now() - startTime,
        query: userQuery,
      },
    };
  } catch (error) {
    console.error(`Query failed: ${error.message}`);

    // Categorize and handle specific errors
    if (error.status === 429) {
      return {
        success: false,
        error: {
          code: RAGErrorCodes.RATE_LIMITED,
          message: 'Service is busy. Please try again in a moment.',
          retryable: true,
          retryAfterMs: 5000,
        },
        metadata: { latencyMs: Date.now() - startTime },
      };
    }

    if (error instanceof z.ZodError) {
      return {
        success: false,
        error: {
          code: RAGErrorCodes.VALIDATION_FAILED,
          message: 'AI response did not match expected format.',
          details: error.errors,
          retryable: true,
        },
        metadata: { latencyMs: Date.now() - startTime },
      };
    }

    if (error instanceof SyntaxError) {
      return {
        success: false,
        error: {
          code: RAGErrorCodes.PARSE_FAILED,
          message: 'AI response was not valid JSON.',
          retryable: true,
        },
        metadata: { latencyMs: Date.now() - startTime },
      };
    }

    // Generic error
    return {
      success: false,
      error: {
        code: 'UNKNOWN',
        message: 'An unexpected error occurred. Please try again.',
        retryable: false,
      },
      metadata: { latencyMs: Date.now() - startTime },
    };
  }
}

11. Testing the Document QA System

// test-qa.js — Test suite for the Document QA system

// --- Test 1: Ingestion produces correct number of chunks ---
async function testIngestion() {
  const store = new VectorStore();
  const result = await ingest([{
    id: 'test-1',
    filename: 'test.md',
    content: 'This is a test document with enough content to produce at least one chunk.',
  }], store);

  console.assert(result.totalChunks >= 1, 'Should produce at least 1 chunk');
  console.assert(result.errors.length === 0, 'Should have no errors');
  console.assert(store.size >= 1, 'Store should have vectors');
  console.log('PASS: testIngestion');
}

// --- Test 2: Query returns valid schema ---
async function testQuerySchema() {
  const store = await createTestStore();
  const result = await query('What is the PTO policy?', store);

  // Validate against Zod schema
  const validation = QAResponseSchema.safeParse(result);
  console.assert(validation.success, `Schema validation failed: ${JSON.stringify(validation.error?.errors)}`);
  console.assert(typeof result.answer === 'string', 'Answer should be a string');
  console.assert(result.confidence >= 0 && result.confidence <= 1, 'Confidence should be 0-1');
  console.assert(Array.isArray(result.sources), 'Sources should be an array');
  console.log('PASS: testQuerySchema');
}

// --- Test 3: Irrelevant query returns low confidence ---
async function testIrrelevantQuery() {
  const store = await createTestStore();
  const result = await query('What is the recipe for chocolate cake?', store);

  console.assert(result.confidence <= 0.3, `Expected low confidence, got ${result.confidence}`);
  console.log('PASS: testIrrelevantQuery');
}

// --- Test 4: Answer cites real sources ---
async function testSourceCitation() {
  const store = await createTestStore();
  const result = await query('How many PTO days do new employees get?', store);

  console.assert(result.sources.length > 0, 'Should have at least one source');
  for (const source of result.sources) {
    console.assert(typeof source.document === 'string', 'Source document should be a string');
    console.assert(typeof source.chunk === 'number', 'Source chunk should be a number');
    console.assert(source.relevance >= 0 && source.relevance <= 1, 'Relevance should be 0-1');
  }
  console.log('PASS: testSourceCitation');
}

// --- Test 5: Empty store returns no-results response ---
async function testEmptyStore() {
  const store = new VectorStore();
  const result = await query('Any question at all?', store);

  console.assert(result.confidence === 0, 'Confidence should be 0 for empty store');
  console.assert(result.sources.length === 0, 'Should have no sources');
  console.log('PASS: testEmptyStore');
}

// --- Test 6: Chunking respects boundaries ---
function testChunking() {
  const text = 'First paragraph about topic A.\n\nSecond paragraph about topic B.\n\nThird paragraph about topic C.';
  const chunks = chunkText(text, { chunkSize: 60, overlap: 10 });

  console.assert(chunks.length >= 2, `Expected at least 2 chunks, got ${chunks.length}`);
  for (const chunk of chunks) {
    console.assert(chunk.text.length > 0, 'Chunk should not be empty');
    console.assert(chunk.start >= 0, 'charStart should be non-negative');
    console.assert(chunk.end > chunk.start, 'charEnd should be > charStart');
  }
  console.log('PASS: testChunking');
}

// --- Test 7: Zod validation catches bad responses ---
function testZodValidation() {
  // Missing required field
  const bad1 = QAResponseSchema.safeParse({ answer: 'test', confidence: 0.5 });
  console.assert(!bad1.success, 'Should reject missing sources');

  // Confidence out of range
  const bad2 = QAResponseSchema.safeParse({
    answer: 'test', confidence: 1.5, sources: [],
  });
  console.assert(!bad2.success, 'Should reject confidence > 1');

  // Valid response
  const good = QAResponseSchema.safeParse({
    answer: 'test', confidence: 0.8,
    sources: [{ document: 'test.md', chunk: 0, relevance: 0.9 }],
  });
  console.assert(good.success, 'Should accept valid response');

  console.log('PASS: testZodValidation');
}

// Helper: create a test vector store with known documents
async function createTestStore() {
  const store = new VectorStore();
  await ingest([{
    id: 'test-handbook',
    filename: 'employee-handbook.md',
    content: `New employees receive 15 PTO days per year. After 5 years, PTO increases to 20 days. Remote work is allowed 3 days per week with manager approval.`,
  }], store);
  return store;
}

// Run all tests
async function runTests() {
  console.log('Running Document QA tests...\n');

  testChunking();
  testZodValidation();
  await testIngestion();
  await testQuerySchema();
  await testIrrelevantQuery();
  await testSourceCitation();
  await testEmptyStore();

  console.log('\nAll tests passed.');
}

runTests().catch(console.error);

12. Production Considerations

Monitoring and observability

// Log every query for monitoring and debugging
async function queryWithLogging(userQuery, store, options = {}) {
  const traceId = crypto.randomUUID();
  const startTime = Date.now();

  console.log(JSON.stringify({
    event: 'rag_query_start',
    traceId,
    query: userQuery,
    timestamp: new Date().toISOString(),
  }));

  const result = await safeQuery(userQuery, store, options);

  console.log(JSON.stringify({
    event: 'rag_query_complete',
    traceId,
    success: result.success,
    confidence: result.data?.confidence,
    sourcesCount: result.data?.sources?.length,
    latencyMs: Date.now() - startTime,
    timestamp: new Date().toISOString(),
  }));

  return result;
}

Performance optimization

Optimization                        Impact
-------------------------------------------------------
Cache embeddings for repeated queries   -50ms per query
Batch embed during ingestion            -80% embedding time
Use smaller embedding model for speed   -30ms per embed
Pre-filter by metadata before search    -20ms per search
Cache frequent query results (TTL)      -200ms per cached hit
Use async/parallel where possible       -40% total latency

Security

// Sanitize user input before processing
function sanitizeQuery(query) {
  // Remove potential prompt injection attempts
  const sanitized = query
    .replace(/ignore previous instructions/gi, '')
    .replace(/system:/gi, '')
    .replace(/\bprompt\b.*\binjection\b/gi, '')
    .trim();

  if (sanitized.length === 0) {
    throw new Error('Query was empty after sanitization');
  }

  return sanitized;
}

13. Key Takeaways

  1. Two pipelines: Ingestion (load, chunk, embed, store) runs offline. Query (embed, retrieve, prompt, generate, validate) runs per request.
  2. Zod validation is non-negotiable — AI output is untrusted. Every response must pass schema validation before reaching your application.
  3. Chunking strategy matters — break at paragraph/sentence boundaries, use overlap to maintain continuity.
  4. Same embedding model everywhere — ingestion and query must use the identical model.
  5. Error handling at every step — embedding can fail, retrieval can return nothing, LLM can return bad JSON, validation can reject. Handle all of them.
  6. Retry with exponential backoff — transient failures (rate limits, API errors) are expected. Retry before failing.
  7. Test the full pipeline — schema validation, chunking, retrieval quality, and end-to-end queries.
  8. Log everything — trace ID, query, retrieved chunks, LLM response, confidence, latency. You need this for debugging and monitoring.

Explain-It Challenge

  1. Walk through what happens if the LLM returns { "answer": "...", "confidence": 1.5 } — how does Zod catch this?
  2. Why do we embed chunks during ingestion but only embed queries at runtime? Could we flip this?
  3. The system returns confidence: 0.92 for a question, but the answer is actually wrong. What went wrong and how would you improve the system?

Navigation: <- 4.13.c Prompt Construction for RAG | 4.13 Exercise Questions ->