LangGraphの循環参照問題を完全解決!確実なシリアライズとメモリリーク対策
LangGraphの循環参照問題:原因と影響
LangGraphを使ったAIエージェント開発中に、突然「Circular reference detected」というエラーに遭遇したことはありませんか?このエラーは開発の大きな障壁となり、多くの開発者を悩ませています。
循環参照(Circular Reference)とは、オブジェクトA がオブジェクトB を参照し、同時にオブジェクトB がオブジェクトA を参照するような状態を指します。LangGraphでは、状態管理やチェックポイントの保存時に内部でシリアライズ処理が行われますが、このとき循環参照が存在すると無限ループに陥り、エラーが発生します。
# 循環参照の簡単な例
obj_a = {}
obj_b = {}
obj_a['partner'] = obj_b # obj_aがobj_bを参照
obj_b['partner'] = obj_a # obj_bがobj_aを参照
LangGraphで特に問題となるのは、以下のような状況です:
- 複雑な状態オブジェクト:TypedDictやPydanticモデルを使った状態管理で、相互参照が発生
- 非同期キューの使用:
asyncio.Queue
などを状態に含める場合 - マルチエージェントシステム:複数のエージェントが互いに参照し合う設計
この問題は単なるエラーにとどまらず、以下のような深刻な影響をもたらします:
- アプリケーションのクラッシュやハングアップ
- 状態のチェックポイント保存が不可能になる
- 長時間実行中のタスクのデータ損失リスク
- デバッグが難しく、原因特定に時間がかかる
「優れたエンジニアは問題を予防する」という格言があるように、LangGraphでは循環参照を事前に防ぐ設計が重要です。次のセクションでは、これを回避するための具体的な戦略を紹介します。
循環参照を回避するState設計のベストプラクティス
LangGraphでは状態設計が非常に重要です。循環参照を避けるためには、いくつかの設計原則に従うことが効果的です。
参照の方向性を一方向に保つ
相互参照の最も安全な解決策は、参照を一方向に統一することです。親から子へ、または子から親への参照のどちらかだけを維持し、双方向の参照を避けましょう。
from typing import TypedDict, List
# 良い例:一方向の参照
class ChildState(TypedDict):
name: str
data: dict
class ParentState(TypedDict):
children: List[ChildState] # 親から子への参照のみ
# 悪い例:双方向の参照
class BadChildState(TypedDict):
name: str
parent: 'BadParentState' # 循環参照の原因
class BadParentState(TypedDict):
children: List[BadChildState]
IDによる間接参照の活用
オブジェクトを直接参照する代わりに、IDを使った間接参照を活用することで循環参照を回避できます。
from typing import TypedDict, List, Dict
# IDによる間接参照を使用した設計
class AgentState(TypedDict):
id: str
name: str
data: dict
class SystemState(TypedDict):
agents: Dict[str, AgentState] # IDをキーとした辞書
connections: List[tuple[str, str]] # IDのペアでエージェント間の関係を表現
イミュータブルな状態設計
可能な限り、状態オブジェクトをイミュータブル(不変)に保つことも有効な戦略です。Python 3.7以降では、dataclasses.dataclass(frozen=True)
や@property
を使ってイミュータブルなオブジェクトを作成できます。
from dataclasses import dataclass
from typing import List, Dict, FrozenSet
@dataclass(frozen=True)
class ImmutableAgentState:
id: str
name: str
data: Dict[str, str]
connections: FrozenSet[str] # 変更不可能な参照セット
状態の分離と抽象化
複雑な状態を管理するためには、関心の分離原則に従って状態を機能ごとに分割しましょう。
from typing import TypedDict, List, Optional
# メッセージ状態
class MessagesState(TypedDict):
history: List[dict]
# ツール状態
class ToolsState(TypedDict):
available_tools: List[str]
tool_results: List[dict]
# 組み合わせた状態
class AgentState(TypedDict):
messages: MessagesState
tools: ToolsState
metadata: dict
循環参照のリスクがある構造の回避
特定のデータ構造は循環参照を起こしやすいので注意が必要です:
asyncio.Queue
やその他の非同期プリミティブ- 深くネストした辞書や配列
- 双方向リンクされたデータ構造(双方向リンクリストなど)
- 内部状態を共有するクロージャー関数
これらを状態に含める場合は、シリアライズ可能な形式に変換する方法を用意しておく必要があります。
現実世界の問題を解決するためには「シンプルに保ち、必要な場合にだけ複雑にする」というアプローチが効果的です。次のセクションでは、既に循環参照が発生している場合の対処法を見ていきましょう。
Deepコピーとシリアライズ処理の最適化テクニック
状態設計で循環参照を避けられない場合や、既存のコードに循環参照が含まれている場合、深いコピー(Deep Copy)とシリアライズ処理を最適化することで問題を解決できます。
循環参照を検出・処理するDeepコピー実装
循環参照を含むオブジェクトを安全にコピーするには、既に処理したオブジェクトを追跡するメカニズムが必要です。以下はPythonでの実装例です:
def deep_copy_with_cycle_detection(obj, memo=None):
if memo is None:
memo = {}
# 基本型(プリミティブ)はそのままコピー
if isinstance(obj, (int, float, str, bool, type(None))):
return obj
# オブジェクトIDを取得
obj_id = id(obj)
# 既に処理済みのオブジェクトなら参照を返す(循環参照対策)
if obj_id in memo:
return memo[obj_id]
# 新しいオブジェクトを作成して記録
if isinstance(obj, list):
new_obj = []
memo[obj_id] = new_obj # 先に記録して循環参照に対応
for item in obj:
new_obj.append(deep_copy_with_cycle_detection(item, memo))
return new_obj
if isinstance(obj, dict):
new_obj = {}
memo[obj_id] = new_obj # 先に記録して循環参照に対応
for key, value in obj.items():
# キーも再帰的にコピー(必要な場合)
if isinstance(key, (list, dict, set)):
copied_key = deep_copy_with_cycle_detection(key, memo)
else:
copied_key = key
new_obj[copied_key] = deep_copy_with_cycle_detection(value, memo)
return new_obj
# その他のオブジェクト型(シリアライズ可能なものに変換)
return str(obj)
LangGraphのシリアライズ処理の理解と最適化
LangGraphでは内部的にjsonplus
というモジュールを使用してシリアライズを行います。このモジュールの_serialize
関数が循環参照を検出します。この知識を活用して、シリアライズ前に状態を処理することが可能です。
# LangGraphでシリアライズする前の状態の前処理
def prepare_state_for_serialization(state):
# 非シリアライズ可能なオブジェクトを処理
if isinstance(state, dict):
clean_state = {}
for key, value in state.items():
# asyncio.Queueなどの特殊オブジェクトは変換
if hasattr(value, '__class__') and value.__class__.__name__ == 'Queue':
clean_state[key] = {'_type': 'Queue', 'items': list(value._queue)}
# 関数や非シリアライズ可能なオブジェクトは文字列に変換
elif callable(value) or hasattr(value, '__dict__'):
clean_state[key] = str(value)
else:
clean_state[key] = prepare_state_for_serialization(value)
return clean_state
elif isinstance(state, list):
return [prepare_state_for_serialization(item) for item in state]
else:
return state
ディープコピーの落とし穴と回避策
ディープコピーを実装する際に陥りやすい問題と解決策を理解しておきましょう:
パフォーマンス問題:大きなオブジェクトグラフのディープコピーは計算コストが高い
- 解決策:必要な部分だけをコピーする部分的ディープコピーを実装する
カスタムクラスの問題:標準の
copy.deepcopy
がカスタムクラスを正しく処理できない- 解決策:
__deepcopy__
メソッドをクラスに実装する
- 解決策:
非シリアライズ可能なオブジェクト:関数やクロージャなど一部のオブジェクトはシリアライズできない
- 解決策:プレースホルダーに置き換えてから、後で復元する仕組みを用意する
無限再帰防止:カスタム実装でも循環参照による無限再帰を防ぐメカニズムが必要
- 解決策:前述のように処理済みオブジェクトを追跡する
「プログラミングとは複雑さを管理する芸術である」という言葉がありますが、循環参照問題においても同様です。次のセクションでは、より具体的なクリーンアップ関数の実装方法を見ていきましょう。
cleanFunctions実装によるシリアライズエラーの解決
LangGraphでは、状態のシリアライズ処理を制御する「クリーン関数(cleanFunctions)」を実装することで、循環参照問題を効果的に解決できます。このアプローチはLangGraphのチェックポイント処理やコールバックでも利用可能です。
cleanFunctionsの基本
cleanFunctionsは、状態オブジェクトを変換し、シリアライズ可能な形式にする関数群です。LangGraphのグラフ作成時にこれらの関数を登録することで、シリアライズ前に自動的に適用されます。
from typing import TypedDict, List, Dict, Any
from langgraph.graph import StateGraph
# 状態の定義
class MyState(TypedDict):
messages: List[Dict[str, Any]]
tools: Dict[str, Any]
complex_object: Any # シリアライズが難しいオブジェクト
# クリーン関数の実装
def clean_complex_object(state):
if 'complex_object' in state and state['complex_object'] is not None:
# 複雑なオブジェクトをシリアライズ可能な形式に変換
serializable_data = {
'type': state['complex_object'].__class__.__name__,
'data': str(state['complex_object'])
}
cleaned_state = state.copy()
cleaned_state['complex_object'] = serializable_data
return cleaned_state
return state
# グラフの作成と設定
graph = StateGraph(MyState)
# クリーン関数の登録
graph.set_clean_functions([clean_complex_object])
# 以降は通常のノードとエッジの追加
# ...
複数のクリーン関数を組み合わせる
複雑な状態には、複数のクリーン関数を組み合わせると効果的です。関数は登録順に適用されます。
# 特定のキーを削除するクリーン関数
def remove_private_keys(state):
if isinstance(state, dict):
cleaned_state = state.copy()
# 先頭がアンダースコアのプライベートキーを削除
for key in list(cleaned_state.keys()):
if isinstance(key, str) and key.startswith('_'):
del cleaned_state[key]
return cleaned_state
return state
# asyncio.Queueをシリアライズ可能な形式に変換
def clean_async_queues(state):
if isinstance(state, dict):
cleaned_state = state.copy()
for key, value in cleaned_state.items():
if hasattr(value, '__class__') and value.__class__.__name__ == 'Queue':
try:
# キューの内容を抽出(可能な場合)
cleaned_state[key] = {
'_queue_type': 'asyncio.Queue',
'items': list(value._queue) if hasattr(value, '_queue') else []
}
except:
# 抽出できない場合はプレースホルダーで代用
cleaned_state[key] = {'_queue_type': 'asyncio.Queue', 'items': []}
return cleaned_state
return state
# 複数のクリーン関数を登録
graph.set_clean_functions([
remove_private_keys,
clean_async_queues,
clean_complex_object
])
リストアコールバックの実装
cleanFunctionsで変換した状態を元に戻すためのコールバックも実装しておくと、状態の復元が可能になります。
# 復元用コールバックの実装
def restore_complex_objects(state):
if isinstance(state, dict):
restored_state = state.copy()
# complex_objectを復元
if 'complex_object' in restored_state and isinstance(restored_state['complex_object'], dict):
obj_data = restored_state['complex_object']
if 'type' in obj_data and obj_data['type'] == 'CustomObject':
# ここで実際のオブジェクトを再構築
# (この例では単純化のため文字列を返す)
restored_state['complex_object'] = f"Restored {obj_data['data']}"
# asyncio.Queueを復元
for key, value in restored_state.items():
if isinstance(value, dict) and value.get('_queue_type') == 'asyncio.Queue':
import asyncio
# 新しいキューを作成して項目を追加
queue = asyncio.Queue()
for item in value.get('items', []):
queue.put_nowait(item)
restored_state[key] = queue
return restored_state
return state
# 復元コールバックを登録(例:チェックポイント作成時)
checkpoint = InMemoryCheckpoint()
checkpoint.restore_callbacks.append(restore_complex_objects)
実際の使用例:エラーケース対応
LangGraphでよく見られる循環参照エラーのケースと、cleanFunctionsによる解決例を見てみましょう:
# エラーケース: LangGraphでよく発生する循環参照エラー
# TypeError: Circular reference detected
# 問題の原因となるステート
class ProblematicState(TypedDict):
agent: 'Agent' # 循環参照を引き起こす可能性がある
messages: List[Dict]
# クリーン関数による解決
def clean_agent_reference(state):
if 'agent' in state and hasattr(state['agent'], '__dict__'):
# エージェントオブジェクトをシリアライズ可能な情報のみ抽出
agent_info = {
'id': getattr(state['agent'], 'id', 'unknown'),
'type': state['agent'].__class__.__name__,
# その他必要な情報
}
cleaned_state = dict(state)
cleaned_state['agent'] = agent_info
return cleaned_state
return state
このようなcleanFunctions実装を活用することで、LangGraphの循環参照問題を効果的に解決できます。「シンプルな解決策が最良の解決策である」という原則に従い、必要最小限の変換を行うことがポイントです。
次のセクションでは、より複雑な状態管理のための実践的なパターンを見ていきましょう。
複雑な状態管理のための実践的パターン
実際のLangGraphアプリケーションでは、複雑な状態管理が必要になることがよくあります。ここでは循環参照を回避しながら、高度な状態管理を実現するためのパターンを紹介します。
状態の不変性(イミュータビリティ)パターン
状態を不変(イミュータブル)に保つことで、予期しない副作用や循環参照を防ぐことができます。このパターンでは、状態の更新時に新しいオブジェクトを作成します。
from typing import TypedDict, List, Dict, Any
from copy import deepcopy
# 不変な状態管理パターン
class AgentState(TypedDict):
messages: List[Dict[str, Any]]
context: Dict[str, Any]
def add_message(state: AgentState, message: Dict[str, Any]) -> AgentState:
"""メッセージを状態に追加する関数(不変性を保つ)"""
# 新しい状態オブジェクトを作成
new_state = dict(state)
# 新しいメッセージリストを作成(既存のリストはそのまま)
new_state['messages'] = state['messages'] + [message]
return new_state
def update_context(state: AgentState, key: str, value: Any) -> AgentState:
"""コンテキストを更新する関数(不変性を保つ)"""
# 新しい状態オブジェクトを作成
new_state = dict(state)
# 新しいコンテキスト辞書を作成
new_state['context'] = dict(state['context'])
new_state['context'][key] = value
return new_state
プロキシパターン
プロキシパターンを使用すると、実際のオブジェクトへの直接アクセスを制御し、循環参照を避けることができます。
from typing import Dict, List, Any, Optional
# プロキシクラスの実装
class AgentProxy:
"""エージェントへのアクセスを管理するプロキシ"""
def __init__(self, agent_id: str, registry: 'AgentRegistry'):
self.agent_id = agent_id
self._registry = registry
def send_message(self, message: Dict[str, Any]) -> None:
"""メッセージを実際のエージェントに送信"""
real_agent = self._registry.get_agent(self.agent_id)
if real_agent:
real_agent.receive_message(message)
# 必要なメソッドのみを提供し、内部実装は隠蔽
# エージェントレジストリ
class AgentRegistry:
"""エージェントの参照を管理するレジストリ"""
def __init__(self):
self._agents: Dict[str, 'Agent'] = {}
def register(self, agent: 'Agent') -> None:
"""エージェントを登録"""
self._agents[agent.id] = agent
def get_agent(self, agent_id: str) -> Optional['Agent']:
"""IDからエージェントを取得"""
return self._agents.get(agent_id)
def get_proxy(self, agent_id: str) -> AgentProxy:
"""エージェントのプロキシを取得"""
return AgentProxy(agent_id, self)
# 実際のエージェントクラス
class Agent:
def __init__(self, agent_id: str, registry: AgentRegistry):
self.id = agent_id
self.messages: List[Dict[str, Any]] = []
self._registry = registry
# 自身を登録
registry.register(self)
def receive_message(self, message: Dict[str, Any]) -> None:
"""メッセージを受信"""
self.messages.append(message)
def send_to(self, recipient_id: str, content: str) -> None:
"""他のエージェントにメッセージを送信(プロキシ経由)"""
# 直接参照せず、プロキシを使用
recipient_proxy = self._registry.get_proxy(recipient_id)
message = {"from": self.id, "content": content}
recipient_proxy.send_message(message)
メモイゼーションパターン
計算コストが高い処理や状態変換を最適化するために、メモイゼーションを活用します。LangGraph内での循環参照を避けながら、効率的な状態管理が可能になります。
from functools import lru_cache
from typing import Dict, Any, Tuple, Hashable
class StateManager:
"""状態管理とメモイゼーションを行うマネージャ"""
def __init__(self):
self._states: Dict[str, Dict[str, Any]] = {}
@lru_cache(maxsize=100)
def transform_state(self, state_id: str, operation: str, *args) -> str:
"""状態を変換し、結果をキャッシュ"""
state = self._states[state_id]
if operation == "add_message":
new_state = dict(state)
new_state['messages'] = state['messages'] + [args[0]]
elif operation == "update_context":
new_state = dict(state)
new_state['context'] = dict(state['context'])
new_state['context'][args[0]] = args[1]
else:
raise ValueError(f"Unknown operation: {operation}")
# 新しい状態IDを生成
new_id = f"{state_id}_{operation}_{hash(args)}"
self._states[new_id] = new_state
return new_id
def get_state(self, state_id: str) -> Dict[str, Any]:
"""状態を取得"""
return self._states[state_id]
def create_state(self, initial_state: Dict[str, Any]) -> str:
"""新しい状態を作成"""
state_id = f"state_{len(self._states)}"
self._states[state_id] = initial_state
return state_id
観察者(Observer)パターン
複数のコンポーネントが状態の変更を監視する必要がある場合、観察者パターンを使用して直接的な循環参照を避けることができます。
from typing import Dict, List, Any, Callable, Set
class EventEmitter:
"""イベントの発行と購読を管理"""
def __init__(self):
self._listeners: Dict[str, Set[Callable]] = {}
def on(self, event: str, listener: Callable) -> None:
"""イベントリスナーを登録"""
if event not in self._listeners:
self._listeners[event] = set()
self._listeners[event].add(listener)
def off(self, event: str, listener: Callable) -> None:
"""イベントリスナーを削除"""
if event in self._listeners and listener in self._listeners[event]:
self._listeners[event].remove(listener)
def emit(self, event: str, *args, **kwargs) -> None:
"""イベントを発行"""
if event in self._listeners:
for listener in self._listeners[event]:
listener(*args, **kwargs)
# 使用例
class StateObservable:
"""状態の変更を通知する基底クラス"""
def __init__(self):
self.events = EventEmitter()
self._state: Dict[str, Any] = {}
def update_state(self, key: str, value: Any) -> None:
"""状態を更新し、変更を通知"""
old_value = self._state.get(key)
self._state[key] = value
self.events.emit('state:change', key, value, old_value)
self.events.emit(f'state:{key}:change', value, old_value)
これらのパターンを適切に組み合わせることで、循環参照を避けながら複雑な状態管理を実現できます。「単純さは究極の洗練である」というレオナルド・ダ・ヴィンチの言葉のとおり、シンプルな原則に基づいた設計が、最も堅牢なシステムにつながります。
次のセクションでは、マルチエージェントシステムにおける循環参照回避に焦点を当てます。
マルチエージェントシステムにおける循環参照回避戦略
LangGraphを使用したマルチエージェントシステムでは、エージェント間の相互作用や共有状態が循環参照の主な原因となります。ここでは、マルチエージェントシステムにおける循環参照回避のための具体的な戦略を見ていきましょう。
メッセージパッシングアーキテクチャ
エージェント間で直接参照を持たせる代わりに、メッセージパッシングを使用することで循環参照を回避できます。
from typing import TypedDict, List, Dict, Any
from uuid import uuid4
# メッセージの定義
class Message(TypedDict):
id: str
sender: str
recipient: str
content: Any
metadata: Dict[str, Any]
# エージェントの状態
class AgentState(TypedDict):
id: str
name: str
inbox: List[Message]
outbox: List[Message]
memory: Dict[str, Any]
# メッセージバス(エージェント間の通信を仲介)
class MessageBus:
def __init__(self):
self._queues: Dict[str, List[Message]] = {}
def register_agent(self, agent_id: str) -> None:
"""エージェントを登録"""
if agent_id not in self._queues:
self._queues[agent_id] = []
def send(self, message: Message) -> None:
"""メッセージを送信"""
recipient = message['recipient']
if recipient in self._queues:
self._queues[recipient].append(message)
def receive(self, agent_id: str) -> List[Message]:
"""メッセージを受信"""
messages = self._queues.get(agent_id, []).copy()
self._queues[agent_id] = []
return messages
# マルチエージェントシステム
class MultiAgentSystem:
def __init__(self):
self.message_bus = MessageBus()
self.agents: Dict[str, AgentState] = {}
def add_agent(self, name: str) -> str:
"""エージェントを追加"""
agent_id = str(uuid4())
agent_state: AgentState = {
'id': agent_id,
'name': name,
'inbox': [],
'outbox': [],
'memory': {}
}
self.agents[agent_id] = agent_state
self.message_bus.register_agent(agent_id)
return agent_id
def process_messages(self) -> None:
"""全てのエージェントのoutboxからメッセージを送信"""
for agent_id, agent in self.agents.items():
for message in agent['outbox']:
self.message_bus.send(message)
agent['outbox'] = []
# 各エージェントのinboxを更新
for agent_id, agent in self.agents.items():
new_messages = self.message_bus.receive(agent_id)
agent['inbox'].extend(new_messages)
ブラックボードパターン
ブラックボードパターンは、エージェント間の直接参照を避けながら、共有状態を管理するための効果的な方法です。
from typing import Dict, List, Any, Callable, Set
from uuid import uuid4
# ブラックボード(共有状態)
class Blackboard:
def __init__(self):
self._data: Dict[str, Any] = {}
self._subscribers: Dict[str, Set[Callable]] = {}
def set(self, key: str, value: Any) -> None:
"""データを設定"""
self._data[key] = value
# 購読者に通知
if key in self._subscribers:
for callback in self._subscribers[key]:
callback(key, value)
def get(self, key: str, default=None) -> Any:
"""データを取得"""
return self._data.get(key, default)
def subscribe(self, key: str, callback: Callable) -> None:
"""変更通知を購読"""
if key not in self._subscribers:
self._subscribers[key] = set()
self._subscribers[key].add(callback)
def unsubscribe(self, key: str, callback: Callable) -> None:
"""購読を解除"""
if key in self._subscribers and callback in self._subscribers[key]:
self._subscribers[key].remove(callback)
# ブラックボードを使用するエージェント
class BlackboardAgent:
def __init__(self, name: str, blackboard: Blackboard):
self.id = str(uuid4())
self.name = name
self._blackboard = blackboard
self._local_state: Dict[str, Any] = {}
# ブラックボードの変更を監視
blackboard.subscribe('global_task', self._on_task_update)
def _on_task_update(self, key: str, value: Any) -> None:
"""タスク更新時のコールバック"""
if 'assignee' in value and value['assignee'] == self.id:
self._process_task(value)
def _process_task(self, task: Dict[str, Any]) -> None:
"""タスクを処理"""
# タスク処理のロジック
result = f"Task {task['id']} processed by {self.name}"
# 結果をブラックボードに書き込み(直接エージェントを参照せず)
self._blackboard.set(f"result_{task['id']}", {
'task_id': task['id'],
'processor_id': self.id,
'result': result
})
エージェントクラス設計のベストプラクティス
循環参照を避けるマルチエージェントシステムでは、エージェントクラスの設計が重要です。
from typing import Dict, List, Any, Optional
from uuid import uuid4
import copy
# エージェントクラス設計のベストプラクティス
class Agent:
def __init__(self, name: str):
self.id = str(uuid4())
self.name = name
self._state: Dict[str, Any] = {
'memory': {},
'tasks': []
}
# シリアライズ可能な表現を返す
def to_dict(self) -> Dict[str, Any]:
"""エージェントのシリアライズ可能な表現を取得"""
return {
'id': self.id,
'name': self.name,
'state': copy.deepcopy(self._state)
}
# 外部状態参照用のIDを使用
def reference(self) -> Dict[str, str]:
"""エージェントの参照情報を取得"""
return {
'id': self.id,
'name': self.name,
'type': self.__class__.__name__
}
# デシリアライズ用のファクトリメソッド
@classmethod
def from_dict(cls, data: Dict[str, Any], **kwargs) -> 'Agent':
"""エージェントをデシリアライズ"""
agent = cls(data['name'])
agent.id = data['id']
agent._state = copy.deepcopy(data['state'])
return agent
LangGraphによるマルチエージェントワークフローの例
実際のLangGraphを使用したマルチエージェントシステムでの循環参照回避の例を見てみましょう。
from typing import TypedDict, List, Dict, Any, Annotated
from langgraph.graph import StateGraph
import operator
# エージェントのメッセージ
class AgentMessage(TypedDict):
agent_id: str
content: str
metadata: Dict[str, Any]
# システム状態
class SystemState(TypedDict):
agents: Dict[str, Dict[str, Any]] # エージェントはIDで参照
messages: Annotated[List[AgentMessage], operator.add] # メッセージの履歴
current_agent: str # 現在実行中のエージェントID
task: Dict[str, Any] # 現在のタスク情報
# エージェントノード関数
def agent_node(state: SystemState, agent_id: str):
"""エージェントのアクション処理"""
current_agent = state['agents'][agent_id]
task = state['task']
# エージェントのロジックを実行
response = f"Agent {current_agent['name']} processed: {task['description']}"
# メッセージを追加
new_message: AgentMessage = {
'agent_id': agent_id,
'content': response,
'metadata': {'task_id': task.get('id')}
}
# 更新された状態を返す(直接参照ではなくIDを使用)
return {
'messages': [new_message],
'current_agent': 'coordinator' # 次に実行するエージェントのID
}
# コーディネータノード関数
def coordinator_node(state: SystemState):
"""次のエージェントを決定"""
messages = state['messages']
task = state['task']
# タスク完了かどうかを確認
if len(messages) >= 3: # 例: 3つのメッセージでタスク完了
return {'current_agent': 'end'}
# 次のエージェントを選択(ここではシンプルに)
agent_ids = list(state['agents'].keys())
next_agent = agent_ids[len(messages) % len(agent_ids)]
return {'current_agent': next_agent}
# 条件分岐関数
def route_to_next_agent(state: SystemState):
"""次のエージェントにルーティング"""
return state['current_agent']
# グラフ構築
graph = StateGraph(SystemState)
# ノード追加
graph.add_node('coordinator', coordinator_node)
graph.add_node('agent1', lambda state: agent_node(state, 'agent1'))
graph.add_node('agent2', lambda state: agent_node(state, 'agent2'))
graph.add_node('agent3', lambda state: agent_node(state, 'agent3'))
# エッジ追加
graph.add_conditional_edges('coordinator', route_to_next_agent,
{'agent1': 'agent1', 'agent2': 'agent2', 'agent3': 'agent3', 'end': None})
graph.add_edge('agent1', 'coordinator')
graph.add_edge('agent2', 'coordinator')
graph.add_edge('agent3', 'coordinator')
# グラフのコンパイル
workflow = graph.compile()
これらの戦略を適用することで、マルチエージェントシステムにおける循環参照問題を効果的に回避できます。重要なのは、直接参照を避け、IDやメッセージパッシングを介した間接的な通信を行うことです。
「複雑なものをシンプルにするのではなく、シンプルなものを組み合わせて複雑なものを作る」という原則に従うことで、安定性が高く、メンテナンスしやすいマルチエージェントシステムを構築できます。