Goals API
Goals API
Pre-built goal implementations for common objectives.
MaintainGoal
from composabl import MaintainGoal
# Maintain temperature
temp_goal = MaintainGoal(
sensor_name="temperature",
description="Keep temperature at 25°C",
target=25.0,
stop_distance=0.5 # Tolerance band
)
# Maintain with specific reward shaping
class CustomMaintainGoal(MaintainGoal):
def __init__(self):
super().__init__("pressure", "Maintain pressure", target=1.0, stop_distance=0.05)
async def compute_reward(self, obs, action, sim_reward):
base_reward = await super().compute_reward(obs, action, sim_reward)
# Add penalty for control effort
effort_penalty = -0.1 * np.sum(np.abs(action))
return base_reward + effort_penalty
ApproachGoal
from composabl import ApproachGoal
# Approach a position
position_goal = ApproachGoal(
sensor_name="distance_to_target",
description="Reach the target location",
target=0.0 # Zero distance
)
# Approach with custom success
class TimedApproachGoal(ApproachGoal):
def __init__(self):
super().__init__("distance", "Reach target quickly", target=0.0)
self.start_time = None
async def compute_reward(self, obs, action, sim_reward):
if self.start_time is None:
self.start_time = obs.get("time", 0)
base_reward = await super().compute_reward(obs, action, sim_reward)
# Bonus for speed
elapsed = obs.get("time", 0) - self.start_time
speed_bonus = max(0, 10 - elapsed) if obs["distance"] < 0.1 else 0
return base_reward + speed_bonus
AvoidGoal
from composabl import AvoidGoal
# Avoid obstacles
obstacle_goal = AvoidGoal(
sensor_name="nearest_obstacle_distance",
description="Stay away from obstacles",
target=0.0, # Avoid zero distance
stop_distance=2.0 # Safe distance
)
# Multiple avoidance targets
class MultiAvoidGoal(AvoidGoal):
def __init__(self, danger_zones):
super().__init__("position", "Avoid danger zones", target=None)
self.danger_zones = danger_zones
async def compute_reward(self, obs, action, sim_reward):
position = obs["position"]
# Find minimum distance to any danger zone
min_distance = float('inf')
for zone in self.danger_zones:
distance = np.linalg.norm(position - zone)
min_distance = min(min_distance, distance)
# Reward increases with distance
return min_distance
MaximizeGoal
from composabl import MaximizeGoal
# Maximize efficiency
efficiency_goal = MaximizeGoal(
sensor_name="efficiency_metric",
description="Maximize system efficiency"
)
# Maximize with constraints
class ConstrainedMaximizeGoal(MaximizeGoal):
def __init__(self):
super().__init__("production_rate", "Maximize production")
async def compute_reward(self, obs, action, sim_reward):
base_reward = await super().compute_reward(obs, action, sim_reward)
# Penalty for violating constraints
if obs["temperature"] > 100:
base_reward -= 10
if obs["pressure"] > 10:
base_reward -= 10
return base_reward
MinimizeGoal
from composabl import MinimizeGoal
# Minimize energy consumption
energy_goal = MinimizeGoal(
sensor_name="power_consumption",
description="Minimize energy usage"
)
# Minimize with multiple objectives
class MultiObjectiveMinimizeGoal(MinimizeGoal):
def __init__(self, weights):
super().__init__("cost", "Minimize total cost")
self.weights = weights
async def compute_reward(self, obs, action, sim_reward):
# Weighted sum of multiple objectives
total_cost = sum(
weight * obs[metric]
for metric, weight in self.weights.items()
)
return -total_cost # Negative because we're minimizing
Last updated