ROSA Interpreter
Warnings and Notes
-
Rosa does support concurrent tasks, however only one Rosa can be instantiated at the time, this is due to ROS2 dependency. Don't worry about initing with
RosaInterpreter()
orRosa()
multiple times, it is programmed as a singleton, but don't try to instantiate various ROSA in multiple process. -
Human in the loop or waitings in ROSA can be easily handled with the tools you pass. Just communicate your functions by using Feedback Messages.
-
ROSA only ensures "RUNINNG" and "FINISH" feedback messages will be sent to the callbacks. The other ones will depend on the code inside the tools.
-
You can use the
gureg
command in the shell to debug in live and look what messages are being passed inside ROSA. Also you can use ROS2 topic tools or change theLOG.LEVEL
inside the constants
Supported Properties
Property | Default Value | Notes |
---|---|---|
stop | Yes | |
hard_stop | Yes | (Use with caution) |
sync_calls | Yes | |
async_calls | Yes | Recommended |
wait_for | Yes | |
callbacks | Yes | Full control implemented with Feedback Messages |
feedback | Yes | |
callback_keywords | ("RUNNING", "STEP", "SUCCESS", "ABORT", "FINISH", "SWITCH") |
RUNNING and FINISH are ensured for every task |
types | "Sequential" , "ControlledSequence" |
On develop the implementation of Parallel/Conditional tasks |
References: - How to use Feedback Messages - Rosa Internals
RosaInterpreter Attributes
Attribute | Type | Description |
---|---|---|
type_dict |
dict[str, SequenceType] |
Maps sequence types to their implementations. |
callback_dict |
dict[str, ExecutionStatus] |
Maps callback keywords to their execution statuses. |
Examples
Initing and kill
from ecm.mediator.rosa_interpreter import RosaInterpreter
interpreter = RosaInterpreter()
interpreter.kill()
Running
import ecm.exelent.parser as parser
my_task = parser.parse("path_to_exln_file")
interpreter.run(my_task) # Will return at finish
interpreter.arun(my_task) # Returns immediatly
Waiting:
# Be careful with waiting, you must be sure the call will be raised or you could end blocked.
# You can safely wait for FINISH, but for messages as STEP, SUCCESS or ABORT is better to use callbacks.
interpreter.arun(my_task)
interpreter.wait_for(my_task.name, "FINISH") # Equivalent to interpreter.run()
Stopping:
interpreter.arun(my_task)
interpreter.stop(my_task.name)