How to Design a Streaming Decision Agent with Partial Reasoning, Online Replanning, and Reactive Mid-Execution Adaptation in Dynamic Environments
Back to Tutorials
aiTutorialbeginner

How to Design a Streaming Decision Agent with Partial Reasoning, Online Replanning, and Reactive Mid-Execution Adaptation in Dynamic Environments

March 11, 20264 views5 min read

Learn to build a streaming decision agent that navigates dynamic environments using online A* planning and reactive adaptation techniques.

Introduction

In this tutorial, we'll build a simple streaming decision agent that can navigate through a changing environment. Think of it like a robot that needs to find the best path to a goal while avoiding moving obstacles. This agent will use a technique called online A* planning to make decisions in real-time as the environment changes.

Our agent will work in a grid world where the goal and obstacles can move over time. The agent will continuously update its plan as new information comes in, making it reactive to changes in the environment. This is a fundamental concept in robotics and AI systems that need to operate in unpredictable real-world conditions.

Prerequisites

To follow this tutorial, you'll need:

  • A computer with Python installed (Python 3.6 or higher)
  • Basic understanding of Python programming concepts
  • Some familiarity with grid-based pathfinding (though we'll explain this as we go)

Step-by-Step Instructions

1. Set Up Your Python Environment

First, let's create a new Python file called streaming_agent.py and start by importing the necessary libraries.

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from collections import deque
import random

We're importing NumPy for mathematical operations, matplotlib for visualization, deque for efficient list operations, and random for generating random movements.

2. Create the Grid World Environment

Let's define our grid world with a size of 20x20 cells. Each cell can be empty, an obstacle, or the goal.

class GridWorld:
    def __init__(self, width=20, height=20):
        self.width = width
        self.height = height
        self.grid = np.zeros((height, width))
        self.goal = (15, 15)
        self.obstacles = set()
        
    def add_obstacle(self, x, y):
        if 0 <= x < self.width and 0 <= y < self.height:
            self.obstacles.add((x, y))
            self.grid[y, x] = 1  # 1 represents an obstacle
            
    def is_obstacle(self, x, y):
        return (x, y) in self.obstacles or x < 0 or y < 0 or x >= self.width or y >= self.height
        
    def update_goal(self, new_x, new_y):
        self.goal = (new_x, new_y)

This creates a basic grid world where we can add obstacles and move the goal. The grid uses 0 for empty spaces and 1 for obstacles.

3. Implement the Online A* Path Planner

Now we'll create a simple A* implementation that can work with our streaming environment.

def heuristic(a, b):
    return abs(a[0] - b[0]) + abs(a[1] - b[1])  # Manhattan distance

class PathPlanner:
    def __init__(self, grid_world):
        self.grid_world = grid_world
        
    def find_path(self, start, goal, max_steps=100):
        # Simple A* implementation
        open_set = [(0, start)]
        came_from = {}
        g_score = {start: 0}
        f_score = {start: heuristic(start, goal)}
        
        while open_set:
            current = min(open_set, key=lambda x: x[0])[1]
            open_set.remove((f_score[current], current))
            
            if current == goal:
                path = []
                while current in came_from:
                    path.append(current)
                    current = came_from[current]
                path.append(start)
                return path[::-1]
            
            for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:  # 4-directional movement
                neighbor = (current[0] + dx, current[1] + dy)
                
                if self.grid_world.is_obstacle(neighbor[0], neighbor[1]):
                    continue
                
                tentative_g_score = g_score[current] + 1
                
                if neighbor not in g_score or tentative_g_score < g_score[neighbor]:
                    came_from[neighbor] = current
                    g_score[neighbor] = tentative_g_score
                    f_score[neighbor] = g_score[neighbor] + heuristic(neighbor, goal)
                    open_set.append((f_score[neighbor], neighbor))
                    
                    if len(open_set) > max_steps:
                        break
        
        return []  # No path found

This pathfinder looks for a route from start to goal using the A* algorithm. It's simplified but demonstrates the core concept of pathfinding in our dynamic environment.

4. Create the Streaming Decision Agent

Our agent will continuously make decisions based on the current environment state.

class StreamingAgent:
    def __init__(self, grid_world):
        self.grid_world = grid_world
        self.planner = PathPlanner(grid_world)
        self.position = (0, 0)
        self.path = []
        self.replan_count = 0
        
    def update_position(self, new_x, new_y):
        self.position = (new_x, new_y)
        
    def get_next_move(self):
        if not self.path:
            self.plan_next_path()
            
        if self.path:
            next_move = self.path.pop(0)
            return next_move
        else:
            return self.position  # Stay in place if no path
            
    def plan_next_path(self):
        # Plan only a few steps ahead (receding horizon)
        self.path = self.planner.find_path(self.position, self.grid_world.goal, max_steps=10)
        self.replan_count += 1
        
    def update_environment(self):
        # Simulate environment changes
        if random.random() < 0.3:  # 30% chance to change something
            # Move obstacles
            if self.grid_world.obstacles:
                old_obstacle = random.choice(list(self.grid_world.obstacles))
                new_obstacle = (old_obstacle[0] + random.randint(-1, 1), 
                               old_obstacle[1] + random.randint(-1, 1))
                self.grid_world.obstacles.remove(old_obstacle)
                self.grid_world.add_obstacle(new_obstacle[0], new_obstacle[1])
                
            # Move goal
            new_goal = (self.grid_world.goal[0] + random.randint(-1, 1), 
                       self.grid_world.goal[1] + random.randint(-1, 1))
            self.grid_world.update_goal(new_goal[0], new_goal[1])

This agent keeps track of its position and path, replanning when needed. It updates the environment to simulate real-world changes.

5. Add Visualization

Let's make it easy to see what's happening with our agent.

def visualize_agent(agent, frame):
    plt.clf()
    grid = agent.grid_world.grid.copy()
    
    # Mark obstacles
    for obs in agent.grid_world.obstacles:
        grid[obs[1], obs[0]] = 2  # Different color for obstacles
    
    # Mark goal
    grid[agent.grid_world.goal[1], agent.grid_world.goal[0]] = 3  # Different color for goal
    
    # Mark agent position
    grid[agent.position[1], agent.position[0]] = 4  # Different color for agent
    
    plt.imshow(grid, cmap='viridis')
    plt.title(f'Frame {frame} - Agent at {agent.position} - Replans: {agent.replan_count}')
    plt.colorbar()
    
    # Draw path if exists
    if agent.path:
        path_x = [p[0] for p in agent.path]
        path_y = [p[1] for p in agent.path]
        plt.plot(path_x, path_y, 'r-', alpha=0.7, linewidth=2)
        
    plt.xticks(range(agent.grid_world.width))
    plt.yticks(range(agent.grid_world.height))

This visualization shows us the grid, obstacles, goal, and agent position with the planned path in red.

6. Run the Simulation

Now let's put everything together in a main simulation loop.

def main_simulation():
    # Create environment
    world = GridWorld()
    
    # Add some initial obstacles
    for i in range(5):
        world.add_obstacle(random.randint(2, 18), random.randint(2, 18))
    
    # Create agent
    agent = StreamingAgent(world)
    
    # Set up visualization
    fig = plt.figure(figsize=(10, 10))
    
    def animate(frame):
        agent.update_environment()  # Update environment
        next_move = agent.get_next_move()  # Get next move
        agent.update_position(next_move[0], next_move[1])  # Move agent
        visualize_agent(agent, frame)  # Visualize
        
    # Create animation
    ani = animation.FuncAnimation(fig, animate, frames=50, interval=500, repeat=False)
    plt.show()
    
    return agent

# Run the simulation
if __name__ == "__main__":
    agent = main_simulation()

This creates a complete simulation where we can watch the agent navigate through the changing environment.

Summary

In this tutorial, we built a streaming decision agent that can adapt to changing environments in real-time. We created:

  • A grid world environment with obstacles and a moving goal
  • An online path planner that finds routes using A* algorithm
  • A streaming agent that makes decisions based on current information
  • A visualization system to see how the agent navigates

The agent demonstrates key concepts of partial reasoning (it doesn't plan the entire route), online replanning (it updates its plan when needed), and reactive adaptation (it responds to changes in the environment). This approach is essential for real-world applications like autonomous vehicles, robotics, and any system that needs to make decisions in dynamic situations.

While this is a simplified example, it demonstrates the core principles behind more advanced streaming decision agents used in industry today.

Source: MarkTechPost

Related Articles