Goals API

Goals API

Pre-built goal implementations for common objectives.

MaintainGoal

from composabl import MaintainGoal

# Maintain temperature
temp_goal = MaintainGoal(
    sensor_name="temperature",
    description="Keep temperature at 25°C",
    target=25.0,
    stop_distance=0.5  # Tolerance band
)

# Maintain with specific reward shaping
class CustomMaintainGoal(MaintainGoal):
    def __init__(self):
        super().__init__("pressure", "Maintain pressure", target=1.0, stop_distance=0.05)
    
    async def compute_reward(self, obs, action, sim_reward):
        base_reward = await super().compute_reward(obs, action, sim_reward)
        
        # Add penalty for control effort
        effort_penalty = -0.1 * np.sum(np.abs(action))
        
        return base_reward + effort_penalty

ApproachGoal

from composabl import ApproachGoal

# Approach a position
position_goal = ApproachGoal(
    sensor_name="distance_to_target",
    description="Reach the target location",
    target=0.0  # Zero distance
)

# Approach with custom success
class TimedApproachGoal(ApproachGoal):
    def __init__(self):
        super().__init__("distance", "Reach target quickly", target=0.0)
        self.start_time = None
        
    async def compute_reward(self, obs, action, sim_reward):
        if self.start_time is None:
            self.start_time = obs.get("time", 0)
            
        base_reward = await super().compute_reward(obs, action, sim_reward)
        
        # Bonus for speed
        elapsed = obs.get("time", 0) - self.start_time
        speed_bonus = max(0, 10 - elapsed) if obs["distance"] < 0.1 else 0
        
        return base_reward + speed_bonus

AvoidGoal

from composabl import AvoidGoal

# Avoid obstacles
obstacle_goal = AvoidGoal(
    sensor_name="nearest_obstacle_distance",
    description="Stay away from obstacles",
    target=0.0,  # Avoid zero distance
    stop_distance=2.0  # Safe distance
)

# Multiple avoidance targets
class MultiAvoidGoal(AvoidGoal):
    def __init__(self, danger_zones):
        super().__init__("position", "Avoid danger zones", target=None)
        self.danger_zones = danger_zones
        
    async def compute_reward(self, obs, action, sim_reward):
        position = obs["position"]
        
        # Find minimum distance to any danger zone
        min_distance = float('inf')
        for zone in self.danger_zones:
            distance = np.linalg.norm(position - zone)
            min_distance = min(min_distance, distance)
        
        # Reward increases with distance
        return min_distance

MaximizeGoal

from composabl import MaximizeGoal

# Maximize efficiency
efficiency_goal = MaximizeGoal(
    sensor_name="efficiency_metric",
    description="Maximize system efficiency"
)

# Maximize with constraints
class ConstrainedMaximizeGoal(MaximizeGoal):
    def __init__(self):
        super().__init__("production_rate", "Maximize production")
        
    async def compute_reward(self, obs, action, sim_reward):
        base_reward = await super().compute_reward(obs, action, sim_reward)
        
        # Penalty for violating constraints
        if obs["temperature"] > 100:
            base_reward -= 10
        if obs["pressure"] > 10:
            base_reward -= 10
            
        return base_reward

MinimizeGoal

from composabl import MinimizeGoal

# Minimize energy consumption
energy_goal = MinimizeGoal(
    sensor_name="power_consumption",
    description="Minimize energy usage"
)

# Minimize with multiple objectives
class MultiObjectiveMinimizeGoal(MinimizeGoal):
    def __init__(self, weights):
        super().__init__("cost", "Minimize total cost")
        self.weights = weights
        
    async def compute_reward(self, obs, action, sim_reward):
        # Weighted sum of multiple objectives
        total_cost = sum(
            weight * obs[metric] 
            for metric, weight in self.weights.items()
        )
        
        return -total_cost  # Negative because we're minimizing

Last updated