Along with the strategy pattern, and the perception pattern, the plan-execute pattern is one of the major of machine teaching. In this pattern, the skills work together in a skill group, with the first skill determining what the action should be and the second skill determining how to achieve it.
What is special about this agent is that it combines DRL and MPC, the technologies from the two single-skilled agents — the worst performers — to create the best performing agent.
In this example, the DRL skill first uses its powers of learning and experimentation to determine the goal temperature for the cooling jacket — the set point. It then passes this information on to the MPC skill, which uses its powers of control and execution to direct the agent on what action to take to achieve the desired temperature.
Remember how the strategy pattern is like a math class where each student solves the problems they are best at, as assigned by the teacher? In the plan-execute pattern, the students work in groups to solve problems together. Let’s say Student A is good at translating word problems into equations, while Student B is good at solving equations. Student A works on each problem first, and then passes it over to Student B, who produces the solution. No teacher is needed here, because the students divide each problem the same way.
Let's get started configuring this agent!
1. Publish the MPC Skill to Your Project
This agent has two skills called control_full_reaction and mpc-skill-group. We have already publishedcontrol_full_reaction to our project, so we only need to publish mpc-skill-group to build this agent in the Agent Builder UI. To publish mpc-skill-group to your project you will need to open up your favorite code editor and terminal. In your terminal, navigate to the skills folder of the Industrial Mixer Repo and use this command with the Composabl CLI.
composabl skill publish mpc-skill-group
Return to the agent builder studio and refresh the page. The skills will appear in the skills menu on the left of your page.
Explore the Code Files
All skills, perceptors, and selectors have a minimum of two files in them. A Python file contains the code that the agent will use, and a config file.
pyproject.toml, a config file with the following information.
A Python file. For this agent, we use controller class with the following code and explanations in comments inline.
File Structure
See the Code
MPC Skill Group Controller Skill
pyproject.toml
See the code
[project]
name = "MPC Skill Group"
version = "0.1.0"
description = "MPC prepared for Skill Group"
authors = [{ name = "John Doe", email = "john.doe@composabl.com" }]
dependencies = [
"composabl-core",
"scipy",
"casadi==3.6.6",
"do_mpc==4.6.5"
]
[composabl]
type = "skill-controller"
entrypoint = "mpc_skill_group.controller:Controller"
controller.py
See the code
from random import randint
from typing import Dict, List
from composabl_core import SkillController
######
import os
import math
import numpy as np
import do_mpc
import numpy as np
from casadi import *
from scipy import interpolate
from math import exp
# time step (seconds) between state updates
Δt = 1
π = math.pi
class Controller(SkillController):
def __init__(self, *args, **kwargs):
"""
Initialize the Controller skill with default values.
Args:
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
# Initialize a counter to track the number of actions computed
self.count = 0
async def compute_action(self, obs, action):
"""
Compute the control action (ΔTc) based on current observations using Model Predictive Control (MPC).
Args:
obs (list or dict): Current sensor observations.
If list, expected order: ['T', 'Tc', 'Ca', 'Cref', 'Tref']
action: The previous action taken (not directly used but considered for ΔTc calculation).
Returns:
list: A list containing the computed change in Tc (ΔTc).
Example: [ΔTc]
"""
# Convert observations to dictionary if they are provided as a list
if type(obs) == list:
obs = {
'T': obs[0],
'Tc': obs[1],
'Ca': obs[2],
'Cref': obs[3],
'Tref': obs[4]
}
# else:
# # Uncomment if you need to ensure all values are floats
# for key, value in obs.items():
# obs[key] = float(value)
# Handle action input: ensure it's a float value
if type(action) == list or type(action) == np.ndarray:
action = action[0]
elif type(action) == dict:
assert type(action['action']) == float
action = float(action['action'])
else:
action = float(action)
# Initialize noise variable (currently set to 0; can be modified for stochasticity)
noise = 0
# Extract and convert sensor readings to float
CrSP = float(obs['Cref']) # Reference concentration
Ca0 = float(obs['Ca']) # Actual concentration at current step
T0 = float(obs['T']) # Temperature at current step
Tc0 = float(obs['Tc']) + action # Cooling liquid temperature adjusted by action
# Define constants for the CSTR model
F = 1 # Volumetric flow rate (m³/h)
V = 1 # Reactor volume (m³)
k0 = 34930800 # Pre-exponential nonthermal factor (1/h)
E = 11843 # Activation energy per mole (kcal/kmol)
R = 1.985875 # Boltzmann's ideal gas constant (kcal/(kmol·K))
ΔH = -5960 # Heat of reaction per mole (kcal/kmol)
phoCp = 500 # Density multiplied by heat capacity (kcal/(m³·K))
UA = 150 # Overall heat transfer coefficient multiplied by tank area (kcal/(K·h))
Cafin = 10 # Inlet concentration (kmol/m³)
Tf = 298.2 # Feed temperature (K)
# --- MPC MODEL SETUP ---
model_type = 'continuous' # Define model type: 'discrete' or 'continuous'
model = do_mpc.model.Model(model_type)
# Define state variables
Ca = model.set_variable(var_type='_x', var_name='Ca', shape=(1,1)) # Concentration
T = model.set_variable(var_type='_x', var_name='T', shape=(1,1)) # Temperature
# Define measurements (if any) with optional measurement noise
model.set_meas('Ca', Ca, meas_noise=True)
model.set_meas('T', T, meas_noise=True)
# Define control input
Tc = model.set_variable(var_type='_u', var_name='Tc') # Cooling liquid temperature
# Define time-varying parameters (TVPs)
model.set_variable(var_type='_tvp', var_name='Caf') # Inlet concentration (kmol/m³)
model.set_variable(var_type='_tvp', var_name='Tref') # Reference temperature (K)
# Define model equations (right-hand side)
model.set_rhs('Ca', (F/V * (Cafin - Ca)) - (k0 * exp(-E/(R*T))*Ca))
model.set_rhs('T', (F/V *(Tf-T)) - ((ΔH/phoCp)*(k0 * exp(-E/(R*T))*Ca)) - ((UA /(phoCp*V)) *(T-Tc)))
# Finalize model setup
model.setup()
# --- CONTROLLER SETUP ---
mpc = do_mpc.controller.MPC(model)
setup_mpc = {
'n_horizon': 20, # Prediction horizon
'n_robust': 1, # Number of robust steps
'open_loop': 0, # Open-loop setting
't_step': Δt, # Time step (seconds)
'store_full_solution': True # Store full solution
}
mpc.set_param(**setup_mpc)
# Suppress IPOPT solver output for cleaner logs
surpress_ipopt = {'ipopt.print_level':0, 'ipopt.sb': 'yes', 'print_time':0}
mpc.set_param(nlpsol_opts = surpress_ipopt)
# Scaling for states and inputs to improve numerical stability
mpc.scaling['_x', 'T'] = 100
mpc.scaling['_u', 'Tc'] = 100
# --- OBJECTIVE FUNCTION ---
_x = model.x
_tvp = model.tvp
_u = model.u
# Define terminal and stage cost
mterm = ((_x['Ca'] - CrSP))**2 # Terminal cost
lterm = ((_x['Ca'] - CrSP))**2 # Stage cost
mpc.set_objective(mterm=mterm, lterm=lterm)
# Define control input penalties to discourage large control actions
mpc.set_rterm(Tc=1.5) # Input penalty for Tc
# --- CONSTRAINTS ---
# Bounds for state variables
mpc.bounds['lower', '_x', 'Ca'] = 0.1 # Minimum concentration
mpc.bounds['upper', '_x', 'Ca'] = 12 # Maximum concentration
mpc.bounds['upper', '_x', 'T'] = 400 # Maximum temperature
mpc.bounds['lower', '_x', 'T'] = 100 # Minimum temperature
# Bounds for control inputs
mpc.bounds['lower', '_u', 'Tc'] = 273 # Minimum cooling temperature (K)
mpc.bounds['upper', '_u', 'Tc'] = 322 # Maximum cooling temperature (K)
# --- TIME-VARYING PARAMETERS (TVPs) SETUP ---
# Define templates for TVPs
tvp_temp_1 = mpc.get_tvp_template()
tvp_temp_1['_tvp', :] = np.array([8.5698])
tvp_temp_2 = mpc.get_tvp_template()
tvp_temp_2['_tvp', :] = np.array([2])
tvp_temp_3 = mpc.get_tvp_template()
tvp_temp_3['_tvp', :] = np.array([2])
# Define a function to update TVPs based on current time
def tvp_fun(t_now):
p1 = 22 # Time step 1
p2 = 74 # Time step 2
time = 90 # Total time
# Define concentration and temperature equilibrium points
ceq = [8.57, 6.9275, 5.2850, 3.6425, 2]
teq = [311.2612, 327.9968, 341.1084, 354.7246, 373.1311]
# Interpolate concentration and temperature based on current time
C = interpolate.interp1d([0, p1, p2, time], [8.57, 8.57, 2, 2])
T_ = interpolate.interp1d([0, p1, p2, time], [311.2612, 311.2612, 373.1311, 373.1311])
if t_now < p1:
return tvp_temp_1
elif p1 <= t_now < p2:
y = float(C(t_now))
tvp_temp_3['_tvp', :] = np.array([y])
return tvp_temp_3
else:
return tvp_temp_2
mpc.set_tvp_fun(tvp_fun)
# Finalize MPC setup
mpc.setup()
# --- ESTIMATOR SETUP ---
estimator = do_mpc.estimator.StateFeedback(model)
# --- SIMULATOR SETUP ---
simulator = do_mpc.simulator.Simulator(model)
params_simulator = {
't_step': Δt # Time step (seconds)
}
simulator.set_param(**params_simulator)
# Define templates for simulator parameters
p_num = simulator.get_p_template()
tvp_num = simulator.get_tvp_template()
# Define functions for TVPs and uncertain parameters in the simulator
def tvp_fun_sim(t_now):
return tvp_num
def p_fun_sim(t_now):
return p_num
simulator.set_tvp_fun(tvp_fun_sim)
simulator.set_p_fun(p_fun_sim)
# Finalize simulator setup
simulator.setup()
# --- INITIAL STATE SETUP ---
# Initialize states for MPC, simulator, and estimator
x0 = simulator.x0
x0['Ca'] = Ca0
x0['T'] = T0
u0 = simulator.u0
u0['Tc'] = Tc0
mpc.x0 = x0
simulator.x0 = x0
estimator.x0 = x0
mpc.u0 = u0
simulator.u0 = u0
estimator.u0 = u0
# Set initial guess for MPC
mpc.set_initial_guess()
# --- MPC CONTROL LOOP ---
# Simulate N steps (currently set to 1)
u0_old = 0
time_steps = 1
for k in range(time_steps):
if k > 1:
u0_old = u0[0][0]
# Make a control step using MPC
u0 = mpc.make_step(x0)
# Enforce a maximum change of ±10 on the control input
if k > 1:
if u0[0][0] - u0_old > 10:
u0 = np.array([[u0_old + 10]])
elif u0[0][0] - u0_old < -10:
u0 = np.array([[u0_old - 10]])
else:
if u0[0][0] - Tc0 >= 10:
u0 = np.array([[Tc0 + 10]])
elif u0[0][0] - Tc0 <= -10:
u0 = np.array([[Tc0 - 10]])
# Add Gaussian noise to the measurements
error_var = noise
σ_max1 = error_var * (8.5698 - 2)
σ_max2 = error_var * (373.1311 - 311.2612)
mu = 0
v0 = np.array([
mu + σ_max1 * np.random.randn(1, 1)[0],
mu + σ_max2 * np.random.randn(1, 1)[0]
])
# Simulate the next step with the control input and noise
y_next = simulator.make_step(u0, v0=v0) # MPC simulation step
# Reshape state values for consistency
state_ops = y_next.reshape((1, 2))
# --- BENCHMARK SETUP ---
p1 = 22
p2 = 74
ceq = [8.57, 6.9275, 5.2850, 3.6425, 2]
teq = [311.2612, 327.9968, 341.1084, 354.7246, 373.1311]
# Interpolate reference concentration and temperature
C = interpolate.interp1d([0, p1, p2, time_steps], [8.57, 8.57, 2, 2])
T_ = interpolate.interp1d([0, p1, p2, time_steps], [311.2612, 311.2612, 373.1311, 373.1311])
# Update reference concentrations and temperatures based on current step
if k < p1:
Cref = 8.5698
Tref = 311.2612
elif p1 <= k < p2:
y = float(C(k))
y2 = float(T_(k))
Cref = y
Tref = y2
else:
Cref = 2
Tref = 373.1311
# Update the estimator with the new measurements
x0 = estimator.make_step(y_next) # Update state estimates
# Increment the action counter
self.count += 1
# Compute the change in Tc (ΔTc) based on the new control input
newTc = u0[0][0]
dTc = float(newTc) - float(obs['Tc'])
# Return the computed ΔTc as a list
return [dTc]
async def transform_sensors(self, obs):
"""
Process and potentially modify sensor observations before they are used.
Args:
obs (dict): Current sensor observations.
Returns:
dict: Transformed sensor observations.
Note:
- Currently, this method returns the observations unchanged.
- This can be customized to apply transformations if needed.
"""
# Currently, no transformation is applied to sensors
return obs
async def filtered_sensor_space(self):
"""
Define which sensors are relevant for this perceptor.
Returns:
list: Names of the sensors to be used.
Note:
- Specifies a list of sensor names that this perceptor will utilize.
- Helps in focusing the perceptor's operations on relevant data.
"""
# Specify the sensors that this perceptor will use
return ['T', 'Tc', 'Ca', 'Cref', 'Tref', 'Conc_Error', 'Eps_Yield', 'Cb_Prod']
async def compute_success_criteria(self, transformed_obs, action):
"""
Determine whether the success criteria have been met.
Args:
transformed_obs (dict): Transformed sensor observations.
action: The action taken.
Returns:
bool: True if success criteria are met, False otherwise.
Behavior:
- Currently always returns False.
- Can be implemented with logic to check if certain conditions are satisfied.
"""
# Placeholder for success criteria logic
return False
async def compute_termination(self, transformed_obs, action):
"""
Determine whether the training episode should terminate.
Args:
transformed_obs (dict): Transformed sensor observations.
action: The action taken.
Returns:
bool: True if the episode should terminate, False otherwise.
Behavior:
- Currently always returns False.
- Can be implemented with logic to terminate based on certain conditions.
"""
# Placeholder for termination condition logic
return False
2. Build the Strategy Pattern Agent in the Agent Builder UI
First drag the skill control_full_reaction from the hand side of the page to the skill container. Once it's there drag the mpc-skill-group and make sure that it is dropped below the control_full_reaction skill and not beside it.
3. Run Your Training Session
Now, we are ready to train your agent and see the results. We suggest you run 50 training cycles.
4. View Results
When the training has been completed, you can view your results in the training sessions tab in the UI. This will show you information on how well the agent is learning.
Analyzing the Plan-Execute Pattern Agent’s Performance
Conversion rate: 95%
Thermal runaway risk: Very low
We tested this fully trained agent and plotted the results.
This agent is the best performer of the group. Combining two imperfect technologies together with Machine Teaching produces much better results than either technology achieves alone.