Perceptors API
Perceptors API
Process sensor data before it reaches skills.
Basic Perceptor
from composabl import Perceptor, PerceptorImpl
class NoiseFilter(PerceptorImpl):
def __init__(self, alpha=0.1):
self.alpha = alpha
self.filtered_value = None
async def compute(self, obs_spec, obs):
"""Exponential moving average filter"""
raw_value = obs["noisy_sensor"]
if self.filtered_value is None:
self.filtered_value = raw_value
else:
# EMA filter
self.filtered_value = (
self.alpha * raw_value +
(1 - self.alpha) * self.filtered_value
)
return {"filtered_sensor": self.filtered_value}
def filtered_sensor_space(self, obs):
"""Specify input sensors"""
return ["noisy_sensor"]
# Create and add to agent
filter_perceptor = Perceptor("noise-filter", NoiseFilter(alpha=0.2))
agent.add_perceptor(filter_perceptor)
Advanced Perceptors
# State estimator perceptor
class KalmanFilter(PerceptorImpl):
def __init__(self):
self.x = np.zeros(2) # State: [position, velocity]
self.P = np.eye(2) # Covariance
self.F = np.array([[1, 0.1], [0, 1]]) # State transition
self.H = np.array([[1, 0]]) # Measurement matrix
self.R = 0.1 # Measurement noise
self.Q = np.array([[0.01, 0], [0, 0.01]]) # Process noise
async def compute(self, obs_spec, obs):
# Prediction step
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
# Update step
z = obs["position_measurement"]
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T / S
self.x = self.x + K * y
self.P = (np.eye(2) - K @ self.H) @ self.P
return {
"estimated_position": self.x[0],
"estimated_velocity": self.x[1]
}
def filtered_sensor_space(self, obs):
return ["position_measurement"]
# Feature engineering perceptor
class FeatureEngineer(PerceptorImpl):
def __init__(self):
self.history = []
self.window_size = 10
async def compute(self, obs_spec, obs):
# Add to history
self.history.append(obs["value"])
if len(self.history) > self.window_size:
self.history.pop(0)
# Calculate features
if len(self.history) >= 2:
features = {
"value": obs["value"],
"mean": np.mean(self.history),
"std": np.std(self.history),
"trend": self.history[-1] - self.history[0],
"acceleration": 0
}
if len(self.history) >= 3:
# Second derivative
features["acceleration"] = (
self.history[-1] - 2*self.history[-2] + self.history[-3]
)
else:
features = {
"value": obs["value"],
"mean": obs["value"],
"std": 0,
"trend": 0,
"acceleration": 0
}
return features
def filtered_sensor_space(self, obs):
return ["value"]
Last updated