Introduction
In this tutorial, we'll build a lightweight vision-language-action-inspired embodied agent that learns to perceive, plan, predict, and replan directly from pixel observations. This agent operates in a simplified grid-world environment, using RGB frames as input rather than symbolic state variables. The agent combines latent world modeling with model predictive control to make intelligent decisions in real-time. This approach demonstrates how embodied agents can learn to navigate complex environments using only visual input, without relying on explicit environmental models or pre-defined knowledge.
By the end of this tutorial, you'll have a working agent that can navigate a grid world, learn from its environment, and make decisions based on visual observations.
Prerequisites
- Python 3.7+
- NumPy
- Matplotlib
- OpenCV (cv2)
- PyTorch (for neural network components)
Note: This tutorial assumes familiarity with basic machine learning concepts, neural networks, and reinforcement learning principles.
Step-by-Step Instructions
1. Set up the Environment
First, create a new Python file and import the necessary libraries:
import numpy as np
import matplotlib.pyplot as plt
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
Why: We're importing essential libraries for numerical computation, visualization, image processing, and neural network implementation.
2. Create the Grid World Environment
Define a simple grid world with obstacles and a goal:
class GridWorld:
def __init__(self, width=10, height=10):
self.width = width
self.height = height
self.grid = np.zeros((height, width))
# Add some obstacles
self.grid[3:5, 3:7] = 1 # Obstacle
self.grid[7:9, 2:5] = 1 # Obstacle
self.start = (0, 0)
self.goal = (9, 9)
self.agent_pos = self.start
def reset(self):
self.agent_pos = self.start
return self.get_observation()
def get_observation(self):
# Create RGB frame from grid
obs = np.zeros((self.height, self.width, 3))
obs[self.agent_pos] = [0, 255, 0] # Agent in green
obs[self.goal] = [255, 0, 0] # Goal in red
obs[self.grid == 1] = [128, 128, 128] # Obstacles in gray
return obs.astype(np.uint8)
def step(self, action):
# Action: 0=up, 1=down, 2=left, 3=right
new_pos = list(self.agent_pos)
if action == 0: new_pos[0] = max(0, new_pos[0] - 1)
elif action == 1: new_pos[0] = min(self.height - 1, new_pos[0] + 1)
elif action == 2: new_pos[1] = max(0, new_pos[1] - 1)
elif action == 3: new_pos[1] = min(self.width - 1, new_pos[1] + 1)
# Check if new position is valid
if self.grid[new_pos[0], new_pos[1]] == 0:
self.agent_pos = tuple(new_pos)
reward = -1 # Small penalty for each step
done = (self.agent_pos == self.goal)
if done:
reward = 100 # Large reward for reaching goal
return self.get_observation(), reward, done
Why: This creates a simplified environment where the agent can learn to navigate from start to goal while avoiding obstacles. The RGB frames represent the agent's visual perception.
3. Implement the World Model
Build a lightweight world model using a simple encoder-decoder architecture:
class WorldModel(nn.Module):
def __init__(self, input_channels=3, latent_dim=64):
super(WorldModel, self).__init__()
# Encoder
self.encoder = nn.Sequential(
nn.Conv2d(input_channels, 32, 3, stride=2, padding=1),
nn.ReLU(),
nn.Conv2d(32, 64, 3, stride=2, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool2d((4, 4)),
nn.Flatten(),
nn.Linear(64 * 4 * 4, latent_dim),
nn.ReLU()
)
# Decoder
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 64 * 4 * 4),
nn.ReLU(),
nn.Unflatten(1, (64, 4, 4)),
nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1),
nn.ReLU(),
nn.ConvTranspose2d(32, input_channels, 3, stride=2, padding=1, output_padding=1),
nn.Sigmoid()
)
# Dynamics model
self.dynamics = nn.LSTM(latent_dim, latent_dim, batch_first=True)
def forward(self, x):
latent = self.encoder(x)
reconstructed = self.decoder(latent)
return reconstructed, latent
Why: The world model learns to compress visual observations into latent representations and reconstruct them, enabling the agent to predict future states based on its current understanding of the environment.
4. Implement Model Predictive Control (MPC)
Create a simple MPC controller that plans ahead:
class MPCController:
def __init__(self, model, horizon=5, num_actions=4):
self.model = model
self.horizon = horizon
self.num_actions = num_actions
def plan(self, current_obs, goal_obs):
best_score = float('inf')
best_action = 0
# Simple greedy planning
for action in range(self.num_actions):
# Predict future states
obs = current_obs
total_reward = 0
for t in range(self.horizon):
# Simple reward calculation
reward = -np.linalg.norm(obs - goal_obs) / 1000
total_reward += reward
# For simplicity, we'll use a basic transition model
obs = self._simple_transition(obs, action)
if np.linalg.norm(obs - goal_obs) < 10:
total_reward += 100
break
if total_reward < best_score:
best_score = total_reward
best_action = action
return best_action
def _simple_transition(self, obs, action):
# Simplified transition model
return obs
Why: MPC allows the agent to look ahead and choose actions that maximize future rewards, making it more intelligent than simple reactive control.
5. Train the Agent
Implement the training loop:
def train_agent(episodes=1000):
env = GridWorld()
model = WorldModel()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Training loop
for episode in range(episodes):
obs = env.reset()
done = False
total_reward = 0
while not done:
# Convert observation to tensor
obs_tensor = torch.tensor(obs, dtype=torch.float32).permute(2, 0, 1).unsqueeze(0)
# Forward pass
reconstructed, latent = model(obs_tensor)
# Loss function
loss = nn.MSELoss()(reconstructed, obs_tensor)
# Backward pass
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Simple action selection (for demonstration)
action = random.randint(0, 3)
obs, reward, done = env.step(action)
total_reward += reward
if episode % 100 == 0:
print(f'Episode {episode}, Total Reward: {total_reward}')
Why: This training process teaches the world model to accurately reconstruct visual observations, which is essential for the agent's ability to predict future states.
6. Run the Complete Agent
Combine everything into a working agent:
def run_agent():
env = GridWorld()
model = WorldModel()
mpc = MPCController(model)
obs = env.reset()
done = False
while not done:
# Convert observation to tensor
obs_tensor = torch.tensor(obs, dtype=torch.float32).permute(2, 0, 1).unsqueeze(0)
# Plan using MPC
action = mpc.plan(obs, env.get_observation())
# Execute action
obs, reward, done = env.step(action)
# Visualize
plt.imshow(obs)
plt.title(f'Action: {action}, Reward: {reward}')
plt.show()
# Small delay to see progress
import time
time.sleep(0.5)
print('Goal reached!')
Why: This final step demonstrates the complete agent working in the environment, combining world modeling with MPC for intelligent decision-making.
Summary
In this tutorial, we've built a lightweight embodied agent that learns to navigate a grid world using vision-language-action principles. The agent combines a world model for perception and prediction with model predictive control for planning. While this is a simplified implementation, it demonstrates core concepts used in more complex embodied AI systems. The agent learns to interpret visual observations, predict future states, and make decisions to reach its goal efficiently.
This approach showcases how embodied agents can learn complex behaviors from raw pixel observations, without requiring explicit environmental models or symbolic representations. The techniques demonstrated here form the foundation for more sophisticated agents that can operate in real-world environments.



