تطوير LangGraph: 5 أنماط لبناء وكلاء آمنين للبيئات الإنتاجية
الأنماط التي تفصل بين الوكلاء المصممين للعروض التجريبية وأولئك القادرين على الصمود أمام المستخدمين الحقيقيين: حفظ حالة النظام (state checkpointing)، بوابات التدخل البشري (human-in-the-loop)، ميزانيات إعادة المحاولة، معالجة أخطاء الأدوات، وأدوات المراقبة.
بناء وكيل ذكاء اصطناعي (AI agent) يعمل بشكل ممتاز في عرض تجريبي (demo) هو أمر بسيط. لكن بناء وكيل يصمد لعام كامل في بيئة إنتاجية (production environment) — مع مستخدمين حقيقيين يقومون بتصرفات غير متوقعة، وأدوات تتوقف عن العمل، ونماذج لغوية (LLMs) تهلوس، وحالات تسابق (race conditions) لم تكن في الحسبان — هو تحدٍ هندسي مختلف تماماً.
يُعد إطار العمل LangGraph الخيار الأمثل لبناء الوكلاء الإنتاجيين، لأنه يجبرك على نمذجة الوكيل كآلة حالة صريحة (explicit state machine). لكن إطار العمل لا يمنحك الأمان في بيئة الإنتاج تلقائياً؛ بل يجب عليك بناؤه بنفسك.
إليك الأنماط الخمسة التي نطبقها في كل عملية نشر لـ LangGraph في Verel Systems.
تفترض هذه الأنماط استخدام إصدار LangGraph ≥ 0.2.x مع واجهة compiled graph API. واجهات checkpointer و interrupt و RunnableConfig المشار إليها هنا مستقرة بدءاً من هذا الإصدار.
النمط 1: حفظ حالة النظام (State Checkpointing) باستخدام تخزين دائم
الفجوة الأكبر بين وكلاء العروض التجريبية والوكلاء الإنتاجيين هي قابلية الاستئناف (resumability). فوكيل العرض التجريبي يعمل حتى يكتمل في عملية واحدة (process). أما الوكيل الإنتاجي فقد يعمل لمدة 10 دقائق، ويستدعي 20 أداة، ثم ينقطع بسبب إعادة تشغيل الخادم، أو انتهاء مهلة الشبكة (network timeout)، أو مستخدم يقرر فجأة "توقف، تراجع للوراء".
يقوم نظام الـ checkpointing في LangGraph بحفظ حالة الرسم البياني (graph state) بالكامل وتصديرها (serialize) إلى وحدة التخزين عند كل انتقال بين العقد (nodes). وعند إعادة تشغيل العملية، يستأنف الرسم البياني عمله بدقة من النقطة التي توقف عندها.
إعداد حفظ الحالة باستخدام Redis
from langgraph.checkpoint.redis import RedisSaver
from langgraph.graph import StateGraph
import redis
# Connect to Redis (use a persistent instance, not an ephemeral cache)
redis_client = redis.Redis(host="localhost", port=6379, db=0)
checkpointer = RedisSaver(redis_client)
# Compile your graph with the checkpointer
graph = builder.compile(checkpointer=checkpointer)
# Every invoke now takes a thread_id for state isolation
config = {"configurable": {"thread_id": "user-session-abc123"}}
result = await graph.ainvoke({"input": user_message}, config=config)
للتطوير: حفظ الحالة باستخدام SQLite
from langgraph.checkpoint.sqlite import SqliteSaver
# SQLite is great for local dev — no infra required
checkpointer = SqliteSaver.from_conn_string(":memory:") # in-memory
# or: SqliteSaver.from_conn_string("checkpoints.db") # persistent file
ما الذي يتم حفظه في الـ Checkpoint؟
تحصل كل thread_id على شجرة حالة معزولة خاصة بها. يقوم الـ checkpointer بتخزين:
- ▸جميع قيم الحالة عند كل انتقال بين العقد
- ▸العقدة الحالية التي يقف عندها الرسم البياني
- ▸الرسائل المعلقة ونتائج استدعاء الأدوات
- ▸أي مفاتيح حالة مخصصة يحددها الرسم البياني الخاص بك
هذا يعني أن المستخدم يمكنه إغلاق متصفحه، والعودة بعد 6 ساعات، واستئناف (resume) نفس سير العمل بدقة — بما في ذلك الحالات التي كانت في منتصف استدعاء الأداة — طالما قمت بتمرير نفس الـ thread_id.
لا تستخدم مثيل Redis واحداً لحفظ الحالة (checkpointing) والتخزين المؤقت المؤقت (ephemeral caching) معاً. يجب أن تظل بيانات الـ checkpoint آمنة من الحذف التلقائي (cache eviction). استخدم سياسة maxmemory-policy noeviction على مثيل Redis المخصص للـ checkpoint، أو استخدم مثيلات منفصلة.
النمط 2: بوابات التدخل البشري (Human-in-the-Loop Interrupt Gates)
أخطر الوكلاء هم أولئك الذين يتخذون إجراءات غير قابلة للتراجع دون تأكيد. حذف السجلات، إرسال البريد الإلكتروني، خصم المبالغ المادية من العملاء، تعديل قواعد البيانات الإنتاجية — كل هذه العمليات تتطلب بوابة موافقة بشرية قبل التنفيذ.
تقوم دالة interrupt في LangGraph بإيقاف الرسم البياني مؤقتاً عند أي عقدة وإعادة التحكم إلى الكود المستدعي. يظل الرسم البياني في حالة "انتظار" (محفوظة عبر الـ checkpointer الخاص بك) حتى تقوم باستئنافه.
تطبيق بوابة الموافقة
from langgraph.types import interrupt, Command
def human_approval_node(state: AgentState):
# This node raises an interrupt — the graph pauses here
# and returns the pending action to the caller
pending_tool_call = state["pending_tool_call"]
decision = interrupt({
"type": "approval_required",
"action": pending_tool_call,
"risk_level": state.get("risk_level", "medium"),
"message": f"Approve: {pending_tool_call['description']}?",
})
# decision is whatever value you pass when resuming
if decision["approved"]:
return {"approved": True}
else:
return {"approved": False, "rejection_reason": decision.get("reason")}
# In your API layer, resuming the graph:
async def handle_approval(thread_id: str, approved: bool, reason: str = ""):
config = {"configurable": {"thread_id": thread_id}}
result = await graph.ainvoke(
Command(resume={"approved": approved, "reason": reason}),
config=config
)
return result
متى يجب إضافة بوابات التدخل البشري؟
قم بتطبيق بوابات التدخل (interrupt gates) على أي إجراء يتصف بالتالي:
- ▸غير قابل للتراجع — مثل حذف البيانات أو إرسال مراسلات خارجية
- ▸عالي التكلفة — استدعاءات واجهات البرمجة (APIs) التي تكلف أموالاً عند كل تشغيل
- ▸مرئي خارجياً — النشر على وسائل التواصل الاجتماعي، تحديث أنظمة إدارة علاقات العملاء (CRM)، أو معالجة عمليات الدفع
- ▸غامض — عندما يتم استنتاج نية الوكيل من السياق وقد تكون خاطئة
النمط المتبع هو: الوكيل يقرر ← إيقاف للمراجعة البشرية ← الإنسان يوافق أو يرفض ← الوكيل ينفذ أو يتراجع.
النمط 3: ميزانيات إعادة المحاولة مع التراجع الأسي (Retry Budgets with Exponential Backoff)
تستدعي الوكلاء في البيئة الإنتاجية خدمات خارجية: واجهات برمجة التطبيقات للنماذج اللغوية (LLM APIs)، قواعد بياناتك الداخلية، وأدوات الطرف الثالث. كل هذه الخدمات معرضة للفشل — مثل حدود معدل الاستخدام (rate limits)، انتهاء المهلة (timeouts)، أو أخطاء الخادم المؤقتة (500s). بدون استراتيجية إعادة محاولة، فإن أي خلل بسيط في الـ API كفيل بإيقاف سير العمل بالكامل.
الحل البدائي (إعادة المحاولة اللانهائية) يتسبب في مشاكل تراكم الطلبات (thundering herd) ويستهلك حدود الاستخدام الخاصة بالـ API بسرعة. الحل الصحيح هو ميزانية إعادة المحاولة (retry budget): عدد محدد من المحاولات مع تراجع أسي (exponential backoff) وإضافة تباين عشوائي (jitter).
تطبيق ميزانيات إعادة المحاولة على عقد الأدوات
import asyncio
import random
from typing import TypedDict
class RetryConfig(TypedDict):
max_attempts: int
base_delay: float # seconds
max_delay: float # seconds
jitter: bool
DEFAULT_RETRY = RetryConfig(
max_attempts=3,
base_delay=1.0,
max_delay=30.0,
jitter=True,
)
async def with_retry(fn, config: RetryConfig = DEFAULT_RETRY):
for attempt in range(config["max_attempts"]):
try:
return await fn()
except (RateLimitError, APITimeoutError) as e:
if attempt == config["max_attempts"] - 1:
raise # exhaust budget → let the graph handle it
delay = min(
config["base_delay"] * (2 ** attempt),
config["max_delay"],
)
if config["jitter"]:
delay *= (0.5 + random.random() * 0.5) # ±50% jitter
await asyncio.sleep(delay)
# Use in a tool node:
async def call_crm_node(state: AgentState):
async def _call():
return await crm_client.get_customer(state["customer_id"])
try:
result = await with_retry(_call)
return {"crm_result": result, "error": None}
except Exception as e:
# Don't crash the graph — return the error in state
return {"crm_result": None, "error": str(e)}
توجيه الأخطاء في الرسم البياني
بعد عقدة الأداة (tool node)، أضف حافة شرطية (conditional edge) تقوم بالتوجيه بناءً على حالة الخطأ:
def route_after_tool(state: AgentState) -> str:
if state.get("error"):
return "error_handler"
return "next_step"
builder.add_conditional_edges("tool_node", route_after_tool, {
"error_handler": "error_handler",
"next_step": "reasoning_node",
})
يتيح لك هذا معالجة الأخطاء بشكل صريح — سواء بإعادة المحاولة، أو التراجع التدريجي الآمن (degrade gracefully)، أو تنبيه المستخدم، أو التوقف لطلب تدخل بشري — بدلاً من ترك الاستثناءات (exceptions) تنتشر بشكل غير متوقع وتتسبب في انهيار النظام.
النمط 4: الحالة المحددة النوع مع التحقق الصارم (Typed State with Strict Validation)
المصدر الأكثر شيوعاً للأخطاء البرمجية الخفية في بيئات الإنتاج مع LangGraph هو تعديل الحالة دون التحقق من صحتها (unvalidated state mutations). على سبيل المثال، تقوم العقدة A بكتابة customer_id كعدد صحيح (integer)، بينما تتوقعه العقدة B كسلسلة نصية (string). أو تعيد العقدة C قيمة فارغة None بينما تتوقع العقدة D قائمة (list). هذه الأخطاء لا تظهر في العروض التجريبية لأن البيانات هناك تكون دائماً مثالية وخالية من المشاكل.
استخدم TypedDict أو Pydantic لجميع تعريفات الحالة، وقم بالتحقق من صحتها عند حدود كل عقدة.
حالة Pydantic مع مدققات الحقول
from pydantic import BaseModel, field_validator, model_validator
from typing import Optional, Literal
class AgentState(BaseModel):
# Inputs
user_input: str
session_id: str
# Workflow state
intent: Optional[Literal["search", "create", "update", "delete"]] = None
retrieved_docs: list[dict] = []
pending_tool_call: Optional[dict] = None
approved: Optional[bool] = None
# Outputs
final_response: Optional[str] = None
error: Optional[str] = None
@field_validator("session_id")
@classmethod
def session_id_format(cls, v: str) -> str:
if not v.startswith("sess_"):
raise ValueError("session_id must start with 'sess_'")
return v
@model_validator(mode="after")
def validate_approval_consistency(self):
if self.approved is True and self.pending_tool_call is None:
raise ValueError("Cannot approve with no pending tool call")
return self
# Using it in nodes — type errors caught at runtime
def reasoning_node(state: AgentState) -> dict:
# state is a validated Pydantic model
# return a partial update (LangGraph merges with current state)
return {"intent": "search"}
تحديثات الحالة في LangGraph يتم دمجها ولا يتم استبدالها (merged, not replaced). تعيد العقد قواميس (dicts) تحتوي فقط على المفاتيح التي تقوم بتحديثها. هذا التصميم مقصود — ولكنه يعني أنك قد تترك قيمًا قديمة في الحالة دون قصد. بالنسبة للحقول التي يجب إعادة تعيينها في كل دورة (مثل pending_tool_call)، قم بتعيينها صراحةً إلى None في العقدة المناسبة.
النمط 5: المراقبة باستخدام تتبع LangSmith (Observability with LangSmith Tracing)
لا يمكن تتبع أخطاء الوكيل في بيئة الإنتاج وتصحيحها بدون تتبع المسارات (traces). فالوكيل يجري ما بين 5 إلى 20 استدعاءً للنموذج اللغوي (LLM) لكل طلب مستخدم، مع اتخاذ قرارات توجيهية في كل خطوة. بدون أدوات المراقبة (observability)، يصبح تتبع خطأ يظهر على شكل "إعطاء إجابات خاطئة أحياناً" أمراً شبه مستحيل.
تُعد منصة LangSmith الحل الرسمي المتكامل، وهي مدمجة بعمق مع LangGraph — حيث تعرض التتبعات مسار تنفيذ الرسم البياني بالكامل، ومدخلات ومخرجات كل عقدة، وعدد الرموز (token counts)، وزمن الاستجابة (latency) لكل عقدة، وتفاصيل استدعاء الأدوات.
إعداد LangSmith في بيئة الإنتاج
import os
from langchain_core.tracers.langchain import wait_for_all_tracers
# Set these in your environment (never hardcode)
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = os.getenv("LANGSMITH_API_KEY")
os.environ["LANGCHAIN_PROJECT"] = "verel-production-agent"
# Tag traces with metadata for filtering
config = {
"configurable": {"thread_id": thread_id},
"metadata": {
"user_id": user_id,
"session_type": "enterprise_rag",
"environment": "production",
},
"tags": ["production", "v2.1"],
"run_name": f"agent_run_{thread_id[:8]}",
}
مقاييس مخصصة للتتبع
بالإضافة إلى المقاييس المدمجة في LangSmith، قم بقياس هذه الإشارات المخصصة:
from langsmith import Client
ls_client = Client()
def log_agent_outcome(run_id: str, outcome: str, latency_ms: int):
"""Log custom metrics to LangSmith for aggregate dashboards."""
ls_client.create_feedback(
run_id=run_id,
key="outcome",
value=outcome, # "success" | "human_rejected" | "tool_error" | "timeout"
score=1.0 if outcome == "success" else 0.0,
)
ls_client.create_feedback(
run_id=run_id,
key="latency_ms",
value=str(latency_ms),
score=max(0, 1.0 - (latency_ms / 10000)), # normalize: 10s = 0.0
)
لوحات البيانات الثلاث التي يحتاجها كل وكيل في بيئة الإنتاج
- ▸معدل الخطأ لكل عقدة (Error rate by node) — لمعرفة العقد الأكثر فشلاً والتركيز على تحصينها.
- ▸معدل الموافقة البشرية (Human approval rate) — النسبة المئوية للإجراءات التي تتطلب مراجعة بشرية (النسبة المرتفعة جداً تسبب إزعاجاً، والمنخفضة جداً تعني ضعف الأمان).
- ▸زمن الاستجابة P95 حسب مسار الرسم البياني (P95 latency by graph path) — لتحديد مسارات التنفيذ البطيئة وتحسينها.
تجميع الأنماط: هيكل وكيل آمن للبيئات الإنتاجية
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.redis import RedisSaver
builder = StateGraph(AgentState)
# Add nodes
builder.add_node("understand_intent", understand_intent_node)
builder.add_node("retrieve_context", retrieve_context_node)
builder.add_node("plan_action", plan_action_node)
builder.add_node("human_approval", human_approval_node) # Pattern 2
builder.add_node("execute_tool", execute_tool_node) # Pattern 3
builder.add_node("synthesize_response", synthesize_response_node)
builder.add_node("error_handler", error_handler_node)
# Add edges
builder.set_entry_point("understand_intent")
builder.add_edge("understand_intent", "retrieve_context")
builder.add_edge("retrieve_context", "plan_action")
# Route: does this action need human approval?
builder.add_conditional_edges("plan_action", route_by_risk, {
"needs_approval": "human_approval",
"safe_to_execute": "execute_tool",
})
builder.add_edge("human_approval", "execute_tool")
# Route: did the tool succeed?
builder.add_conditional_edges("execute_tool", route_after_tool, {
"success": "synthesize_response",
"error_handler": "error_handler",
})
builder.add_edge("synthesize_response", END)
builder.add_edge("error_handler", END)
# Compile with all five patterns active
graph = builder.compile(
checkpointer=RedisSaver(redis_client), # Pattern 1
interrupt_before=["human_approval"], # Pattern 2
)
خلاصة: قائمة التحقق لأمان بيئة الإنتاج
قبل نشر أي وكيل LangGraph في بيئة الإنتاج، تأكد من:
- ▸ إعداد الـ checkpointer باستخدام تخزين دائم (Redis أو Postgres، وليس الذاكرة المؤقتة in-memory)
- ▸ تفعيل بوابات الموافقة البشرية لجميع الإجراءات غير القابلة للتراجع أو المرئية خارجياً
- ▸ تحديد ميزانيات إعادة المحاولة لجميع استدعاءات الأدوات الخارجية (3 محاولات كحد أقصى، مع تراجع أسي + تباين عشوائي jitter)
- ▸ استخدام حالة Pydantic/TypedDict مع تحديد أنواع الحقول بشكل صريح
- ▸ معالجة حالة الخطأ — يجب أن تعيد جميع العقد الأخطاء ضمن الحالة بدلاً من إطلاق استثناءات تؤدي لتوقف النظام
- ▸ تفعيل تتبع LangSmith مع تصنيف المشروع عبر الوسوم (tagging)
- ▸ تتبع المقاييس المخصصة: النتيجة النهائية، زمن الاستجابة، ومعدل الموافقة
- ▸ اختبار الأداء تحت الضغط مع قيم
thread_idمتزامنة (حيث أن checkpointer الخاص بـ Redis آمن برمجياً للتزامن thread-safe، بينما SQLite ليس كذلك)
الأسئلة الشائعة
ما الفرق بين LangGraph و LangChain LCEL التقليدي؟ يُعد LCEL ممتازاً لسلاسل العمليات عديمة الحالة (stateless chains) — حيث تدخل المدخلات ويخرج رد مهيكل مباشرة. أما LangGraph فهو مصمم لتدفقات العمل المعقدة ذات الحالة (stateful) والتكرارية (cyclical)، حيث يتخذ الوكيل قرارات بشأن الخطوة التالية بناءً على السياق المتراكم. إذا كان سير العمل لديك يحتوي على تفرعات، أو حلقات تكرارية (loops)، أو يتطلب ذاكرة بين الخطوات، فإن LCEL ليس الأداة المناسبة.
كيف يقارن LangGraph بـ CrewAI و AutoGen للاستخدام في بيئات الإنتاج؟ تمتلك أطر العمل CrewAI و AutoGen واجهات برمجة تطبيقات (APIs) أبسط وتعمل بشكل ممتاز في النماذج الأولية (prototypes). أما LangGraph فهو يتطلب كتابة كود أكثر تفصيلاً، ولكنه يمنحك تحكماً كاملاً وصريحاً في انتقالات الحالة، ومعالجة الأخطاء، وحفظ البيانات — وهو بالضبط ما تتطلبه بيئة الإنتاج. لقد قمنا في Verel Systems بنقل العديد من النماذج الأولية من AutoGen إلى LangGraph خصيصاً للاستفادة من ميزات حفظ الحالة (checkpointing) والتدخل البشري (interrupt).
ما هو التأثير على الأداء الناتج عن حفظ الحالة (checkpointing)؟
باستخدام Redis، تضيف عمليات كتابة الـ checkpoint ما بين 5 إلى 20 مللي ثانية عند كل انتقال بين العقد. بالنسبة لرسم بياني يحتوي على 10 عقد، فإن هذا يعني زيادة تتراوح بين 50 إلى 200 مللي ثانية — وهو أمر مقبول تماماً لمعظم أنظمة أتمتة الأعمال. أما إذا كنت تبني تجربة مستخدم فورية (realtime UX)، فاستخدم interrupt_before بحذر واقتصر على حفظ الحالة عند المحطات الرئيسية والهامة فقط.
هل يمكنني تشغيل وكلاء LangGraph بشكل متزامن؟
نعم، بشرط استخدام نظام حفظ حالة (checkpointing) مناسب. يتطلب كل تشغيل متزامن معرف thread_id فريداً خاصاً به. يتعامل الـ checkpointer الخاص بـ Redis مع عمليات الكتابة المتزامنة بأمان تام، بينما لا يدعم SQLite ذلك بكفاءة — لذا ننصح باستخدام Redis دائماً في بيئات الإنتاج.
