Perceptor Management
Create New Perceptor
composabl perceptor new \
--name derivative-calc \
--description "Calculates derivative of sensor values" \
--location ./perceptors/
Generated Perceptor Structure
# perceptor.py
import time
from composabl_core import PerceptorImpl
class DemoPerceptor(PerceptorImpl):
def __init__(self, *args, **kwargs):
self.previous_value = None
self.previous_time = None
async def compute(self, obs_spec, obs):
current_value = obs["counter"]
current_time = time.time()
value_derived = 0
if self.previous_value is not None and self.previous_time is not None:
time_delta = current_time - self.previous_time
if time_delta > 0:
value_derived = (current_value - self.previous_value) / time_delta
self.previous_value = current_value
self.previous_time = current_time
return {"counter_derived": value_derived}
def filtered_sensor_space(self, obs):
return ["counter"]
Perceptor Commands
# Publish perceptor
composabl perceptor publish ./perceptors/derivative-calc/
# List perceptors
composabl perceptor list
# Delete perceptor
composabl perceptor delete
Last updated