69 lines
1.6 KiB
Python
69 lines
1.6 KiB
Python
import rclpy
|
|
from rclpy.action import ActionServer, CancelResponse, GoalResponse
|
|
from rclpy.node import Node
|
|
import time
|
|
|
|
from pubsub.pubsub_library_v4 import MinimalActionServer
|
|
|
|
from custom_interfaces.action import TestACT
|
|
|
|
|
|
class Server(MinimalActionServer):
|
|
|
|
def execute_callback(self, goal_handle):
|
|
self.get_logger().info('Executing goal...')
|
|
self.goal_handle = goal_handle
|
|
|
|
# Use the goal request
|
|
i = self.goal_handle.request.number
|
|
print(i)
|
|
time.sleep(2)
|
|
|
|
|
|
# initialize a feedback
|
|
self.feedback_msg = TestACT.Feedback()
|
|
|
|
|
|
# do the function with the requested input
|
|
while i < 100:
|
|
self.spin_once()
|
|
if self.cancel == True:
|
|
self.cancel = False
|
|
break
|
|
i += 1
|
|
|
|
# wrrte feedback
|
|
self.feedback_msg.current_number = i
|
|
self.get_logger().info(f'Feedback message: {self.feedback_msg.current_number}')
|
|
|
|
# publish feedback
|
|
Versuch.send_feedback(self.feedback_msg)
|
|
time.sleep(.5)
|
|
|
|
# genrate a respones/result
|
|
self.goal_handle.succeed()
|
|
self.result = TestACT.Result()
|
|
self.result.success = True
|
|
self.get_logger().info('Result sent.')
|
|
return self.result
|
|
|
|
|
|
|
|
def spin_once(self):
|
|
rclpy.spin_once(self, timeout_sec=0.01)
|
|
|
|
|
|
def main(args=None):
|
|
rclpy.init(args=args)
|
|
global Versuch
|
|
|
|
Versuch = Server(NODE_NAME="action_server", ACT_NAME="ActionTest", ACT_TYPE=TestACT)
|
|
|
|
while rclpy.ok():
|
|
Versuch.spin_once()
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|