LangGraph Development: 5 Patterns for Production-Safe Agents
The patterns that separate agents that work in demos from agents that survive real users: state checkpointing, human-in-the-loop gates, retry budgets, tool error handling, and observability hooks.
Building an AI agent that works in a demo is straightforward. Building one that survives a year of production traffic — with real users doing unexpected things, tools going down, LLMs hallucinating, and race conditions you never anticipated — is an entirely different engineering problem.
LangGraph is the right framework for production agents precisely because it forces you to model your agent as an explicit state machine. But the framework doesn't automatically give you production safety. You have to build it in.
Here are the five patterns we apply to every LangGraph deployment at Verel Systems.
These patterns assume LangGraph ≥ 0.2.x with the compiled graph API. The checkpointer, interrupt, and RunnableConfig interfaces referenced here are stable as of that release.
Pattern 1: State Checkpointing with Persistent Storage
The single biggest gap between demo agents and production agents is resumability. A demo agent runs to completion in one process. A production agent might run for 10 minutes, call 20 tools, and then get interrupted by a process restart, a network timeout, or a human who says "actually, stop and go back."
LangGraph's checkpointing system serializes the entire graph state to storage at every node transition. When the process restarts, the graph picks up exactly where it left off.
Setting up Redis checkpointing
from langgraph.checkpoint.redis import RedisSaver
from langgraph.graph import StateGraph
import redis
# Connect to Redis (use a persistent instance, not an ephemeral cache)
redis_client = redis.Redis(host="localhost", port=6379, db=0)
checkpointer = RedisSaver(redis_client)
# Compile your graph with the checkpointer
graph = builder.compile(checkpointer=checkpointer)
# Every invoke now takes a thread_id for state isolation
config = {"configurable": {"thread_id": "user-session-abc123"}}
result = await graph.ainvoke({"input": user_message}, config=config)
For development: SQLite checkpointing
from langgraph.checkpoint.sqlite import SqliteSaver
# SQLite is great for local dev — no infra required
checkpointer = SqliteSaver.from_conn_string(":memory:") # in-memory
# or: SqliteSaver.from_conn_string("checkpoints.db") # persistent file
What gets checkpointed
Every thread_id gets its own isolated state tree. The checkpointer stores:
- ▸All state values at each node transition
- ▸The current node the graph is at
- ▸Pending messages and tool call results
- ▸Any custom state keys your graph defines
This means a user can close their browser, return 6 hours later, and resume the exact workflow — including mid-tool-call states — as long as you pass the same thread_id.
Don't use a single Redis instance for both checkpointing and ephemeral caching. Checkpoint data must survive cache eviction. Use maxmemory-policy noeviction on your checkpoint Redis instance, or use separate instances.
Pattern 2: Human-in-the-Loop Interrupt Gates
The most dangerous agents are the ones that take irreversible actions without confirmation. Deleting records, sending emails, charging customers, modifying production databases — these need a human approval gate before execution.
LangGraph's interrupt function pauses a graph at any node and returns control to the calling code. The graph stays in a "waiting" state (persisted via your checkpointer) until you resume it.
Implementing an approval gate
from langgraph.types import interrupt, Command
def human_approval_node(state: AgentState):
# This node raises an interrupt — the graph pauses here
# and returns the pending action to the caller
pending_action = state["pending_tool_call"]
decision = interrupt({
"type": "approval_required",
"action": pending_action,
"risk_level": state.get("risk_level", "medium"),
"message": f"Approve: {pending_action['description']}?",
})
# decision is whatever value you pass when resuming
if decision["approved"]:
return {"approved": True}
else:
return {"approved": False, "rejection_reason": decision.get("reason")}
# In your API layer, resuming the graph:
async def handle_approval(thread_id: str, approved: bool, reason: str = ""):
config = {"configurable": {"thread_id": thread_id}}
result = await graph.ainvoke(
Command(resume={"approved": approved, "reason": reason}),
config=config
)
return result
When to add interrupt gates
Apply interrupt gates to any action that is:
- ▸Irreversible — deleting data, sending external communications
- ▸High-cost — API calls that cost money per execution
- ▸Externally visible — posting to social media, updating a CRM, triggering a payment
- ▸Ambiguous — when the agent's intent is inferred from context and could be wrong
The pattern is: agent decides → interrupt for human review → human approves/rejects → agent executes or backtracks.
Pattern 3: Retry Budgets with Exponential Backoff
Production agents call external services: LLM APIs, your internal databases, third-party tools. All of these fail — rate limits, timeouts, transient 500s. Without a retry strategy, a single API blip kills your entire workflow.
The naive fix (retry forever) causes thundering herd problems and blows your API rate limits. The right fix is a retry budget: a bounded retry count with exponential backoff and jitter.
Implementing retry budgets on tool nodes
import asyncio
import random
from typing import TypedDict
class RetryConfig(TypedDict):
max_attempts: int
base_delay: float # seconds
max_delay: float # seconds
jitter: bool
DEFAULT_RETRY = RetryConfig(
max_attempts=3,
base_delay=1.0,
max_delay=30.0,
jitter=True,
)
async def with_retry(fn, config: RetryConfig = DEFAULT_RETRY):
for attempt in range(config["max_attempts"]):
try:
return await fn()
except (RateLimitError, APITimeoutError) as e:
if attempt == config["max_attempts"] - 1:
raise # exhaust budget → let the graph handle it
delay = min(
config["base_delay"] * (2 ** attempt),
config["max_delay"],
)
if config["jitter"]:
delay *= (0.5 + random.random() * 0.5) # ±50% jitter
await asyncio.sleep(delay)
# Use in a tool node:
async def call_crm_node(state: AgentState):
async def _call():
return await crm_client.get_customer(state["customer_id"])
try:
result = await with_retry(_call)
return {"crm_result": result, "error": None}
except Exception as e:
# Don't crash the graph — return the error in state
return {"crm_result": None, "error": str(e)}
Error routing in the graph
After a tool node, add a conditional edge that routes on error state:
def route_after_tool(state: AgentState) -> str:
if state.get("error"):
return "error_handler"
return "next_step"
builder.add_conditional_edges("tool_node", route_after_tool, {
"error_handler": "error_handler",
"next_step": "reasoning_node",
})
This lets you handle errors explicitly — retry, degrade gracefully, notify the user, or interrupt for human intervention — rather than letting exceptions propagate unpredictably.
Pattern 4: Typed State with Strict Validation
The most common source of subtle production bugs in LangGraph agents is unvalidated state mutations. Node A writes customer_id as an integer; Node B expects a string. Node C returns None when Node D expects a list. These bugs don't surface in demos because demos have clean, happy-path data.
Use TypedDict or Pydantic for all state definitions, and validate at every node boundary.
Pydantic state with field validators
from pydantic import BaseModel, field_validator, model_validator
from typing import Optional, Literal
class AgentState(BaseModel):
# Inputs
user_input: str
session_id: str
# Workflow state
intent: Optional[Literal["search", "create", "update", "delete"]] = None
retrieved_docs: list[dict] = []
pending_tool_call: Optional[dict] = None
approved: Optional[bool] = None
# Outputs
final_response: Optional[str] = None
error: Optional[str] = None
@field_validator("session_id")
@classmethod
def session_id_format(cls, v: str) -> str:
if not v.startswith("sess_"):
raise ValueError("session_id must start with 'sess_'")
return v
@model_validator(mode="after")
def validate_approval_consistency(self):
if self.approved is True and self.pending_tool_call is None:
raise ValueError("Cannot approve with no pending tool call")
return self
# Using it in nodes — type errors caught at runtime
def reasoning_node(state: AgentState) -> dict:
# state is a validated Pydantic model
# return a partial update (LangGraph merges with current state)
return {"intent": "search"}
LangGraph state updates are merged, not replaced. Nodes return dicts with only the keys they're updating. This is intentional — but it means you can accidentally leave stale values in state. For fields that should be reset each turn (like pending_tool_call), explicitly set them to None in the appropriate node.
Pattern 5: Observability with LangSmith Tracing
You cannot debug a production agent without traces. An agent makes 5–20 LLM calls per user request, with routing decisions at each step. Without observability, a bug that manifests as "sometimes gives wrong answers" is nearly impossible to track down.
LangSmith is the first-party solution, and for LangGraph it's deeply integrated — traces show the full graph execution path, each node's input/output, token counts, latency by node, and tool call details.
Configuring LangSmith in production
import os
from langchain_core.tracers.langchain import wait_for_all_tracers
# Set these in your environment (never hardcode)
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = os.getenv("LANGSMITH_API_KEY")
os.environ["LANGCHAIN_PROJECT"] = "verel-production-agent"
# Tag traces with metadata for filtering
config = {
"configurable": {"thread_id": thread_id},
"metadata": {
"user_id": user_id,
"session_type": "enterprise_rag",
"environment": "production",
},
"tags": ["production", "v2.1"],
"run_name": f"agent_run_{thread_id[:8]}",
}
Custom metrics to track
Beyond LangSmith's built-in metrics, instrument these custom signals:
from langsmith import Client
ls_client = Client()
def log_agent_outcome(run_id: str, outcome: str, latency_ms: int):
"""Log custom metrics to LangSmith for aggregate dashboards."""
ls_client.create_feedback(
run_id=run_id,
key="outcome",
value=outcome, # "success" | "human_rejected" | "tool_error" | "timeout"
score=1.0 if outcome == "success" else 0.0,
)
ls_client.create_feedback(
run_id=run_id,
key="latency_ms",
value=str(latency_ms),
score=max(0, 1.0 - (latency_ms / 10000)), # normalize: 10s = 0.0
)
The three dashboards every production agent needs
- ▸Error rate by node — which nodes fail most often, so you know where to harden
- ▸Human approval rate — what % of actions trigger human review (too high → noisy, too low → unsafe)
- ▸P95 latency by graph path — which execution paths are slow, informing optimization
Putting it together: a production-safe agent skeleton
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.redis import RedisSaver
builder = StateGraph(AgentState)
# Add nodes
builder.add_node("understand_intent", understand_intent_node)
builder.add_node("retrieve_context", retrieve_context_node)
builder.add_node("plan_action", plan_action_node)
builder.add_node("human_approval", human_approval_node) # Pattern 2
builder.add_node("execute_tool", execute_tool_node) # Pattern 3
builder.add_node("synthesize_response", synthesize_response_node)
builder.add_node("error_handler", error_handler_node)
# Add edges
builder.set_entry_point("understand_intent")
builder.add_edge("understand_intent", "retrieve_context")
builder.add_edge("retrieve_context", "plan_action")
# Route: does this action need human approval?
builder.add_conditional_edges("plan_action", route_by_risk, {
"needs_approval": "human_approval",
"safe_to_execute": "execute_tool",
})
builder.add_edge("human_approval", "execute_tool")
# Route: did the tool succeed?
builder.add_conditional_edges("execute_tool", route_after_tool, {
"success": "synthesize_response",
"error_handler": "error_handler",
})
builder.add_edge("synthesize_response", END)
builder.add_edge("error_handler", END)
# Compile with all five patterns active
graph = builder.compile(
checkpointer=RedisSaver(redis_client), # Pattern 1
interrupt_before=["human_approval"], # Pattern 2
)
Summary: production-safety checklist
Before deploying any LangGraph agent to production:
- ▸ Checkpointer configured with persistent storage (Redis or Postgres, not in-memory)
- ▸ Human approval gates on all irreversible or externally-visible actions
- ▸ Retry budgets on all external tool calls (max 3 attempts, exponential backoff + jitter)
- ▸ Pydantic/TypedDict state with explicit field types
- ▸ Error state handling — all nodes return errors in state, not raise exceptions
- ▸ LangSmith tracing enabled with project tagging
- ▸ Custom metrics: outcome, latency, approval rate
- ▸ Load-tested with concurrent
thread_idvalues (Redis checkpointer is thread-safe; SQLite is not)
Frequently asked questions
What's the difference between LangGraph and plain LangChain LCEL? LCEL is great for stateless chains — a prompt goes in, a structured response comes out. LangGraph is for stateful, cyclical workflows where the agent makes decisions about what to do next based on accumulated context. If your workflow has branches, loops, or requires memory between turns, LCEL is the wrong tool.
How does LangGraph compare to CrewAI and AutoGen for production use? CrewAI and AutoGen have simpler APIs that work well for prototypes. LangGraph is more verbose but gives you explicit control over state transitions, error handling, and persistence — which is what production requires. We've migrated several AutoGen prototypes to LangGraph specifically for the checkpointing and interrupt capabilities.
What's the performance overhead of checkpointing?
With Redis, checkpoint writes add 5–20ms per node transition. For a 10-node graph, that's 50–200ms overhead — acceptable for most business automation. If you're building a realtime UX, use interrupt_before sparingly and only checkpoint at meaningful state milestones.
Can I run LangGraph agents concurrently?
Yes, with proper checkpointing. Each concurrent execution needs its own thread_id. The Redis checkpointer handles concurrent writes safely. The SQLite checkpointer does not — use Redis in production.
