Source code for mex_sentinel

#! /usr/bin/env python

The MEx Sentinel ROS node keeps track of all information about the Mobile Executors (MExs). It provides services for retrieving and editing the MEx information.
More information on each (callback) function can be found below.

import rospy
from rooster_fleet_manager.srv import AssignJobToMex, AssignJobToMexResponse, UnassignJobFromMex, UnassignJobFromMexResponse, ChangeMexStatus, ChangeMexStatusResponse, GetMexStatus, GetMexStatusResponse, GetMexList, GetMexListResponse
from rooster_fleet_manager.msg import MexInfo, MexListInfo
from JobManager.MobileExecutor import MExStatus, MobileExecutor

# Make mex_list global
mex_list = []

[docs]def mobile_executor_initialization(): """ Function which is called once to initialize MobileExecutors and add them the MEx list based on the ROS parameter server. """ # Read robot_list from ROS parameter server. Robotnames are strings. robot_namespaces = rospy.get_param("/robot_list") # Go through the robot list, extract each name and create MobileExecutor object with that name, then append to the MEx list. for i in robot_namespaces: robot_name = i robot_instance = MobileExecutor(robot_name) mex_list.append(robot_instance)
#service to provide all mex with status
[docs]def get_mex_list(request): """ GetMexList service callback function. Takes in a empty request. Returns a GetMexListResponse response with a list of MexInfo objects for each MExs and a flag if the call was handled succesfully. """ response = GetMexListResponse() # Initialize mex list and bool success to return. # Making new list of MexInfo objects by copying each mex from original mex_info list. for i in mex_list: mex = MexInfo() = mex.status = str( # Return the status as the Enum's name (string) mex.job_id = str(i.job_id) response.mex_list.append(mex) # Add current object to the response list. if len(response.mex_list) == len(mex_list): response.success = True else: response.success = False return response
[docs]def assign_job(request): """ AssignJob service callback function. Takes in a MEx ID and Job ID and assigns the Job to the MEx, updating it's information. Returns a flag if the call was handled succesfully. """ assignment = False #flag for i in mex_list: if == request.mex_id: i.job_id = request.job_id i.status = MExStatus.ASSIGNED assignment = True if assignment == True: response = AssignJobToMexResponse() response.success = True else: response = AssignJobToMexResponse() response.success = False return response
[docs]def unassign_job(request): """ UnassignJob service callback function. Takes in a MEx ID and Job ID and unassigns the Job from the MEx, updating it's information. Returns a flag if the call was handled succesfully. """ unassignment = False #flag for i in mex_list: if == request.mex_id: i.job_id = None i.status = MExStatus.STANDBY unassignment = True if unassignment == True: response = UnassignJobFromMexResponse() response.success = True else: response = UnassignJobFromMexResponse() response.success = False return response
#callback function for service that return status of a certain MEx
[docs]def get_mex_status(request): """ GetMexStatus service callback function. Takes in a MEx ID. Returns a response with the MEx status and Job ID, and a flag if the call was handled succesfully. """ flag = False #flag that is used for indicating successful search of MEx in MEx list response = GetMexStatusResponse() #create response object for i in mex_list: #search for MEx in MEx list if == request.mex_id: response.mex_status = #MEx status query 'name' method from enum object returns text response.job_id = str(i.job_id) flag = True if flag == True: #if we found requested Mex in the list, together with status and job id we return bool success response.success = True return response else: response.success = False return response
[docs]def change_mex_status(request): """ ChangeMexStatus service callback function. Takes in a MEx ID and status. Updates status of the MobileExecutor in the MEx list matching the provided ID. Returns a response with a flag if the call was handled succesfully. """ flag = None for i in mex_list: if == request.mex_id: i.status = MExStatus(request.mex_new_status) flag = True if flag == True: response = ChangeMexStatusResponse() response.success = True else: response = ChangeMexStatusResponse() response.success = False return response
[docs]def mex_list_info(): """ MexListInfo publish function. Sets up a published on the local /mex_list_info topic, publishing information on all MobileExecutors in the MEx list at a rate of 1 Hz. """ pub = rospy.Publisher('~mex_list_info', MexListInfo, queue_size=10) rate = rospy.Rate(1) # 1hz while not rospy.is_shutdown(): # Loop here, publishing the MexListInfo message with a list of MexInfo objects at the specified rate. mexlistinfo = MexListInfo() mexlistinfo.stamp = mexlistinfo.total_mex_number = len(mex_list) mexlistinfo.standby = 0 mexlistinfo.charging = 0 mexlistinfo.assigned = 0 mexlistinfo.executing_task = 0 mexlistinfo.error = 0 for i in mex_list: mex_info = MexInfo() mex_info.status = = mex_info.job_id = str(i.job_id) mexlistinfo.mex_list_info_array.append(mex_info) if == mexlistinfo.standby += 1 elif == mexlistinfo.charging += 1 elif == mexlistinfo.assigned += 1 elif == mexlistinfo.executing_task += 1 elif == mexlistinfo.error += 1 pub.publish(mexlistinfo) rate.sleep()
if __name__ == '__main__': try: # Initialize mobile executors mobile_executor_initialization() # Initialize node. rospy.init_node('mex_sentinel') # Initialize services. get_mex_list_service = rospy.Service('~get_mex_list', GetMexList, get_mex_list) assign_job_to_mex_service = rospy.Service('~assign_job_to_mex', AssignJobToMex, assign_job) unassign_job_from_mex_service = rospy.Service('~unassign_job_from_mex', UnassignJobFromMex, unassign_job) get_mex_status_service = rospy.Service('~get_mex_status', GetMexStatus, get_mex_status) change_mex_status_service = rospy.Service('~change_mex_status', ChangeMexStatus, change_mex_status) # Initialise mex list publisher mex_list_info() # Maintain the services open. rospy.spin() except rospy.ROSInterruptException: # Handle InterruptExceptions without crashing. pass