Episode 4 — Generative AI Engineering / 4.19 — Multi Agent Architecture Concerns
4.19.d — Managing Shared State
In one sentence: Shared state is the data that flows between agents in a pipeline -- and how you design, pass, mutate, and persist that state determines whether your multi-agent system is reliable and debuggable or a tangled mess of invisible dependencies.
Navigation: <- 4.19.c Debugging Across Agents | 4.19.e -- When Not to Use Multi-Agent ->
1. What Is Shared State?
In a multi-agent pipeline, shared state is any data that one agent produces and another agent consumes. It is the connective tissue that links agents into a coherent workflow.
SHARED STATE IN A 3-AGENT PIPELINE:
User Input: "Summarize this article about climate change"
+-- Pipeline State Object ------------------------------------------+
| |
| userInput: "Summarize this article about..." |
| |
| [Agent A: Classifier] writes: |
| classification: { intent: "summarize", topic: "climate" } |
| |
| [Agent B: Retriever] reads classification, writes: |
| documents: [{ title: "...", content: "..." }, ...] |
| |
| [Agent C: Generator] reads documents + classification, writes: |
| response: "Climate change refers to..." |
| |
+--------------------------------------------------------------------+
Each agent READS from state and WRITES to state.
The state object is the single source of truth for the pipeline.
Types of shared data
| Type | Example | Challenge |
|---|---|---|
| Input data | Original user message, uploaded file | Must be preserved unchanged |
| Intermediate results | Classification, extracted entities, search results | Produced by one agent, consumed by another |
| Metadata | Trace ID, timestamps, token counts | Important for debugging, not for agent logic |
| Configuration | Model parameters, feature flags | Should be read-only |
| Accumulated context | Conversation history, memory | Grows over time, needs management |
2. State Management Patterns
Pattern 1: Pipeline state object (recommended for most cases)
A single state object travels through the pipeline. Each agent reads what it needs and writes its output.
// Define a typed pipeline state
function createPipelineState(userInput) {
return {
// Immutable metadata
traceId: crypto.randomUUID(),
startedAt: new Date().toISOString(),
userInput,
// Agent outputs (populated as pipeline runs)
classification: null,
documents: null,
response: null,
// Tracking
agentTimings: {},
errors: [],
};
}
// Each agent receives the full state and returns its contribution
async function classifyAgent(state) {
const result = await callLLM({
systemPrompt: 'Classify the user intent. Return JSON: { intent, topic, complexity }',
userMessage: state.userInput,
});
return { classification: JSON.parse(result) };
}
async function retrieveAgent(state) {
const docs = await searchKnowledgeBase(
state.classification.topic,
state.classification.intent
);
return { documents: docs };
}
async function generateAgent(state) {
const response = await callLLM({
systemPrompt: 'Generate a response using the provided documents.',
userMessage: JSON.stringify({
question: state.userInput,
intent: state.classification.intent,
sources: state.documents,
}),
});
return { response };
}
// Pipeline runner merges each agent's output into state
async function runPipeline(userInput) {
let state = createPipelineState(userInput);
const agents = [
{ name: 'classifier', fn: classifyAgent },
{ name: 'retriever', fn: retrieveAgent },
{ name: 'generator', fn: generateAgent },
];
for (const agent of agents) {
const start = performance.now();
try {
const agentOutput = await agent.fn(state);
state = { ...state, ...agentOutput }; // Merge agent output into state
state.agentTimings[agent.name] = performance.now() - start;
} catch (error) {
state.errors.push({ agent: agent.name, error: error.message });
throw error;
}
}
return state;
}
Pattern 2: Context passing (explicit parameters)
Instead of a shared state object, each agent receives only the specific data it needs. More explicit, less risk of unintended coupling.
// Each agent takes only its required inputs
async function classifyIntent(userInput) {
// Only receives userInput -- no access to downstream state
return callLLM({
systemPrompt: 'Classify intent. Return JSON: { intent, topic }',
userMessage: userInput,
});
}
async function retrieveDocuments(topic, intent) {
// Only receives what it needs from classification
return searchKnowledgeBase(topic, intent);
}
async function generateResponse(userInput, intent, documents) {
// Receives specific data, not the entire state
return callLLM({
systemPrompt: 'Generate a response using provided sources.',
userMessage: JSON.stringify({ question: userInput, intent, sources: documents }),
});
}
// Pipeline explicitly threads data between agents
async function runPipeline(userInput) {
const classification = await classifyIntent(userInput);
const { intent, topic } = JSON.parse(classification);
const documents = await retrieveDocuments(topic, intent);
const response = await generateResponse(userInput, intent, documents);
return response;
}
Pattern comparison
+-------------------------------------------------------------------+
| Pattern | Pros | Cons |
|-----------------------+-----------------------+--------------------|
| Pipeline State | - Easy to add agents | - Agent can access |
| Object | - Full context avail | data it shouldn't|
| | - Easy to log/debug | - Harder to test |
| | - State is serializable| agents in isolation|
|-----------------------+-----------------------+--------------------|
| Context Passing | - Explicit deps | - Verbose wiring |
| (explicit params) | - Easy to test alone | - Adding agents |
| | - Clear interfaces | requires rewiring|
| | - No hidden coupling | - Harder to log |
+-----------------------+-----------------------+--------------------+
Recommendation: Start with the pipeline state object for most multi-agent systems. Switch to explicit context passing if you find agents are accessing data they shouldn't, or if you need strict interface contracts.
3. Immutable State vs Mutable State
Mutable state (dangerous)
// DANGEROUS: Agents modify the state object directly
async function dangerousPipeline(userInput) {
const state = { userInput, data: {} };
await classifyAgent(state);
// state.data.intent is now set
await retrieveAgent(state);
// state.data.documents is now set
// BUT: retrieveAgent might have ALSO modified state.data.intent (bug!)
await generateAgent(state);
// Who knows what state looks like now?
return state;
}
// The problem: any agent can modify any part of state
async function retrieveAgent(state) {
// This agent accidentally overwrites another agent's data
state.data.intent = 'search'; // BUG! Changed classification result
state.data.documents = await search(state.data.intent);
}
Immutable state (safe)
// SAFE: Each agent returns new data; the runner merges it into a new state
async function safePipeline(userInput) {
const initialState = Object.freeze({
userInput,
classification: null,
documents: null,
response: null,
});
// Agent A: returns ONLY its contribution
const classResult = await classifyAgent(initialState);
const stateAfterA = Object.freeze({ ...initialState, ...classResult });
// Agent B: receives frozen state, cannot modify classification
const retrieveResult = await retrieveAgent(stateAfterA);
const stateAfterB = Object.freeze({ ...stateAfterA, ...retrieveResult });
// Agent C: receives everything, cannot modify earlier results
const generateResult = await generateAgent(stateAfterB);
const finalState = Object.freeze({ ...stateAfterB, ...generateResult });
return finalState;
}
// Utility: pipeline runner with immutable state
async function immutablePipelineRunner(initialState, agents) {
let state = Object.freeze({ ...initialState });
for (const agent of agents) {
const contribution = await agent.fn(state);
// Validate that the agent didn't try to overwrite existing fields
for (const key of Object.keys(contribution)) {
if (state[key] !== null && state[key] !== undefined) {
console.warn(
`[WARNING] Agent "${agent.name}" tried to overwrite existing field "${key}". Ignoring.`
);
delete contribution[key];
}
}
state = Object.freeze({ ...state, ...contribution });
}
return state;
}
Why immutability matters
MUTABLE STATE TIMELINE:
State: { intent: null }
After Agent A: { intent: "billing" }
After Agent B: { intent: "search" } <-- ACCIDENTALLY OVERWRITTEN!
After Agent C: generates response for "search" intent
Result: Wrong answer. Hard to debug because state was mutated in place.
IMMUTABLE STATE TIMELINE:
State v0: { intent: null } (frozen)
State v1: { intent: "billing" } (frozen, v0 preserved)
State v2: { intent: "billing", docs: [...] } (frozen, v1 preserved)
State v3: { intent: "billing", docs: [...], response: "..." } (frozen)
Result: Each version is preserved. If something goes wrong, you can
compare state at each step. Agent B CANNOT overwrite intent.
4. Race Conditions in Parallel Agent Execution
When agents run in parallel (via Promise.all), they can cause race conditions if they both try to modify shared state.
The problem
// DANGEROUS: Parallel agents writing to the same state object
async function brokenParallelPipeline(userInput) {
const state = { userInput, results: {} };
// These run at the same time and both write to state.results
await Promise.all([
sentimentAgent(state), // writes state.results.sentiment
summaryAgent(state), // writes state.results.summary
keywordAgent(state), // writes state.results.keywords
]);
// Did all three writes succeed? Maybe! But JavaScript's event loop
// means these async operations interleave unpredictably.
// If an agent does: state.results = { sentiment: "..." } instead of
// state.results.sentiment = "...", it overwrites the others' results.
return state;
}
The solution: collect results, merge after
// SAFE: Parallel agents return independent results; merge after
async function safeParallelPipeline(userInput) {
const [sentimentResult, summaryResult, keywordResult] = await Promise.all([
analyzeSentiment(userInput), // Returns { sentiment: "positive" }
generateSummary(userInput), // Returns { summary: "..." }
extractKeywords(userInput), // Returns { keywords: [...] }
]);
// Merge results into state AFTER all agents complete
const state = Object.freeze({
userInput,
...sentimentResult,
...summaryResult,
...keywordResult,
});
return state;
}
Parallel agents with dependency on shared resources
// Problem: Multiple agents querying the same rate-limited API
async function rateLimitedParallel(userInput) {
// If all 3 agents call OpenAI simultaneously and you hit rate limits,
// some will fail. Use a semaphore or queue.
const semaphore = new Semaphore(2); // Max 2 concurrent LLM calls
const results = await Promise.all([
semaphore.acquire().then(() =>
analyzeSentiment(userInput).finally(() => semaphore.release())
),
semaphore.acquire().then(() =>
generateSummary(userInput).finally(() => semaphore.release())
),
semaphore.acquire().then(() =>
extractKeywords(userInput).finally(() => semaphore.release())
),
]);
return results;
}
// Simple semaphore implementation
class Semaphore {
constructor(max) {
this.max = max;
this.current = 0;
this.queue = [];
}
acquire() {
return new Promise((resolve) => {
if (this.current < this.max) {
this.current++;
resolve();
} else {
this.queue.push(resolve);
}
});
}
release() {
this.current--;
if (this.queue.length > 0) {
this.current++;
const next = this.queue.shift();
next();
}
}
}
5. State Schema Design
Define your state schema up front. This prevents agents from producing unexpected structures and makes the pipeline self-documenting.
Using a schema to validate state
// Define the expected state shape at each pipeline stage
const STATE_SCHEMAS = {
initial: {
required: ['traceId', 'userInput'],
properties: {
traceId: { type: 'string' },
userInput: { type: 'string' },
},
},
afterClassification: {
required: ['traceId', 'userInput', 'classification'],
properties: {
traceId: { type: 'string' },
userInput: { type: 'string' },
classification: {
type: 'object',
required: ['intent', 'topic', 'complexity'],
properties: {
intent: { type: 'string', enum: ['question', 'summarize', 'analyze', 'compare'] },
topic: { type: 'string' },
complexity: { type: 'string', enum: ['simple', 'moderate', 'complex'] },
},
},
},
},
afterRetrieval: {
required: ['traceId', 'userInput', 'classification', 'documents'],
properties: {
// ... inherits above, plus:
documents: {
type: 'array',
items: {
type: 'object',
required: ['title', 'content', 'relevanceScore'],
properties: {
title: { type: 'string' },
content: { type: 'string' },
relevanceScore: { type: 'number', minimum: 0, maximum: 1 },
},
},
},
},
},
final: {
required: ['traceId', 'userInput', 'classification', 'documents', 'response'],
properties: {
// ... inherits above, plus:
response: { type: 'string', minLength: 1 },
},
},
};
// Validate state after each agent
function validateState(state, schemaName) {
const schema = STATE_SCHEMAS[schemaName];
const errors = [];
// Check required fields
for (const field of schema.required) {
if (state[field] === null || state[field] === undefined) {
errors.push(`Missing required field: ${field}`);
}
}
// Check types
for (const [field, rules] of Object.entries(schema.properties)) {
if (state[field] !== null && state[field] !== undefined) {
if (rules.type === 'string' && typeof state[field] !== 'string') {
errors.push(`${field} should be string, got ${typeof state[field]}`);
}
if (rules.type === 'array' && !Array.isArray(state[field])) {
errors.push(`${field} should be array, got ${typeof state[field]}`);
}
if (rules.enum && !rules.enum.includes(state[field])) {
errors.push(`${field} must be one of: ${rules.enum.join(', ')}. Got: ${state[field]}`);
}
}
}
if (errors.length > 0) {
console.error(`State validation failed (${schemaName}):`);
errors.forEach((e) => console.error(` - ${e}`));
throw new Error(`Invalid state at ${schemaName}: ${errors.join('; ')}`);
}
return true;
}
// Usage in pipeline
async function validatedPipeline(userInput) {
let state = createPipelineState(userInput);
validateState(state, 'initial');
state = { ...state, ...(await classifyAgent(state)) };
validateState(state, 'afterClassification');
state = { ...state, ...(await retrieveAgent(state)) };
validateState(state, 'afterRetrieval');
state = { ...state, ...(await generateAgent(state)) };
validateState(state, 'final');
return state;
}
State evolution diagram
+----------------------------------------------------------------------+
| STATE EVOLUTION THROUGH PIPELINE |
+----------------------------------------------------------------------+
| |
| Stage 0 (initial): |
| { traceId, userInput } |
| | |
| v |
| [Classifier Agent] |
| | |
| v |
| Stage 1 (afterClassification): |
| { traceId, userInput, classification: { intent, topic, complexity }}|
| | |
| v |
| [Retriever Agent] |
| | |
| v |
| Stage 2 (afterRetrieval): |
| { traceId, userInput, classification, documents: [...] } |
| | |
| v |
| [Generator Agent] |
| | |
| v |
| Stage 3 (final): |
| { traceId, userInput, classification, documents, response: "..." } |
| |
+----------------------------------------------------------------------+
6. Persisting State for Resumable Pipelines
Long-running pipelines can fail midway. If state is only in memory, you lose all progress. Persisting state allows you to resume from the last successful step.
class PersistentPipelineRunner {
constructor(storageDir = './pipeline-state') {
this.storageDir = storageDir;
}
statePath(traceId) {
return `${this.storageDir}/${traceId}.json`;
}
async saveState(state) {
const fs = await import('fs/promises');
await fs.mkdir(this.storageDir, { recursive: true });
await fs.writeFile(
this.statePath(state.traceId),
JSON.stringify(state, null, 2)
);
}
async loadState(traceId) {
const fs = await import('fs/promises');
try {
const data = await fs.readFile(this.statePath(traceId), 'utf-8');
return JSON.parse(data);
} catch {
return null;
}
}
async run(userInput, agents) {
const traceId = crypto.randomUUID();
let state = createPipelineState(userInput);
state.traceId = traceId;
state.completedAgents = [];
await this.saveState(state);
console.log(`Pipeline started: ${traceId}`);
for (const agent of agents) {
if (state.completedAgents.includes(agent.name)) {
console.log(`Skipping ${agent.name} (already completed)`);
continue;
}
try {
console.log(`Running ${agent.name}...`);
const contribution = await agent.fn(state);
state = { ...state, ...contribution };
state.completedAgents.push(agent.name);
await this.saveState(state); // Checkpoint after each agent
console.log(`${agent.name} completed. State saved.`);
} catch (error) {
state.lastError = { agent: agent.name, message: error.message };
await this.saveState(state);
console.error(`${agent.name} failed: ${error.message}`);
console.log(`Pipeline paused at ${agent.name}. Resume with: resume("${traceId}")`);
throw error;
}
}
state.completedAt = new Date().toISOString();
await this.saveState(state);
return state;
}
async resume(traceId, agents) {
const state = await this.loadState(traceId);
if (!state) throw new Error(`No saved state for trace ${traceId}`);
console.log(`Resuming pipeline ${traceId}`);
console.log(`Completed agents: ${state.completedAgents.join(', ')}`);
console.log(`Resuming from: ${state.lastError?.agent || 'next agent'}`);
// Re-run from where it left off
return this.run(state.userInput, agents);
}
}
// Usage
const runner = new PersistentPipelineRunner();
const agents = [
{ name: 'classifier', fn: classifyAgent },
{ name: 'retriever', fn: retrieveAgent },
{ name: 'generator', fn: generateAgent },
];
try {
const result = await runner.run('Explain quantum computing', agents);
} catch (error) {
// Pipeline failed midway -- state is saved
// Later, fix the issue and resume:
// const result = await runner.resume('trace-id-from-error', agents);
}
When to persist state
+-----------------------------------------------------------------+
| PERSIST STATE WHEN: |
+-----------------------------------------------------------------+
| - Pipeline takes > 30 seconds (user might navigate away) |
| - Pipeline involves expensive operations (don't repeat them) |
| - Pipeline processes user-uploaded data (can't re-upload) |
| - Pipeline is part of a batch job (need retry capability) |
| - Regulatory requirements demand audit trail |
+-----------------------------------------------------------------+
| SKIP PERSISTENCE WHEN: |
+-----------------------------------------------------------------+
| - Pipeline is fast (< 5 seconds) |
| - Pipeline is cheap to retry |
| - Pipeline runs in a request/response cycle (stateless API) |
+-----------------------------------------------------------------+
7. Anti-Patterns in State Management
| Anti-Pattern | Problem | Solution |
|---|---|---|
| Global mutable state | Any agent can corrupt any data | Use immutable state + merge pattern |
| Implicit dependencies | Agent B reads a field that Agent A might not have set | Schema validation after each step |
| Oversized state | Passing 100K tokens of context to every agent | Each agent reads only what it needs |
| No state versioning | Can't tell what state looked like at step 2 | Save snapshots at each pipeline stage |
| Stringified everything | Agents pass JSON strings instead of objects | Use typed state objects, parse once |
| No error state | When an agent fails, state is inconsistent | Include error fields, validate before each step |
8. Key Takeaways
- Shared state is the connective tissue of multi-agent pipelines. Design it explicitly from the start -- don't let it emerge accidentally.
- Use a pipeline state object for most systems. Each agent reads from it and returns its contribution, which the runner merges in.
- Prefer immutable state. Use
Object.freeze()and the spread operator to create new state versions rather than mutating in place. - Parallel agents must not write to the same state. Collect parallel results independently, then merge them after all agents complete.
- Define a state schema and validate it after each agent. Catch malformed data immediately, not three agents later.
- Persist state for long-running or expensive pipelines. This enables resumption after failures and provides an audit trail.
- Each agent should receive only the data it needs. Avoid passing the entire context window to every agent -- this wastes tokens and creates hidden coupling.
Explain-It Challenge
- Agent B expects
state.classification.intentto be one of["question", "summarize", "analyze"], but Agent A returns"ask_question". How would your state schema catch this, and what happens if it doesn't? - Two parallel agents both return a field called
confidence. How do you handle the naming collision when merging results? - Your pipeline processes a 50-page document and fails at Agent 4 of 6. Without persistent state, how much work is wasted? Design a resumable solution.
Navigation: <- 4.19.c Debugging Across Agents | 4.19.e -- When Not to Use Multi-Agent ->