State Management in LangGraph
State management is the foundation of LangGraph's power. It allows workflows to maintain context, make decisions based on accumulated information, and persist state across executions.
🗄️ Understanding State
What is State in LangGraph?
State is the shared data structure that flows through your workflow nodes. It contains all the information needed for processing decisions and maintains context throughout the execution.
State Characteristics
- Immutable - Each node creates a new state rather than modifying the existing one
- Typed - Defined using TypedDict for type safety and clarity
- Accumulative - Can build up information across multiple nodes
- Accessible - All nodes can read and update state
📝 TypedDict State Definitions
Basic State Structure
Define your state using TypedDict for type safety and IDE support.
from typing import TypedDict, List, Annotated, Optional
import operator
# Simple state
class SimpleState(TypedDict):
question: str
answer: str
confidence: float
# Complex state with annotations
class ResearchState(TypedDict):
topic: str
research_data: str
analysis: str
confidence: float
completed_steps: Annotated[List[str], operator.add]
metadata: Optional[dict]
# Agent state with messages
class AgentState(TypedDict):
messages: Annotated[List[str], operator.add] # Accumulates messages
current_task: str
task_history: Annotated[List[dict], operator.add] # Accumulates task records
context: dict
current_step: intAdvanced State Patterns
Nested State Structures
class DocumentAnalysis(TypedDict):
content: str
metadata: dict
class WorkflowState(TypedDict):
documents: List[DocumentAnalysis]
current_phase: str
results: dict
errors: List[str]
configuration: dictState with Enums
from enum import Enum
class WorkflowPhase(Enum):
INITIALIZATION = "init"
PROCESSING = "processing"
REVIEW = "review"
COMPLETION = "complete"
class WorkflowState(TypedDict):
phase: WorkflowPhase
data: dict
results: Optional[dict]🔄 State Updates and Transitions
Node State Updates
Each node function receives the current state and returns updates.
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-3.5-turbo")
def research_node(state: ResearchState):
"""Research node that updates state with findings."""
topic = state["topic"]
# Perform research
prompt = f"Research the topic: {topic}. Provide key information and insights."
response = llm.invoke(prompt)
# Return state updates
return {
"research_data": response.content,
"completed_steps": ["research_completed"]
}
def analysis_node(state: ResearchState):
"""Analysis node that processes research data."""
research_data = state["research_data"]
prompt = f"Analyze this research data: {research_data}"
response = llm.invoke(prompt)
# Update multiple state fields
return {
"analysis": response.content,
"confidence": 0.85,
"completed_steps": ["analysis_completed"]
}Conditional State Updates
Update state based on conditions and previous values.
def quality_check_node(state: ResearchState):
"""Quality check node that evaluates confidence."""
confidence = state.get("confidence", 0.0)
analysis = state["analysis"]
if confidence < 0.7:
# Low confidence - request more research
return {
"confidence": confidence,
"completed_steps": ["quality_check_failed"]
}
else:
# High confidence - approve analysis
return {
"confidence": min(confidence, 1.0), # Cap at 1.0
"completed_steps": ["quality_check_passed"]
}💾 Persistence and Checkpoints
Memory Checkpoints
Save state snapshots to enable resuming workflows.
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
# Memory-based checkpoint (for development)
memory_saver = MemorySaver()
# SQLite-based checkpoint (for production)
sqlite_saver = SqliteSaver.from_conn_string("workflows.db")
# Compile graph with checkpointing
workflow = StateGraph(ResearchState)
# ... add nodes and edges ...
app = workflow.compile(checkpointer=sqlite_saver)
# Execute with thread ID for persistence
config = {"configurable": {"thread_id": "research-001"}}
# First execution
result1 = app.invoke(initial_state, config=config)
# Later resume from same thread
result2 = app.invoke(updated_state, config=config)Advanced Checkpoint Management
Checkpoint Time Travel
# Get current state
current_state = app.get_state(config=config)
print(f"Current step: {current_state.next}")
# Get checkpoint history
history = list(app.get_state_history(config=config))
for checkpoint in history[-3:]: # Last 3 checkpoints
print(f"Checkpoint: {checkpoint.config['checkpoint_id']}")
print(f"State: {checkpoint.values}")Checkpoint Filtering
# Filter checkpoints by metadata
filtered_history = [
cp for cp in app.get_state_history(config=config)
if cp.metadata.get("phase") == "research"
]🗃️ State Serialization
Custom State Serialization
Handle complex objects that need special serialization.
import json
from datetime import datetime
from langchain_core.messages import BaseMessage
class CustomState(TypedDict):
messages: List[BaseMessage]
timestamp: datetime
complex_data: dict
def custom_serializer(state: CustomState) -> dict:
"""Custom serialization for complex state."""
return {
"messages": [msg.dict() for msg in state["messages"]],
"timestamp": state["timestamp"].isoformat(),
"complex_data": state["complex_data"]
}
def custom_deserializer(data: dict) -> CustomState:
"""Custom deserialization from stored data."""
from langchain_core.messages import messages_from_dict
return {
"messages": messages_from_dict(data["messages"]),
"timestamp": datetime.fromisoformat(data["timestamp"]),
"complex_data": data["complex_data"]
}🔄 State Accumulation Patterns
Message Accumulation
Common pattern for chat-like workflows.
class ConversationState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
conversation_summary: str
user_context: dict
def add_user_message(state: ConversationState, new_message: str):
"""Add a new user message to the conversation."""
from langchain_core.messages import HumanMessage
return {
"messages": [HumanMessage(content=new_message)]
}
def add_ai_response(state: ConversationState, response: str):
"""Add AI response to the conversation."""
from langchain_core.messages import AIMessage
return {
"messages": [AIMessage(content=response)]
}
def update_summary(state: ConversationState):
"""Update conversation summary."""
messages = state["messages"]
recent_messages = messages[-6:] # Last 3 exchanges
prompt = f"Summarize this conversation: {recent_messages}"
summary = llm.invoke(prompt)
return {"conversation_summary": summary.content}Task Tracking
Track completed tasks and progress through workflows.
class ProjectState(TypedDict):
project_name: str
current_phase: str
completed_tasks: Annotated[List[dict], operator.add]
current_task: Optional[dict]
progress: float
def add_completed_task(state: ProjectState, task_name: str, result: dict):
"""Add a completed task to the project."""
task_record = {
"task": task_name,
"completed_at": datetime.now().isoformat(),
"result": result
}
# Update progress
completed_count = len(state["completed_tasks"]) + 1
total_tasks = 5 # Define your total
return {
"completed_tasks": [task_record],
"progress": completed_count / total_tasks
}🎛️ State Configuration and Initialization
Default State Values
Set up initial state with sensible defaults.
def create_initial_state(topic: str) -> ResearchState:
"""Create initial state with default values."""
return {
"topic": topic,
"research_data": "",
"analysis": "",
"confidence": 0.0,
"completed_steps": [],
"metadata": {
"created_at": datetime.now().isoformat(),
"version": "1.0"
}
}
# Usage
initial_state = create_initial_state("AI in healthcare")
result = app.invoke(initial_state)Dynamic State Configuration
Configure state based on runtime parameters.
def configure_state(topic: str, config: dict) -> ResearchState:
"""Configure state based on parameters."""
complexity = config.get("complexity", "medium")
depth = config.get("depth", 3)
return {
"topic": topic,
"research_data": "",
"analysis": "",
"confidence": 0.0,
"completed_steps": [],
"metadata": {
"complexity": complexity,
"depth": depth,
"created_at": datetime.now().isoformat()
}
}
# Usage with different configurations
simple_state = configure_state("AI", {"complexity": "simple", "depth": 1})
complex_state = configure_state("AI", {"complexity": "detailed", "depth": 5})🔍 State Debugging and Monitoring
State Inspection
Monitor state changes during workflow execution.
def debug_node(state: ResearchState):
"""Debug node that prints current state."""
print("=== Current State ===")
print(f"Topic: {state.get('topic')}")
print(f"Completed Steps: {state.get('completed_steps', [])}")
print(f"Confidence: {state.get('confidence', 0.0)}")
print("===================")
# Return empty updates (just for debugging)
return {}
# Add debug node to workflow
workflow.add_node("debug", debug_node)
workflow.add_edge("research", "debug")
workflow.add_edge("debug", "analysis")State Validation
Validate state integrity and required fields.
def validate_state(state: ResearchState) -> bool:
"""Validate state structure and required fields."""
required_fields = ["topic"]
for field in required_fields:
if field not in state or not state[field]:
return False
# Validate confidence range
confidence = state.get("confidence", 0.0)
if not 0.0 <= confidence <= 1.0:
return False
return True
def validation_node(state: ResearchState):
"""Node that validates state before processing."""
if not validate_state(state):
raise ValueError("Invalid state detected")
return {"completed_steps": ["validation_passed"]}State Metrics and Analytics
Track state changes and workflow performance.
class StateMetrics:
"""Track state-related metrics."""
def __init__(self):
self.state_changes = []
self.execution_times = []
self.error_count = 0
def record_state_change(self, node: str, state_before: dict, state_after: dict):
"""Record a state change."""
self.state_changes.append({
"node": node,
"timestamp": datetime.now().isoformat(),
"changes": self._compute_changes(state_before, state_after)
})
def _compute_changes(self, before: dict, after: dict) -> dict:
"""Compute differences between states."""
changes = {}
for key, value in after.items():
if key not in before or before[key] != value:
changes[key] = {"from": before.get(key), "to": value}
return changes
# Usage in nodes
metrics = StateMetrics()
def tracked_node(state: ResearchState):
"""Node with state tracking."""
state_before = state.copy()
# Node processing
result = research_node(state)
# Record metrics
metrics.record_state_change("research", state_before, result)
return result🎯 Best Practices
1. State Design Principles
- Keep it minimal - Only include necessary fields
- Use clear naming - Descriptive field names
- Type consistently - Use consistent types
- Document structure - Comments explaining complex fields
2. Performance Considerations
# ✅ Good: Specific field updates
def efficient_node(state: dict):
return {"specific_field": "new_value"}
# ❌ Avoid: Reconstructing entire state
def inefficient_node(state: dict):
return {
"field1": state["field1"], # Unnecessary copy
"field2": state["field2"], # Unnecessary copy
"field3": "new_value"
}3. Error Handling
def safe_node(state: ResearchState):
"""Node with error handling."""
try:
# Main processing
result = process_data(state)
return result
except Exception as e:
# Log error and return safe state
print(f"Error in node: {e}")
return {
"completed_steps": ["node_failed"],
"error": str(e)
}4. State Testing
def test_state_transitions():
"""Test state update logic."""
initial_state = {
"topic": "test",
"confidence": 0.0,
"completed_steps": []
}
# Test node
result = research_node(initial_state)
# Assertions
assert "research_data" in result
assert "research_completed" in result["completed_steps"]
assert result["confidence"] >= 0.0Master state management to build robust, reliable LangGraph workflows. Next, explore workflow patterns to create sophisticated processing logic.