Episode 4 — Generative AI Engineering / 4.19 — Multi Agent Architecture Concerns

4.19.d — Managing Shared State

In one sentence: Shared state is the data that flows between agents in a pipeline -- and how you design, pass, mutate, and persist that state determines whether your multi-agent system is reliable and debuggable or a tangled mess of invisible dependencies.

Navigation: <- 4.19.c Debugging Across Agents | 4.19.e -- When Not to Use Multi-Agent ->


1. What Is Shared State?

In a multi-agent pipeline, shared state is any data that one agent produces and another agent consumes. It is the connective tissue that links agents into a coherent workflow.

SHARED STATE IN A 3-AGENT PIPELINE:

  User Input: "Summarize this article about climate change"

  +-- Pipeline State Object ------------------------------------------+
  |                                                                    |
  |  userInput: "Summarize this article about..."                      |
  |                                                                    |
  |  [Agent A: Classifier] writes:                                     |
  |    classification: { intent: "summarize", topic: "climate" }       |
  |                                                                    |
  |  [Agent B: Retriever] reads classification, writes:                |
  |    documents: [{ title: "...", content: "..." }, ...]              |
  |                                                                    |
  |  [Agent C: Generator] reads documents + classification, writes:    |
  |    response: "Climate change refers to..."                         |
  |                                                                    |
  +--------------------------------------------------------------------+

  Each agent READS from state and WRITES to state.
  The state object is the single source of truth for the pipeline.

Types of shared data

TypeExampleChallenge
Input dataOriginal user message, uploaded fileMust be preserved unchanged
Intermediate resultsClassification, extracted entities, search resultsProduced by one agent, consumed by another
MetadataTrace ID, timestamps, token countsImportant for debugging, not for agent logic
ConfigurationModel parameters, feature flagsShould be read-only
Accumulated contextConversation history, memoryGrows over time, needs management

2. State Management Patterns

Pattern 1: Pipeline state object (recommended for most cases)

A single state object travels through the pipeline. Each agent reads what it needs and writes its output.

// Define a typed pipeline state
function createPipelineState(userInput) {
  return {
    // Immutable metadata
    traceId: crypto.randomUUID(),
    startedAt: new Date().toISOString(),
    userInput,

    // Agent outputs (populated as pipeline runs)
    classification: null,
    documents: null,
    response: null,

    // Tracking
    agentTimings: {},
    errors: [],
  };
}

// Each agent receives the full state and returns its contribution
async function classifyAgent(state) {
  const result = await callLLM({
    systemPrompt: 'Classify the user intent. Return JSON: { intent, topic, complexity }',
    userMessage: state.userInput,
  });
  return { classification: JSON.parse(result) };
}

async function retrieveAgent(state) {
  const docs = await searchKnowledgeBase(
    state.classification.topic,
    state.classification.intent
  );
  return { documents: docs };
}

async function generateAgent(state) {
  const response = await callLLM({
    systemPrompt: 'Generate a response using the provided documents.',
    userMessage: JSON.stringify({
      question: state.userInput,
      intent: state.classification.intent,
      sources: state.documents,
    }),
  });
  return { response };
}

// Pipeline runner merges each agent's output into state
async function runPipeline(userInput) {
  let state = createPipelineState(userInput);

  const agents = [
    { name: 'classifier', fn: classifyAgent },
    { name: 'retriever', fn: retrieveAgent },
    { name: 'generator', fn: generateAgent },
  ];

  for (const agent of agents) {
    const start = performance.now();
    try {
      const agentOutput = await agent.fn(state);
      state = { ...state, ...agentOutput }; // Merge agent output into state
      state.agentTimings[agent.name] = performance.now() - start;
    } catch (error) {
      state.errors.push({ agent: agent.name, error: error.message });
      throw error;
    }
  }

  return state;
}

Pattern 2: Context passing (explicit parameters)

Instead of a shared state object, each agent receives only the specific data it needs. More explicit, less risk of unintended coupling.

// Each agent takes only its required inputs
async function classifyIntent(userInput) {
  // Only receives userInput -- no access to downstream state
  return callLLM({
    systemPrompt: 'Classify intent. Return JSON: { intent, topic }',
    userMessage: userInput,
  });
}

async function retrieveDocuments(topic, intent) {
  // Only receives what it needs from classification
  return searchKnowledgeBase(topic, intent);
}

async function generateResponse(userInput, intent, documents) {
  // Receives specific data, not the entire state
  return callLLM({
    systemPrompt: 'Generate a response using provided sources.',
    userMessage: JSON.stringify({ question: userInput, intent, sources: documents }),
  });
}

// Pipeline explicitly threads data between agents
async function runPipeline(userInput) {
  const classification = await classifyIntent(userInput);
  const { intent, topic } = JSON.parse(classification);

  const documents = await retrieveDocuments(topic, intent);

  const response = await generateResponse(userInput, intent, documents);

  return response;
}

Pattern comparison

+-------------------------------------------------------------------+
|  Pattern              | Pros                  | Cons               |
|-----------------------+-----------------------+--------------------|
|  Pipeline State       | - Easy to add agents  | - Agent can access |
|  Object               | - Full context avail  |   data it shouldn't|
|                       | - Easy to log/debug   | - Harder to test   |
|                       | - State is serializable|  agents in isolation|
|-----------------------+-----------------------+--------------------|
|  Context Passing      | - Explicit deps       | - Verbose wiring   |
|  (explicit params)    | - Easy to test alone  | - Adding agents    |
|                       | - Clear interfaces    |   requires rewiring|
|                       | - No hidden coupling  | - Harder to log    |
+-----------------------+-----------------------+--------------------+

Recommendation: Start with the pipeline state object for most multi-agent systems. Switch to explicit context passing if you find agents are accessing data they shouldn't, or if you need strict interface contracts.


3. Immutable State vs Mutable State

Mutable state (dangerous)

// DANGEROUS: Agents modify the state object directly
async function dangerousPipeline(userInput) {
  const state = { userInput, data: {} };

  await classifyAgent(state);
  // state.data.intent is now set

  await retrieveAgent(state);
  // state.data.documents is now set
  // BUT: retrieveAgent might have ALSO modified state.data.intent (bug!)

  await generateAgent(state);
  // Who knows what state looks like now?

  return state;
}

// The problem: any agent can modify any part of state
async function retrieveAgent(state) {
  // This agent accidentally overwrites another agent's data
  state.data.intent = 'search'; // BUG! Changed classification result
  state.data.documents = await search(state.data.intent);
}

Immutable state (safe)

// SAFE: Each agent returns new data; the runner merges it into a new state
async function safePipeline(userInput) {
  const initialState = Object.freeze({
    userInput,
    classification: null,
    documents: null,
    response: null,
  });

  // Agent A: returns ONLY its contribution
  const classResult = await classifyAgent(initialState);
  const stateAfterA = Object.freeze({ ...initialState, ...classResult });

  // Agent B: receives frozen state, cannot modify classification
  const retrieveResult = await retrieveAgent(stateAfterA);
  const stateAfterB = Object.freeze({ ...stateAfterA, ...retrieveResult });

  // Agent C: receives everything, cannot modify earlier results
  const generateResult = await generateAgent(stateAfterB);
  const finalState = Object.freeze({ ...stateAfterB, ...generateResult });

  return finalState;
}

// Utility: pipeline runner with immutable state
async function immutablePipelineRunner(initialState, agents) {
  let state = Object.freeze({ ...initialState });

  for (const agent of agents) {
    const contribution = await agent.fn(state);

    // Validate that the agent didn't try to overwrite existing fields
    for (const key of Object.keys(contribution)) {
      if (state[key] !== null && state[key] !== undefined) {
        console.warn(
          `[WARNING] Agent "${agent.name}" tried to overwrite existing field "${key}". Ignoring.`
        );
        delete contribution[key];
      }
    }

    state = Object.freeze({ ...state, ...contribution });
  }

  return state;
}

Why immutability matters

MUTABLE STATE TIMELINE:
  State: { intent: null }
  After Agent A: { intent: "billing" }
  After Agent B: { intent: "search" }     <-- ACCIDENTALLY OVERWRITTEN!
  After Agent C: generates response for "search" intent
  Result: Wrong answer. Hard to debug because state was mutated in place.

IMMUTABLE STATE TIMELINE:
  State v0: { intent: null }                      (frozen)
  State v1: { intent: "billing" }                 (frozen, v0 preserved)
  State v2: { intent: "billing", docs: [...] }    (frozen, v1 preserved)
  State v3: { intent: "billing", docs: [...], response: "..." } (frozen)
  Result: Each version is preserved. If something goes wrong, you can
          compare state at each step. Agent B CANNOT overwrite intent.

4. Race Conditions in Parallel Agent Execution

When agents run in parallel (via Promise.all), they can cause race conditions if they both try to modify shared state.

The problem

// DANGEROUS: Parallel agents writing to the same state object
async function brokenParallelPipeline(userInput) {
  const state = { userInput, results: {} };

  // These run at the same time and both write to state.results
  await Promise.all([
    sentimentAgent(state),  // writes state.results.sentiment
    summaryAgent(state),    // writes state.results.summary
    keywordAgent(state),    // writes state.results.keywords
  ]);

  // Did all three writes succeed? Maybe! But JavaScript's event loop
  // means these async operations interleave unpredictably.
  // If an agent does: state.results = { sentiment: "..." } instead of
  // state.results.sentiment = "...", it overwrites the others' results.

  return state;
}

The solution: collect results, merge after

// SAFE: Parallel agents return independent results; merge after
async function safeParallelPipeline(userInput) {
  const [sentimentResult, summaryResult, keywordResult] = await Promise.all([
    analyzeSentiment(userInput),  // Returns { sentiment: "positive" }
    generateSummary(userInput),   // Returns { summary: "..." }
    extractKeywords(userInput),   // Returns { keywords: [...] }
  ]);

  // Merge results into state AFTER all agents complete
  const state = Object.freeze({
    userInput,
    ...sentimentResult,
    ...summaryResult,
    ...keywordResult,
  });

  return state;
}

Parallel agents with dependency on shared resources

// Problem: Multiple agents querying the same rate-limited API
async function rateLimitedParallel(userInput) {
  // If all 3 agents call OpenAI simultaneously and you hit rate limits,
  // some will fail. Use a semaphore or queue.

  const semaphore = new Semaphore(2); // Max 2 concurrent LLM calls

  const results = await Promise.all([
    semaphore.acquire().then(() =>
      analyzeSentiment(userInput).finally(() => semaphore.release())
    ),
    semaphore.acquire().then(() =>
      generateSummary(userInput).finally(() => semaphore.release())
    ),
    semaphore.acquire().then(() =>
      extractKeywords(userInput).finally(() => semaphore.release())
    ),
  ]);

  return results;
}

// Simple semaphore implementation
class Semaphore {
  constructor(max) {
    this.max = max;
    this.current = 0;
    this.queue = [];
  }

  acquire() {
    return new Promise((resolve) => {
      if (this.current < this.max) {
        this.current++;
        resolve();
      } else {
        this.queue.push(resolve);
      }
    });
  }

  release() {
    this.current--;
    if (this.queue.length > 0) {
      this.current++;
      const next = this.queue.shift();
      next();
    }
  }
}

5. State Schema Design

Define your state schema up front. This prevents agents from producing unexpected structures and makes the pipeline self-documenting.

Using a schema to validate state

// Define the expected state shape at each pipeline stage
const STATE_SCHEMAS = {
  initial: {
    required: ['traceId', 'userInput'],
    properties: {
      traceId: { type: 'string' },
      userInput: { type: 'string' },
    },
  },

  afterClassification: {
    required: ['traceId', 'userInput', 'classification'],
    properties: {
      traceId: { type: 'string' },
      userInput: { type: 'string' },
      classification: {
        type: 'object',
        required: ['intent', 'topic', 'complexity'],
        properties: {
          intent: { type: 'string', enum: ['question', 'summarize', 'analyze', 'compare'] },
          topic: { type: 'string' },
          complexity: { type: 'string', enum: ['simple', 'moderate', 'complex'] },
        },
      },
    },
  },

  afterRetrieval: {
    required: ['traceId', 'userInput', 'classification', 'documents'],
    properties: {
      // ... inherits above, plus:
      documents: {
        type: 'array',
        items: {
          type: 'object',
          required: ['title', 'content', 'relevanceScore'],
          properties: {
            title: { type: 'string' },
            content: { type: 'string' },
            relevanceScore: { type: 'number', minimum: 0, maximum: 1 },
          },
        },
      },
    },
  },

  final: {
    required: ['traceId', 'userInput', 'classification', 'documents', 'response'],
    properties: {
      // ... inherits above, plus:
      response: { type: 'string', minLength: 1 },
    },
  },
};

// Validate state after each agent
function validateState(state, schemaName) {
  const schema = STATE_SCHEMAS[schemaName];
  const errors = [];

  // Check required fields
  for (const field of schema.required) {
    if (state[field] === null || state[field] === undefined) {
      errors.push(`Missing required field: ${field}`);
    }
  }

  // Check types
  for (const [field, rules] of Object.entries(schema.properties)) {
    if (state[field] !== null && state[field] !== undefined) {
      if (rules.type === 'string' && typeof state[field] !== 'string') {
        errors.push(`${field} should be string, got ${typeof state[field]}`);
      }
      if (rules.type === 'array' && !Array.isArray(state[field])) {
        errors.push(`${field} should be array, got ${typeof state[field]}`);
      }
      if (rules.enum && !rules.enum.includes(state[field])) {
        errors.push(`${field} must be one of: ${rules.enum.join(', ')}. Got: ${state[field]}`);
      }
    }
  }

  if (errors.length > 0) {
    console.error(`State validation failed (${schemaName}):`);
    errors.forEach((e) => console.error(`  - ${e}`));
    throw new Error(`Invalid state at ${schemaName}: ${errors.join('; ')}`);
  }

  return true;
}

// Usage in pipeline
async function validatedPipeline(userInput) {
  let state = createPipelineState(userInput);
  validateState(state, 'initial');

  state = { ...state, ...(await classifyAgent(state)) };
  validateState(state, 'afterClassification');

  state = { ...state, ...(await retrieveAgent(state)) };
  validateState(state, 'afterRetrieval');

  state = { ...state, ...(await generateAgent(state)) };
  validateState(state, 'final');

  return state;
}

State evolution diagram

+----------------------------------------------------------------------+
|  STATE EVOLUTION THROUGH PIPELINE                                    |
+----------------------------------------------------------------------+
|                                                                      |
|  Stage 0 (initial):                                                  |
|  { traceId, userInput }                                              |
|       |                                                              |
|       v                                                              |
|  [Classifier Agent]                                                  |
|       |                                                              |
|       v                                                              |
|  Stage 1 (afterClassification):                                      |
|  { traceId, userInput, classification: { intent, topic, complexity }}|
|       |                                                              |
|       v                                                              |
|  [Retriever Agent]                                                   |
|       |                                                              |
|       v                                                              |
|  Stage 2 (afterRetrieval):                                           |
|  { traceId, userInput, classification, documents: [...] }            |
|       |                                                              |
|       v                                                              |
|  [Generator Agent]                                                   |
|       |                                                              |
|       v                                                              |
|  Stage 3 (final):                                                    |
|  { traceId, userInput, classification, documents, response: "..." }  |
|                                                                      |
+----------------------------------------------------------------------+

6. Persisting State for Resumable Pipelines

Long-running pipelines can fail midway. If state is only in memory, you lose all progress. Persisting state allows you to resume from the last successful step.

class PersistentPipelineRunner {
  constructor(storageDir = './pipeline-state') {
    this.storageDir = storageDir;
  }

  statePath(traceId) {
    return `${this.storageDir}/${traceId}.json`;
  }

  async saveState(state) {
    const fs = await import('fs/promises');
    await fs.mkdir(this.storageDir, { recursive: true });
    await fs.writeFile(
      this.statePath(state.traceId),
      JSON.stringify(state, null, 2)
    );
  }

  async loadState(traceId) {
    const fs = await import('fs/promises');
    try {
      const data = await fs.readFile(this.statePath(traceId), 'utf-8');
      return JSON.parse(data);
    } catch {
      return null;
    }
  }

  async run(userInput, agents) {
    const traceId = crypto.randomUUID();
    let state = createPipelineState(userInput);
    state.traceId = traceId;
    state.completedAgents = [];

    await this.saveState(state);
    console.log(`Pipeline started: ${traceId}`);

    for (const agent of agents) {
      if (state.completedAgents.includes(agent.name)) {
        console.log(`Skipping ${agent.name} (already completed)`);
        continue;
      }

      try {
        console.log(`Running ${agent.name}...`);
        const contribution = await agent.fn(state);
        state = { ...state, ...contribution };
        state.completedAgents.push(agent.name);
        await this.saveState(state); // Checkpoint after each agent
        console.log(`${agent.name} completed. State saved.`);
      } catch (error) {
        state.lastError = { agent: agent.name, message: error.message };
        await this.saveState(state);
        console.error(`${agent.name} failed: ${error.message}`);
        console.log(`Pipeline paused at ${agent.name}. Resume with: resume("${traceId}")`);
        throw error;
      }
    }

    state.completedAt = new Date().toISOString();
    await this.saveState(state);
    return state;
  }

  async resume(traceId, agents) {
    const state = await this.loadState(traceId);
    if (!state) throw new Error(`No saved state for trace ${traceId}`);

    console.log(`Resuming pipeline ${traceId}`);
    console.log(`Completed agents: ${state.completedAgents.join(', ')}`);
    console.log(`Resuming from: ${state.lastError?.agent || 'next agent'}`);

    // Re-run from where it left off
    return this.run(state.userInput, agents);
  }
}

// Usage
const runner = new PersistentPipelineRunner();

const agents = [
  { name: 'classifier', fn: classifyAgent },
  { name: 'retriever', fn: retrieveAgent },
  { name: 'generator', fn: generateAgent },
];

try {
  const result = await runner.run('Explain quantum computing', agents);
} catch (error) {
  // Pipeline failed midway -- state is saved
  // Later, fix the issue and resume:
  // const result = await runner.resume('trace-id-from-error', agents);
}

When to persist state

+-----------------------------------------------------------------+
|  PERSIST STATE WHEN:                                            |
+-----------------------------------------------------------------+
|  - Pipeline takes > 30 seconds (user might navigate away)       |
|  - Pipeline involves expensive operations (don't repeat them)   |
|  - Pipeline processes user-uploaded data (can't re-upload)      |
|  - Pipeline is part of a batch job (need retry capability)      |
|  - Regulatory requirements demand audit trail                   |
+-----------------------------------------------------------------+
|  SKIP PERSISTENCE WHEN:                                         |
+-----------------------------------------------------------------+
|  - Pipeline is fast (< 5 seconds)                               |
|  - Pipeline is cheap to retry                                   |
|  - Pipeline runs in a request/response cycle (stateless API)    |
+-----------------------------------------------------------------+

7. Anti-Patterns in State Management

Anti-PatternProblemSolution
Global mutable stateAny agent can corrupt any dataUse immutable state + merge pattern
Implicit dependenciesAgent B reads a field that Agent A might not have setSchema validation after each step
Oversized statePassing 100K tokens of context to every agentEach agent reads only what it needs
No state versioningCan't tell what state looked like at step 2Save snapshots at each pipeline stage
Stringified everythingAgents pass JSON strings instead of objectsUse typed state objects, parse once
No error stateWhen an agent fails, state is inconsistentInclude error fields, validate before each step

8. Key Takeaways

  1. Shared state is the connective tissue of multi-agent pipelines. Design it explicitly from the start -- don't let it emerge accidentally.
  2. Use a pipeline state object for most systems. Each agent reads from it and returns its contribution, which the runner merges in.
  3. Prefer immutable state. Use Object.freeze() and the spread operator to create new state versions rather than mutating in place.
  4. Parallel agents must not write to the same state. Collect parallel results independently, then merge them after all agents complete.
  5. Define a state schema and validate it after each agent. Catch malformed data immediately, not three agents later.
  6. Persist state for long-running or expensive pipelines. This enables resumption after failures and provides an audit trail.
  7. Each agent should receive only the data it needs. Avoid passing the entire context window to every agent -- this wastes tokens and creates hidden coupling.

Explain-It Challenge

  1. Agent B expects state.classification.intent to be one of ["question", "summarize", "analyze"], but Agent A returns "ask_question". How would your state schema catch this, and what happens if it doesn't?
  2. Two parallel agents both return a field called confidence. How do you handle the naming collision when merging results?
  3. Your pipeline processes a 50-page document and fails at Agent 4 of 6. Without persistent state, how much work is wasted? Design a resumable solution.

Navigation: <- 4.19.c Debugging Across Agents | 4.19.e -- When Not to Use Multi-Agent ->