Skip to content

Using ROSA (ROS Agent)

Introduction

ROSA (ROS Agent) is a crucial component in our system that simplifies the interaction with the Task Sequence Protocol (TSP) through a series of easy-to-use functions at higher levels. This guide covers the basics of how to utilize ROSA for handling tasks, executing sequences, and managing execution statuses within a test-driven development environment.

Setup ROSA

Before executing any tasks, initialize the ROSA instance. Here's how to set up ROSA within your test environment:

from execution_layer.rosa.interfaces.rosa import ROSA
rosa = ROSA()

Executing a Task

To execute a task, you need to define a SequencePackage with a list of ActionPackage items that specify what actions to perform. In the following example, we will use an example function that prints Waiting... 3 times and finishes printing "Hello ROSA!".

For this, we will use a set of functions called sleep_and_print included in the mocks. If you want to use your own functions, you can look at the mocks and copy the templates with your own code.

from execution_layer.rosa.ros2.tools.packages import ActionPackage, SequencePackage, SequencePriority

waiting = ActionPackage(
    action_id=ItemRegistry.get_id(sleep_and_print), text="Wating..."
)
end = ActionPackage(
    action_id=ItemRegistry.get_id(sleep_and_print), text="Hello ROSA!"
)

Now that we have a set of actions, we must define the behavior and priority of the actions. For this, we will use a SequencePackage. In this object, we will define the following properties:

  • task_id: Here you will define a name for the task. This will be used as an internal identifier of all the tasks with the same objective. Usually you should write the purpouse of your sequence, in this case wait_and_print
  • type: Here you can select how to execute your set of actions. You can find all types in the types directory. However you can implement your own types, by loading them as stablished in the basic.py docs. Note that the most relevant types are the following:
  • SimpleSequence: Executes the sequence in order, if an action fails, it will return an ABORT code, else a SUCCESS code.
  • ParallelSequence: Yet not implemented.
  • priority: Stablishes the order in which the server will execute the tasks if multiple tasks with the same id are received. You can look at all priorities in this file
  • actions: An array with all the actions you want to perform
from execution_layer.rosa.ros2.types.basic import SimpleSequence

seq = SequencePackage(
    task_id="wait_and_print",
    type=SimpleSequence.get_type(),
    priority=SequencePriority.NORMAL,
    actions=[wait_for_text, end_text],
)

Now that you have the task defined, we will create a task in ROSA and build a function for controlling the feedback returned from the server. For creating a feedback manager (optional) you can define a function for managing each Feedback object received from ROSA. A feedback object contains two main properties:

  • _exec_status: This value should not be modified, it returns some information of the status of the SequenceType. The most common values are the following:
  • FINISH: This value is always returned when the SequenceType finishes (wether it success or fails)
  • ABORT: Returned when the SequenceType fails
  • SUCCESS: Returned when the SequenceType has ended properly
  • STEP: Returned between the execution of each action
  • RUNNING: Returned when the feedback has asynchronously been generated by an inside action/function

  • object: This value contains any object that you can return from inside the functions executed, use it as you want.

As an example we will make the following controller:

from execution_layer.rosa.ros2.tools.feedback import Feedback
from execution_layer.rosa.ros2.tools.feedback import ExecutionStatus

def my_controller(feedback: Feedback):
    if feedback._exec_status == ExecutionStatus.FINISH:
        print("Task has finished!")

Note that if you don't want to use any controller you can just use the ROSA.muted_callback as argument.

Now that we have everything settled up, we just have to execute all with ROSA. Remember that ROSA will use a non-blocking call so if you want to wait for the execution to finish before continuing the program you can use the wait_for function.

self.rosa.new_task("wait_and_print", feedback_callback=my_controller)
self.rosa.execute(seq)
self.rosa.wait_for("wait_and_print", ExecutionStatus.FINISH)

Stopping a task

Whenever you have launched a task, you may want to abort the following actions. For this reason ROSA implements 2 stopping methods. Soft-Stop and Hard-Stop.

A soft-stop allows the current sequence to complete gracefully before stopping, it will raise an event into the SequenceType to stop executing whenever a clean exit is possible. This exit usually happens between action steps.

self.rosa.soft_stop("wait_and_print")
self.rosa.wait_for("test_soft_stop", ExecutionStatus.FINISH)

A hard-stop immediately raises a signal to stop a task, disregarding its current state or any ongoing actions. This is a critical function for emergency situations where stopping a task as quickly as possible is necessary to prevent undesirable effects or to reset the system rapidly. However, this function should not be used unless it is stricly needed since it can end up in corrupted files or inconsistent executions.

import time

self.rosa._hard_stop("wait_and_print")
time.sleep(1)  # Wait to ensure the process has been terminated

Using Feedback

Wherever you are in python, if you want to return feedback to the SequenceType that has called you (if any) remember that you can return an object with the following structure.

from execution_layer.rosa.ros2.tools.feedback import Feedback

f = Feedback()  # Automatically will settle up your task_id
f.publish(my_object)

# Note: You should have built at least one ROSA instance before using feedback objects.

Now this feedback will be managed by a ROSA callback manager if it has been stablished in the task.

Conclusion

This tutorial provides a foundational understanding of how to interact with ROSA to execute, monitor, and manage tasks within a ROS2-enabled environment. The methods showcased help facilitate effective testing and control of asynchronous task execution, essential for robust software development. Now you are ready to implement and improve your own functionalities with ROSA as an Execution_Layer Algorithm or A_2.