Quick Start Examples

Example 1: Simple Temperature Controller

from composabl import Agent, Skill, Sensor, MaintainGoal, Trainer

# Create an agent
agent = Agent()

# Add a sensor
agent.add_sensors([
    Sensor("temperature", 
           "Current temperature reading", 
           lambda obs: obs["temp"])
])

# Create a skill with a maintain goal
skill = Skill("temp-controller", 
              MaintainGoal("temperature", 
                          "Maintain temperature at 25°C",
                          target=25.0,
                          stop_distance=0.5))

agent.add_skill(skill)

# Train locally
trainer = Trainer({
    "target": {
        "local": {"address": "localhost:1337"}
    }
})

trainer.train(agent, train_cycles=10)

Example 2: Multi-Skill Agent

from composabl import (
    Agent, Skill, Sensor, SkillSelector,
    ApproachGoal, AvoidGoal, Trainer
)

# Create agent with multiple sensors
agent = Agent()
agent.add_sensors([
    Sensor("position_x", "X coordinate", lambda obs: obs[0]),
    Sensor("position_y", "Y coordinate", lambda obs: obs[1]),
    Sensor("obstacle_distance", "Distance to nearest obstacle", 
           lambda obs: obs[2])
])

# Create navigation skills
move_to_target = Skill(
    "move-to-target",
    ApproachGoal("position_x", "Reach target X", target=10.0)
)

avoid_obstacles = Skill(
    "avoid-obstacles",
    AvoidGoal("obstacle_distance", "Stay away from obstacles", 
              target=0.0, stop_distance=2.0)
)

# Create a selector to coordinate skills
class NavigationSelector(SkillSelector):
    def select_skill(self, observation):
        if observation["obstacle_distance"] < 3.0:
            return "avoid-obstacles"
        return "move-to-target"

navigation = SkillSelector(
    "navigation",
    NavigationSelector,
    children=["move-to-target", "avoid-obstacles"]
)

# Build the agent
agent.add_skill(move_to_target)
agent.add_skill(avoid_obstacles)
agent.add_skill(navigation)

# Train with Docker simulator
trainer = Trainer({
    "target": {
        "docker": {
            "image": "composabl/sim-navigation:latest"
        }
    },
    "resources": {
        "sim_count": 4  # Run 4 parallel simulators
    }
})

trainer.train(agent, train_cycles=50)

Example 3: Custom Teacher Implementation

from composabl import Agent, Skill, SkillTeacher, Trainer

class CustomTeacher(SkillTeacher):
    def __init__(self):
        self.step_count = 0
        
    async def compute_reward(self, obs, action, sim_reward):
        # Custom reward logic
        distance_to_goal = abs(obs["position"] - obs["target"])
        return -distance_to_goal  # Negative distance as reward
    
    async def compute_success_criteria(self, obs, action):
        # Success when close to target
        return abs(obs["position"] - obs["target"]) < 0.1
    
    async def compute_termination(self, obs, action):
        # Terminate after 1000 steps or if out of bounds
        self.step_count += 1
        return self.step_count > 1000 or abs(obs["position"]) > 100
    
    async def transform_sensors(self, sensors, action):
        # Pass through sensors unchanged
        return sensors
    
    async def transform_action(self, obs, action):
        # Clip actions to valid range
        return np.clip(action, -1, 1)
    
    async def filtered_sensor_space(self):
        # Specify which sensors this skill needs
        return ["position", "velocity", "target"]

# Create and train agent
agent = Agent()
agent.add_skill(Skill("custom-skill", CustomTeacher))

trainer = Trainer({"target": {"local": {"address": "localhost:1337"}}})
trainer.train(agent, train_cycles=20)

Complete Example: Industrial Controller

import os
from composabl import (
    Agent, Skill, Sensor, Scenario, Perceptor,
    MaintainGoal, MinimizeGoal, 
    SkillController, SkillSelector,
    Trainer
)

# Configure environment
os.environ["COMPOSABL_LICENSE"] = "your-license-key"
os.environ["COMPOSABL_EULA_AGREED"] = "1"

# Create perceptor for derivative calculation
class DerivativePerceptor(Perceptor):
    def __init__(self):
        super().__init__()
        self.last_value = None
        self.last_time = None
    
    async def compute(self, obs_spec, obs):
        import time
        current_time = time.time()
        
        if self.last_value is not None:
            dt = current_time - self.last_time
            derivative = (obs["temperature"] - self.last_value) / dt
        else:
            derivative = 0
        
        self.last_value = obs["temperature"]
        self.last_time = current_time
        
        return {"temperature_rate": derivative}

# Create the agent
agent = Agent()

# Add sensors
agent.add_sensors([
    Sensor("temperature", "Current temperature (°C)", 
           lambda obs: obs["temp"]),
    Sensor("pressure", "Current pressure (bar)", 
           lambda obs: obs["pressure"]),
    Sensor("flow_rate", "Flow rate (L/min)", 
           lambda obs: obs["flow"]),
    Sensor("energy_consumption", "Energy usage (kW)", 
           lambda obs: obs["energy"])
])

# Add perceptor
agent.add_perceptor(Perceptor("temp-derivative", DerivativePerceptor))

# Create skills
# 1. Temperature control
temp_control = Skill(
    "temperature-control",
    MaintainGoal("temperature", 
                "Maintain reactor temperature",
                target=75.0, 
                stop_distance=2.0)
)

# 2. Energy optimization
energy_optimization = Skill(
    "energy-optimization",
    MinimizeGoal("energy_consumption",
                "Minimize energy usage")
)

# 3. Emergency shutdown controller
class EmergencyShutdown(SkillController):
    async def compute_action(self, obs, action):
        if obs["temperature"] > 100 or obs["pressure"] > 10:
            return [0, 0, 0]  # Shutdown action
        return None  # Let other skills handle
    
    async def compute_success_criteria(self, obs, action):
        return obs["temperature"] < 90 and obs["pressure"] < 8
    
    async def filtered_sensor_space(self):
        return ["temperature", "pressure"]

emergency = Skill("emergency-shutdown", EmergencyShutdown)

# 4. Coordinator selector
class ProcessCoordinator(SkillSelector):
    def select_skill(self, obs):
        # Emergency takes priority
        if obs["temperature"] > 95 or obs["pressure"] > 9:
            return "emergency-shutdown"
        # Normal operation
        elif obs["temperature_rate"] > 5:  # Rapid temperature change
            return "temperature-control"
        else:
            return "energy-optimization"

coordinator = SkillSelector(
    "process-coordinator",
    ProcessCoordinator,
    children=["temperature-control", 
              "energy-optimization", 
              "emergency-shutdown"]
)

# Build agent hierarchy
agent.add_skills([temp_control, energy_optimization, emergency])
agent.add_skill(coordinator)

# Define training scenarios
scenarios = [
    Scenario({
        "temperature": {"min": 70, "max": 80},
        "pressure": {"min": 5, "max": 7},
        "flow_rate": 100,
        "energy": {"min": 10, "max": 50}
    }),
    Scenario({
        "temperature": 90,  # High temp scenario
        "pressure": 8,
        "flow_rate": 150,
        "energy": 75
    })
]

# Configure training
config = {
    "target": {
        "docker": {
            "image": "composabl/sim-reactor:latest",
            "environment": {
                "SCENARIO_MODE": "variable"
            }
        }
    },
    "env": {
        "name": "reactor-control",
        "init": {
            "control_frequency": 10,  # Hz
            "simulation_speed": 100  # 100x real-time
        }
    },
    "algorithm": {
        "name": "PPO",
        "config": {
            "lr": 0.0003,
            "gamma": 0.99,
            "lambda": 0.95,
            "clip_param": 0.2
        }
    },
    "resources": {
        "sim_count": 8,
        "num_workers": 4
    },
    "model": {
        "fcnet_hiddens": [256, 256],
        "fcnet_activation": "relu"
    },
    "rollout": {
        "num_rollout_workers": 4,
        "num_envs_per_worker": 2
    },
    "scenarios": scenarios,
    "post_processing": {
        "record": {
            "enabled": True,
            "file_path": "./recordings",
            "gif_file_name": "reactor_control.gif"
        }
    }
}

# Train the agent
trainer = Trainer(config)

# Train with callbacks
def on_cycle_complete(cycle, metrics):
    print(f"Cycle {cycle}: Reward = {metrics['episode_reward_mean']:.2f}")

trainer.train(
    agent, 
    train_cycles=100,
    callbacks={"on_cycle_complete": on_cycle_complete}
)

# Evaluate the trained agent
print("Evaluating trained agent...")
eval_results = trainer.evaluate(agent, num_episodes=10)
print(f"Average reward: {eval_results['episode_reward_mean']:.2f}")
print(f"Success rate: {eval_results['custom_metrics']['success_rate']:.2%}")

# Export the trained agent
agent.export("./trained_agents/reactor_controller.json")

# Package for deployment
deployed_agent = trainer.package(agent)
print("Agent ready for deployment!")

# Clean up
trainer.close()

Last updated