How to Design a Production-Grade Multi-Agent Communication System Using LangGraph Structured Message Bus, ACP Logging, and Persistent Shared State Architecture
Back to Tutorials
aiTutorialintermediate

How to Design a Production-Grade Multi-Agent Communication System Using LangGraph Structured Message Bus, ACP Logging, and Persistent Shared State Architecture

March 1, 20263 views5 min read

Learn to build a production-grade multi-agent communication system using LangGraph, Pydantic, and structured messaging patterns with ACP-style protocols.

Introduction

In this tutorial, we'll build a production-grade multi-agent communication system using LangGraph, Pydantic, and structured messaging patterns. This system implements an ACP (Agent Communication Protocol)-style message bus that allows agents to communicate through a shared state rather than direct function calls. This approach provides modularity, traceability, and scalability for complex AI applications.

By the end of this tutorial, you'll have a working system with three specialized agents (planner, executor, and reviewer) that communicate through a shared state, logging, and structured message handling.

Prerequisites

  • Python 3.8+
  • Basic understanding of Python and AI concepts
  • LangGraph library installed
  • Pydantic v2 installed
  • Basic knowledge of message queues and shared state patterns

Step-by-Step Instructions

1. Install Required Dependencies

We'll need several packages to implement our multi-agent system. The LangGraph library provides the graph-based orchestration, while Pydantic handles structured data validation.

pip install langgraph pydantic

2. Define the Shared State Schema

First, we'll create a Pydantic model that represents our shared state. This schema will be used by all agents to read and update the system's state.

from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime


class TaskState(BaseModel):
    task_id: str
    description: str
    status: str = Field(default="pending")
    steps: List[str] = Field(default_factory=list)
    results: List[str] = Field(default_factory=list)
    errors: List[str] = Field(default_factory=list)
    timestamp: datetime = Field(default_factory=datetime.now)
    current_step: Optional[str] = None
    completed_steps: int = 0

Why this step? Using Pydantic models ensures data consistency and validation across all agents. This prevents runtime errors from malformed data and provides clear documentation of the shared state structure.

3. Create the ACP Message Schema

Next, we'll define a structured message format that agents use to communicate. This follows ACP principles by standardizing how information flows between agents.

class ACPMessage(BaseModel):
    message_id: str
    sender: str
    receiver: str
    action: str
    payload: dict
    timestamp: datetime = Field(default_factory=datetime.now)
    correlation_id: Optional[str] = None

    class Config:
        arbitrary_types_allowed = True

Why this step? The ACP message schema provides traceability and context. Each message has a sender, receiver, and action that makes debugging and monitoring easier in production systems.

4. Implement the Message Bus

We'll create a simple message bus that handles routing and delivery of ACP messages between agents.

import asyncio
from typing import Dict, List
from collections import defaultdict


class MessageBus:
    def __init__(self):
        self.subscribers: Dict[str, List[callable]] = defaultdict(list)
        self.message_history = []

    def subscribe(self, agent_name: str, callback: callable):
        self.subscribers[agent_name].append(callback)

    def publish(self, message: ACPMessage):
        self.message_history.append(message)
        for callback in self.subscribers.get(message.receiver, []):
            callback(message)

    def get_message_history(self):
        return self.message_history

Why this step? The message bus decouples agents from each other, allowing for easier scaling and maintenance. Agents only need to know about the message bus, not about each other directly.

5. Create Specialized Agents

We'll implement three agents: a planner that creates tasks, an executor that performs work, and a reviewer that validates results.

class PlannerAgent:
    def __init__(self, message_bus: MessageBus):
        self.message_bus = message_bus
        self.message_bus.subscribe("planner", self.handle_message)

    def handle_message(self, message: ACPMessage):
        if message.action == "create_task":
            task_state = TaskState(
                task_id=message.message_id,
                description=message.payload["description"],
                status="created"
            )
            # Send to executor
            executor_msg = ACPMessage(
                message_id=f"exec_{message.message_id}",
                sender="planner",
                receiver="executor",
                action="execute_task",
                payload={"task": task_state.dict()}
            )
            self.message_bus.publish(executor_msg)


class ExecutorAgent:
    def __init__(self, message_bus: MessageBus):
        self.message_bus = message_bus
        self.message_bus.subscribe("executor", self.handle_message)

    def handle_message(self, message: ACPMessage):
        if message.action == "execute_task":
            task = message.payload["task"]
            # Simulate task execution
            steps = ["step_1", "step_2", "step_3"]
            task["steps"] = steps
            task["completed_steps"] = len(steps)
            task["status"] = "completed"
            
            # Send to reviewer
            review_msg = ACPMessage(
                message_id=f"review_{message.message_id}",
                sender="executor",
                receiver="reviewer",
                action="review_results",
                payload={"task": task}
            )
            self.message_bus.publish(review_msg)


class ReviewerAgent:
    def __init__(self, message_bus: MessageBus):
        self.message_bus = message_bus
        self.message_bus.subscribe("reviewer", self.handle_message)

    def handle_message(self, message: ACPMessage):
        if message.action == "review_results":
            task = message.payload["task"]
            task["status"] = "approved"
            print(f"Task {task['task_id']} approved with {task['completed_steps']} steps")

Why this step? Each agent has a specific responsibility, following the single responsibility principle. This makes the system modular and easier to test and extend.

6. Initialize and Run the System

Now we'll put everything together and run a sample workflow.

def main():
    # Initialize components
    message_bus = MessageBus()
    planner = PlannerAgent(message_bus)
    executor = ExecutorAgent(message_bus)
    reviewer = ReviewerAgent(message_bus)

    # Create initial task
    initial_msg = ACPMessage(
        message_id="task_001",
        sender="user",
        receiver="planner",
        action="create_task",
        payload={"description": "Process customer data"}
    )

    # Start the workflow
    message_bus.publish(initial_msg)

    # Print message history
    print("Message History:")
    for msg in message_bus.get_message_history():
        print(f"{msg.sender} -> {msg.receiver}: {msg.action}")

if __name__ == "__main__":
    main()

Why this step? This demonstrates how the entire system works end-to-end, showing how messages flow through the system and how each agent responds to different actions.

7. Add Logging and Persistence

To make this production-ready, we'll add basic logging and state persistence.

import logging
from datetime import datetime

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Add logging to agents

class LoggedPlannerAgent(PlannerAgent):
    def handle_message(self, message: ACPMessage):
        logger.info(f"Planner received message: {message.action}")
        super().handle_message(message)

# For persistence, you could add a simple file-based storage
# or integrate with a database

class PersistentState:
    def __init__(self, filename="state.json"):
        self.filename = filename
        self.state = {}
        self.load()

    def save(self):
        import json
        with open(self.filename, 'w') as f:
            json.dump(self.state, f)

    def load(self):
        import json
        try:
            with open(self.filename, 'r') as f:
                self.state = json.load(f)
        except FileNotFoundError:
            self.state = {}

Why this step? Logging provides visibility into system behavior for debugging and monitoring. Persistence ensures that state is maintained across system restarts, which is critical for production systems.

Summary

In this tutorial, we've built a production-grade multi-agent communication system using LangGraph's structured message bus, ACP-style messaging, and shared state architecture. We've created three specialized agents that communicate through a centralized message bus, implemented proper data validation with Pydantic, and added logging and persistence capabilities.

This system provides several key benefits:

  • Modularity: Each agent has a clear responsibility
  • Traceability: All communication is logged and tracked
  • Scalability: New agents can be added without modifying existing ones
  • Production-readiness: Includes logging, error handling, and state persistence

This architecture can be extended with more sophisticated features like message queuing, distributed state management, or integration with external databases for complex enterprise applications.

Source: MarkTechPost

Related Articles