Skip to content

ROSA Interpreter

Warnings and Notes

  • Rosa does support concurrent tasks, however only one Rosa can be instantiated at the time, this is due to ROS2 dependency. Don't worry about initing with RosaInterpreter() or Rosa() multiple times, it is programmed as a singleton, but don't try to instantiate various ROSA in multiple process.

  • Human in the loop or waitings in ROSA can be easily handled with the tools you pass. Just communicate your functions by using Feedback Messages.

  • ROSA only ensures "RUNINNG" and "FINISH" feedback messages will be sent to the callbacks. The other ones will depend on the code inside the tools.

  • You can use the gureg command in the shell to debug in live and look what messages are being passed inside ROSA. Also you can use ROS2 topic tools or change the LOG.LEVEL inside the constants

Supported Properties

Property Default Value Notes
stop Yes
hard_stop Yes (Use with caution)
sync_calls Yes
async_calls Yes Recommended
wait_for Yes
callbacks Yes Full control implemented with Feedback Messages
feedback Yes
callback_keywords ("RUNNING", "STEP", "SUCCESS", "ABORT", "FINISH", "SWITCH") RUNNING and FINISH are ensured for every task
types "Sequential", "ControlledSequence" On develop the implementation of Parallel/Conditional tasks

References: - How to use Feedback Messages - Rosa Internals

RosaInterpreter Attributes

Attribute Type Description
type_dict dict[str, SequenceType] Maps sequence types to their implementations.
callback_dict dict[str, ExecutionStatus] Maps callback keywords to their execution statuses.

Examples

Initing and kill

from ecm.mediator.rosa_interpreter import RosaInterpreter
interpreter = RosaInterpreter()
interpreter.kill()

Running

import ecm.exelent.parser as parser
my_task = parser.parse("path_to_exln_file")

interpreter.run(my_task) # Will return at finish
interpreter.arun(my_task) # Returns immediatly

Waiting:

# Be careful with waiting, you must be sure the call will be raised or you could end blocked.
# You can safely wait for FINISH, but for messages as STEP, SUCCESS or ABORT is better to use callbacks.

interpreter.arun(my_task)
interpreter.wait_for(my_task.name, "FINISH") # Equivalent to interpreter.run()

Stopping:

interpreter.arun(my_task)
interpreter.stop(my_task.name)