ROS2_pubsub/src/pubsub/pubsub/pubsub_library_v3.py

312 lines
10 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#********************************************#
# Publisher/Subscriber Class Library
# Creted for:
# ROS2 Workshop 2020
# Roverentwicklung für Explorationsaufgaben
# Institute for Space Systems
# University of Stuttgart
# Created by Patrick Winterhalder
# IRS, University of Stuttgart
#********************************************#
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
# How to use, please refer to:
# Topic:
# - topic_talker.py
# - topic_listener.py
# Service:
# - service_host.py
# - service_client.py
# Action:
# - action_server.py
# - action_client.py
# Definition of Parent Classes
#******************************************************************************#
# TOPIC
#******************************************************************************#
class MinimalPublisher(Node):
def __init__(self, NODE_NAME, TOPIC_NAME, MSG_TYPE, MSG_PERIOD):
self.NODE_NAME= NODE_NAME
self.TOPIC_NAME = TOPIC_NAME
self.CUSTOM_MSG = MSG_TYPE
self.timer_period = MSG_PERIOD # [seconds]
# Init above laying class Node
super().__init__(self.NODE_NAME)
print("\t- " + str(TOPIC_NAME) + "\n")
self.publisher_ = self.create_publisher(
self.CUSTOM_MSG,
self.TOPIC_NAME,
10)
self.new_msg = False
# Define Node Frequency, equivalent to ~rospy.rate()
self.timer = self.create_timer(self.timer_period, self.publisher_timer)
return
def publisher_timer(self):
if self.new_msg is True:
try:
#self.get_logger().info('Pub:')
self.publisher_.publish(self.msg)
self.new_msg = False
except TypeError:
print("[ERROR] Msg-Data-Types do not match")
return
# Publish using Timer
def timer_publish(self, msg):
self.msg=msg
self.new_msg = True
return
# Publish directly without Timer
def direct_publish(self, msg):
try:
#self.get_logger().info('Pub:')
self.publisher_.publish(msg)
except TypeError:
print("[ERROR] Msg-Data-Types do not match")
return
#******************************************************************************#
class MinimalSubscriber(Node):
def __init__(self, NODE_NAME, TOPIC_NAME, MSG_TYPE, NUM_MSGS):
self.NODE_NAME= NODE_NAME
self.TOPIC_NAME = TOPIC_NAME
self.CUSTOM_MSG = MSG_TYPE
self.NUM_MSGS = NUM_MSGS
self.topic_received = False
# Init above laying class "Node"
super().__init__(self.NODE_NAME)
self.subscription = self.create_subscription(
self.CUSTOM_MSG, # Message Type
self.TOPIC_NAME, # Topic Name
self.listener_callback, # Callback Function
self.NUM_MSGS) # List of saved messages
self.subscription # prevent unused variable warning
print("MinimalSubscriber `%s` created"%(self.NODE_NAME))
return
def listener_callback(self, msg):
#self.get_logger().info('I heard: "%s"' % msg.data)
self.topic_received = True
self.msg = msg
return
def return_msg(self):
return self.msg
#******************************************************************************#
# SERVICE
#******************************************************************************#
class MinimalServiceProvider(Node):
""" Minimal Service Provider Class
Inputs:
* NODE_NAME: string
* SRV_NAME: string
* SRV_TYPE: service type class
* srv_callback_fct: service callback function
srv_callback_fct - Inputs:
* request: class of type srv request
* response: class of type srv response
Return:
* response: class of type srv response
"""
def __init__(self, NODE_NAME, SRV_NAME, SRV_TYPE, srv_callback_fct=None):
self.NODE_NAME = NODE_NAME
self.SRV_NAME = SRV_NAME
self.SRV_TYPE = SRV_TYPE
if srv_callback_fct == None:
self.srv_callback = self.std_srv_callback
else:
self.srv_callback = srv_callback_fct
# Init above laying class "Node"
super().__init__(self.NODE_NAME)
print("\tStarting Service as Host:\t%s"%(self.SRV_NAME))
#self.srv_host = self.create_service(self.SRV_TYPE, self.SRV_NAME, self.service_callback)
self.srv_host = self.create_service(self.SRV_TYPE, self.SRV_NAME, self.srv_callback)
#def service_callback(self, request, response):
def std_srv_callback(self, request, response):
"""
* request: first half of srv-file
* response: second half of srv-file
access values using class access:
* request.var_name
* response.var_name
"""
self.get_logger().info("[srv: %s] Service Call Received"%(self.SRV_NAME))
# Abstraction layer:
response = self.user_defined(request, response) # Function defined below
return response
# Insert your callback code here
def user_defined(self, request, response):
""" Write your user defined code here which will be run at every service call
Input:
* request: srv request class
Return:
* response: srv response class
Access these variables using class access, eg. request.var_name
"""
# Write user defined code here
# Adapt to fit your service type
# You could overwrite this fdunctions with a new function when initializing this class
response.success = True # Change this to fit your service type
return response
#******************************************************************************#
class MinimalServiceClientAsync(Node):
def __init__(self, NODE_NAME, SRV_NAME, SRV_TYPE):
self.NODE_NAME = NODE_NAME
self.SRV_NAME = SRV_NAME
self.SRV_TYPE = SRV_TYPE
# Init above laying class "Node" (super class)
super().__init__(self.NODE_NAME)
print("Starting Service Client:\t%s"%(self.SRV_NAME))
self.srv_client = self.create_client(self.SRV_TYPE, self.SRV_NAME)
while not self.srv_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting...')
self.request = self.SRV_TYPE.Request()
self.done_was_true = False
def send_request(self, request):
"""
Feed request of type "SRV_TYPE.Request()"
Access variable using
* request = SRV_TYPE.Request()
* request.<var_name> = ....
"""
self.request = request
self.done_was_true = False
self.future = self.srv_client.call_async(self.request)
def check_if_service_complete(self):
""" Checks if service call was answered by host.
Returns tuple [done, response]:
* done: Bool (service call complete)
* response: msg class from srv type
"""
if self.future.done():
try:
response = self.future.result()
self.done_was_true = True
except Exception as e:
self.get_logger().info('Service call failed %r' % (e,))
else:
pass
# Adapt to fit your custom service type
# self.get_logger().info('Result of add_three_ints: for %d + %d + %d = %d' %(minimal_client.req.a, minimal_client.req.b, minimal_client.req.c, response.sum))
return self.future.done(), response # response is of type "<service_type_name>.Response()"
else:
return self.future.done(), None
#******************************************************************************#
# ACTIONS
#******************************************************************************#
class MinimalActionServer(Node):
""" Minimal Action Server Class
Inputs:
* NODE_NAME: string
* ACT_NAME: string
* ACT_TYPE: action type class
* act_callback_input: action callback function
Return:
*
"""
def __init__(self, NODE_NAME, ACT_NAME, ACT_TYPE, act_callback_input=None):
self.NODE_NAME = NODE_NAME
self.ACT_NAME = ACT_NAME
self.ACT_TYPE = ACT_TYPE
# Check if user passed callback function
if act_callback_input == None:
self.execute_callback= self.std_act_callback
else:
self.execute_callback = act_callback_input
# Init above laying class "Node" (super class)
super().__init__(self.NODE_NAME)
print("\tStarting Action Server:\t%s"%(self.ACT_NAME))
self._action_server = ActionServer(self, self.ACT_TYPE, self.ACT_NAME, self.execute_callback)
# standard action callback
def std_act_callback(self, goal_handle):
""" Standard Action Callback Method
Replace this method by inputting a custom callback method using 'act_callback_input'.
This method MUST return a variable of the type 'action result',
eg. result = ACTION_TYPE.Result()
"""
self.get_logger().info('Executing goal...')
#################
# Action Method #
# here #
#################
result = self.ACT_TYPE.Result()
return result
#******************************************************************************#
class MinimalActionClient(Node):
""" Minimal Action Client
Inputs:
* NODE_NAME: string
* ACT_NAME: string
* ACT_TYPE: action type class
* act_callback_input: action callback function
Return:
*
"""
def __init__(self, NODE_NAME, ACT_NAME, ACT_TYPE, act_callback_input=None):
self.NODE_NAME = NODE_NAME
self.ACT_NAME = ACT_NAME
self.ACT_TYPE = ACT_TYPE
# Init above laying class "Node" (super class)
super().__init__(self.NODE_NAME)
print("\tStarting Action Client for:\t%s"%(self.ACT_NAME))
self._action_client = ActionClient(self, self.ACT_TYPE, self.ACT_NAME)
def send_goal(self, input):
return
def abort_goal(self):
return