Quick Start Examples
Example 1: Simple Temperature Controller
from composabl import Agent, Skill, Sensor, MaintainGoal, Trainer
# Create an agent
agent = Agent()
# Add a sensor
agent.add_sensors([
Sensor("temperature",
"Current temperature reading",
lambda obs: obs["temp"])
])
# Create a skill with a maintain goal
skill = Skill("temp-controller",
MaintainGoal("temperature",
"Maintain temperature at 25°C",
target=25.0,
stop_distance=0.5))
agent.add_skill(skill)
# Train locally
trainer = Trainer({
"target": {
"local": {"address": "localhost:1337"}
}
})
trainer.train(agent, train_cycles=10)
Example 2: Multi-Skill Agent
from composabl import (
Agent, Skill, Sensor, SkillSelector,
ApproachGoal, AvoidGoal, Trainer
)
# Create agent with multiple sensors
agent = Agent()
agent.add_sensors([
Sensor("position_x", "X coordinate", lambda obs: obs[0]),
Sensor("position_y", "Y coordinate", lambda obs: obs[1]),
Sensor("obstacle_distance", "Distance to nearest obstacle",
lambda obs: obs[2])
])
# Create navigation skills
move_to_target = Skill(
"move-to-target",
ApproachGoal("position_x", "Reach target X", target=10.0)
)
avoid_obstacles = Skill(
"avoid-obstacles",
AvoidGoal("obstacle_distance", "Stay away from obstacles",
target=0.0, stop_distance=2.0)
)
# Create a selector to coordinate skills
class NavigationSelector(SkillSelector):
def select_skill(self, observation):
if observation["obstacle_distance"] < 3.0:
return "avoid-obstacles"
return "move-to-target"
navigation = SkillSelector(
"navigation",
NavigationSelector,
children=["move-to-target", "avoid-obstacles"]
)
# Build the agent
agent.add_skill(move_to_target)
agent.add_skill(avoid_obstacles)
agent.add_skill(navigation)
# Train with Docker simulator
trainer = Trainer({
"target": {
"docker": {
"image": "composabl/sim-navigation:latest"
}
},
"resources": {
"sim_count": 4 # Run 4 parallel simulators
}
})
trainer.train(agent, train_cycles=50)
Example 3: Custom Teacher Implementation
from composabl import Agent, Skill, SkillTeacher, Trainer
class CustomTeacher(SkillTeacher):
def __init__(self):
self.step_count = 0
async def compute_reward(self, obs, action, sim_reward):
# Custom reward logic
distance_to_goal = abs(obs["position"] - obs["target"])
return -distance_to_goal # Negative distance as reward
async def compute_success_criteria(self, obs, action):
# Success when close to target
return abs(obs["position"] - obs["target"]) < 0.1
async def compute_termination(self, obs, action):
# Terminate after 1000 steps or if out of bounds
self.step_count += 1
return self.step_count > 1000 or abs(obs["position"]) > 100
async def transform_sensors(self, sensors, action):
# Pass through sensors unchanged
return sensors
async def transform_action(self, obs, action):
# Clip actions to valid range
return np.clip(action, -1, 1)
async def filtered_sensor_space(self):
# Specify which sensors this skill needs
return ["position", "velocity", "target"]
# Create and train agent
agent = Agent()
agent.add_skill(Skill("custom-skill", CustomTeacher))
trainer = Trainer({"target": {"local": {"address": "localhost:1337"}}})
trainer.train(agent, train_cycles=20)
Complete Example: Industrial Controller
import os
from composabl import (
Agent, Skill, Sensor, Scenario, Perceptor,
MaintainGoal, MinimizeGoal,
SkillController, SkillSelector,
Trainer
)
# Configure environment
os.environ["COMPOSABL_LICENSE"] = "your-license-key"
os.environ["COMPOSABL_EULA_AGREED"] = "1"
# Create perceptor for derivative calculation
class DerivativePerceptor(Perceptor):
def __init__(self):
super().__init__()
self.last_value = None
self.last_time = None
async def compute(self, obs_spec, obs):
import time
current_time = time.time()
if self.last_value is not None:
dt = current_time - self.last_time
derivative = (obs["temperature"] - self.last_value) / dt
else:
derivative = 0
self.last_value = obs["temperature"]
self.last_time = current_time
return {"temperature_rate": derivative}
# Create the agent
agent = Agent()
# Add sensors
agent.add_sensors([
Sensor("temperature", "Current temperature (°C)",
lambda obs: obs["temp"]),
Sensor("pressure", "Current pressure (bar)",
lambda obs: obs["pressure"]),
Sensor("flow_rate", "Flow rate (L/min)",
lambda obs: obs["flow"]),
Sensor("energy_consumption", "Energy usage (kW)",
lambda obs: obs["energy"])
])
# Add perceptor
agent.add_perceptor(Perceptor("temp-derivative", DerivativePerceptor))
# Create skills
# 1. Temperature control
temp_control = Skill(
"temperature-control",
MaintainGoal("temperature",
"Maintain reactor temperature",
target=75.0,
stop_distance=2.0)
)
# 2. Energy optimization
energy_optimization = Skill(
"energy-optimization",
MinimizeGoal("energy_consumption",
"Minimize energy usage")
)
# 3. Emergency shutdown controller
class EmergencyShutdown(SkillController):
async def compute_action(self, obs, action):
if obs["temperature"] > 100 or obs["pressure"] > 10:
return [0, 0, 0] # Shutdown action
return None # Let other skills handle
async def compute_success_criteria(self, obs, action):
return obs["temperature"] < 90 and obs["pressure"] < 8
async def filtered_sensor_space(self):
return ["temperature", "pressure"]
emergency = Skill("emergency-shutdown", EmergencyShutdown)
# 4. Coordinator selector
class ProcessCoordinator(SkillSelector):
def select_skill(self, obs):
# Emergency takes priority
if obs["temperature"] > 95 or obs["pressure"] > 9:
return "emergency-shutdown"
# Normal operation
elif obs["temperature_rate"] > 5: # Rapid temperature change
return "temperature-control"
else:
return "energy-optimization"
coordinator = SkillSelector(
"process-coordinator",
ProcessCoordinator,
children=["temperature-control",
"energy-optimization",
"emergency-shutdown"]
)
# Build agent hierarchy
agent.add_skills([temp_control, energy_optimization, emergency])
agent.add_skill(coordinator)
# Define training scenarios
scenarios = [
Scenario({
"temperature": {"min": 70, "max": 80},
"pressure": {"min": 5, "max": 7},
"flow_rate": 100,
"energy": {"min": 10, "max": 50}
}),
Scenario({
"temperature": 90, # High temp scenario
"pressure": 8,
"flow_rate": 150,
"energy": 75
})
]
# Configure training
config = {
"target": {
"docker": {
"image": "composabl/sim-reactor:latest",
"environment": {
"SCENARIO_MODE": "variable"
}
}
},
"env": {
"name": "reactor-control",
"init": {
"control_frequency": 10, # Hz
"simulation_speed": 100 # 100x real-time
}
},
"algorithm": {
"name": "PPO",
"config": {
"lr": 0.0003,
"gamma": 0.99,
"lambda": 0.95,
"clip_param": 0.2
}
},
"resources": {
"sim_count": 8,
"num_workers": 4
},
"model": {
"fcnet_hiddens": [256, 256],
"fcnet_activation": "relu"
},
"rollout": {
"num_rollout_workers": 4,
"num_envs_per_worker": 2
},
"scenarios": scenarios,
"post_processing": {
"record": {
"enabled": True,
"file_path": "./recordings",
"gif_file_name": "reactor_control.gif"
}
}
}
# Train the agent
trainer = Trainer(config)
# Train with callbacks
def on_cycle_complete(cycle, metrics):
print(f"Cycle {cycle}: Reward = {metrics['episode_reward_mean']:.2f}")
trainer.train(
agent,
train_cycles=100,
callbacks={"on_cycle_complete": on_cycle_complete}
)
# Evaluate the trained agent
print("Evaluating trained agent...")
eval_results = trainer.evaluate(agent, num_episodes=10)
print(f"Average reward: {eval_results['episode_reward_mean']:.2f}")
print(f"Success rate: {eval_results['custom_metrics']['success_rate']:.2%}")
# Export the trained agent
agent.export("./trained_agents/reactor_controller.json")
# Package for deployment
deployed_agent = trainer.package(agent)
print("Agent ready for deployment!")
# Clean up
trainer.close()
Last updated