Introduction
In this tutorial, we'll build a production-grade multi-agent communication system using LangGraph, Pydantic, and structured messaging patterns. This system implements an ACP (Agent Communication Protocol)-style message bus that allows agents to communicate through a shared state rather than direct function calls. This approach provides modularity, traceability, and scalability for complex AI applications.
By the end of this tutorial, you'll have a working system with three specialized agents (planner, executor, and reviewer) that communicate through a shared state, logging, and structured message handling.
Prerequisites
- Python 3.8+
- Basic understanding of Python and AI concepts
- LangGraph library installed
- Pydantic v2 installed
- Basic knowledge of message queues and shared state patterns
Step-by-Step Instructions
1. Install Required Dependencies
We'll need several packages to implement our multi-agent system. The LangGraph library provides the graph-based orchestration, while Pydantic handles structured data validation.
pip install langgraph pydantic
2. Define the Shared State Schema
First, we'll create a Pydantic model that represents our shared state. This schema will be used by all agents to read and update the system's state.
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
class TaskState(BaseModel):
task_id: str
description: str
status: str = Field(default="pending")
steps: List[str] = Field(default_factory=list)
results: List[str] = Field(default_factory=list)
errors: List[str] = Field(default_factory=list)
timestamp: datetime = Field(default_factory=datetime.now)
current_step: Optional[str] = None
completed_steps: int = 0
Why this step? Using Pydantic models ensures data consistency and validation across all agents. This prevents runtime errors from malformed data and provides clear documentation of the shared state structure.
3. Create the ACP Message Schema
Next, we'll define a structured message format that agents use to communicate. This follows ACP principles by standardizing how information flows between agents.
class ACPMessage(BaseModel):
message_id: str
sender: str
receiver: str
action: str
payload: dict
timestamp: datetime = Field(default_factory=datetime.now)
correlation_id: Optional[str] = None
class Config:
arbitrary_types_allowed = True
Why this step? The ACP message schema provides traceability and context. Each message has a sender, receiver, and action that makes debugging and monitoring easier in production systems.
4. Implement the Message Bus
We'll create a simple message bus that handles routing and delivery of ACP messages between agents.
import asyncio
from typing import Dict, List
from collections import defaultdict
class MessageBus:
def __init__(self):
self.subscribers: Dict[str, List[callable]] = defaultdict(list)
self.message_history = []
def subscribe(self, agent_name: str, callback: callable):
self.subscribers[agent_name].append(callback)
def publish(self, message: ACPMessage):
self.message_history.append(message)
for callback in self.subscribers.get(message.receiver, []):
callback(message)
def get_message_history(self):
return self.message_history
Why this step? The message bus decouples agents from each other, allowing for easier scaling and maintenance. Agents only need to know about the message bus, not about each other directly.
5. Create Specialized Agents
We'll implement three agents: a planner that creates tasks, an executor that performs work, and a reviewer that validates results.
class PlannerAgent:
def __init__(self, message_bus: MessageBus):
self.message_bus = message_bus
self.message_bus.subscribe("planner", self.handle_message)
def handle_message(self, message: ACPMessage):
if message.action == "create_task":
task_state = TaskState(
task_id=message.message_id,
description=message.payload["description"],
status="created"
)
# Send to executor
executor_msg = ACPMessage(
message_id=f"exec_{message.message_id}",
sender="planner",
receiver="executor",
action="execute_task",
payload={"task": task_state.dict()}
)
self.message_bus.publish(executor_msg)
class ExecutorAgent:
def __init__(self, message_bus: MessageBus):
self.message_bus = message_bus
self.message_bus.subscribe("executor", self.handle_message)
def handle_message(self, message: ACPMessage):
if message.action == "execute_task":
task = message.payload["task"]
# Simulate task execution
steps = ["step_1", "step_2", "step_3"]
task["steps"] = steps
task["completed_steps"] = len(steps)
task["status"] = "completed"
# Send to reviewer
review_msg = ACPMessage(
message_id=f"review_{message.message_id}",
sender="executor",
receiver="reviewer",
action="review_results",
payload={"task": task}
)
self.message_bus.publish(review_msg)
class ReviewerAgent:
def __init__(self, message_bus: MessageBus):
self.message_bus = message_bus
self.message_bus.subscribe("reviewer", self.handle_message)
def handle_message(self, message: ACPMessage):
if message.action == "review_results":
task = message.payload["task"]
task["status"] = "approved"
print(f"Task {task['task_id']} approved with {task['completed_steps']} steps")
Why this step? Each agent has a specific responsibility, following the single responsibility principle. This makes the system modular and easier to test and extend.
6. Initialize and Run the System
Now we'll put everything together and run a sample workflow.
def main():
# Initialize components
message_bus = MessageBus()
planner = PlannerAgent(message_bus)
executor = ExecutorAgent(message_bus)
reviewer = ReviewerAgent(message_bus)
# Create initial task
initial_msg = ACPMessage(
message_id="task_001",
sender="user",
receiver="planner",
action="create_task",
payload={"description": "Process customer data"}
)
# Start the workflow
message_bus.publish(initial_msg)
# Print message history
print("Message History:")
for msg in message_bus.get_message_history():
print(f"{msg.sender} -> {msg.receiver}: {msg.action}")
if __name__ == "__main__":
main()
Why this step? This demonstrates how the entire system works end-to-end, showing how messages flow through the system and how each agent responds to different actions.
7. Add Logging and Persistence
To make this production-ready, we'll add basic logging and state persistence.
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Add logging to agents
class LoggedPlannerAgent(PlannerAgent):
def handle_message(self, message: ACPMessage):
logger.info(f"Planner received message: {message.action}")
super().handle_message(message)
# For persistence, you could add a simple file-based storage
# or integrate with a database
class PersistentState:
def __init__(self, filename="state.json"):
self.filename = filename
self.state = {}
self.load()
def save(self):
import json
with open(self.filename, 'w') as f:
json.dump(self.state, f)
def load(self):
import json
try:
with open(self.filename, 'r') as f:
self.state = json.load(f)
except FileNotFoundError:
self.state = {}
Why this step? Logging provides visibility into system behavior for debugging and monitoring. Persistence ensures that state is maintained across system restarts, which is critical for production systems.
Summary
In this tutorial, we've built a production-grade multi-agent communication system using LangGraph's structured message bus, ACP-style messaging, and shared state architecture. We've created three specialized agents that communicate through a centralized message bus, implemented proper data validation with Pydantic, and added logging and persistence capabilities.
This system provides several key benefits:
- Modularity: Each agent has a clear responsibility
- Traceability: All communication is logged and tracked
- Scalability: New agents can be added without modifying existing ones
- Production-readiness: Includes logging, error handling, and state persistence
This architecture can be extended with more sophisticated features like message queuing, distributed state management, or integration with external databases for complex enterprise applications.

