Introduction
In this tutorial, we'll build a simple streaming decision agent that can navigate through a changing environment. Think of it like a robot that needs to find the best path to a goal while avoiding moving obstacles. This agent will use a technique called online A* planning to make decisions in real-time as the environment changes.
Our agent will work in a grid world where the goal and obstacles can move over time. The agent will continuously update its plan as new information comes in, making it reactive to changes in the environment. This is a fundamental concept in robotics and AI systems that need to operate in unpredictable real-world conditions.
Prerequisites
To follow this tutorial, you'll need:
- A computer with Python installed (Python 3.6 or higher)
- Basic understanding of Python programming concepts
- Some familiarity with grid-based pathfinding (though we'll explain this as we go)
Step-by-Step Instructions
1. Set Up Your Python Environment
First, let's create a new Python file called streaming_agent.py and start by importing the necessary libraries.
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from collections import deque
import random
We're importing NumPy for mathematical operations, matplotlib for visualization, deque for efficient list operations, and random for generating random movements.
2. Create the Grid World Environment
Let's define our grid world with a size of 20x20 cells. Each cell can be empty, an obstacle, or the goal.
class GridWorld:
def __init__(self, width=20, height=20):
self.width = width
self.height = height
self.grid = np.zeros((height, width))
self.goal = (15, 15)
self.obstacles = set()
def add_obstacle(self, x, y):
if 0 <= x < self.width and 0 <= y < self.height:
self.obstacles.add((x, y))
self.grid[y, x] = 1 # 1 represents an obstacle
def is_obstacle(self, x, y):
return (x, y) in self.obstacles or x < 0 or y < 0 or x >= self.width or y >= self.height
def update_goal(self, new_x, new_y):
self.goal = (new_x, new_y)
This creates a basic grid world where we can add obstacles and move the goal. The grid uses 0 for empty spaces and 1 for obstacles.
3. Implement the Online A* Path Planner
Now we'll create a simple A* implementation that can work with our streaming environment.
def heuristic(a, b):
return abs(a[0] - b[0]) + abs(a[1] - b[1]) # Manhattan distance
class PathPlanner:
def __init__(self, grid_world):
self.grid_world = grid_world
def find_path(self, start, goal, max_steps=100):
# Simple A* implementation
open_set = [(0, start)]
came_from = {}
g_score = {start: 0}
f_score = {start: heuristic(start, goal)}
while open_set:
current = min(open_set, key=lambda x: x[0])[1]
open_set.remove((f_score[current], current))
if current == goal:
path = []
while current in came_from:
path.append(current)
current = came_from[current]
path.append(start)
return path[::-1]
for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]: # 4-directional movement
neighbor = (current[0] + dx, current[1] + dy)
if self.grid_world.is_obstacle(neighbor[0], neighbor[1]):
continue
tentative_g_score = g_score[current] + 1
if neighbor not in g_score or tentative_g_score < g_score[neighbor]:
came_from[neighbor] = current
g_score[neighbor] = tentative_g_score
f_score[neighbor] = g_score[neighbor] + heuristic(neighbor, goal)
open_set.append((f_score[neighbor], neighbor))
if len(open_set) > max_steps:
break
return [] # No path found
This pathfinder looks for a route from start to goal using the A* algorithm. It's simplified but demonstrates the core concept of pathfinding in our dynamic environment.
4. Create the Streaming Decision Agent
Our agent will continuously make decisions based on the current environment state.
class StreamingAgent:
def __init__(self, grid_world):
self.grid_world = grid_world
self.planner = PathPlanner(grid_world)
self.position = (0, 0)
self.path = []
self.replan_count = 0
def update_position(self, new_x, new_y):
self.position = (new_x, new_y)
def get_next_move(self):
if not self.path:
self.plan_next_path()
if self.path:
next_move = self.path.pop(0)
return next_move
else:
return self.position # Stay in place if no path
def plan_next_path(self):
# Plan only a few steps ahead (receding horizon)
self.path = self.planner.find_path(self.position, self.grid_world.goal, max_steps=10)
self.replan_count += 1
def update_environment(self):
# Simulate environment changes
if random.random() < 0.3: # 30% chance to change something
# Move obstacles
if self.grid_world.obstacles:
old_obstacle = random.choice(list(self.grid_world.obstacles))
new_obstacle = (old_obstacle[0] + random.randint(-1, 1),
old_obstacle[1] + random.randint(-1, 1))
self.grid_world.obstacles.remove(old_obstacle)
self.grid_world.add_obstacle(new_obstacle[0], new_obstacle[1])
# Move goal
new_goal = (self.grid_world.goal[0] + random.randint(-1, 1),
self.grid_world.goal[1] + random.randint(-1, 1))
self.grid_world.update_goal(new_goal[0], new_goal[1])
This agent keeps track of its position and path, replanning when needed. It updates the environment to simulate real-world changes.
5. Add Visualization
Let's make it easy to see what's happening with our agent.
def visualize_agent(agent, frame):
plt.clf()
grid = agent.grid_world.grid.copy()
# Mark obstacles
for obs in agent.grid_world.obstacles:
grid[obs[1], obs[0]] = 2 # Different color for obstacles
# Mark goal
grid[agent.grid_world.goal[1], agent.grid_world.goal[0]] = 3 # Different color for goal
# Mark agent position
grid[agent.position[1], agent.position[0]] = 4 # Different color for agent
plt.imshow(grid, cmap='viridis')
plt.title(f'Frame {frame} - Agent at {agent.position} - Replans: {agent.replan_count}')
plt.colorbar()
# Draw path if exists
if agent.path:
path_x = [p[0] for p in agent.path]
path_y = [p[1] for p in agent.path]
plt.plot(path_x, path_y, 'r-', alpha=0.7, linewidth=2)
plt.xticks(range(agent.grid_world.width))
plt.yticks(range(agent.grid_world.height))
This visualization shows us the grid, obstacles, goal, and agent position with the planned path in red.
6. Run the Simulation
Now let's put everything together in a main simulation loop.
def main_simulation():
# Create environment
world = GridWorld()
# Add some initial obstacles
for i in range(5):
world.add_obstacle(random.randint(2, 18), random.randint(2, 18))
# Create agent
agent = StreamingAgent(world)
# Set up visualization
fig = plt.figure(figsize=(10, 10))
def animate(frame):
agent.update_environment() # Update environment
next_move = agent.get_next_move() # Get next move
agent.update_position(next_move[0], next_move[1]) # Move agent
visualize_agent(agent, frame) # Visualize
# Create animation
ani = animation.FuncAnimation(fig, animate, frames=50, interval=500, repeat=False)
plt.show()
return agent
# Run the simulation
if __name__ == "__main__":
agent = main_simulation()
This creates a complete simulation where we can watch the agent navigate through the changing environment.
Summary
In this tutorial, we built a streaming decision agent that can adapt to changing environments in real-time. We created:
- A grid world environment with obstacles and a moving goal
- An online path planner that finds routes using A* algorithm
- A streaming agent that makes decisions based on current information
- A visualization system to see how the agent navigates
The agent demonstrates key concepts of partial reasoning (it doesn't plan the entire route), online replanning (it updates its plan when needed), and reactive adaptation (it responds to changes in the environment). This approach is essential for real-world applications like autonomous vehicles, robotics, and any system that needs to make decisions in dynamic situations.
While this is a simplified example, it demonstrates the core principles behind more advanced streaming decision agents used in industry today.



