Create a Custom Controller

In this tutorial, you will learn how to create a custom control backend that receives the current state of the vehicle and its sensors and computes the control inputs to give to the rotors of the drone. The control laws implemented in this tutorial and the math behind them are well described in [PGC21], [MK11].

In this tutorial we will endup with a simulation that does NOT use PX4-Autopilot , and instead will use a custom nonlinear controller to make the vehicle follow a pre-determined trajectory, written in just a few lines of pure Python code.

Note

The Control Backend API is what was used to implement the MAVLink/PX4 backend. This interface is very powerful as it not only allows the user to create custom communication protocols (such as ROS) for controlling the vehicle, as well as defining and testing new control algorithms directly in pure Python code. This is specially usefull for conducting MATLAB-like simulations, as we can implement and test algorithms directly in a 3D environment by writing a few lines of Python code without having to deal with Mavlink, PX4 or ROS like in other simulators.

0. Preparation

This tutorial assumes that you have followed the Your First Simulation section first.

1. Code

The tutorial corresponds to the 4_python_single_vehicle.py example in the examples directory.

  1#!/usr/bin/env python
  2"""
  3| File: 4_python_single_vehicle.py
  4| Author: Marcelo Jacinto and Joao Pinto (marcelo.jacinto@tecnico.ulisboa.pt, joao.s.pinto@tecnico.ulisboa.pt)
  5| License: BSD-3-Clause. Copyright (c) 2023, Marcelo Jacinto. All rights reserved.
  6| Description: This files serves as an example on how to use the control backends API to create a custom controller 
  7for the vehicle from scratch and use it to perform a simulation, without using PX4 nor ROS.
  8"""
  9
 10# Imports to start Isaac Sim from this script
 11import carb
 12from omni.isaac.kit import SimulationApp
 13
 14# Start Isaac Sim's simulation environment
 15# Note: this simulation app must be instantiated right after the SimulationApp import, otherwise the simulator will crash
 16# as this is the object that will load all the extensions and load the actual simulator.
 17simulation_app = SimulationApp({"headless": False})
 18
 19# -----------------------------------
 20# The actual script should start here
 21# -----------------------------------
 22import omni.timeline
 23from omni.isaac.core.world import World
 24
 25# Import the Pegasus API for simulating drones
 26from pegasus.simulator.params import ROBOTS, SIMULATION_ENVIRONMENTS
 27from pegasus.simulator.logic.vehicles.multirotor import Multirotor, MultirotorConfig
 28from pegasus.simulator.logic.interface.pegasus_interface import PegasusInterface
 29
 30# Import the custom python control backend
 31from utils.nonlinear_controller import NonlinearController
 32
 33# Auxiliary scipy and numpy modules
 34from scipy.spatial.transform import Rotation
 35
 36# Use os and pathlib for parsing the desired trajectory from a CSV file
 37import os
 38from pathlib import Path
 39
 40
 41class PegasusApp:
 42    """
 43    A Template class that serves as an example on how to build a simple Isaac Sim standalone App.
 44    """
 45
 46    def __init__(self):
 47        """
 48        Method that initializes the PegasusApp and is used to setup the simulation environment.
 49        """
 50
 51        # Acquire the timeline that will be used to start/stop the simulation
 52        self.timeline = omni.timeline.get_timeline_interface()
 53
 54        # Start the Pegasus Interface
 55        self.pg = PegasusInterface()
 56
 57        # Acquire the World, .i.e, the singleton that controls that is a one stop shop for setting up physics, 
 58        # spawning asset primitives, etc.
 59        self.pg._world = World(**self.pg._world_settings)
 60        self.world = self.pg.world
 61
 62        # Launch one of the worlds provided by NVIDIA
 63        self.pg.load_environment(SIMULATION_ENVIRONMENTS["Curved Gridroom"])
 64
 65        # Get the current directory used to read trajectories and save results
 66        self.curr_dir = str(Path(os.path.dirname(os.path.realpath(__file__))).resolve())
 67
 68        # Create the vehicle 1
 69        # Try to spawn the selected robot in the world to the specified namespace
 70        config_multirotor1 = MultirotorConfig()
 71        config_multirotor1.backends = [NonlinearController(
 72            trajectory_file=self.curr_dir + "/trajectories/pitch_relay_90_deg_2.csv",
 73            results_file=self.curr_dir + "/results/single_statistics.npz",
 74            Ki=[0.5, 0.5, 0.5],
 75            Kr=[2.0, 2.0, 2.0]
 76        )]
 77
 78        Multirotor(
 79            "/World/quadrotor1",
 80            ROBOTS['Iris'],
 81            0,
 82            [2.3, -1.5, 0.07],
 83            Rotation.from_euler("XYZ", [0.0, 0.0, 0.0], degrees=True).as_quat(),
 84            config=config_multirotor1,
 85        )
 86
 87        # Reset the simulation environment so that all articulations (aka robots) are initialized
 88        self.world.reset()
 89
 90    def run(self):
 91        """
 92        Method that implements the application main loop, where the physics steps are executed.
 93        """
 94
 95        # Start the simulation
 96        self.timeline.play()
 97
 98        # The "infinite" loop
 99        while simulation_app.is_running():
100
101            # Update the UI of the app and perform the physics step
102            self.world.step(render=True)
103        
104        # Cleanup and stop
105        carb.log_warn("PegasusApp Simulation App is closing.")
106        self.timeline.stop()
107        simulation_app.close()
108
109def main():
110
111    # Instantiate the template app
112    pg_app = PegasusApp()
113
114    # Run the application loop
115    pg_app.run()
116
117if __name__ == "__main__":
118    main()

The next code section corresponds to the nonlinear_controller.py in the examples/utils directory.

  1#!/usr/bin/env python
  2"""
  3| File: nonlinear_controller.py
  4| Author: Marcelo Jacinto and Joao Pinto (marcelo.jacinto@tecnico.ulisboa.pt, joao.s.pinto@tecnico.ulisboa.pt)
  5| License: BSD-3-Clause. Copyright (c) 2023, Marcelo Jacinto. All rights reserved.
  6| Description: This files serves as an example on how to use the control backends API to create a custom controller 
  7for the vehicle from scratch and use it to perform a simulation, without using PX4 nor ROS. In this controller, we
  8provide a quick way of following a given trajectory specified in csv files or track an hard-coded trajectory based
  9on exponentials! NOTE: This is just an example, to demonstrate the potential of the API. A much more flexible solution
 10can be achieved
 11"""
 12
 13# Imports to be able to log to the terminal with fancy colors
 14import carb
 15
 16# Imports from the Pegasus library
 17from pegasus.simulator.logic.state import State
 18from pegasus.simulator.logic.backends import Backend
 19
 20# Auxiliary scipy and numpy modules
 21import numpy as np
 22from scipy.spatial.transform import Rotation
 23
 24class NonlinearController(Backend):
 25    """A nonlinear controller class. It implements a nonlinear controller that allows a vehicle to track
 26    aggressive trajectories. This controlers is well described in the papers
 27    
 28    [1] J. Pinto, B. J. Guerreiro and R. Cunha, "Planning Parcel Relay Manoeuvres for Quadrotors," 
 29    2021 International Conference on Unmanned Aircraft Systems (ICUAS), Athens, Greece, 2021, 
 30    pp. 137-145, doi: 10.1109/ICUAS51884.2021.9476757.
 31    [2] D. Mellinger and V. Kumar, "Minimum snap trajectory generation and control for quadrotors," 
 32    2011 IEEE International Conference on Robotics and Automation, Shanghai, China, 2011, 
 33    pp. 2520-2525, doi: 10.1109/ICRA.2011.5980409.
 34    """
 35
 36    def __init__(self, 
 37        trajectory_file: str = None, 
 38        results_file: str=None, 
 39        reverse=False, 
 40        Kp=[10.0, 10.0, 10.0],
 41        Kd=[8.5, 8.5, 8.5],
 42        Ki=[1.50, 1.50, 1.50],
 43        Kr=[3.5, 3.5, 3.5],
 44        Kw=[0.5, 0.5, 0.5]):
 45
 46        # The current rotor references [rad/s]
 47        self.input_ref = [0.0, 0.0, 0.0, 0.0]
 48
 49        # The current state of the vehicle expressed in the inertial frame (in ENU)
 50        self.p = np.zeros((3,))                   # The vehicle position
 51        self.R: Rotation = Rotation.identity()    # The vehicle attitude
 52        self.w = np.zeros((3,))                   # The angular velocity of the vehicle
 53        self.v = np.zeros((3,))                   # The linear velocity of the vehicle in the inertial frame
 54        self.a = np.zeros((3,))                   # The linear acceleration of the vehicle in the inertial frame
 55
 56        # Define the control gains matrix for the outer-loop
 57        self.Kp = np.diag(Kp)
 58        self.Kd = np.diag(Kd)
 59        self.Ki = np.diag(Ki)
 60        self.Kr = np.diag(Kr)
 61        self.Kw = np.diag(Kw)
 62
 63        self.int = np.array([0.0, 0.0, 0.0])
 64
 65        # Define the dynamic parameters for the vehicle
 66        self.m = 1.50        # Mass in Kg
 67        self.g = 9.81       # The gravity acceleration ms^-2
 68
 69        # Read the target trajectory from a CSV file inside the trajectories directory
 70        # if a trajectory is provided. Otherwise, just perform the hard-coded trajectory provided with this controller
 71        if trajectory_file is not None:
 72            self.trajectory = self.read_trajectory_from_csv(trajectory_file)
 73            self.index = 0
 74            self.max_index, _ = self.trajectory.shape
 75            self.total_time = 0.0
 76        # Use the built-in trajectory hard-coded for this controller
 77        else:
 78            # Set the initial time for starting when using the built-in trajectory (the time is also used in this case
 79            # as the parametric value)
 80            self.total_time = -5.0
 81            # Signal that we will not used a received trajectory
 82            self.trajectory = None
 83            self.max_index = 1
 84
 85        self.reverse = reverse
 86
 87        # Auxiliar variable, so that we only start sending motor commands once we get the state of the vehicle
 88        self.reveived_first_state = False
 89
 90        # Lists used for analysing performance statistics
 91        self.results_files = results_file
 92        self.time_vector = []
 93        self.desired_position_over_time = []
 94        self.position_over_time = []
 95        self.position_error_over_time = []
 96        self.velocity_error_over_time = []
 97        self.atittude_error_over_time = []
 98        self.attitude_rate_error_over_time = []
 99
100    def read_trajectory_from_csv(self, file_name: str):
101        """Auxiliar method used to read the desired trajectory from a CSV file
102
103        Args:
104            file_name (str): A string with the name of the trajectory inside the trajectories directory
105
106        Returns:
107            np.ndarray: A numpy matrix with the trajectory desired states over time
108        """
109
110        # Read the trajectory to a pandas frame
111        return np.flip(np.genfromtxt(file_name, delimiter=','), axis=0)
112
113
114    def start(self):
115        """
116        Reset the control and trajectory index
117        """
118        self.reset_statistics()
119        
120
121    def stop(self):
122        """
123        Stopping the controller. Saving the statistics data for plotting later
124        """
125
126        # Check if we should save the statistics to some file or not
127        if self.results_files is None:
128            return
129        
130        statistics = {}
131        statistics["time"] = np.array(self.time_vector)
132        statistics["p"] = np.vstack(self.position_over_time)
133        statistics["desired_p"] = np.vstack(self.desired_position_over_time)
134        statistics["ep"] = np.vstack(self.position_error_over_time)
135        statistics["ev"] = np.vstack(self.velocity_error_over_time)
136        statistics["er"] = np.vstack(self.atittude_error_over_time)
137        statistics["ew"] = np.vstack(self.attitude_rate_error_over_time)
138        np.savez(self.results_files, **statistics)
139        carb.log_warn("Statistics saved to: " + self.results_files)
140
141        self.reset_statistics()
142
143    def update_sensor(self, sensor_type: str, data):
144        """
145        Do nothing. For now ignore all the sensor data and just use the state directly for demonstration purposes. 
146        This is a callback that is called at every physics step.
147
148        Args:
149            sensor_type (str): The name of the sensor providing the data
150            data (dict): A dictionary that contains the data produced by the sensor
151        """
152        pass
153
154    def update_state(self, state: State):
155        """
156        Method that updates the current state of the vehicle. This is a callback that is called at every physics step
157
158        Args:
159            state (State): The current state of the vehicle.
160        """
161        self.p = state.position
162        self.R = Rotation.from_quat(state.attitude)
163        self.w = state.angular_velocity
164        self.v = state.linear_velocity
165
166        self.reveived_first_state = True
167
168    def input_reference(self):
169        """
170        Method that is used to return the latest target angular velocities to be applied to the vehicle
171
172        Returns:
173            A list with the target angular velocities for each individual rotor of the vehicle
174        """
175        return self.input_ref
176
177    def update(self, dt: float):
178        """Method that implements the nonlinear control law and updates the target angular velocities for each rotor. 
179        This method will be called by the simulation on every physics step
180
181        Args:
182            dt (float): The time elapsed between the previous and current function calls (s).
183        """
184        
185        if self.reveived_first_state == False:
186            return
187
188        # -------------------------------------------------
189        # Update the references for the controller to track
190        # -------------------------------------------------
191        self.total_time += dt
192        
193
194        # Check if we need to update to the next trajectory index
195        if self.index < self.max_index - 1 and self.total_time >= self.trajectory[self.index + 1, 0]:
196            self.index += 1
197
198        # Update using an external trajectory
199        if self.trajectory is not None:
200            # the target positions [m], velocity [m/s], accelerations [m/s^2], jerk [m/s^3], yaw-angle [rad], yaw-rate [rad/s]
201            p_ref = np.array([self.trajectory[self.index, 1], self.trajectory[self.index, 2], self.trajectory[self.index, 3]])
202            v_ref = np.array([self.trajectory[self.index, 4], self.trajectory[self.index, 5], self.trajectory[self.index, 6]])
203            a_ref = np.array([self.trajectory[self.index, 7], self.trajectory[self.index, 8], self.trajectory[self.index, 9]])
204            j_ref = np.array([self.trajectory[self.index, 10], self.trajectory[self.index, 11], self.trajectory[self.index, 12]])
205            yaw_ref = self.trajectory[self.index, 13]
206            yaw_rate_ref = self.trajectory[self.index, 14]
207        # Or update the reference using the built-in trajectory
208        else:
209            s = 0.6
210            p_ref = self.pd(self.total_time, s, self.reverse)
211            v_ref = self.d_pd(self.total_time, s, self.reverse)
212            a_ref = self.dd_pd(self.total_time, s, self.reverse)
213            j_ref = self.ddd_pd(self.total_time, s, self.reverse)
214            yaw_ref = self.yaw_d(self.total_time, s)
215            yaw_rate_ref = self.d_yaw_d(self.total_time, s)
216
217        # -------------------------------------------------
218        # Start the controller implementation
219        # -------------------------------------------------
220
221        # Compute the tracking errors
222        ep = self.p - p_ref
223        ev = self.v - v_ref
224        self.int = self.int +  (ep * dt)
225        ei = self.int
226
227        # Compute F_des term
228        F_des = -(self.Kp @ ep) - (self.Kd @ ev) - (self.Ki @ ei) + np.array([0.0, 0.0, self.m * self.g]) + (self.m * a_ref)
229
230        # Get the current axis Z_B (given by the last column of the rotation matrix)
231        Z_B = self.R.as_matrix()[:,2]
232
233        # Get the desired total thrust in Z_B direction (u_1)
234        u_1 = F_des @ Z_B
235
236        # Compute the desired body-frame axis Z_b
237        Z_b_des = F_des / np.linalg.norm(F_des)
238
239        # Compute X_C_des 
240        X_c_des = np.array([np.cos(yaw_ref), np.sin(yaw_ref), 0.0])
241
242        # Compute Y_b_des
243        Z_b_cross_X_c = np.cross(Z_b_des, X_c_des)
244        Y_b_des = Z_b_cross_X_c / np.linalg.norm(Z_b_cross_X_c)
245
246        # Compute X_b_des
247        X_b_des = np.cross(Y_b_des, Z_b_des)
248
249        # Compute the desired rotation R_des = [X_b_des | Y_b_des | Z_b_des]
250        R_des = np.c_[X_b_des, Y_b_des, Z_b_des]
251        R = self.R.as_matrix()
252
253        # Compute the rotation error
254        e_R = 0.5 * self.vee((R_des.T @ R) - (R.T @ R_des))
255
256        # Compute an approximation of the current vehicle acceleration in the inertial frame (since we cannot measure it directly)
257        self.a = (u_1 * Z_B) / self.m - np.array([0.0, 0.0, self.g])
258
259        # Compute the desired angular velocity by projecting the angular velocity in the Xb-Yb plane
260        # projection of angular velocity on xB − yB plane
261        # see eqn (7) from [2].
262        hw = (self.m / u_1) * (j_ref - np.dot(Z_b_des, j_ref) * Z_b_des) 
263        
264        # desired angular velocity
265        w_des = np.array([-np.dot(hw, Y_b_des), 
266                           np.dot(hw, X_b_des), 
267                           yaw_rate_ref * Z_b_des[2]])
268
269        # Compute the angular velocity error
270        e_w = self.w - w_des
271
272        # Compute the torques to apply on the rigid body
273        tau = -(self.Kr @ e_R) - (self.Kw @ e_w)
274
275        # Use the allocation matrix provided by the Multirotor vehicle to convert the desired force and torque
276        # to angular velocity [rad/s] references to give to each rotor
277        if self.vehicle:
278            self.input_ref = self.vehicle.force_and_torques_to_velocities(u_1, tau)
279
280        # ----------------------------
281        # Statistics to save for later
282        # ----------------------------
283        self.time_vector.append(self.total_time)
284        self.position_over_time.append(self.p)
285        self.desired_position_over_time.append(p_ref)
286        self.position_error_over_time.append(ep)
287        self.velocity_error_over_time.append(ev)
288        self.atittude_error_over_time.append(e_R)
289        self.attitude_rate_error_over_time.append(e_w)
290
291    @staticmethod
292    def vee(S):
293        """Auxiliary function that computes the 'v' map which takes elements from so(3) to R^3.
294
295        Args:
296            S (np.array): A matrix in so(3)
297        """
298        return np.array([-S[1,2], S[0,2], -S[0,1]])
299    
300    def reset_statistics(self):
301
302        self.index = 0
303        # If we received an external trajectory, reset the time to 0.0
304        if self.trajectory is not None:
305            self.total_time = 0.0
306        # if using the internal trajectory, make the parametric value start at -5.0
307        else:
308            self.total_time = -5.0
309
310        # Reset the lists used for analysing performance statistics
311        self.time_vector = []
312        self.desired_position_over_time = []
313        self.position_over_time = []
314        self.position_error_over_time = []
315        self.velocity_error_over_time = []
316        self.atittude_error_over_time = []
317        self.attitude_rate_error_over_time = []
318
319    # ---------------------------------------------------
320    # Definition of an exponential trajectory for example
321    # This can be used as a reference if not trajectory file is passed
322    # as an argument to the constructor of this class
323    # ---------------------------------------------------
324
325    def pd(self, t, s, reverse=False):
326        """The desired position of the built-in trajectory
327
328        Args:
329            t (float): The parametric value that guides the equation
330            s (float): How steep and agressive the curve is
331            reverse (bool, optional): Choose whether we want to flip the curve (so that we can have 2 drones almost touching). Defaults to False.
332
333        Returns:
334            np.ndarray: A 3x1 array with the x, y ,z desired [m]
335        """
336
337        x = t
338        z = 1 / s * np.exp(-0.5 * np.power(t/s, 2)) + 1.0
339        y = 1 / s * np.exp(-0.5 * np.power(t/s, 2))
340
341        if reverse == True:
342            y = -1 / s * np.exp(-0.5 * np.power(t/s, 2)) + 4.5
343
344        return np.array([x,y,z])
345
346    def d_pd(self, t, s, reverse=False):
347        """The desired velocity of the built-in trajectory
348
349        Args:
350            t (float): The parametric value that guides the equation
351            s (float): How steep and agressive the curve is
352            reverse (bool, optional): Choose whether we want to flip the curve (so that we can have 2 drones almost touching). Defaults to False.
353
354        Returns:
355            np.ndarray: A 3x1 array with the d_x, d_y ,d_z desired [m/s]
356        """
357
358        x = 1.0
359        y = -(t * np.exp(-np.power(t,2)/(2*np.power(s,2))))/np.power(s,3)
360        z = -(t * np.exp(-np.power(t,2)/(2*np.power(s,2))))/np.power(s,3)
361
362        if reverse == True:
363            y = (t * np.exp(-np.power(t,2)/(2*np.power(s,2))))/np.power(s,3)
364
365        return np.array([x,y,z])
366
367    def dd_pd(self, t, s, reverse=False):
368        """The desired acceleration of the built-in trajectory
369
370        Args:
371            t (float): The parametric value that guides the equation
372            s (float): How steep and agressive the curve is
373            reverse (bool, optional): Choose whether we want to flip the curve (so that we can have 2 drones almost touching). Defaults to False.
374
375        Returns:
376            np.ndarray: A 3x1 array with the dd_x, dd_y ,dd_z desired [m/s^2]
377        """
378
379        x = 0.0
380        y = (np.power(t,2)*np.exp(-np.power(t,2)/(2*np.power(s,2))))/np.power(s,5) - np.exp(-np.power(t,2)/(2*np.power(s,2)))/np.power(s,3)
381        z = (np.power(t,2)*np.exp(-np.power(t,2)/(2*np.power(s,2))))/np.power(s,5) - np.exp(-np.power(t,2)/(2*np.power(s,2)))/np.power(s,3)
382
383        if reverse == True:
384            y = np.exp(-np.power(t,2)/(2*np.power(s,2)))/np.power(s,3) - (np.power(t,2)*np.exp(-np.power(t,2)/(2*np.power(s,2))))/np.power(s,5)
385
386        return np.array([x,y,z])
387
388    def ddd_pd(self, t, s, reverse=False):
389        """The desired jerk of the built-in trajectory
390
391        Args:
392            t (float): The parametric value that guides the equation
393            s (float): How steep and agressive the curve is
394            reverse (bool, optional): Choose whether we want to flip the curve (so that we can have 2 drones almost touching). Defaults to False.
395
396        Returns:
397            np.ndarray: A 3x1 array with the ddd_x, ddd_y ,ddd_z desired [m/s^3]
398        """
399        x = 0.0
400        y = (3*t*np.exp(-np.power(t,2)/(2*np.power(s,2))))/np.power(s,5) - (np.power(t,3)*np.exp(-np.power(t,2)/(2*np.power(s,2))))/np.power(s,7)
401        z = (3*t*np.exp(-np.power(t,2)/(2*np.power(s,2))))/np.power(s,5) - (np.power(t,3)*np.exp(-np.power(t,2)/(2*np.power(s,2))))/np.power(s,7)
402
403        if reverse == True:
404            y = (np.power(t,3)*np.exp(-np.power(t,2)/(2*np.power(s,2))))/np.power(s,7) - (3*t*np.exp(-np.power(t,2)/(2*np.power(s,2))))/np.power(s,5)
405
406        return np.array([x,y,z])
407
408    def yaw_d(self, t, s):
409        """The desired yaw of the built-in trajectory
410
411        Args:
412            t (float): The parametric value that guides the equation
413            s (float): How steep and agressive the curve is
414            reverse (bool, optional): Choose whether we want to flip the curve (so that we can have 2 drones almost touching). Defaults to False.
415
416        Returns:
417            np.ndarray: A float with the desired yaw in rad
418        """
419        return 0.0
420    
421    def d_yaw_d(self, t, s):
422        """The desired yaw_rate of the built-in trajectory
423
424        Args:
425            t (float): The parametric value that guides the equation
426            s (float): How steep and agressive the curve is
427            reverse (bool, optional): Choose whether we want to flip the curve (so that we can have 2 drones almost touching). Defaults to False.
428
429        Returns:
430            np.ndarray: A float with the desired yaw_rate in rad/s
431        """
432        return 0.0

2. Explanation

To start a pre-programed simulation using a different control backend other than MAVLink we include our custom control backend module written in pure Python.

30# Import the custom python control backend
31from utils.nonlinear_controller import NonlinearController

We now create a multirotor configuration and set the multirotor backend to be our NonlinearController object. That’s it! It is that simple! Instead of using the MAVLink/PX4` control we will use a pure Python controller written by us. The rest of the script is the same as in the previous tutorial.

68        # Create the vehicle 1
69        # Try to spawn the selected robot in the world to the specified namespace
70        config_multirotor1 = MultirotorConfig()
71        config_multirotor1.backends = [NonlinearController(
72            trajectory_file=self.curr_dir + "/trajectories/pitch_relay_90_deg_2.csv",
73            results_file=self.curr_dir + "/results/single_statistics.npz",
74            Ki=[0.5, 0.5, 0.5],
75            Kr=[2.0, 2.0, 2.0]

Now, let’s take a look on how the NonlinearControl class is structured and how you can build your own. We must start by including the Backend class found in the pegasus.simulator.logic.backends API. To explore all the functionalities offered by this class, check the Backend API reference page.

18from pegasus.simulator.logic.backends import Backend

Next, we define a class that inherits the Backend` class. This class must implement a few methods to be compliant with the Multirotor API. In this example we will implement an “hard-coded” nonlinear geometrical controller for the multirotor to track a pre-defined trajectory.

24class NonlinearController(Backend):
25    """A nonlinear controller class. It implements a nonlinear controller that allows a vehicle to track
26    aggressive trajectories. This controlers is well described in the papers
27    
28    [1] J. Pinto, B. J. Guerreiro and R. Cunha, "Planning Parcel Relay Manoeuvres for Quadrotors," 
29    2021 International Conference on Unmanned Aircraft Systems (ICUAS), Athens, Greece, 2021, 
30    pp. 137-145, doi: 10.1109/ICUAS51884.2021.9476757.
31    [2] D. Mellinger and V. Kumar, "Minimum snap trajectory generation and control for quadrotors," 
32    2011 IEEE International Conference on Robotics and Automation, Shanghai, China, 2011, 
33    pp. 2520-2525, doi: 10.1109/ICRA.2011.5980409.
34    """
35
36    def __init__(self, 
37        trajectory_file: str = None, 
38        results_file: str=None, 
39        reverse=False, 
40        Kp=[10.0, 10.0, 10.0],
41        Kd=[8.5, 8.5, 8.5],
42        Ki=[1.50, 1.50, 1.50],
43        Kr=[3.5, 3.5, 3.5],
44        Kw=[0.5, 0.5, 0.5]):

The first method that should be implemented is start() . This method gets invoked by the vehicle when the simulation starts. You should perform any initialization of your algorithm or communication protocol here.

114    def start(self):
115        """
116        Reset the control and trajectory index
117        """

The stop() method gets invoked by the vehicle when the simulation stops. You should perform any cleanup here.

121    def stop(self):
122        """
123        Stopping the controller. Saving the statistics data for plotting later
124        """

The update_sensor(sensor_type: str, data) method gets invoked by the vehicle at every physics step iteration (it is used as a callback) for each sensor that generated output data. It receives as input a string which describes the type of sensor and the data produced by that sensor. It is up to you to decide on how to parse the sensor data and use it. In this case we are not going to use the data produced by the simulated sensors for our controller. Instead we will get the state of the vehicle directly from the simulation and use that to feedback into our control law.

143    def update_sensor(self, sensor_type: str, data):
144        """
145        Do nothing. For now ignore all the sensor data and just use the state directly for demonstration purposes. 
146        This is a callback that is called at every physics step.
147
148        Args:
149            sensor_type (str): The name of the sensor providing the data
150            data (dict): A dictionary that contains the data produced by the sensor
151        """

Note

You can take a look on how the MavlinkBackend is implemented to check on how to parse sensor data produced by IMU, GPS, etc. A simple strategy is to use an if-statement to check for which sensor we are receive the data from and parse it accordingly, for example:

if sensor_type == "imu":
   # do something
elif sensor_type == "gps":
   # do something else
else:
   # some sensor we do not care about, so ignore

Check the Sensors API Reference to check what type of data each sensor generates. For most cases, it will be a dictionary.

The update_state(state: State) method is called at every physics steps with the most up-to-date State of the vehicle. The state object contains:

  • position of the vehicle’s body frame with respect to the inertial frame, expressed in the inertial frame;

  • attitude of the vehicle’s body frame with respect to the inertial frame, expressed in the inertial frame;

  • linear velocity of the vehicle’s body frame, with respect to the inertial frame, expressed in the inertial frame;

  • linear body-velocity of the vehicle’s body frame, with respect to the inertial frame, expressed in the vehicle’s body frame;

  • angular velocity of the vehicle’s body frame, with respect to the inertial frame, expressed in the vehicle’s body frame;

  • linear acceleration of the vehicle’s body frame, with respect to the inertial frame, expressed in the inertial frame.

All of this data is filled adopting a East-North-Down (ENU) convention for the Inertial Reference Frame and a Front-Left-Up (FLU) for the vehicle’s body frame of reference.

Note

In case you need to adopt other conventions, the State class also provides methods to return the state of the vehicle in North-East-Down (NED) and Front-Right-Down (FRD) conventions, so you don’t have to make those conversions manually. Check the State API reference page for more information.

154    def update_state(self, state: State):
155        """
156        Method that updates the current state of the vehicle. This is a callback that is called at every physics step
157
158        Args:
159            state (State): The current state of the vehicle.
160        """

The input_reference() method is called at every physics steps by the vehicle to retrieve from the controller a list of floats with the target angular velocities in [rad/s] to use as reference to apply to each vehicle rotor. The list of floats returned by this method should have the same length as the number of rotors of the vehicle.

Note

In order to have a general controller, you might not want to hard-code the number of rotors of the vehicle in the controller. To take this into account, the Backend object is initialized with a reference to the Vehicle object when this gets instantiated. Therefore, it will access to all of its attributes, such as the number of rotors and methods to convert torques and forces in the body-frame to target rotor angular velocities!

168    def input_reference(self):
169        """
170        Method that is used to return the latest target angular velocities to be applied to the vehicle
171
172        Returns:
173            A list with the target angular velocities for each individual rotor of the vehicle
174        """

The update(dt: float) method gets invoked at every physics step by the vehicle. It receives as argument the amount of time elapsed since the last update (in seconds). This is where you should define your controller/communication layer logic and update the list of reference angular velocities for the rotors.

177    def update(self, dt: float):
178        """Method that implements the nonlinear control law and updates the target angular velocities for each rotor. 
179        This method will be called by the simulation on every physics step
180
181        Args:
182            dt (float): The time elapsed between the previous and current function calls (s).
183        """

3. Execution

Now let’s run the Python script:

ISAACSIM_PYTHON examples/4_python_single_vehicle.py

This should open a stage with a blue ground-plane with an 3DR Iris vehicle model in it. The simulation should start playing automatically and the stage being rendered. The vehicle will automatically start flying and performing a very fast relay maneuvre. If you miss it, you can just hit the stop/play button again to repeat it.

Notice now, that unlike the previous tutorial, if you open QGroundControl you won’t see the vehicle. This is normal, because now we are not using the MAVLink control backend, but instead our custom NonlinearControl backend.