Along with the strategy pattern, and the perception pattern, the plan-execute pattern is one of the major of machine teaching. In this pattern, the skills work together in a skill group, with the first skill determining what the action should be and the second skill determining how to achieve it.
What is special about this agent is that it combines DRL and MPC, the technologies from the two single-skilled agents — the worst performers — to create the best performing agent.
In this example, the DRL skill first uses its powers of learning and experimentation to determine the goal temperature for the cooling jacket — the set point. It then passes this information on to the MPC skill, which uses its powers of control and execution to direct the agent on what action to take to achieve the desired temperature.
Remember how the strategy pattern is like a math class where each student solves the problems they are best at, as assigned by the teacher? In the plan-execute pattern, the students work in groups to solve problems together. Let’s say Student A is good at translating word problems into equations, while Student B is good at solving equations. Student A works on each problem first, and then passes it over to Student B, who produces the solution. No teacher is needed here, because the students divide each problem the same way.
Let's get started configuring this agent!
1. Publish the MPC Skill to Your Project
This agent has two skills called control_full_reaction and mpc-skill-group. We have already publishedcontrol_full_reaction to our project, so we only need to publish mpc-skill-group to build this agent in the Agent Builder UI. To publish mpc-skill-group to your project you will need to open up your favorite code editor and terminal. In your terminal, navigate to the skills folder of the Industrial Mixer Repo and use this command with the Composabl CLI.
composablskillpublishmpc-skill-group
Return to the agent builder studio and refresh the page. The skills will appear in the skills menu on the left of your page.
Explore the Code Files
All skills, perceptors, and selectors have a minimum of two files in them. A Python file contains the code that the agent will use, and a config file.
pyproject.toml, a config file with the following information.
A Python file. For this agent, we use controller class with the following code and explanations in comments inline.
from random import randintfrom typing import Dict, Listfrom composabl_core import SkillController######import osimport mathimport numpy as npimport do_mpcimport numpy as npfrom casadi import*from scipy import interpolatefrom math import exp# time step (seconds) between state updatesΔt =1π = math.piclassController(SkillController):def__init__(self,*args,**kwargs):""" Initialize the Controller skill with default values. Args: *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. """# Initialize a counter to track the number of actions computed self.count =0asyncdefcompute_action(self,obs,action):""" Compute the control action (ΔTc) based on current observations using Model Predictive Control (MPC). Args: obs (list or dict): Current sensor observations. If list, expected order: ['T', 'Tc', 'Ca', 'Cref', 'Tref'] action: The previous action taken (not directly used but considered for ΔTc calculation). Returns: list: A list containing the computed change in Tc (ΔTc). Example: [ΔTc] """# Convert observations to dictionary if they are provided as a listiftype(obs)==list: obs ={'T': obs[0],'Tc': obs[1],'Ca': obs[2],'Cref': obs[3],'Tref': obs[4]}# else:# # Uncomment if you need to ensure all values are floats# for key, value in obs.items():# obs[key] = float(value)# Handle action input: ensure it's a float valueiftype(action)==listortype(action)== np.ndarray: action = action[0]eliftype(action)==dict:asserttype(action['action'])==float action =float(action['action'])else: action =float(action)# Initialize noise variable (currently set to 0; can be modified for stochasticity) noise =0# Extract and convert sensor readings to float CrSP =float(obs['Cref'])# Reference concentration Ca0 =float(obs['Ca'])# Actual concentration at current step T0 =float(obs['T'])# Temperature at current step Tc0 =float(obs['Tc'])+ action # Cooling liquid temperature adjusted by action# Define constants for the CSTR model F =1# Volumetric flow rate (m³/h) V =1# Reactor volume (m³) k0 =34930800# Pre-exponential nonthermal factor (1/h) E =11843# Activation energy per mole (kcal/kmol) R =1.985875# Boltzmann's ideal gas constant (kcal/(kmol·K)) ΔH =-5960# Heat of reaction per mole (kcal/kmol) phoCp =500# Density multiplied by heat capacity (kcal/(m³·K)) UA =150# Overall heat transfer coefficient multiplied by tank area (kcal/(K·h)) Cafin =10# Inlet concentration (kmol/m³) Tf =298.2# Feed temperature (K)# --- MPC MODEL SETUP --- model_type ='continuous'# Define model type: 'discrete' or 'continuous' model = do_mpc.model.Model(model_type)# Define state variables Ca = model.set_variable(var_type='_x', var_name='Ca', shape=(1,1))# Concentration T = model.set_variable(var_type='_x', var_name='T', shape=(1,1))# Temperature# Define measurements (if any) with optional measurement noise model.set_meas('Ca', Ca, meas_noise=True) model.set_meas('T', T, meas_noise=True)# Define control input Tc = model.set_variable(var_type='_u', var_name='Tc')# Cooling liquid temperature# Define time-varying parameters (TVPs) model.set_variable(var_type='_tvp', var_name='Caf')# Inlet concentration (kmol/m³) model.set_variable(var_type='_tvp', var_name='Tref')# Reference temperature (K)# Define model equations (right-hand side) model.set_rhs('Ca', (F/V * (Cafin - Ca)) - (k0 *exp(-E/(R*T))*Ca)) model.set_rhs('T', (F/V *(Tf-T)) - ((ΔH/phoCp)*(k0 *exp(-E/(R*T))*Ca)) - ((UA /(phoCp*V)) *(T-Tc)))# Finalize model setup model.setup()# --- CONTROLLER SETUP --- mpc = do_mpc.controller.MPC(model) setup_mpc ={'n_horizon':20,# Prediction horizon'n_robust':1,# Number of robust steps'open_loop':0,# Open-loop setting't_step': Δt,# Time step (seconds)'store_full_solution':True# Store full solution} mpc.set_param(**setup_mpc)# Suppress IPOPT solver output for cleaner logs surpress_ipopt ={'ipopt.print_level':0,'ipopt.sb':'yes','print_time':0} mpc.set_param(nlpsol_opts = surpress_ipopt)# Scaling for states and inputs to improve numerical stability mpc.scaling['_x','T']=100 mpc.scaling['_u','Tc']=100# --- OBJECTIVE FUNCTION --- _x = model.x _tvp = model.tvp _u = model.u# Define terminal and stage cost mterm = ((_x['Ca']- CrSP))**2# Terminal cost lterm = ((_x['Ca']- CrSP))**2# Stage cost mpc.set_objective(mterm=mterm, lterm=lterm)# Define control input penalties to discourage large control actions mpc.set_rterm(Tc=1.5)# Input penalty for Tc# --- CONSTRAINTS ---# Bounds for state variables mpc.bounds['lower','_x','Ca']=0.1# Minimum concentration mpc.bounds['upper','_x','Ca']=12# Maximum concentration mpc.bounds['upper','_x','T']=400# Maximum temperature mpc.bounds['lower','_x','T']=100# Minimum temperature# Bounds for control inputs mpc.bounds['lower','_u','Tc']=273# Minimum cooling temperature (K) mpc.bounds['upper','_u','Tc']=322# Maximum cooling temperature (K)# --- TIME-VARYING PARAMETERS (TVPs) SETUP ---# Define templates for TVPs tvp_temp_1 = mpc.get_tvp_template() tvp_temp_1['_tvp',:]= np.array([8.5698]) tvp_temp_2 = mpc.get_tvp_template() tvp_temp_2['_tvp',:]= np.array([2]) tvp_temp_3 = mpc.get_tvp_template() tvp_temp_3['_tvp',:]= np.array([2])# Define a function to update TVPs based on current timedeftvp_fun(t_now): p1 =22# Time step 1 p2 =74# Time step 2 time =90# Total time# Define concentration and temperature equilibrium points ceq = [8.57,6.9275,5.2850,3.6425,2] teq = [311.2612,327.9968,341.1084,354.7246,373.1311]# Interpolate concentration and temperature based on current time C = interpolate.interp1d([0, p1, p2, time], [8.57, 8.57, 2, 2]) T_ = interpolate.interp1d([0, p1, p2, time], [311.2612, 311.2612, 373.1311, 373.1311])if t_now < p1:return tvp_temp_1elif p1 <= t_now < p2: y =float(C(t_now)) tvp_temp_3['_tvp',:]= np.array([y])return tvp_temp_3else:return tvp_temp_2 mpc.set_tvp_fun(tvp_fun)# Finalize MPC setup mpc.setup()# --- ESTIMATOR SETUP --- estimator = do_mpc.estimator.StateFeedback(model)# --- SIMULATOR SETUP --- simulator = do_mpc.simulator.Simulator(model) params_simulator ={'t_step': Δt # Time step (seconds)} simulator.set_param(**params_simulator)# Define templates for simulator parameters p_num = simulator.get_p_template() tvp_num = simulator.get_tvp_template()# Define functions for TVPs and uncertain parameters in the simulatordeftvp_fun_sim(t_now):return tvp_numdefp_fun_sim(t_now):return p_num simulator.set_tvp_fun(tvp_fun_sim) simulator.set_p_fun(p_fun_sim)# Finalize simulator setup simulator.setup()# --- INITIAL STATE SETUP ---# Initialize states for MPC, simulator, and estimator x0 = simulator.x0 x0['Ca']= Ca0 x0['T']= T0 u0 = simulator.u0 u0['Tc']= Tc0 mpc.x0 = x0 simulator.x0 = x0 estimator.x0 = x0 mpc.u0 = u0 simulator.u0 = u0 estimator.u0 = u0# Set initial guess for MPC mpc.set_initial_guess()# --- MPC CONTROL LOOP ---# Simulate N steps (currently set to 1) u0_old =0 time_steps =1for k inrange(time_steps):if k >1: u0_old = u0[0][0]# Make a control step using MPC u0 = mpc.make_step(x0)# Enforce a maximum change of ±10 on the control inputif k >1:if u0[0][0] - u0_old >10: u0 = np.array([[u0_old +10]])elif u0[0][0] - u0_old <-10: u0 = np.array([[u0_old -10]])else:if u0[0][0] - Tc0 >=10: u0 = np.array([[Tc0 +10]])elif u0[0][0] - Tc0 <=-10: u0 = np.array([[Tc0 -10]])# Add Gaussian noise to the measurements error_var = noise σ_max1 = error_var * (8.5698-2) σ_max2 = error_var * (373.1311-311.2612) mu =0 v0 = np.array([ mu + σ_max1 * np.random.randn(1, 1)[0], mu + σ_max2 * np.random.randn(1, 1)[0] ])# Simulate the next step with the control input and noise y_next = simulator.make_step(u0, v0=v0)# MPC simulation step# Reshape state values for consistency state_ops = y_next.reshape((1, 2))# --- BENCHMARK SETUP --- p1 =22 p2 =74 ceq = [8.57,6.9275,5.2850,3.6425,2] teq = [311.2612,327.9968,341.1084,354.7246,373.1311]# Interpolate reference concentration and temperature C = interpolate.interp1d([0, p1, p2, time_steps], [8.57, 8.57, 2, 2]) T_ = interpolate.interp1d([0, p1, p2, time_steps], [311.2612, 311.2612, 373.1311, 373.1311])# Update reference concentrations and temperatures based on current stepif k < p1: Cref =8.5698 Tref =311.2612elif p1 <= k < p2: y =float(C(k)) y2 =float(T_(k)) Cref = y Tref = y2else: Cref =2 Tref =373.1311# Update the estimator with the new measurements x0 = estimator.make_step(y_next)# Update state estimates# Increment the action counter self.count +=1# Compute the change in Tc (ΔTc) based on the new control input newTc = u0[0][0] dTc =float(newTc)-float(obs['Tc'])# Return the computed ΔTc as a listreturn [dTc]asyncdeftransform_sensors(self,obs):""" Process and potentially modify sensor observations before they are used. Args: obs (dict): Current sensor observations. Returns: dict: Transformed sensor observations. Note: - Currently, this method returns the observations unchanged. - This can be customized to apply transformations if needed. """# Currently, no transformation is applied to sensorsreturn obsasyncdeffiltered_sensor_space(self):""" Define which sensors are relevant for this perceptor. Returns: list: Names of the sensors to be used. Note: - Specifies a list of sensor names that this perceptor will utilize. - Helps in focusing the perceptor's operations on relevant data. """# Specify the sensors that this perceptor will usereturn ['T','Tc','Ca','Cref','Tref','Conc_Error','Eps_Yield','Cb_Prod']asyncdefcompute_success_criteria(self,transformed_obs,action):""" Determine whether the success criteria have been met. Args: transformed_obs (dict): Transformed sensor observations. action: The action taken. Returns: bool: True if success criteria are met, False otherwise. Behavior: - Currently always returns False. - Can be implemented with logic to check if certain conditions are satisfied. """# Placeholder for success criteria logicreturnFalseasyncdefcompute_termination(self,transformed_obs,action):""" Determine whether the training episode should terminate. Args: transformed_obs (dict): Transformed sensor observations. action: The action taken. Returns: bool: True if the episode should terminate, False otherwise. Behavior: - Currently always returns False. - Can be implemented with logic to terminate based on certain conditions. """# Placeholder for termination condition logicreturnFalse
2. Build the Strategy Pattern Agent in the Agent Builder UI
First drag the skill control_full_reaction from the hand side of the page to the skill container. Once it's there drag the mpc-skill-group and make sure that it is dropped below the control_full_reaction skill and not beside it.
3. Run Your Training Session
Now, we are ready to train your agent and see the results. We suggest you run 50 training cycles.
4. View Results
When the training has been completed, you can view your results in the training sessions tab in the UI. This will show you information on how well the agent is learning.
Analyzing the Plan-Execute Pattern Agent’s Performance
Conversion rate: 95%
Thermal runaway risk: Very low
We tested this fully trained agent and plotted the results.
This agent is the best performer of the group. Combining two imperfect technologies together with Machine Teaching produces much better results than either technology achieves alone.