Introduction
In today's rapidly evolving AI landscape, companies are deploying autonomous AI agents to handle various tasks across their networks. These agents can perform complex operations, make decisions, and even coordinate with each other. However, as more agents are introduced, the lack of proper interaction infrastructure leads to chaos, duplicated work, and inefficient operations. This tutorial will guide you through creating a simple interaction framework for AI agents using Python and message queues, which will help you understand how to structure communication between autonomous AI agents.
Prerequisites
To follow along with this tutorial, you'll need:
- Python 3.7 or higher installed on your system
- Basic understanding of Python programming concepts
- Knowledge of how to install Python packages using pip
- Access to a terminal or command prompt
What You'll Build
In this tutorial, you'll create a simple interaction infrastructure for AI agents that can communicate through a message queue. You'll build two types of agents: a task executor and a task coordinator, which will demonstrate how agents can work together without direct communication.
Step 1: Setting Up Your Environment
1.1 Create a Project Directory
First, create a new directory for this project and navigate into it:
mkdir ai-agents-framework
cd ai-agents-framework
1.2 Install Required Packages
We'll use Redis as our message broker, which will serve as the interaction infrastructure. Install the required packages:
pip install redis
Step 2: Creating the Message Broker
2.1 Initialize Redis Connection
Let's start by creating a file called broker.py that will handle all communication between agents:
import redis
import json
# Initialize Redis connection
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)
# Define channels for different types of messages
TASK_CHANNEL = 'task_queue'
COORDINATION_CHANNEL = 'coordination_queue'
# Function to send a message to a channel
def send_message(channel, message):
redis_client.publish(channel, json.dumps(message))
# Function to listen for messages on a channel
def listen_messages(channel, callback):
pubsub = redis_client.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
callback(json.loads(message['data']))
2.2 Understanding the Broker
The broker acts as the central communication hub. Agents send messages to specific channels, and other agents listen to these channels to receive information. This pattern allows agents to work independently while still being able to coordinate their actions.
Step 3: Creating an AI Agent
3.1 Create the Task Executor Agent
Now, create a file called executor.py that will represent an AI agent that performs tasks:
import time
import random
from broker import listen_messages, TASK_CHANNEL, send_message
# Simulate task execution
def execute_task(task):
print(f"Executing task: {task['description']}")
# Simulate work being done
time.sleep(random.randint(1, 3))
print(f"Completed task: {task['description']}")
# Send completion message
completion_message = {
'type': 'task_completed',
'task_id': task['id'],
'agent_id': 'executor_1',
'timestamp': time.time()
}
send_message(COORDINATION_CHANNEL, completion_message)
# Listen for tasks
def start_executor():
def handle_task(message):
if message['type'] == 'task_assigned':
execute_task(message['task'])
print("Task executor started, listening for tasks...")
listen_messages(TASK_CHANNEL, handle_task)
if __name__ == "__main__":
start_executor()
3.2 Understanding the Executor Agent
This agent listens for tasks on the task_queue channel. When it receives a task, it executes it and then sends a completion message to the coordination_queue channel. This ensures that other agents are informed when work is done.
Step 4: Creating a Coordinator Agent
4.1 Create the Coordinator Agent
Create a file called coordinator.py that will manage task assignments:
import time
import random
from broker import listen_messages, TASK_CHANNEL, COORDINATION_CHANNEL, send_message
# Simulate task creation
def create_task():
task_id = random.randint(1000, 9999)
task_description = f"Task {task_id} - Data processing"
task = {
'id': task_id,
'description': task_description,
'priority': random.choice(['high', 'medium', 'low'])
}
return task
# Assign tasks to executors
def assign_task():
task = create_task()
task_message = {
'type': 'task_assigned',
'task': task,
'timestamp': time.time()
}
print(f"Assigning task: {task['description']}")
send_message(TASK_CHANNEL, task_message)
# Listen for task completions
def start_coordinator():
def handle_completion(message):
if message['type'] == 'task_completed':
print(f"Task {message['task_id']} completed by {message['agent_id']}")
# Assign another task
time.sleep(2)
assign_task()
print("Coordinator started, listening for completions...")
listen_messages(COORDINATION_CHANNEL, handle_completion)
# Start by assigning a task
assign_task()
if __name__ == "__main__":
start_coordinator()
4.2 Understanding the Coordinator
The coordinator agent creates tasks and assigns them to executors via the message broker. It also listens for completion messages to know when tasks are done and can assign new ones. This demonstrates how agents can coordinate without direct communication.
Step 5: Running Your AI Agent System
5.1 Start Redis Server
Before running your agents, you need to have Redis running on your system. If you don't have Redis installed, you can download it from redis.io. Once installed, start the Redis server:
redis-server
5.2 Run the Coordinator Agent
In one terminal, run the coordinator agent:
python coordinator.py
5.3 Run the Executor Agent
In another terminal, run the executor agent:
python executor.py
5.4 Observe the Interaction
You should see messages flowing between the agents. The coordinator will assign tasks, the executor will complete them, and the coordinator will be notified of completion before assigning new tasks.
Summary
In this tutorial, you've created a basic interaction infrastructure for AI agents using Redis as a message broker. You've built two types of agents: a task executor that performs work and a task coordinator that assigns tasks and manages the workflow. This interaction framework prevents the chaos that occurs when autonomous agents operate without coordination.
The key concept here is that agents communicate through a central message queue rather than directly with each other. This approach provides several benefits:
- Decoupling: Agents don't need to know about each other directly
- Scalability: You can add more agents without changing existing code
- Reliability: Messages are stored until they are processed
- Flexibility: Agents can be developed and deployed independently
This simple framework demonstrates how interaction infrastructure can prevent automation waste by ensuring that AI agents work together efficiently rather than competing for resources or duplicating efforts.



