#!/usr/bin/env python # -*- coding: utf-8 -*- #********************************************# # ROS2 Topic, Service and Action Class Library # Creted for: # ROS2 Workshop 2020 # Roverentwicklung für Explorationsaufgaben # Institute for Space Systems # University of Stuttgart # Created by Patrick Winterhalder # IRS, University of Stuttgart # Updated by Christopher Herr # Added Actions (March 2023) #********************************************# import rclpy from rclpy.node import Node from rclpy.action import ActionClient from rclpy.action import ActionServer, CancelResponse import time # How to use, please refer to: # Topic: # - topic_talker.py # - topic_listener.py # Service: # - service_host.py # - service_client.py # Action: # - action_server_example.py # - action_client_example.py # Definition of Parent Classes #******************************************************************************# # TOPIC #******************************************************************************# class MinimalPublisher(Node): def __init__(self, NODE_NAME, TOPIC_NAME, MSG_TYPE, MSG_PERIOD): self.NODE_NAME= NODE_NAME self.TOPIC_NAME = TOPIC_NAME self.CUSTOM_MSG = MSG_TYPE self.timer_period = MSG_PERIOD # [seconds] # Init above laying class Node super().__init__(self.NODE_NAME) print("\t- " + str(TOPIC_NAME) + "\n") self.publisher_ = self.create_publisher( self.CUSTOM_MSG, self.TOPIC_NAME, 10) self.new_msg = False # Define Node Frequency, equivalent to ~rospy.rate() self.timer = self.create_timer(self.timer_period, self.publisher_timer) return def publisher_timer(self): if self.new_msg is True: try: #self.get_logger().info('Pub:') self.publisher_.publish(self.msg) self.new_msg = False except TypeError: print("[ERROR] Msg-Data-Types do not match") return # Publish using Timer def timer_publish(self, msg): self.msg=msg self.new_msg = True return # Publish directly without Timer def direct_publish(self, msg): try: #self.get_logger().info('Pub:') self.publisher_.publish(msg) except TypeError: print("[ERROR] Msg-Data-Types do not match") return #******************************************************************************# class MinimalSubscriber(Node): def __init__(self, NODE_NAME, TOPIC_NAME, MSG_TYPE, NUM_MSGS): self.NODE_NAME= NODE_NAME self.TOPIC_NAME = TOPIC_NAME self.CUSTOM_MSG = MSG_TYPE self.NUM_MSGS = NUM_MSGS self.topic_received = False self.overflow = False self.watchdog = 0 self.trust = 0 self.msg = None super().__init__(self.NODE_NAME) self.subscription = self.create_subscription( self.CUSTOM_MSG, # Message Type self.TOPIC_NAME, # Topic Name self.listener_callback, # Callback Function self.NUM_MSGS) # List of saved messages self.subscription # prevent unused variable warning print("MinimalSubscriber `%s` created"%(self.NODE_NAME)) return def listener_callback(self, msg): #self.get_logger().info('I heard: "%s"' % msg.data) self.last_msg_time = time.time() self.topic_received = True self.msg = msg return def has_msg(self): # Check if a maessage has been received return self.msg is not None def return_msg(self): return self.msg def timer_overflow(self, time_limit=0, number_of_timer_overflows=10, trust_factor=1): """Watchdog for Topic Publisher Messages\n ture = timer_overflow >> error false = no timer_overflow >> no error""" self.overflow = False if time_limit > 0 and self.has_msg(): if time.time() <= self.last_msg_time + time_limit: if self.watchdog > 0: self.trust += 1 if self.trust > number_of_timer_overflows * trust_factor: self.watchdog = 0 if time.time() > self.last_msg_time + time_limit: self.watchdog += 1 self.trust = 0 self.get_logger().warn(f"Message-Watchdog triggered {self.watchdog} times") if self.watchdog > number_of_timer_overflows: self.overflow = True return self.overflow #******************************************************************************# # SERVICE #******************************************************************************# class MinimalServiceProvider(Node): """ Minimal Service Provider Class Inputs: * NODE_NAME: string * SRV_NAME: string * SRV_TYPE: service type class * srv_callback_fct: service callback function srv_callback_fct - Inputs: * request: class of type srv request * response: class of type srv response Return: * response: class of type srv response """ def __init__(self, NODE_NAME, SRV_NAME, SRV_TYPE, srv_callback_fct=None): self.NODE_NAME = NODE_NAME self.SRV_NAME = SRV_NAME self.SRV_TYPE = SRV_TYPE if srv_callback_fct == None: self.srv_callback = self.std_srv_callback else: self.srv_callback = srv_callback_fct # Init above laying class "Node" super().__init__(self.NODE_NAME) print("\tStarting Service as Host:\t%s"%(self.SRV_NAME)) #self.srv_host = self.create_service(self.SRV_TYPE, self.SRV_NAME, self.service_callback) self.srv_host = self.create_service(self.SRV_TYPE, self.SRV_NAME, self.srv_callback) #def service_callback(self, request, response): def std_srv_callback(self, request, response): """ * request: first half of srv-file * response: second half of srv-file access values using class access: * request.var_name * response.var_name """ self.get_logger().info("[srv: %s] Service Call Received"%(self.SRV_NAME)) # Abstraction layer: response = self.user_defined(request, response) # Function defined below return response # Insert your callback code here def user_defined(self, request, response): """ Write your user defined code here which will be run at every service call Input: * request: srv request class Return: * response: srv response class Access these variables using class access, eg. request.var_name """ # Write user defined code here # Adapt to fit your service type # You could overwrite this fdunctions with a new function when initializing this class response.success = True # Change this to fit your service type return response #******************************************************************************# class MinimalServiceClientAsync(Node): def __init__(self, NODE_NAME, SRV_NAME, SRV_TYPE): self.NODE_NAME = NODE_NAME self.SRV_NAME = SRV_NAME self.SRV_TYPE = SRV_TYPE # Init above laying class "Node" (super class) super().__init__(self.NODE_NAME) print("Starting Service Client:\t%s"%(self.SRV_NAME)) self.srv_client = self.create_client(self.SRV_TYPE, self.SRV_NAME) while not self.srv_client.wait_for_service(timeout_sec=1.0): self.get_logger().info('service not available, waiting...') self.request = self.SRV_TYPE.Request() self.done_was_true = False def send_request(self, request): """ Feed request of type "SRV_TYPE.Request()" Access variable using * request = SRV_TYPE.Request() * request. = .... """ self.request = request self.done_was_true = False self.future = self.srv_client.call_async(self.request) def check_if_service_complete(self): """ Checks if service call was answered by host. Returns tuple [done, response]: * done: Bool (service call complete) * response: msg class from srv type """ if self.future.done(): try: response = self.future.result() self.done_was_true = True except Exception as e: self.get_logger().info('Service call failed %r' % (e,)) else: pass # Adapt to fit your custom service type # self.get_logger().info('Result of add_three_ints: for %d + %d + %d = %d' %(minimal_client.req.a, minimal_client.req.b, minimal_client.req.c, response.sum)) return self.future.done(), response # response is of type ".Response()" else: return self.future.done(), None #******************************************************************************# # ACTIONS #******************************************************************************# class MinimalActionServer(Node): def __init__(self, NODE_NAME, ACT_NAME, ACT_TYPE, execute_callback=None, cancel_callback=None): """MinimalActionCLien Input:\n NODE_NAME: name of the Node ACT_NAME: name of the action ACT_TYPE: type of the action (custom action file) execute_callback=None: user defined function to execute the action goal cancel_callback=None: user defined function to cancel the goal during the action\n\n Best practice is to inherit this class and then override the 'get_result_callback' and 'feedback_callback' method in your own class with the same name.""" self.NODE_NAME = NODE_NAME self.ACT_NAME = ACT_NAME self.ACT_TYPE = ACT_TYPE self.cancel = False self.goal_handle = None if execute_callback == None: self.execute_callback = self.execute_callback else: self.execute_callback = execute_callback if cancel_callback == None: self.cancel_callback = self.cancel_callback else: self.cancel_callback = cancel_callback super().__init__(self.NODE_NAME) self._action_server = ActionServer(self, action_type=self.ACT_TYPE, action_name=self.ACT_NAME, execute_callback=self.execute_callback, cancel_callback=self.cancel_callback) def execute_callback(self, goal_handle): self.get_logger().info('Executing goal...') self.goal_handle = goal_handle time.sleep(1) # initialize a feedback self.get_logger().info('Sending feedback with vlaue: 34') self.feedback_msg = self.ACT_TYPE.Feedback() self.feedback_msg.current_number = 34 self.send_feedback(self.feedback_msg) time.sleep(1) # initzialize a result self.get_logger().info('Sending result...') result = self.ACT_TYPE.Result() result.success = True self.send_response_result() return result def send_feedback(self, feedback_msg): """Method to send feedback""" if self.goal_handle is not None: self.get_logger().info('Sending Feedback...') self.feedback_msg = feedback_msg self.goal_handle.publish_feedback(self.feedback_msg) def cancel_callback(self, goal): """Accept or reject a client request to cancel an action.""" self.get_logger().info('Received cancel request') self.cancel = True return CancelResponse.ACCEPT #******************************************************************************# class MinimalActionClient(Node): def __init__(self, NODE_NAME, ACT_NAME, ACT_TYPE, get_result_callback=None, feedback_callback=None): """MinimalActionCLien Input:\n NODE_NAME: name of the Node ACT_NAME: name of the action ACT_TYPE: type of the action (custom action file) get_result_callback=None: user defined function to receive the result of the action request feedback_callback=None: user defined function to receive the feedback during the action\n\n Best practice is to inherit this class and then override the 'get_result_callback' and 'feedback_callback' method in your own class with the same name.""" self.NODE_NAME = NODE_NAME self.ACT_NAME = ACT_NAME self.ACT_TYPE = ACT_TYPE self._goal_handle = None if get_result_callback == None: self.result_callback = self.get_result_callback else: self.result_callback = get_result_callback if feedback_callback == None: self.feedback = self.feedback_callback else: self.feedback = feedback_callback super().__init__(self.NODE_NAME) self._action_client = ActionClient(self, action_type=self.ACT_TYPE, action_name=self.ACT_NAME) def send_action_goal(self, goal_msg=None): """ * request: first part of the action file access values using class access: * goal.var_name """ self.goal = self.ACT_TYPE.Goal() self.goal_msg = goal_msg #print(self.goal_msg.number) # wait for server to be realdy self._action_client.wait_for_server() # send goal with feedback clallback self._send_goal_future = self._action_client.send_goal_async(self.goal_msg, feedback_callback=self.feedback) # get a response callback in the future self._send_goal_future.add_done_callback(self.goal_response_callback) def goal_response_callback(self, future): self.goal_handle = future.result() # chack if goal request was accepted if not self.goal_handle.accepted: self.get_logger().info('Goal rejected :(') return self._goal_handle = self.goal_handle self.get_logger().info('Goal accepted :)') # get result callback once request is done self._get_result_future = self.goal_handle.get_result_async() self._get_result_future.add_done_callback(self.result_callback) def get_result_callback(self, future): """overwrite me * result: second part of the action file""" # get result # ----------- # this is how to get the result: #result = future.result().result #sself.get_logger().info(f'Result: {result.success}') self.get_logger().info('Get result function has been reached') def feedback_callback(self, feedback_msg): """overwrite me""" # get feedback # ----------- # this is how to get the feedback: #feedback = feedback_msg.feedback #self.get_logger().info(f'Received feedback: {feedback.current_number}') self.get_logger().info('Feedback function has been reached') def cancel_done(self, future): # get result of cancel request cancel_response = future.result() if len(cancel_response.goals_canceling) > 0: self.get_logger().info('Goal successfully canceled') else: self.get_logger().info('Goal failed to cancel') def cancel_goal(self): """Method to cancel the goal""" if self._goal_handle is not None: # send request to cancel goal self.get_logger().info('Canceling goal') # Cancel the goal future = self._goal_handle.cancel_goal_async() future.add_done_callback(self.cancel_done)