Perceptor Management

Create New Perceptor

composabl perceptor new \
  --name derivative-calc \
  --description "Calculates derivative of sensor values" \
  --location ./perceptors/

Generated Perceptor Structure

# perceptor.py
import time
from composabl_core import PerceptorImpl

class DemoPerceptor(PerceptorImpl):
    def __init__(self, *args, **kwargs):
        self.previous_value = None
        self.previous_time = None
    
    async def compute(self, obs_spec, obs):
        current_value = obs["counter"]
        current_time = time.time()
        value_derived = 0
        
        if self.previous_value is not None and self.previous_time is not None:
            time_delta = current_time - self.previous_time
            if time_delta > 0:
                value_derived = (current_value - self.previous_value) / time_delta
        
        self.previous_value = current_value
        self.previous_time = current_time
        
        return {"counter_derived": value_derived}
    
    def filtered_sensor_space(self, obs):
        return ["counter"]

Perceptor Commands

# Publish perceptor
composabl perceptor publish ./perceptors/derivative-calc/

# List perceptors
composabl perceptor list

# Delete perceptor
composabl perceptor delete

Last updated