Plan-Execute Pattern

Along with the strategy pattern, and the perception pattern, the plan-execute pattern is one of the major of machine teaching. In this pattern, the skill agents work together in a skill group, with the first skill agent determining what the action should be and the second skill agent determining how to achieve it.

What is special about this agent is that it combines DRL and MPC, the technologies from the two single-skilled agent systems — the worst performers — to create the best performing agent.

In this example, the DRL skill agent first uses its powers of learning and experimentation to determine the goal temperature for the cooling jacket — the set point. It then passes this information on to the MPC skill agent, which uses its powers of control and execution to direct the agent on what action to take to achieve the desired temperature.

This tutorial will show you how to publish the MPC controller to the platform using the data science workflow and then use it to create a multi-agent system using the plan-execute pattern.

Remember how the strategy pattern is like a math class where each student solves the problems they are best at, as assigned by the teacher? In the plan-execute pattern, the students work in groups to solve problems together. Let’s say Student A is good at translating word problems into equations, while Student B is good at solving equations. Student A works on each problem first, and then passes it over to Student B, who produces the solution. No teacher is needed here, because the students divide each problem the same way.

Let's get started configuring this agent!

1. Publish the MPC Skill Agent to Your Project

This agent has two skill agents called control_full_reaction and mpc-skill-group. We have already created control_full_reaction in our project, so we only need to publish mpc-skill-group to build this agent in the Agent Builder UI. To publish mpc-skill-group to your use case you will need to open up your favorite code editor and terminal. In your terminal, navigate to the skills folder of the Industrial Mixer Repo and use this command with the Composabl CLI.

composabl skill publish mpc-skill-group

Return to the agent orchestration studio and refresh the page. The skill agent will appear in the skills menu on the left of your page.

Explore the Code Files

All skill agents, perceptors, and orchestrators have at least two files in them. A Python file contains the code the skill agent will use, and a config file.

  1. pyproject.toml, a config file with the following information.

  2. A Python file. For this skill agent, we use controller class with the following code and explanations in comments inline.

File Structure

See the Code

MPC Skill Group Controller Skill Agent

pyproject.toml

See the code
[project]
name = "MPC Skill Group"
version = "0.1.0"
description = "MPC prepared for Skill Group"
authors = [{ name = "John Doe", email = "john.doe@composabl.com" }]
dependencies = [
    "composabl-core",
    "scipy",
    "casadi==3.6.6",
    "do_mpc==4.6.5"
]

[composabl]
type = "skill-controller"
entrypoint = "mpc_skill_group.controller:Controller"

controller.py

See the code
from random import randint
from typing import Dict, List

from composabl_core import SkillController
######
import os
import math
import numpy as np
import do_mpc
import numpy as np
from casadi import *
from scipy import interpolate
from math import exp

# time step (seconds) between state updates
Δt = 1

Ï€ = math.pi

class Controller(SkillController):
    def __init__(self, *args, **kwargs):
        """
        Initialize the Controller skill with default values.
        
        Args:
            *args: Variable length argument list.
            **kwargs: Arbitrary keyword arguments.
        """
        # Initialize a counter to track the number of actions computed
        self.count = 0

    async def compute_action(self, obs, action):
        """
        Compute the control action (ΔTc) based on current observations using Model Predictive Control (MPC).
        
        Args:
            obs (list or dict): Current sensor observations.
                If list, expected order: ['T', 'Tc', 'Ca', 'Cref', 'Tref']
            action: The previous action taken (not directly used but considered for ΔTc calculation).
        
        Returns:
            list: A list containing the computed change in Tc (ΔTc).
                  Example: [ΔTc]
        """
        # Convert observations to dictionary if they are provided as a list
        if type(obs) == list:
            obs = {
                'T': obs[0],
                'Tc': obs[1],
                'Ca': obs[2],
                'Cref': obs[3],
                'Tref': obs[4]
            }
        # else:
        #     # Uncomment if you need to ensure all values are floats
        #     for key, value in obs.items():
        #         obs[key] = float(value)

        # Handle action input: ensure it's a float value
        if type(action) == list or type(action) == np.ndarray:
            action = action[0]
        elif type(action) == dict:
            assert type(action['action']) == float
            action = float(action['action'])
        else:
            action = float(action)

        # Initialize noise variable (currently set to 0; can be modified for stochasticity)
        noise = 0

        # Extract and convert sensor readings to float
        CrSP = float(obs['Cref'])    # Reference concentration
        Ca0 = float(obs['Ca'])       # Actual concentration at current step
        T0 = float(obs['T'])         # Temperature at current step
        Tc0 = float(obs['Tc']) + action  # Cooling liquid temperature adjusted by action

        # Define constants for the CSTR model
        F = 1          # Volumetric flow rate (m³/h)
        V = 1          # Reactor volume (m³)
        k0 = 34930800  # Pre-exponential nonthermal factor (1/h)
        E = 11843      # Activation energy per mole (kcal/kmol)
        R = 1.985875   # Boltzmann's ideal gas constant (kcal/(kmol·K))
        ΔH = -5960     # Heat of reaction per mole (kcal/kmol)
        phoCp = 500    # Density multiplied by heat capacity (kcal/(m³·K))
        UA = 150       # Overall heat transfer coefficient multiplied by tank area (kcal/(K·h))
        Cafin = 10     # Inlet concentration (kmol/m³)
        Tf = 298.2     # Feed temperature (K)

        # --- MPC MODEL SETUP ---
        model_type = 'continuous'  # Define model type: 'discrete' or 'continuous'
        model = do_mpc.model.Model(model_type)

        # Define state variables
        Ca = model.set_variable(var_type='_x', var_name='Ca', shape=(1,1))  # Concentration
        T = model.set_variable(var_type='_x', var_name='T', shape=(1,1))    # Temperature

        # Define measurements (if any) with optional measurement noise
        model.set_meas('Ca', Ca, meas_noise=True)
        model.set_meas('T', T, meas_noise=True)

        # Define control input
        Tc = model.set_variable(var_type='_u', var_name='Tc')  # Cooling liquid temperature

        # Define time-varying parameters (TVPs)
        model.set_variable(var_type='_tvp', var_name='Caf')   # Inlet concentration (kmol/m³)
        model.set_variable(var_type='_tvp', var_name='Tref')  # Reference temperature (K)

        # Define model equations (right-hand side)
        model.set_rhs('Ca', (F/V * (Cafin - Ca)) - (k0 * exp(-E/(R*T))*Ca))
        model.set_rhs('T', (F/V *(Tf-T)) - ((ΔH/phoCp)*(k0 * exp(-E/(R*T))*Ca)) - ((UA /(phoCp*V)) *(T-Tc)))

        # Finalize model setup
        model.setup()

        # --- CONTROLLER SETUP ---
        mpc = do_mpc.controller.MPC(model)
        setup_mpc = {
            'n_horizon': 20,       # Prediction horizon
            'n_robust': 1,         # Number of robust steps
            'open_loop': 0,        # Open-loop setting
            't_step': Δt,          # Time step (seconds)
            'store_full_solution': True  # Store full solution
        }

        mpc.set_param(**setup_mpc)

        # Suppress IPOPT solver output for cleaner logs
        surpress_ipopt = {'ipopt.print_level':0, 'ipopt.sb': 'yes', 'print_time':0}
        mpc.set_param(nlpsol_opts = surpress_ipopt)

        # Scaling for states and inputs to improve numerical stability
        mpc.scaling['_x', 'T'] = 100
        mpc.scaling['_u', 'Tc'] = 100

        # --- OBJECTIVE FUNCTION ---
        _x = model.x
        _tvp = model.tvp
        _u = model.u

        # Define terminal and stage cost
        mterm = ((_x['Ca'] - CrSP))**2  # Terminal cost
        lterm = ((_x['Ca'] - CrSP))**2  # Stage cost

        mpc.set_objective(mterm=mterm, lterm=lterm)

        # Define control input penalties to discourage large control actions
        mpc.set_rterm(Tc=1.5)  # Input penalty for Tc

        # --- CONSTRAINTS ---
        # Bounds for state variables
        mpc.bounds['lower', '_x', 'Ca'] = 0.1   # Minimum concentration
        mpc.bounds['upper', '_x', 'Ca'] = 12    # Maximum concentration

        mpc.bounds['upper', '_x', 'T'] = 400    # Maximum temperature
        mpc.bounds['lower', '_x', 'T'] = 100    # Minimum temperature

        # Bounds for control inputs
        mpc.bounds['lower', '_u', 'Tc'] = 273   # Minimum cooling temperature (K)
        mpc.bounds['upper', '_u', 'Tc'] = 322   # Maximum cooling temperature (K)

        # --- TIME-VARYING PARAMETERS (TVPs) SETUP ---
        # Define templates for TVPs
        tvp_temp_1 = mpc.get_tvp_template()
        tvp_temp_1['_tvp', :] = np.array([8.5698])

        tvp_temp_2 = mpc.get_tvp_template()
        tvp_temp_2['_tvp', :] = np.array([2])

        tvp_temp_3 = mpc.get_tvp_template()
        tvp_temp_3['_tvp', :] = np.array([2])

        # Define a function to update TVPs based on current time
        def tvp_fun(t_now):
            p1 = 22    # Time step 1
            p2 = 74    # Time step 2
            time = 90  # Total time

            # Define concentration and temperature equilibrium points
            ceq = [8.57, 6.9275, 5.2850, 3.6425, 2]
            teq = [311.2612, 327.9968, 341.1084, 354.7246, 373.1311]

            # Interpolate concentration and temperature based on current time
            C = interpolate.interp1d([0, p1, p2, time], [8.57, 8.57, 2, 2])
            T_ = interpolate.interp1d([0, p1, p2, time], [311.2612, 311.2612, 373.1311, 373.1311])

            if t_now < p1:
                return tvp_temp_1
            elif p1 <= t_now < p2:
                y = float(C(t_now))
                tvp_temp_3['_tvp', :] = np.array([y])
                return tvp_temp_3
            else:
                return tvp_temp_2

        mpc.set_tvp_fun(tvp_fun)

        # Finalize MPC setup
        mpc.setup()

        # --- ESTIMATOR SETUP ---
        estimator = do_mpc.estimator.StateFeedback(model)

        # --- SIMULATOR SETUP ---
        simulator = do_mpc.simulator.Simulator(model)
        params_simulator = {
            't_step': Δt  # Time step (seconds)
        }

        simulator.set_param(**params_simulator)

        # Define templates for simulator parameters
        p_num = simulator.get_p_template()
        tvp_num = simulator.get_tvp_template()

        # Define functions for TVPs and uncertain parameters in the simulator
        def tvp_fun_sim(t_now):
            return tvp_num

        def p_fun_sim(t_now):
            return p_num

        simulator.set_tvp_fun(tvp_fun_sim)
        simulator.set_p_fun(p_fun_sim)

        # Finalize simulator setup
        simulator.setup()

        # --- INITIAL STATE SETUP ---
        # Initialize states for MPC, simulator, and estimator
        x0 = simulator.x0
        x0['Ca'] = Ca0
        x0['T'] = T0

        u0 = simulator.u0
        u0['Tc'] = Tc0

        mpc.x0 = x0
        simulator.x0 = x0
        estimator.x0 = x0

        mpc.u0 = u0
        simulator.u0 = u0
        estimator.u0 = u0

        # Set initial guess for MPC
        mpc.set_initial_guess()

        # --- MPC CONTROL LOOP ---
        # Simulate N steps (currently set to 1)
        u0_old = 0
        time_steps = 1
        for k in range(time_steps):
            if k > 1:
                u0_old = u0[0][0]

            # Make a control step using MPC
            u0 = mpc.make_step(x0)

            # Enforce a maximum change of ±10 on the control input
            if k > 1:
                if u0[0][0] - u0_old > 10:
                    u0 = np.array([[u0_old + 10]])
                elif u0[0][0] - u0_old < -10:
                    u0 = np.array([[u0_old - 10]])
            else:
                if u0[0][0] - Tc0 >= 10:
                    u0 = np.array([[Tc0 + 10]])
                elif u0[0][0] - Tc0 <= -10:
                    u0 = np.array([[Tc0 - 10]])

            # Add Gaussian noise to the measurements
            error_var = noise
            σ_max1 = error_var * (8.5698 - 2)
            σ_max2 = error_var * (373.1311 - 311.2612)
            mu = 0
            v0 = np.array([
                mu + σ_max1 * np.random.randn(1, 1)[0],
                mu + σ_max2 * np.random.randn(1, 1)[0]
            ])

            # Simulate the next step with the control input and noise
            y_next = simulator.make_step(u0, v0=v0)  # MPC simulation step

            # Reshape state values for consistency
            state_ops = y_next.reshape((1, 2))

            # --- BENCHMARK SETUP ---
            p1 = 22
            p2 = 74
            ceq = [8.57, 6.9275, 5.2850, 3.6425, 2]
            teq = [311.2612, 327.9968, 341.1084, 354.7246, 373.1311]

            # Interpolate reference concentration and temperature
            C = interpolate.interp1d([0, p1, p2, time_steps], [8.57, 8.57, 2, 2])
            T_ = interpolate.interp1d([0, p1, p2, time_steps], [311.2612, 311.2612, 373.1311, 373.1311])

            # Update reference concentrations and temperatures based on current step
            if k < p1:
                Cref = 8.5698
                Tref = 311.2612
            elif p1 <= k < p2:
                y = float(C(k))
                y2 = float(T_(k))
                Cref = y
                Tref = y2
            else:
                Cref = 2
                Tref = 373.1311

            # Update the estimator with the new measurements
            x0 = estimator.make_step(y_next)  # Update state estimates

        # Increment the action counter
        self.count += 1

        # Compute the change in Tc (ΔTc) based on the new control input
        newTc = u0[0][0]
        dTc = float(newTc) - float(obs['Tc'])

        # Return the computed ΔTc as a list
        return [dTc]

    async def transform_sensors(self, obs):
        """
        Process and potentially modify sensor observations before they are used.

        Args:
            obs (dict): Current sensor observations.

        Returns:
            dict: Transformed sensor observations.

        Note:
            - Currently, this method returns the observations unchanged.
            - This can be customized to apply transformations if needed.
        """
        # Currently, no transformation is applied to sensors
        return obs

    async def filtered_sensor_space(self):
        """
        Define which sensors are relevant for this perceptor.

        Returns:
            list: Names of the sensors to be used.

        Note:
            - Specifies a list of sensor names that this perceptor will utilize.
            - Helps in focusing the perceptor's operations on relevant data.
        """
        # Specify the sensors that this perceptor will use
        return ['T', 'Tc', 'Ca', 'Cref', 'Tref', 'Conc_Error', 'Eps_Yield', 'Cb_Prod']

    async def compute_success_criteria(self, transformed_obs, action):
        """
        Determine whether the success criteria have been met.

        Args:
            transformed_obs (dict): Transformed sensor observations.
            action: The action taken.

        Returns:
            bool: True if success criteria are met, False otherwise.

        Behavior:
            - Currently always returns False.
            - Can be implemented with logic to check if certain conditions are satisfied.
        """
        # Placeholder for success criteria logic
        return False

    async def compute_termination(self, transformed_obs, action):
        """
        Determine whether the training episode should terminate.

        Args:
            transformed_obs (dict): Transformed sensor observations.
            action: The action taken.

        Returns:
            bool: True if the episode should terminate, False otherwise.

        Behavior:
            - Currently always returns False.
            - Can be implemented with logic to terminate based on certain conditions.
        """
        # Placeholder for termination condition logic
        return False

2. Build the Plan Execute Pattern Agent System in the Agent Orchestrator UI

First, drag the skill agent control_full_reaction from the hand side of the page to the skill layer. Once it's there, drag over the mpc-skill-group and make sure that it is dropped below the control_full_reaction skill agent and not beside it.

3. Run Your Training Session

We are ready to train your agent system and see the results. Select the cluster you want to use and the number of training cycles. We suggest you run 50 training cycles. You will see the skill agents training one at a time, and you assign the number of cycles you want each skill agent to use. It will automatically assign an equal number of training sessions for each skill agent, but in some agent system designs, some skill agents might require more training than others.

4. View Results

When the training has been completed, you can view your results in the training sessions tab in the UI. This will show you information on how well the agent is learning.

Analyzing the Plan-Execute Pattern Agent’s Performance

Conversion rate: 95% Thermal runaway risk: Very low

We tested this fully trained agent and plotted the results.

Graph of plan-execute agent performance, showing the agent sticking closely to the reference line

This agent is the best performer of the group. Combining two imperfect technologies together with Machine Teaching produces much better results than either technology achieves alone.

Last updated