Skills API
Skill Configuration
from composabl_core.config import SkillConfig
skill_config = SkillConfig(
name="temperature-control",
type="SkillTeacher",
config={
"learning_rate": 0.001,
"hidden_layers": [128, 128],
"activation": "tanh"
}
)
Skills API
Skills define agent behaviors through different implementation strategies.
Skill Types
1. SkillTeacher (Learning-based)
from composabl import SkillTeacher
class CustomTeacher(SkillTeacher):
def __init__(self, target_position=10.0):
self.target = target_position
self.episode_steps = 0
async def compute_reward(self, transformed_obs, action, sim_reward):
"""Calculate reward for reinforcement learning"""
distance = abs(transformed_obs["position"] - self.target)
# Shaped reward
reward = -distance # Negative distance
# Bonus for reaching target
if distance < 0.1:
reward += 100
# Penalty for energy usage
reward -= 0.1 * abs(action[0])
return reward
async def compute_success_criteria(self, transformed_obs, action):
"""Define success condition"""
return abs(transformed_obs["position"] - self.target) < 0.1
async def compute_termination(self, transformed_obs, action):
"""Define episode termination"""
self.episode_steps += 1
# Terminate on success
if await self.compute_success_criteria(transformed_obs, action):
return True
# Terminate on failure conditions
if abs(transformed_obs["position"]) > 100: # Out of bounds
return True
# Terminate on timeout
return self.episode_steps >= 1000
async def transform_sensors(self, sensors, action):
"""Preprocess sensors if needed"""
# Normalize position to [-1, 1]
transformed = dict(sensors)
if "position" in transformed:
transformed["position"] = transformed["position"] / 50.0
return transformed
async def transform_action(self, transformed_obs, action):
"""Transform action to simulator space"""
# Clip action to valid range
return np.clip(action, -1, 1)
async def filtered_sensor_space(self):
"""Specify which sensors this skill needs"""
return ["position", "velocity", "target"]
async def compute_action_mask(self, transformed_obs, action):
"""Optional: Define valid actions"""
# Example: Disable reverse if at boundary
if transformed_obs["position"] <= -50:
return [True, False] # Can only go forward
elif transformed_obs["position"] >= 50:
return [False, True] # Can only go backward
return None # All actions valid
# Create skill with teacher
skill = Skill("reach-target", CustomTeacher(target_position=25.0))
2. SkillController (Programmatic)
from composabl import SkillController
class PIDController(SkillController):
def __init__(self, kp=1.0, ki=0.1, kd=0.01):
self.kp = kp
self.ki = ki
self.kd = kd
self.integral = 0
self.last_error = 0
async def compute_action(self, transformed_obs, action):
"""Compute PID control action"""
# Calculate error
error = transformed_obs["setpoint"] - transformed_obs["measurement"]
# P term
p_term = self.kp * error
# I term
self.integral += error
i_term = self.ki * self.integral
# D term
derivative = error - self.last_error
d_term = self.kd * derivative
# Combined output
output = p_term + i_term + d_term
# Update state
self.last_error = error
return [output]
async def compute_success_criteria(self, transformed_obs, action):
"""Success when error is small"""
error = abs(transformed_obs["setpoint"] - transformed_obs["measurement"])
return error < 0.01
async def compute_termination(self, transformed_obs, action):
"""Never terminate - continuous control"""
return False
async def transform_sensors(self, sensors, action):
"""Pass through"""
return sensors
async def transform_action(self, transformed_obs, action):
"""Clip to actuator limits"""
return np.clip(action, -100, 100)
async def filtered_sensor_space(self):
"""Required sensors"""
return ["measurement", "setpoint"]
# Create skill with controller
pid_skill = Skill("pid-control", PIDController(kp=2.0, ki=0.5, kd=0.1))
3. SkillSelector
from composabl import SkillSelector
class AdaptiveSelector(SkillSelector):
"""Selects between different control strategies"""
async def compute_action(self, transformed_obs, action):
"""Return selected skill index"""
error = abs(transformed_obs["error"])
if error > 10:
return [0] # Aggressive control
elif error > 1:
return [1] # Moderate control
else:
return [2] # Fine control
async def compute_success_criteria(self, transformed_obs, action):
"""Success when system is stable"""
return transformed_obs["error"] < 0.1 and transformed_obs["rate"] < 0.01
async def filtered_sensor_space(self):
return ["error", "rate", "mode"]
# Create selector with child skills
selector = SkillSelector(
name="adaptive-control",
implementation=AdaptiveSelector,
children=["aggressive-pid", "moderate-pid", "fine-pid"]
)
4. Coordinated Skills
from composabl import (
SkillCoordinatedSet,
SkillCoordinatedPopulation,
SkillPopulation,
SkillCoach
)
# Coach for coordinated skills
class TeamCoach(SkillCoach):
async def compute_reward(self, transformed_obs, action, sim_reward):
"""Reward for team coordination"""
# Reward based on team performance
team_distance = transformed_obs["team_spread"]
target_reached = transformed_obs["targets_reached"]
reward = target_reached * 10 # Reward for reaching targets
reward -= team_distance * 0.1 # Penalty for spreading too far
return reward
async def compute_success_criteria(self, transformed_obs, action):
return transformed_obs["all_targets_reached"]
async def filtered_sensor_space(self):
return ["team_spread", "targets_reached", "all_targets_reached"]
# Coordinate specific agents
team_set = SkillCoordinatedSet(
name="team-coordination",
implementation=TeamCoach,
skills=[
Skill("agent-1", Agent1Controller),
Skill("agent-2", Agent2Controller),
Skill("agent-3", Agent3Controller)
]
)
# Coordinate a population
swarm = SkillCoordinatedPopulation(
name="swarm-behavior",
implementation=SwarmCoach,
skills=[
SkillPopulation("drone", DroneController, amount=10),
SkillPopulation("scout", ScoutController, amount=2)
]
)
Skill Composition Patterns
# Hierarchical skill structure
navigation = SkillSelector("navigation", NavigationSelector, [
Skill("path-planning", PathPlanner),
SkillSelector("obstacle-avoidance", ObstacleSelector, [
Skill("go-around", GoAroundObstacle),
Skill("go-over", GoOverObstacle)
]),
Skill("target-approach", ApproachTarget)
])
# Skill with fallback
class FallbackController(SkillSelector):
async def compute_action(self, obs, action):
# Try primary skill first
if obs["system_health"] > 0.8:
return [0] # Normal operation
else:
return [1] # Fallback/safe mode
fallback_skill = SkillSelector(
"fault-tolerant-control",
FallbackController,
["normal-control", "safe-mode-control"]
)
Per Skill Configuration
Algorithms
PPO (Proximal Policy Optimization)
config = {
"algorithm": {
"name": "PPO",
"config": {
# Learning
"lr": 5e-5,
"lr_schedule": None, # or [[0, 1e-3], [1000000, 1e-5]]
# PPO specific
"use_critic": True,
"use_gae": True,
"lambda": 0.95,
"kl_coeff": 0.2,
"kl_target": 0.01,
"clip_param": 0.3,
"vf_clip_param": 10.0,
"entropy_coeff": 0.0,
"entropy_coeff_schedule": None,
# Training
"num_sgd_iter": 30,
"sgd_minibatch_size": 128,
"shuffle_sequences": True,
"vf_loss_coeff": 1.0,
"model": {
"vf_share_layers": True,
"free_log_std": False
},
# GAE
"gamma": 0.99,
"normalize_advantages": True,
# Batch settings
"train_batch_size": 4000,
"rollout_fragment_length": 200
}
}
}
SAC (Soft Actor-Critic)
config = {
"algorithm": {
"name": "SAC",
"config": {
# Learning
"lr": 3e-4,
"lr_schedule": None,
# SAC specific
"twin_q": True,
"q_model_config": {
"fcnet_hiddens": [256, 256],
"fcnet_activation": "relu"
},
"policy_model_config": {
"fcnet_hiddens": [256, 256],
"fcnet_activation": "relu"
},
"tau": 5e-3,
"target_network_update_freq": 1,
"initial_alpha": 1.0,
"target_entropy": "auto",
# Replay buffer
"replay_buffer_config": {
"type": "MultiAgentPrioritizedReplayBuffer",
"capacity": 1000000,
"prioritized_replay": True,
"prioritized_replay_alpha": 0.6,
"prioritized_replay_beta": 0.4,
"prioritized_replay_eps": 1e-6
},
# Training
"train_batch_size": 256,
"gamma": 0.99,
"n_step": 1,
"grad_clip": None,
# Exploration
"exploration_config": {
"type": "StochasticSampling"
}
}
}
}
DQN (Deep Q-Network)
config = {
"algorithm": {
"name": "DQN",
"config": {
# Learning
"lr": 5e-4,
"lr_schedule": None,
# DQN specific
"dueling": True,
"double_q": True,
"num_atoms": 1,
"noisy": False,
"sigma0": 0.5,
# Replay buffer
"replay_buffer_config": {
"type": "MultiAgentReplayBuffer",
"capacity": 100000
},
# Exploration
"exploration_config": {
"type": "EpsilonGreedy",
"initial_epsilon": 1.0,
"final_epsilon": 0.02,
"epsilon_timesteps": 10000
},
# Training
"train_batch_size": 32,
"gamma": 0.99,
"n_step": 1,
"target_network_update_freq": 500,
# Minimum replay size
"replay_buffer_replay_ratio": 0.0,
"training_intensity": None
}
}
}
IMPALA
config = {
"algorithm": {
"name": "IMPALA",
"config": {
# Learning
"lr": 0.0005,
"lr_schedule": None,
# IMPALA specific
"vtrace": True,
"vtrace_clip_rho_threshold": 1.0,
"vtrace_clip_pg_rho_threshold": 1.0,
# Architecture
"num_workers": 16,
"num_gpus": 1,
"num_multi_gpu_tower_stacks": 1,
"minibatch_buffer_size": 1,
"num_sgd_iter": 1,
"replay_proportion": 0.0,
"replay_buffer_num_slots": 0,
# Training
"train_batch_size": 500,
"rollout_fragment_length": 50,
"max_sample_requests_in_flight_per_worker": 2,
# Loss
"learner_queue_size": 16,
"learner_queue_timeout": 300,
"grad_clip": 40.0,
"opt_type": "adam",
"decay": 0.99,
"momentum": 0.0,
"epsilon": 0.1,
"vf_loss_coeff": 0.5,
"entropy_coeff": 0.01
}
}
}
Custom Algorithm
from ray.rllib.algorithms.algorithm import Algorithm
class CustomAlgorithm(Algorithm):
@classmethod
def get_default_config(cls):
config = super().get_default_config()
config.update({
"custom_param": 1.0,
"special_lr": 0.001
})
return config
def setup(self, config):
super().setup(config)
# Custom setup
def training_step(self):
# Custom training logic
result = super().training_step()
result["custom_metric"] = self.custom_computation()
return result
# Use custom algorithm
config = {
"algorithm": {
"name": "Custom",
"class": CustomAlgorithm,
"config": {
"custom_param": 2.0,
"special_lr": 0.0001
}
}
}
Last updated