AI Technology
LangGraph
Workflows

Workflows in LangGraph

Workflows are the heart of LangGraph, defining how information flows through your AI system. They enable complex processing patterns beyond simple linear chains, including conditional logic, loops, and parallel execution.

⚙️ Workflow Types

1. Sequential Workflows

Linear processing where each node processes the output of the previous node.

2. Conditional Workflows

Dynamic routing based on state and conditions.

3. Looping Workflows

Iterative processing with cycles and feedback loops.

4. Parallel Workflows

Concurrent execution of multiple processing branches.

📋 Sequential Workflows

Basic Sequential Pattern

The simplest workflow pattern with linear progression.

from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict
 
llm = ChatOpenAI(model="gpt-3.5-turbo")
 
class DocumentState(TypedDict):
    raw_text: str
    cleaned_text: str
    summary: str
    analysis: str
 
def preprocess_node(state: DocumentState):
    """Preprocess the raw text."""
    raw_text = state["raw_text"]
 
    prompt = f"Clean and preprocess this text:\n{raw_text}"
    response = llm.invoke(prompt)
 
    return {"cleaned_text": response.content}
 
def summarize_node(state: DocumentState):
    """Summarize the cleaned text."""
    cleaned_text = state["cleaned_text"]
 
    prompt = f"Summarize this text:\n{cleaned_text}"
    response = llm.invoke(prompt)
 
    return {"summary": response.content}
 
def analyze_node(state: DocumentState):
    """Analyze the summary."""
    summary = state["summary"]
 
    prompt = f"Analyze this summary for key insights:\n{summary}"
    response = llm.invoke(prompt)
 
    return {"analysis": response.content}
 
# Build sequential workflow
workflow = StateGraph(DocumentState)
 
# Add nodes
workflow.add_node("preprocess", preprocess_node)
workflow.add_node("summarize", summarize_node)
workflow.add_node("analyze", analyze_node)
 
# Define sequential flow
workflow.set_entry_point("preprocess")
workflow.add_edge("preprocess", "summarize")
workflow.add_edge("summarize", "analyze")
workflow.add_edge("analyze", END)
 
# Compile and run
app = workflow.compile()
 
result = app.invoke({
    "raw_text": "Your document text here..."
})

Advanced Sequential with Error Handling

class ProcessingState(TypedDict):
    data: str
    processed_data: str
    error: str
    step: int
    completed: bool
 
def processing_node(state: ProcessingState):
    """Processing node with error handling."""
    data = state["data"]
    step = state["step"]
 
    try:
        # Simulate processing
        if step < 3:
            processed_data = f"Processed_{data}_step_{step + 1}"
            return {
                "processed_data": processed_data,
                "step": step + 1,
                "completed": step >= 2
            }
        else:
            return {"completed": True}
    except Exception as e:
        return {
            "error": str(e),
            "completed": False
        }
 
def should_continue(state: ProcessingState):
    """Check if processing should continue."""
    if state.get("error"):
        return "error"
    elif state.get("completed", False):
        return "end"
    else:
        return "continue"
 
# Build workflow with error handling
workflow = StateGraph(ProcessingState)
workflow.add_node("process", processing_node)
 
workflow.add_conditional_edges(
    "process",
    should_continue,
    {
        "continue": "process",  # Loop back
        "end": END,
        "error": END
    }
)

🔀 Conditional Workflows

Dynamic Routing Based on State

Create workflows that make decisions based on current conditions.

class AnalysisState(TypedDict):
    input_text: str
    category: str
    sentiment: str
    response: str
    confidence: float
 
def categorize_node(state: AnalysisState):
    """Categorize the input text."""
    text = state["input_text"]
 
    prompt = f"Categorize this text (technical/business/personal/other): {text}"
    response = llm.invoke(prompt)
 
    return {"category": response.content.strip().lower()}
 
def sentiment_node(state: AnalysisState):
    """Analyze sentiment of the text."""
    text = state["input_text"]
 
    prompt = f"Analyze sentiment (positive/negative/neutral): {text}"
    response = llm.invoke(prompt)
 
    return {"sentiment": response.content.strip().lower()}
 
def technical_response(state: AnalysisState):
    """Generate technical response."""
    return {
        "response": "I'll help you with the technical aspects.",
        "confidence": 0.9
    }
 
def business_response(state: AnalysisState):
    """Generate business response."""
    return {
        "response": "Let's discuss the business implications.",
        "confidence": 0.8
    }
 
def personal_response(state: AnalysisState):
    """Generate personal response."""
    return {
        "response": "I understand your personal perspective.",
        "confidence": 0.7
    }
 
def default_response(state: AnalysisState):
    """Generate default response."""
    return {
        "response": "I'll help you with this request.",
        "confidence": 0.6
    }
 
def route_by_category(state: AnalysisState):
    """Route to appropriate response based on category."""
    category = state["category"]
 
    routing_map = {
        "technical": "technical_handler",
        "business": "business_handler",
        "personal": "personal_handler"
    }
 
    return routing_map.get(category, "default_handler")
 
# Build conditional workflow
workflow = StateGraph(AnalysisState)
 
# Add nodes
workflow.add_node("categorize", categorize_node)
workflow.add_node("sentiment", sentiment_node)
workflow.add_node("technical_handler", technical_response)
workflow.add_node("business_handler", business_response)
workflow.add_node("personal_handler", personal_response)
workflow.add_node("default_handler", default_response)
 
# Set entry and analysis flow
workflow.set_entry_point("categorize")
workflow.add_edge("categorize", "sentiment")
 
# Add conditional routing
workflow.add_conditional_edges(
    "sentiment",
    route_by_category,
    {
        "technical_handler": "technical_handler",
        "business_handler": "business_handler",
        "personal_handler": "personal_handler",
        "default_handler": "default_handler"
    }
)
 
# All response nodes end the workflow
workflow.add_edge("technical_handler", END)
workflow.add_edge("business_handler", END)
workflow.add_edge("personal_handler", END)
workflow.add_edge("default_handler", END)
 
app = workflow.compile()

Multi-level Conditional Logic

class DecisionState(TypedDict):
    request: str
    priority: str  # high, medium, low
    category: str  # technical, general, urgent
    assigned_to: str
    processing_time: int
    status: str
 
def assess_priority(state: DecisionState):
    """Assess request priority."""
    request = state["request"]
 
    prompt = f"Assess priority (high/medium/low): {request}"
    response = llm.invoke(prompt)
 
    return {"priority": response.content.strip().lower()}
 
def categorize_request(state: DecisionState):
    """Categorize the request."""
    request = state["request"]
 
    prompt = f"Categorize request (technical/general/urgent): {request}"
    response = llm.invoke(prompt)
 
    return {"category": response.content.strip().lower()}
 
def route_urgent(state: DecisionState):
    """Handle urgent requests immediately."""
    return {
        "assigned_to": "senior_team",
        "processing_time": 5,
        "status": "immediate_processing"
    }
 
def route_high_technical(state: DecisionState):
    """Handle high priority technical requests."""
    return {
        "assigned_to": "technical_team",
        "processing_time": 30,
        "status": "queued_for_tech"
    }
 
def route_standard(state: DecisionState):
    """Handle standard requests."""
    return {
        "assigned_to": "general_team",
        "processing_time": 60,
        "status": "standard_queue"
    }
 
def complex_routing(state: DecisionState):
    """Complex routing based on multiple factors."""
    priority = state["priority"]
    category = state["category"]
 
    if category == "urgent":
        return "urgent_handler"
    elif priority == "high" and category == "technical":
        return "high_tech_handler"
    else:
        return "standard_handler"
 
# Build complex conditional workflow
workflow = StateGraph(DecisionState)
 
workflow.add_node("assess_priority", assess_priority)
workflow.add_node("categorize", categorize_request)
workflow.add_node("urgent_handler", route_urgent)
workflow.add_node("high_tech_handler", route_high_technical)
workflow.add_node("standard_handler", route_standard)
 
workflow.set_entry_point("assess_priority")
workflow.add_edge("assess_priority", "categorize")
 
workflow.add_conditional_edges(
    "categorize",
    complex_routing,
    {
        "urgent_handler": "urgent_handler",
        "high_tech_handler": "high_tech_handler",
        "standard_handler": "standard_handler"
    }
)
 
workflow.add_edge("urgent_handler", END)
workflow.add_edge("high_tech_handler", END)
workflow.add_edge("standard_handler", END)

🔄 Looping Workflows

Iterative Processing

Create workflows that loop until conditions are met.

from langgraph.graph import StateGraph, END
 
class IterativeState(TypedDict):
    target_quality: float
    current_quality: float
    attempts: int
    max_attempts: int
    content: str
    improved_content: str
    feedback: str
    complete: bool
 
def improve_content(state: IterativeState):
    """Improve content quality."""
    content = state["content"]
    current_quality = state["current_quality"]
 
    if state["attempts"] > 0:
        feedback = state.get("feedback", "")
        prompt = f"Improve this content based on feedback:\nOriginal: {content}\nFeedback: {feedback}"
    else:
        prompt = f"Improve this content:\n{content}"
 
    response = llm.invoke(prompt)
 
    return {
        "improved_content": response.content,
        "attempts": state["attempts"] + 1
    }
 
def evaluate_quality(state: IterativeState):
    """Evaluate the quality of improved content."""
    content = state["improved_content"]
 
    prompt = f"Rate content quality (0.0-1.0): {content}"
    response = llm.invoke(prompt)
 
    try:
        quality = float(response.content.strip())
        return {"current_quality": min(max(quality, 0.0), 1.0)}
    except ValueError:
        return {"current_quality": 0.5}  # Default if parsing fails
 
def should_continue_improving(state: IterativeState):
    """Check if improvement should continue."""
    current_quality = state["current_quality"]
    target_quality = state["target_quality"]
    attempts = state["attempts"]
    max_attempts = state["max_attempts"]
 
    if current_quality >= target_quality:
        return "complete"
    elif attempts >= max_attempts:
        return "max_attempts_reached"
    else:
        return "continue"
 
def generate_feedback(state: IterativeState):
    """Generate feedback for improvement."""
    content = state["improved_content"]
    target_quality = state["target_quality"]
    current_quality = state["current_quality"]
 
    prompt = f"Content quality is {current_quality:.2f}, target is {target_quality:.2f}. Suggest improvements: {content}"
    response = llm.invoke(prompt)
 
    return {"feedback": response.content}
 
def finalize(state: IterativeState):
    """Finalize the content."""
    return {
        "content": state["improved_content"],
        "complete": True
    }
 
def max_attempts_handler(state: IterativeState):
    """Handle maximum attempts reached."""
    return {
        "content": state["improved_content"],
        "complete": True,
        "feedback": f"Max attempts ({state['max_attempts']}) reached. Current quality: {state['current_quality']:.2f}"
    }
 
# Build looping workflow
workflow = StateGraph(IterativeState)
 
workflow.add_node("improve", improve_content)
workflow.add_node("evaluate", evaluate_quality)
workflow.add_node("generate_feedback", generate_feedback)
workflow.add_node("finalize", finalize)
workflow.add_node("max_attempts_handler", max_attempts_handler)
 
workflow.set_entry_point("improve")
workflow.add_edge("improve", "evaluate")
 
workflow.add_conditional_edges(
    "evaluate",
    should_continue_improving,
    {
        "continue": "generate_feedback",
        "complete": "finalize",
        "max_attempts_reached": "max_attempts_handler"
    }
)
 
workflow.add_edge("generate_feedback", "improve")  # Loop back
workflow.add_edge("finalize", END)
workflow.add_edge("max_attempts_handler", END)
 
app = workflow.compile()
 
# Run with initial state
result = app.invoke({
    "target_quality": 0.9,
    "current_quality": 0.5,
    "attempts": 0,
    "max_attempts": 5,
    "content": "Initial content to improve",
    "improved_content": "",
    "feedback": "",
    "complete": False
})

🔀 Parallel Workflows

Concurrent Processing

Execute multiple branches simultaneously and combine results.

from langgraph.graph import StateGraph, START, END
 
class ParallelState(TypedDict):
    query: str
    web_results: str
    database_results: str
    api_results: str
    combined_results: str
    timestamp: str
 
def web_search_node(state: ParallelState):
    """Perform web search."""
    query = state["query"]
 
    prompt = f"Simulate web search for: {query}"
    response = llm.invoke(prompt)
 
    return {"web_results": f"Web: {response.content}"}
 
def database_search_node(state: ParallelState):
    """Perform database search."""
    query = state["query"]
 
    prompt = f"Simulate database search for: {query}"
    response = llm.invoke(prompt)
 
    return {"database_results": f"Database: {response.content}"}
 
def api_search_node(state: ParallelState):
    """Perform API search."""
    query = state["query"]
 
    prompt = f"Simulate API search for: {query}"
    response = llm.invoke(prompt)
 
    return {"api_results": f"API: {response.content}"}
 
def combine_results(state: ParallelState):
    """Combine results from all sources."""
    web = state["web_results"]
    db = state["database_results"]
    api = state["api_results"]
 
    prompt = f"Combine these search results:\nWeb: {web}\nDatabase: {db}\nAPI: {api}"
    response = llm.invoke(prompt)
 
    return {
        "combined_results": response.content,
        "timestamp": datetime.now().isoformat()
    }
 
# Build parallel workflow
workflow = StateGraph(ParallelState)
 
workflow.add_node("web_search", web_search_node)
workflow.add_node("database_search", database_search_node)
workflow.add_node("api_search", api_search_node)
workflow.add_node("combine", combine_results)
 
# Set entry point
workflow.add_edge(START, "web_search")
workflow.add_edge(START, "database_search")
workflow.add_edge(START, "api_search")
 
# All search nodes converge to combine
workflow.add_edge("web_search", "combine")
workflow.add_edge("database_search", "combine")
workflow.add_edge("api_search", "combine")
workflow.add_edge("combine", END)
 
app = workflow.compile()

Parallel with Error Handling

class RobustParallelState(TypedDict):
    task: str
    processor1_result: str
    processor2_result: str
    processor3_result: str
    errors: list
    successful_results: list
    final_result: str
 
def robust_processor1(state: RobustParallelState):
    """First parallel processor with error handling."""
    try:
        task = state["task"]
        result = f"Processor1 completed: {task}"
        return {"processor1_result": result}
    except Exception as e:
        return {"errors": state.get("errors", []) + [f"Processor1 error: {str(e)}"]}
 
def robust_processor2(state: RobustParallelState):
    """Second parallel processor with error handling."""
    try:
        task = state["task"]
        result = f"Processor2 completed: {task}"
        return {"processor2_result": result}
    except Exception as e:
        return {"errors": state.get("errors", []) + [f"Processor2 error: {str(e)}"]}
 
def robust_processor3(state: RobustParallelState):
    """Third parallel processor with error handling."""
    try:
        task = state["task"]
        result = f"Processor3 completed: {task}"
        return {"processor3_result": result}
    except Exception as e:
        return {"errors": state.get("errors", []) + [f"Processor3 error: {str(e)}"]}
 
def collect_results(state: RobustParallelState):
    """Collect successful results."""
    results = []
 
    if "processor1_result" in state:
        results.append(state["processor1_result"])
    if "processor2_result" in state:
        results.append(state["processor2_result"])
    if "processor3_result" in state:
        results.append(state["processor3_result"])
 
    return {"successful_results": results}
 
def finalize_robust(state: RobustParallelState):
    """Finalize with robust handling."""
    successful_results = state["successful_results"]
    errors = state.get("errors", [])
 
    if successful_results:
        prompt = f"Process these successful results: {successful_results}"
        if errors:
            prompt += f"\n\nErrors occurred: {errors}"
 
        response = llm.invoke(prompt)
        return {"final_result": response.content}
    else:
        return {"final_result": f"All processors failed. Errors: {errors}"}
 
# Build robust parallel workflow
workflow = StateGraph(RobustParallelState)
 
workflow.add_node("processor1", robust_processor1)
workflow.add_node("processor2", robust_processor2)
workflow.add_node("processor3", robust_processor3)
workflow.add_node("collect", collect_results)
workflow.add_node("finalize", finalize_robust)
 
# Parallel execution
workflow.add_edge(START, "processor1")
workflow.add_edge(START, "processor2")
workflow.add_edge(START, "processor3")
 
# Convergence
workflow.add_edge("processor1", "collect")
workflow.add_edge("processor2", "collect")
workflow.add_edge("processor3", "collect")
workflow.add_edge("collect", "finalize")
workflow.add_edge("finalize", END)

🎯 Advanced Workflow Patterns

State Machine Workflow

Create finite state machine patterns.

class StateMachineState(TypedDict):
    current_state: str
    event: str
    context: dict
    history: list
 
class StateMachine:
    """Finite state machine for workflow control."""
 
    def __init__(self):
        self.states = {
            "idle": ["start"],
            "processing": ["complete", "error", "pause"],
            "paused": ["resume", "cancel"],
            "completed": ["reset"],
            "error": ["retry", "reset"]
        }
 
    def get_next_states(self, current_state: str) -> list:
        return self.states.get(current_state, [])
 
    def can_transition(self, current_state: str, event: str) -> bool:
        return event in self.get_next_states(current_state)
 
def state_machine_node(state: StateMachineState):
    """State machine transition handler."""
    current_state = state["current_state"]
    event = state["event"]
 
    sm = StateMachine()
 
    if sm.can_transition(current_state, event):
        # Perform state transition
        if current_state == "idle" and event == "start":
            return {"current_state": "processing", "history": ["idle -> processing"]}
        elif current_state == "processing" and event == "complete":
            return {"current_state": "completed", "history": state.get("history", []) + ["processing -> completed"]}
        # ... more transitions
    else:
        return {"current_state": "error", "history": state.get("history", []) + [f"{current_state} -> error (invalid event: {event})"]}

Pipeline Workflow with Stages

class PipelineState(TypedDict):
    input_data: any
    stages: list
    current_stage: int
    stage_results: dict
    completed: bool
    pipeline_result: any
 
def pipeline_processor(state: PipelineState):
    """Process current pipeline stage."""
    stages = state["stages"]
    current_stage = state["current_stage"]
 
    if current_stage < len(stages):
        stage_name = stages[current_stage]
        input_data = state.get("input_data")
 
        # Process stage
        if stage_name == "validate":
            result = f"Validated: {input_data}"
        elif stage_name == "transform":
            result = f"Transformed: {input_data}"
        elif stage_name == "analyze":
            result = f"Analyzed: {input_data}"
        else:
            result = f"Unknown stage: {stage_name}"
 
        # Update stage results
        stage_results = state.get("stage_results", {})
        stage_results[stage_name] = result
 
        return {
            "stage_results": stage_results,
            "current_stage": current_stage + 1,
            "completed": current_stage >= len(stages) - 1
        }
    else:
        return {"completed": True}
 
def should_continue_pipeline(state: PipelineState):
    """Check if pipeline should continue."""
    if state.get("completed", False):
        return "finalize"
    else:
        return "continue"
 
def finalize_pipeline(state: PipelineState):
    """Finalize pipeline execution."""
    stage_results = state.get("stage_results", {})
 
    # Combine all stage results
    combined_result = " | ".join(stage_results.values())
 
    return {"pipeline_result": combined_result}
 
# Build pipeline workflow
workflow = StateGraph(PipelineState)
 
workflow.add_node("process_stage", pipeline_processor)
workflow.add_node("finalize", finalize_pipeline)
 
workflow.add_conditional_edges(
    "process_stage",
    should_continue_pipeline,
    {
        "continue": "process_stage",
        "finalize": "finalize"
    }
)
 
workflow.add_edge("finalize", END)

🎯 Best Practices

1. Workflow Design

  • Keep workflows focused - Single responsibility per workflow
  • Test individual nodes - Unit test node functions
  • Handle edge cases - Consider all possible states
  • Document transitions - Clear workflow documentation

2. Performance Optimization

  • Parallelize when possible - Use parallel workflows for independent tasks
  • Minimize state size - Keep state payloads manageable
  • Use caching - Cache expensive operations
  • Monitor execution - Track performance metrics

3. Error Handling

  • Validate inputs - Check state before processing
  • Graceful degradation - Handle failures gracefully
  • Retry mechanisms - Implement retry logic for transient failures
  • Logging - Comprehensive logging for debugging

4. Testing Strategies

def test_workflow_transitions():
    """Test all possible workflow paths."""
    # Test normal flow
    result1 = app.invoke({"input": "test"})
    assert result1["status"] == "complete"
 
    # Test error conditions
    result2 = app.invoke({"input": ""})  # Empty input
    assert result2["status"] == "error"
 
def test_parallel_execution():
    """Test parallel workflow execution."""
    start_time = time.time()
    result = app.invoke({"task": "parallel test"})
    execution_time = time.time() - start_time
 
    # Parallel execution should be faster than sequential
    assert execution_time < expected_sequential_time

Master workflow patterns to create sophisticated, adaptive AI systems. Next, explore agents to build autonomous decision-making capabilities.