#!/usr/bin/env python # -*- coding: utf-8 -*- #********************************************# # Publisher/Subscriber Class Library # Creted for: # ROS2 Workshop 2020 # Roverentwicklung für Explorationsaufgaben # Institute for Space Systems # University of Stuttgart # Created by Patrick Winterhalder # IRS, University of Stuttgart #********************************************# import rclpy from rclpy.node import Node # How to use: # from pubsub_library import MinimalPublisher # from pubsub_library import MinimalSubscriber # minimal_publisher = MinimalPublisher(NODE_NAME='minimal_pub', TOPIC_NAME='user_controller', MSG_TYPE=Usercontroller, MSG_PERIOD=0.5) # minimal_subscriber = MinimalSubscriber(NODE_NAME='minimal_sub', TOPIC_NAME='epos_feedback', MSG_TYPE=Eposreturn) # See --> talker.py, listener.py # Definition of Parent Classes #******************************************************************************# class MinimalPublisher(Node): def __init__(self, NODE_NAME, TOPIC_NAME, MSG_TYPE, MSG_PERIOD): self.NODE_NAME= NODE_NAME self.TOPIC_NAME = TOPIC_NAME self.CUSTOM_MSG = MSG_TYPE self.timer_period = MSG_PERIOD # [seconds] # Init above laying class Node super().__init__(self.NODE_NAME) print("\t- " + str(TOPIC_NAME) + "\n") self.publisher_ = self.create_publisher( self.CUSTOM_MSG, self.TOPIC_NAME, 10) self.new_msg = False # Define Node Frequency, equivalent to ~rospy.rate() self.timer = self.create_timer(self.timer_period, self.publisher_timer) return def publisher_timer(self): if self.new_msg is True: try: #self.get_logger().info('Pub:') self.publisher_.publish(self.msg) self.new_msg = False except TypeError: print("[ERROR] Msg-Data-Types do not match") return # Publish using Timer def timer_publish(self, msg): self.msg=msg self.new_msg = True return # Publish directly without Timer def direct_publish(self, msg): try: #self.get_logger().info('Pub:') self.publisher_.publish(msg) except TypeError: print("[ERROR] Msg-Data-Types do not match") return #******************************************************************************# class MinimalSubscriber(Node): def __init__(self, NODE_NAME, TOPIC_NAME, MSG_TYPE, NUM_MSGS): self.NODE_NAME= NODE_NAME self.TOPIC_NAME = TOPIC_NAME self.CUSTOM_MSG = MSG_TYPE self.NUM_MSGS = NUM_MSGS self.topic_received = False # Init above laying class Node super().__init__(self.NODE_NAME) self.subscription = self.create_subscription( self.CUSTOM_MSG, # Message Type self.TOPIC_NAME, # Topic Name self.listener_callback, # Callback Function self.NUM_MSGS) # List of saved messages self.subscription # prevent unused variable warning return def listener_callback(self, msg): #self.get_logger().info('I heard: "%s"' % msg.data) self.msg = msg return def return_msg(self): return self.msg