50 lines
1.6 KiB
Python
50 lines
1.6 KiB
Python
import rclpy
|
|
from rclpy.node import Node
|
|
|
|
# Service Type Files
|
|
from custom_interfaces.srv import CustomSrv1
|
|
|
|
# Service Minimal Host and Client
|
|
from pubsub.pubsub_library_v3 import MinimalServiceClientAsync
|
|
|
|
|
|
def main(args=None):
|
|
rclpy.init(args=args)
|
|
|
|
# Create Service Client
|
|
#minimal_client = lib.MinimalServiceClientAsync(NODE_NAME="srv_client_node", SRV_NAME="test_srv", SRV_TYPE=CustomSrv1)
|
|
minimal_client = MinimalServiceClientAsync(NODE_NAME="srv_client_node", SRV_NAME="test_srv", SRV_TYPE=CustomSrv1)
|
|
|
|
# Create Service Request
|
|
srv_request = CustomSrv1.Request()
|
|
srv_request.command = "Hallo"
|
|
|
|
# Send Service Request to host
|
|
print("Service requested...")
|
|
minimal_client.done_was_true = False # set service as not done
|
|
minimal_client.send_request(srv_request)
|
|
|
|
while rclpy.ok():
|
|
try:
|
|
# Loop checks if service is complete
|
|
rclpy.spin_once(minimal_client, timeout_sec=0.1)
|
|
|
|
# check if service was already done before this loop
|
|
if not minimal_client.done_was_true:
|
|
done, response = minimal_client.check_if_service_complete() # returns tuple
|
|
if done:
|
|
print("Service done !")
|
|
print("Service Response:\t%s"%(response.success))
|
|
minimal_client.done_was_true = True # set service as done
|
|
else:
|
|
print("Service not complete")
|
|
|
|
except (KeyboardInterrupt, SystemExit):
|
|
print("\n\nShutting down...")
|
|
minimal_client.destroy_node()
|
|
rclpy.shutdown()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|