Perceptors API

Perceptors API

Process sensor data before it reaches skills.

Basic Perceptor

from composabl import Perceptor, PerceptorImpl

class NoiseFilter(PerceptorImpl):
    def __init__(self, alpha=0.1):
        self.alpha = alpha
        self.filtered_value = None
        
    async def compute(self, obs_spec, obs):
        """Exponential moving average filter"""
        raw_value = obs["noisy_sensor"]
        
        if self.filtered_value is None:
            self.filtered_value = raw_value
        else:
            # EMA filter
            self.filtered_value = (
                self.alpha * raw_value + 
                (1 - self.alpha) * self.filtered_value
            )
        
        return {"filtered_sensor": self.filtered_value}
    
    def filtered_sensor_space(self, obs):
        """Specify input sensors"""
        return ["noisy_sensor"]

# Create and add to agent
filter_perceptor = Perceptor("noise-filter", NoiseFilter(alpha=0.2))
agent.add_perceptor(filter_perceptor)

Advanced Perceptors

# State estimator perceptor
class KalmanFilter(PerceptorImpl):
    def __init__(self):
        self.x = np.zeros(2)  # State: [position, velocity]
        self.P = np.eye(2)    # Covariance
        self.F = np.array([[1, 0.1], [0, 1]])  # State transition
        self.H = np.array([[1, 0]])  # Measurement matrix
        self.R = 0.1  # Measurement noise
        self.Q = np.array([[0.01, 0], [0, 0.01]])  # Process noise
        
    async def compute(self, obs_spec, obs):
        # Prediction step
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + self.Q
        
        # Update step
        z = obs["position_measurement"]
        y = z - self.H @ self.x
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T / S
        
        self.x = self.x + K * y
        self.P = (np.eye(2) - K @ self.H) @ self.P
        
        return {
            "estimated_position": self.x[0],
            "estimated_velocity": self.x[1]
        }
    
    def filtered_sensor_space(self, obs):
        return ["position_measurement"]

# Feature engineering perceptor
class FeatureEngineer(PerceptorImpl):
    def __init__(self):
        self.history = []
        self.window_size = 10
        
    async def compute(self, obs_spec, obs):
        # Add to history
        self.history.append(obs["value"])
        if len(self.history) > self.window_size:
            self.history.pop(0)
        
        # Calculate features
        if len(self.history) >= 2:
            features = {
                "value": obs["value"],
                "mean": np.mean(self.history),
                "std": np.std(self.history),
                "trend": self.history[-1] - self.history[0],
                "acceleration": 0
            }
            
            if len(self.history) >= 3:
                # Second derivative
                features["acceleration"] = (
                    self.history[-1] - 2*self.history[-2] + self.history[-3]
                )
        else:
            features = {
                "value": obs["value"],
                "mean": obs["value"],
                "std": 0,
                "trend": 0,
                "acceleration": 0
            }
        
        return features
    
    def filtered_sensor_space(self, obs):
        return ["value"]

Last updated