Workflows in LangGraph
Workflows are the heart of LangGraph, defining how information flows through your AI system. They enable complex processing patterns beyond simple linear chains, including conditional logic, loops, and parallel execution.
⚙️ Workflow Types
1. Sequential Workflows
Linear processing where each node processes the output of the previous node.
2. Conditional Workflows
Dynamic routing based on state and conditions.
3. Looping Workflows
Iterative processing with cycles and feedback loops.
4. Parallel Workflows
Concurrent execution of multiple processing branches.
📋 Sequential Workflows
Basic Sequential Pattern
The simplest workflow pattern with linear progression.
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict
llm = ChatOpenAI(model="gpt-3.5-turbo")
class DocumentState(TypedDict):
raw_text: str
cleaned_text: str
summary: str
analysis: str
def preprocess_node(state: DocumentState):
"""Preprocess the raw text."""
raw_text = state["raw_text"]
prompt = f"Clean and preprocess this text:\n{raw_text}"
response = llm.invoke(prompt)
return {"cleaned_text": response.content}
def summarize_node(state: DocumentState):
"""Summarize the cleaned text."""
cleaned_text = state["cleaned_text"]
prompt = f"Summarize this text:\n{cleaned_text}"
response = llm.invoke(prompt)
return {"summary": response.content}
def analyze_node(state: DocumentState):
"""Analyze the summary."""
summary = state["summary"]
prompt = f"Analyze this summary for key insights:\n{summary}"
response = llm.invoke(prompt)
return {"analysis": response.content}
# Build sequential workflow
workflow = StateGraph(DocumentState)
# Add nodes
workflow.add_node("preprocess", preprocess_node)
workflow.add_node("summarize", summarize_node)
workflow.add_node("analyze", analyze_node)
# Define sequential flow
workflow.set_entry_point("preprocess")
workflow.add_edge("preprocess", "summarize")
workflow.add_edge("summarize", "analyze")
workflow.add_edge("analyze", END)
# Compile and run
app = workflow.compile()
result = app.invoke({
"raw_text": "Your document text here..."
})Advanced Sequential with Error Handling
class ProcessingState(TypedDict):
data: str
processed_data: str
error: str
step: int
completed: bool
def processing_node(state: ProcessingState):
"""Processing node with error handling."""
data = state["data"]
step = state["step"]
try:
# Simulate processing
if step < 3:
processed_data = f"Processed_{data}_step_{step + 1}"
return {
"processed_data": processed_data,
"step": step + 1,
"completed": step >= 2
}
else:
return {"completed": True}
except Exception as e:
return {
"error": str(e),
"completed": False
}
def should_continue(state: ProcessingState):
"""Check if processing should continue."""
if state.get("error"):
return "error"
elif state.get("completed", False):
return "end"
else:
return "continue"
# Build workflow with error handling
workflow = StateGraph(ProcessingState)
workflow.add_node("process", processing_node)
workflow.add_conditional_edges(
"process",
should_continue,
{
"continue": "process", # Loop back
"end": END,
"error": END
}
)🔀 Conditional Workflows
Dynamic Routing Based on State
Create workflows that make decisions based on current conditions.
class AnalysisState(TypedDict):
input_text: str
category: str
sentiment: str
response: str
confidence: float
def categorize_node(state: AnalysisState):
"""Categorize the input text."""
text = state["input_text"]
prompt = f"Categorize this text (technical/business/personal/other): {text}"
response = llm.invoke(prompt)
return {"category": response.content.strip().lower()}
def sentiment_node(state: AnalysisState):
"""Analyze sentiment of the text."""
text = state["input_text"]
prompt = f"Analyze sentiment (positive/negative/neutral): {text}"
response = llm.invoke(prompt)
return {"sentiment": response.content.strip().lower()}
def technical_response(state: AnalysisState):
"""Generate technical response."""
return {
"response": "I'll help you with the technical aspects.",
"confidence": 0.9
}
def business_response(state: AnalysisState):
"""Generate business response."""
return {
"response": "Let's discuss the business implications.",
"confidence": 0.8
}
def personal_response(state: AnalysisState):
"""Generate personal response."""
return {
"response": "I understand your personal perspective.",
"confidence": 0.7
}
def default_response(state: AnalysisState):
"""Generate default response."""
return {
"response": "I'll help you with this request.",
"confidence": 0.6
}
def route_by_category(state: AnalysisState):
"""Route to appropriate response based on category."""
category = state["category"]
routing_map = {
"technical": "technical_handler",
"business": "business_handler",
"personal": "personal_handler"
}
return routing_map.get(category, "default_handler")
# Build conditional workflow
workflow = StateGraph(AnalysisState)
# Add nodes
workflow.add_node("categorize", categorize_node)
workflow.add_node("sentiment", sentiment_node)
workflow.add_node("technical_handler", technical_response)
workflow.add_node("business_handler", business_response)
workflow.add_node("personal_handler", personal_response)
workflow.add_node("default_handler", default_response)
# Set entry and analysis flow
workflow.set_entry_point("categorize")
workflow.add_edge("categorize", "sentiment")
# Add conditional routing
workflow.add_conditional_edges(
"sentiment",
route_by_category,
{
"technical_handler": "technical_handler",
"business_handler": "business_handler",
"personal_handler": "personal_handler",
"default_handler": "default_handler"
}
)
# All response nodes end the workflow
workflow.add_edge("technical_handler", END)
workflow.add_edge("business_handler", END)
workflow.add_edge("personal_handler", END)
workflow.add_edge("default_handler", END)
app = workflow.compile()Multi-level Conditional Logic
class DecisionState(TypedDict):
request: str
priority: str # high, medium, low
category: str # technical, general, urgent
assigned_to: str
processing_time: int
status: str
def assess_priority(state: DecisionState):
"""Assess request priority."""
request = state["request"]
prompt = f"Assess priority (high/medium/low): {request}"
response = llm.invoke(prompt)
return {"priority": response.content.strip().lower()}
def categorize_request(state: DecisionState):
"""Categorize the request."""
request = state["request"]
prompt = f"Categorize request (technical/general/urgent): {request}"
response = llm.invoke(prompt)
return {"category": response.content.strip().lower()}
def route_urgent(state: DecisionState):
"""Handle urgent requests immediately."""
return {
"assigned_to": "senior_team",
"processing_time": 5,
"status": "immediate_processing"
}
def route_high_technical(state: DecisionState):
"""Handle high priority technical requests."""
return {
"assigned_to": "technical_team",
"processing_time": 30,
"status": "queued_for_tech"
}
def route_standard(state: DecisionState):
"""Handle standard requests."""
return {
"assigned_to": "general_team",
"processing_time": 60,
"status": "standard_queue"
}
def complex_routing(state: DecisionState):
"""Complex routing based on multiple factors."""
priority = state["priority"]
category = state["category"]
if category == "urgent":
return "urgent_handler"
elif priority == "high" and category == "technical":
return "high_tech_handler"
else:
return "standard_handler"
# Build complex conditional workflow
workflow = StateGraph(DecisionState)
workflow.add_node("assess_priority", assess_priority)
workflow.add_node("categorize", categorize_request)
workflow.add_node("urgent_handler", route_urgent)
workflow.add_node("high_tech_handler", route_high_technical)
workflow.add_node("standard_handler", route_standard)
workflow.set_entry_point("assess_priority")
workflow.add_edge("assess_priority", "categorize")
workflow.add_conditional_edges(
"categorize",
complex_routing,
{
"urgent_handler": "urgent_handler",
"high_tech_handler": "high_tech_handler",
"standard_handler": "standard_handler"
}
)
workflow.add_edge("urgent_handler", END)
workflow.add_edge("high_tech_handler", END)
workflow.add_edge("standard_handler", END)🔄 Looping Workflows
Iterative Processing
Create workflows that loop until conditions are met.
from langgraph.graph import StateGraph, END
class IterativeState(TypedDict):
target_quality: float
current_quality: float
attempts: int
max_attempts: int
content: str
improved_content: str
feedback: str
complete: bool
def improve_content(state: IterativeState):
"""Improve content quality."""
content = state["content"]
current_quality = state["current_quality"]
if state["attempts"] > 0:
feedback = state.get("feedback", "")
prompt = f"Improve this content based on feedback:\nOriginal: {content}\nFeedback: {feedback}"
else:
prompt = f"Improve this content:\n{content}"
response = llm.invoke(prompt)
return {
"improved_content": response.content,
"attempts": state["attempts"] + 1
}
def evaluate_quality(state: IterativeState):
"""Evaluate the quality of improved content."""
content = state["improved_content"]
prompt = f"Rate content quality (0.0-1.0): {content}"
response = llm.invoke(prompt)
try:
quality = float(response.content.strip())
return {"current_quality": min(max(quality, 0.0), 1.0)}
except ValueError:
return {"current_quality": 0.5} # Default if parsing fails
def should_continue_improving(state: IterativeState):
"""Check if improvement should continue."""
current_quality = state["current_quality"]
target_quality = state["target_quality"]
attempts = state["attempts"]
max_attempts = state["max_attempts"]
if current_quality >= target_quality:
return "complete"
elif attempts >= max_attempts:
return "max_attempts_reached"
else:
return "continue"
def generate_feedback(state: IterativeState):
"""Generate feedback for improvement."""
content = state["improved_content"]
target_quality = state["target_quality"]
current_quality = state["current_quality"]
prompt = f"Content quality is {current_quality:.2f}, target is {target_quality:.2f}. Suggest improvements: {content}"
response = llm.invoke(prompt)
return {"feedback": response.content}
def finalize(state: IterativeState):
"""Finalize the content."""
return {
"content": state["improved_content"],
"complete": True
}
def max_attempts_handler(state: IterativeState):
"""Handle maximum attempts reached."""
return {
"content": state["improved_content"],
"complete": True,
"feedback": f"Max attempts ({state['max_attempts']}) reached. Current quality: {state['current_quality']:.2f}"
}
# Build looping workflow
workflow = StateGraph(IterativeState)
workflow.add_node("improve", improve_content)
workflow.add_node("evaluate", evaluate_quality)
workflow.add_node("generate_feedback", generate_feedback)
workflow.add_node("finalize", finalize)
workflow.add_node("max_attempts_handler", max_attempts_handler)
workflow.set_entry_point("improve")
workflow.add_edge("improve", "evaluate")
workflow.add_conditional_edges(
"evaluate",
should_continue_improving,
{
"continue": "generate_feedback",
"complete": "finalize",
"max_attempts_reached": "max_attempts_handler"
}
)
workflow.add_edge("generate_feedback", "improve") # Loop back
workflow.add_edge("finalize", END)
workflow.add_edge("max_attempts_handler", END)
app = workflow.compile()
# Run with initial state
result = app.invoke({
"target_quality": 0.9,
"current_quality": 0.5,
"attempts": 0,
"max_attempts": 5,
"content": "Initial content to improve",
"improved_content": "",
"feedback": "",
"complete": False
})🔀 Parallel Workflows
Concurrent Processing
Execute multiple branches simultaneously and combine results.
from langgraph.graph import StateGraph, START, END
class ParallelState(TypedDict):
query: str
web_results: str
database_results: str
api_results: str
combined_results: str
timestamp: str
def web_search_node(state: ParallelState):
"""Perform web search."""
query = state["query"]
prompt = f"Simulate web search for: {query}"
response = llm.invoke(prompt)
return {"web_results": f"Web: {response.content}"}
def database_search_node(state: ParallelState):
"""Perform database search."""
query = state["query"]
prompt = f"Simulate database search for: {query}"
response = llm.invoke(prompt)
return {"database_results": f"Database: {response.content}"}
def api_search_node(state: ParallelState):
"""Perform API search."""
query = state["query"]
prompt = f"Simulate API search for: {query}"
response = llm.invoke(prompt)
return {"api_results": f"API: {response.content}"}
def combine_results(state: ParallelState):
"""Combine results from all sources."""
web = state["web_results"]
db = state["database_results"]
api = state["api_results"]
prompt = f"Combine these search results:\nWeb: {web}\nDatabase: {db}\nAPI: {api}"
response = llm.invoke(prompt)
return {
"combined_results": response.content,
"timestamp": datetime.now().isoformat()
}
# Build parallel workflow
workflow = StateGraph(ParallelState)
workflow.add_node("web_search", web_search_node)
workflow.add_node("database_search", database_search_node)
workflow.add_node("api_search", api_search_node)
workflow.add_node("combine", combine_results)
# Set entry point
workflow.add_edge(START, "web_search")
workflow.add_edge(START, "database_search")
workflow.add_edge(START, "api_search")
# All search nodes converge to combine
workflow.add_edge("web_search", "combine")
workflow.add_edge("database_search", "combine")
workflow.add_edge("api_search", "combine")
workflow.add_edge("combine", END)
app = workflow.compile()Parallel with Error Handling
class RobustParallelState(TypedDict):
task: str
processor1_result: str
processor2_result: str
processor3_result: str
errors: list
successful_results: list
final_result: str
def robust_processor1(state: RobustParallelState):
"""First parallel processor with error handling."""
try:
task = state["task"]
result = f"Processor1 completed: {task}"
return {"processor1_result": result}
except Exception as e:
return {"errors": state.get("errors", []) + [f"Processor1 error: {str(e)}"]}
def robust_processor2(state: RobustParallelState):
"""Second parallel processor with error handling."""
try:
task = state["task"]
result = f"Processor2 completed: {task}"
return {"processor2_result": result}
except Exception as e:
return {"errors": state.get("errors", []) + [f"Processor2 error: {str(e)}"]}
def robust_processor3(state: RobustParallelState):
"""Third parallel processor with error handling."""
try:
task = state["task"]
result = f"Processor3 completed: {task}"
return {"processor3_result": result}
except Exception as e:
return {"errors": state.get("errors", []) + [f"Processor3 error: {str(e)}"]}
def collect_results(state: RobustParallelState):
"""Collect successful results."""
results = []
if "processor1_result" in state:
results.append(state["processor1_result"])
if "processor2_result" in state:
results.append(state["processor2_result"])
if "processor3_result" in state:
results.append(state["processor3_result"])
return {"successful_results": results}
def finalize_robust(state: RobustParallelState):
"""Finalize with robust handling."""
successful_results = state["successful_results"]
errors = state.get("errors", [])
if successful_results:
prompt = f"Process these successful results: {successful_results}"
if errors:
prompt += f"\n\nErrors occurred: {errors}"
response = llm.invoke(prompt)
return {"final_result": response.content}
else:
return {"final_result": f"All processors failed. Errors: {errors}"}
# Build robust parallel workflow
workflow = StateGraph(RobustParallelState)
workflow.add_node("processor1", robust_processor1)
workflow.add_node("processor2", robust_processor2)
workflow.add_node("processor3", robust_processor3)
workflow.add_node("collect", collect_results)
workflow.add_node("finalize", finalize_robust)
# Parallel execution
workflow.add_edge(START, "processor1")
workflow.add_edge(START, "processor2")
workflow.add_edge(START, "processor3")
# Convergence
workflow.add_edge("processor1", "collect")
workflow.add_edge("processor2", "collect")
workflow.add_edge("processor3", "collect")
workflow.add_edge("collect", "finalize")
workflow.add_edge("finalize", END)🎯 Advanced Workflow Patterns
State Machine Workflow
Create finite state machine patterns.
class StateMachineState(TypedDict):
current_state: str
event: str
context: dict
history: list
class StateMachine:
"""Finite state machine for workflow control."""
def __init__(self):
self.states = {
"idle": ["start"],
"processing": ["complete", "error", "pause"],
"paused": ["resume", "cancel"],
"completed": ["reset"],
"error": ["retry", "reset"]
}
def get_next_states(self, current_state: str) -> list:
return self.states.get(current_state, [])
def can_transition(self, current_state: str, event: str) -> bool:
return event in self.get_next_states(current_state)
def state_machine_node(state: StateMachineState):
"""State machine transition handler."""
current_state = state["current_state"]
event = state["event"]
sm = StateMachine()
if sm.can_transition(current_state, event):
# Perform state transition
if current_state == "idle" and event == "start":
return {"current_state": "processing", "history": ["idle -> processing"]}
elif current_state == "processing" and event == "complete":
return {"current_state": "completed", "history": state.get("history", []) + ["processing -> completed"]}
# ... more transitions
else:
return {"current_state": "error", "history": state.get("history", []) + [f"{current_state} -> error (invalid event: {event})"]}Pipeline Workflow with Stages
class PipelineState(TypedDict):
input_data: any
stages: list
current_stage: int
stage_results: dict
completed: bool
pipeline_result: any
def pipeline_processor(state: PipelineState):
"""Process current pipeline stage."""
stages = state["stages"]
current_stage = state["current_stage"]
if current_stage < len(stages):
stage_name = stages[current_stage]
input_data = state.get("input_data")
# Process stage
if stage_name == "validate":
result = f"Validated: {input_data}"
elif stage_name == "transform":
result = f"Transformed: {input_data}"
elif stage_name == "analyze":
result = f"Analyzed: {input_data}"
else:
result = f"Unknown stage: {stage_name}"
# Update stage results
stage_results = state.get("stage_results", {})
stage_results[stage_name] = result
return {
"stage_results": stage_results,
"current_stage": current_stage + 1,
"completed": current_stage >= len(stages) - 1
}
else:
return {"completed": True}
def should_continue_pipeline(state: PipelineState):
"""Check if pipeline should continue."""
if state.get("completed", False):
return "finalize"
else:
return "continue"
def finalize_pipeline(state: PipelineState):
"""Finalize pipeline execution."""
stage_results = state.get("stage_results", {})
# Combine all stage results
combined_result = " | ".join(stage_results.values())
return {"pipeline_result": combined_result}
# Build pipeline workflow
workflow = StateGraph(PipelineState)
workflow.add_node("process_stage", pipeline_processor)
workflow.add_node("finalize", finalize_pipeline)
workflow.add_conditional_edges(
"process_stage",
should_continue_pipeline,
{
"continue": "process_stage",
"finalize": "finalize"
}
)
workflow.add_edge("finalize", END)🎯 Best Practices
1. Workflow Design
- Keep workflows focused - Single responsibility per workflow
- Test individual nodes - Unit test node functions
- Handle edge cases - Consider all possible states
- Document transitions - Clear workflow documentation
2. Performance Optimization
- Parallelize when possible - Use parallel workflows for independent tasks
- Minimize state size - Keep state payloads manageable
- Use caching - Cache expensive operations
- Monitor execution - Track performance metrics
3. Error Handling
- Validate inputs - Check state before processing
- Graceful degradation - Handle failures gracefully
- Retry mechanisms - Implement retry logic for transient failures
- Logging - Comprehensive logging for debugging
4. Testing Strategies
def test_workflow_transitions():
"""Test all possible workflow paths."""
# Test normal flow
result1 = app.invoke({"input": "test"})
assert result1["status"] == "complete"
# Test error conditions
result2 = app.invoke({"input": ""}) # Empty input
assert result2["status"] == "error"
def test_parallel_execution():
"""Test parallel workflow execution."""
start_time = time.time()
result = app.invoke({"task": "parallel test"})
execution_time = time.time() - start_time
# Parallel execution should be faster than sequential
assert execution_time < expected_sequential_timeMaster workflow patterns to create sophisticated, adaptive AI systems. Next, explore agents to build autonomous decision-making capabilities.