ROS2话题、服务、动作通讯

ROS2话题、服务、动作通讯 前面一篇博客 介绍了如何在工作空间中创建包并在包中创建一个或多个可执行程序程序里定义了单个或多个节点类对象以便可执行程序运行起来的时候类对象能够执行动作干一些事情。那多个可执行程序运行起来时他们之间靠什么来通讯呢常规是走tcp/ip通讯、共享内存等其它但在ROS2中已定定义了一些现成的方式封装好了三种标准通信方式话题服务动作通讯。ROS2 把底层的 TCP/UDP/DDS/ 共享内存 全部封装起来你只需要用话题、服务、动作 这三种高级方式通信不需要关心网络底层。不用自己写了简化了很多操作。注话题服务动作通讯其实是在填充节点对象的功能后面的节点对象就不再像前面博客演示那样只打印几句话而已。一. 话题通讯为了方便博主这边就不再下命令去创建包了直接复用上一篇博客的包通过新增一个可执行程序方式去拓展。1.新建一个topic_demo1_publisher_side.py文件在里面实现话题的发布代码如下# 导入rclpy库 import rclpy from rclpy.node import Node # 导入String字符串消息 from std_msgs.msg import String # 创建一个继承于Node基类的子类 class Topic_Pub(Node): def __init__(self, name): super().__init__(name) # 参数含义 消息类型, 话题名, 消息队列长度 self.pub self.create_publisher(String, /topic_demo1, 1) #self.create_timer(1.0, self.timer_callback) #意思每隔1秒执行一次timer_callback self.timer self.create_timer(1, self.pub_msg) def pub_msg(self): msg String() # 创建一个String类型的变量msg msg.data Hi,I publish a message. # 给msg里边的data赋值 self.pub.publish(msg) # 发布话题数据 # 主函数 def main(): rclpy.init() # 初始化 pub_demo Topic_Pub(topic_publisher_node) rclpy.spin(pub_demo) pub_demo.destroy_node() # 销毁节点对象 rclpy.shutdown() # 关闭ROS2 Python接口2. 再创建一个名为topic_demo1_subscriber_side.py的文件在里面实现话题的订阅#导入相关的库 import rclpy from rclpy.node import Node from std_msgs.msg import String class Topic_Sub(Node): def __init__(self,name): super().__init__(name) #self.create_subscription( # msg_type, # 1. 消息类型必须 # topic, # 2. 话题名称必须 # callback, # 3. 收到消息后的回调函数必须 # qos_profile, # 4. 队列长度必须 # callback_groupNone, # 5. 回调组可选 #) # 4个必须参数你写代码必写 # msg_type 消息类型如String、Image、LaserScan # topic 话题名字字符串如my_topic # callback 收到消息后自动调用的函数 # qos_profile队列长度一般写10 self.sub self.create_subscription(String,/topic_demo1,self.sub_callback,1) def sub_callback(self,msg): # print(msg.data,flushTrue) self.get_logger().info(msg.data) def main(): rclpy.init() sub_demo Topic_Sub(topic_subscriber_node) # 创建对象并进行初始化 rclpy.spin(sub_demo) sub_demo.destroy_node() #销毁节点对象 rclpy.shutdown() #关闭ROS2 Python接口3. 再继续编辑配置文件setup.py, 如下4. 完毕后编译功能包5. 接下来我们验证下是否设置成功先运行含有发布话题的节点的可执行程序ros2 run pythonpackagedemo1 topic_demo1_publisher_side注1 如下语句可查下当前在可执行程序里运行起来的节点节点类对象的名字ros2 node list注2 我们还可以通过命令查看节点的信息ros2 node info /topic_publisher_node注3若想看下当前发布的话题可使用如下命令ros2 topic list注4 如果想看下话题发布的数据可执行如下语句ros2 topic echo /topic_demo1注5如果想看话题相关的信息可执行如下语句ros2 topic info /topic_demo16. 再运行含有订阅话题的节点的可执行程序ros2 run pythonpackagedemo1 topic_demo1_subscriber_side可看到订阅者能够正常收到发布者发布的信息了且收到了发布者传过来的String类型参数注订阅者中有对应的收到发布的话题数据后要执行的函数。二. 服务通讯同样直接复用之前创建的包通过新增一个可执行程序方式去拓展。1.新建一个service_demo1_server_side.py文件在里面创建服务端代码如下#导入相关的库文件 import rclpy from rclpy.node import Node from example_interfaces.srv import AddTwoInts class Service_Server(Node): def __init__(self,name): super().__init__(name) #作用是注册一个服务监听客户端的请求并调用回调函数处理请求。 self.srv self.create_service(AddTwoInts, /add_two_ints, self.Add2Ints_callback) #参数 作用 #AddTwoInts 服务接口类型ROS2预定义的求和服务,常见的还有Setbool, Trigger, Empty, 后面讲自定义服务 #/add_two_ints 服务名称客户端必须用这个名字才能找到服务可自定义名字必须以 / 开头只能用 字母、数字、下划线 _大小写敏感/Add 和 /add 是两个不同服务 #self.Add2Ints_callback 回调函数收到请求后自动执行的处理逻辑 def Add2Ints_callback(self,request,response): response.sum request.a request.b print(response.sum ,response.sum) return response # ROS2规定服务回调函数必须有两个参数 # request客户端发来的请求数据这里是两个整数a、b # response服务端返回给客户端的响应数据这里是求和结果sum # 函数最后必须return response。 # 话题 聊天、广播、持续发消息单向 # 服务 打电话、请求 - 响应、干完就挂双向 # 话题Topic 大喇叭广播 # 老师拿着喇叭一直喊“现在温度 25 度现在速度 1m/s” # 学生们随便听想听就听不想听就不听 # 不用回复老师只管喊 # 服务Service 打电话 # 你打给朋友“帮我算一下 35 等于几”请求 # 朋友算完“等于 8”响应 # 必须一问一答打完电话就结束 # 特性 话题 (Topic) 服务 (Service) # 通信模式 发布 → 订阅单向 请求 → 响应双向 # 是否需要回复 不需要 必须等待回复 # 使用场景 传感器数据、持续状态 执行任务、计算、开关 # 频率 高频几十几百 Hz 低频偶尔调用 # 连接方式 多对多 一对一 # 阻塞 不阻塞 会阻塞等结果 # 只要是持续不断、实时传输的数据都用话题 # 摄像头图像 # 激光雷达数据 # 机器人位置、速度 # 温度、气压 # 遥控器指令 # 特点只管发不管对方收没收到。 # 只要是需要执行一个动作、需要结果、需要确认的都用服务 # 让机器人移动到某个点 # 计算两个数的和 # 打开 / 关闭传感器 # 保存一张图片 # 重置系统 # 查询机器人当前状态 # 特点我叫你做你必须做完告诉我结果。 # 最关键的区别有没有响应 # 数据流用话题任务调用用服务 def main(): rclpy.init() server_demo Service_Server(service_server_node) rclpy.spin(server_demo) server_demo.destroy_node() # 销毁节点对象 rclpy.shutdown() # 关闭ROS2 Python接口2. 再创建一个名为service_demo1_client_side.py的文件在里面实现客户端# 导入相关的库 import rclpy from rclpy.node import Node from example_interfaces.srv import AddTwoInts # ROS2 天生支持 # 一个服务端Server 多个客户端Client 同时通信。 class Service_Client(Node): def __init__(self, name): super().__init__(name) # 创建客户端绑定服务名必须和服务端一模一样 self.client self.create_client(AddTwoInts, /add_two_ints) # 等待服务上线防止服务没启动就调用 while not self.client.wait_for_service(timeout_sec1.0): print(service not available, waiting again...) # 创建请求对象 self.request AddTwoInts.Request() def send_request(self): self.request.a 10 self.request.b 90 # 异步发送请求非阻塞 self.future self.client.call_async(self.request) print(等待服务端为我解惑....) def main(): rclpy.init() # 节点初始化 service_client Service_Client(service_client_node) # 创建对象 service_client.send_request() # 发送服务请求 while rclpy.ok(): rclpy.spin_once(service_client) # 判断数据是否处理完成 if service_client.future.done(): try: # 获得服务反馈的信息并且打印 response service_client.future.result() print(service_client.request.a , service_client.request.a) print(service_client.request.b , service_client.request.b) print(Result , response.sum) except Exception as e: service_client.get_logger().info(Service call failed %r % (e,)) break service_client.destroy_node() rclpy.shutdown()3. 再继续编辑配置文件setup.py, 如下4. 完毕后编译功能包colcon build5. 接下来我们验证下是否设置成功先运行含有服务服务器端的节点的可执行程序ros2 run pythonpackagedemo1 service_demo1_server_side注1查看当前所有运行的服务ros2 service list注2查看某个运行服务所对应的服务类型ros2 service type /add_two_ints注意service没有info指令注3查看服务类型的详细结构请求 / 响应有哪些字段如下是查看ROS2自带的服务类型也可以查看自定义的服务类型后面再说ros2 interface show example_interfaces/srv/AddTwoInts注4直接用命令行调用服务超级常用6. 再运行客户端程序ros2 run pythonpackagedemo1 service_demo1_client_side运行之后可看到结果如下三. 动作通讯开始之前我们先回答下为什么要有动作它和话题服务的区别是什么如下是三者的核心区别。特性话题 (Topic)服务 (Service)动作 (Action)通信方式单向发布双向请求 / 响应双向目标 反馈 结果耗时长短一直持续极快毫秒级很慢秒级 / 分钟级能否取消不能不能能中途取消有无进度无无有实时进度反馈阻塞吗不阻塞会阻塞等待不阻塞后台运行典型场景摄像头、雷达、速度开关、查询、计算导航、机械臂移动、充电可看到针对不同的场景需要使用不同的方式。1. 我们首先要去自定义接口这个要自己手动创建必须先定义接口Goal/Feedback/Result 三部分这是最关键的一步。这个要如创建C类型包ros2 pkg create --build-type ament_cmake action_interfaces_demo2. 接着在这个文件夹下新建Progress.action的文件记得第一个字母大写文件内容如下# 目标 (Goal)客户端发送 int64 mytarget --- # 结果 (Result)服务端最后返回: 1~target中每个数值*2的值 int64[] myresult --- # 反馈 (Feedback)服务端实时发送比如target值为100当前算到45那么返回1~45各值*2的值 int64[] myfeedback3. 在package.xml中需要添加一些依赖包具体内容如下buildtool_dependrosidl_default_generators/buildtool_depend exec_dependrosidl_default_runtime/exec_depend dependaction_msgs/depend member_of_grouprosidl_interface_packages/member_of_group4. 在CMakeLists.txt中添加如下配置find_package(rosidl_default_generators REQUIRED) rosidl_generate_interfaces(${PROJECT_NAME} action/Progress.action )5. 编译下包colcon build输入如下命令可查看文件定义及编译是否正常ros2 interface show action_interfaces_demo/action/Progress6. 有了动作接口后接下来编写动作服务端使用python。这边还是复用之前创建的包pythonpackagedemo1新建一个action_demo1_server_side.py文件在里面创建服务端代码如下import rclpy from rclpy.action import ActionServer from rclpy.node import Node import time # 导入我们定义的动作接口 from action_interfaces_demo.action import Progress class Action_Server(Node): def __init__(self): super().__init__(action_server) # 创建动作服务端 self._action_server ActionServer( self, #当前节点 Progress, #动作接口类型也就是前面你定义的action下的数据格式 multicase, # 动作名称客户端要对应客户端必须用相同名字才能连上 self.execute_callback) #收到目标后自动调用这个函数 self.get_logger().info(动作服务端已启动等待客户端请求...) # 执行任务的回调函数核心 def execute_callback(self, goal_handle): self.get_logger().info(开始执行任务...) # 获取客户端发送的目标 target goal_handle.request.mytarget #来自你定义的action中的变量名 sequence [] # 循环生成数列模拟长时间任务 feedback_msg Progress.Feedback() for i in range(1, target): sequence.append(i * 2) self.get_logger().info(f反馈{sequence}) feedback_msg.myfeedback sequence goal_handle.publish_feedback(feedback_msg) # 模拟任务耗时 time.sleep(1) # 任务完成 goal_handle.succeed() result Progress.Result() result.myresult sequence self.get_logger().info(f任务完成结果{sequence}) return result def main(argsNone): rclpy.init(argsargs) server Action_Server() rclpy.spin(server) if __name__ __main__: main()7. 再创建一个名为action_demo1_client_side.py的文件在里面实现客户端代码如下import rclpy from rclpy.action import ActionClient from rclpy.node import Node from action_interfaces_demo.action import Progress class Action_Client(Node): def __init__(self): super().__init__(action_client) # 创建动作客户端 #参数如下 # 当前节点 # 动作接口类型也就是前面你定义的action下的数据格式 # 动作名称和服务器端要对应客户端必须用相同名字才能连上 self._action_client ActionClient(self, Progress, multicase) def send_goal(self, tatget): # 发送目标 # 等待服务端上线 self._action_client.wait_for_server() # 构造目标 goal_msg Progress.Goal() goal_msg.mytarget tatget # 异步发送目标 self._send_goal_future self._action_client.send_goal_async( goal_msg, feedback_callbackself.feedback_callback # 绑定反馈回调 ) self._send_goal_future.add_done_callback(self.goal_response_callback)#当一个异步任务做完了自动调用你指定的函数。 def goal_response_callback(self, future): # 处理目标发送后的反馈 # 收到服务端响应接受/拒绝目标 goal_handle future.result() if not goal_handle.accepted: self.get_logger().info(目标被拒绝) return self.get_logger().info(目标已接受) self._goal_handle goal_handle # 等待最终结果 self._get_result_future goal_handle.get_result_async() self._get_result_future.add_done_callback(self.get_result_callback) def feedback_callback(self, feedback_msg): # 处理连续反馈 # 接收实时反馈 feedback feedback_msg.feedback self.get_logger().info(f收到反馈{feedback.myfeedback}) def get_result_callback(self, future): # 处理最终响应。 # 接收最终结果 result future.result().result self.get_logger().info(f最终结果{result.myresult}) rclpy.shutdown() def main(argsNone): rclpy.init(argsargs) client Action_Client() client.send_goal(10) # rclpy.spin(client) if __name__ __main__: main()8. 再继续编辑配置文件setup.py, 如下9. 然后编译10. 接下来我们验证下是否设置成功先运行动作服务器端程序ros2 run pythonpackagedemo1 action_demo1_server_side注1如下是一些常用的查询命令和命令行调用语句这边就不再像前面话题服务那样一一文字阐述了直接截图action有info指令注2如果想用命令行方式调用则可使用如下格式语句ros2 action send_goal 动作名 动作类型 {参数}ros2 action send_goal /multicase action_interfaces_demo/action/Progress {mytarget: 5}11. 再运行客户端程序ros2 run pythonpackagedemo1 action_demo1_client_side运行之后可看到结果如下可看到如效果如自己所希望的由于遍历每一个元素且将元素值*2,(循环中加了睡眠时间这个完整过程较为耗时需要闭环就像人一样事事有回响中间不断地再反馈结果直到结束。注1好多话题/服务/动作全都是路径格式用/分层比如/turtle1/cmd_vel中turtle1是全局命名空间cmd_vel是话题名/ 根所有节点、话题都从这里开始/turtle1 命名空间区分第 1 只乌龟/cmd_vel 话题名速度控制而ros2 interface show也有/符号则不是路径分层是固定格式【包】 / 【msg/srv/action】 / 【接口名】注意节点、话题/服务/动作、接口的区别。节点可以认为是在可执行程序里定义的节点对象名话题/服务/动作可认为是在节点中实现的通讯方式名而接口则是通讯的数据格式注2如下是一些常用命令在上文博客内也都有用过ros2 pkg create pythonpackagedemo1 --build-type ament_python --dependencies rclpy --node-name pythonexecute1 #创建python包 ros2 pkg create cpp_packagedemo1 --build-type ament_cmake --dependencies rclcpp --node-name cppexecute1 #创建c包 colcon build #编译包 ros2 pkg list #例举ROS2已安装或编译的包 ros2 pkg executables 包名 #查指定包里的可执行程序 ros2 pkg executables #查所有可执行程序 # 先启动小乌龟 ros2 run turtlesim turtlesim_node #run后面跟 包名 可执行文件 ros2 run turtlesim turtle_teleop_key #run后面跟 包名 可执行文件 # 查看节点列表 ros2 node list # 输出大概是/turtlesim /teleop_turtle # 查看某个节点详细 info ros2 node info /turtlesim # 查看所有话题 ros2 topic list # 查看某个话题详情类型、发布者、订阅者 ros2 topic info /turtle1/cmd_vel #注意也可以直接是ros2 topic info /topic_demo1这种单分层形式不用全局命名空间 # 查看话题发布的数据 ros2 topic echo /turtle1/cmd_vel # 查看所有服务 ros2 service list # 查看服务类型 ros2 service type /spawn # 查看服务接口定义等价于 ros2 interface show ros2 interface show turtlesim/srv/Spawn # 查看所有动作 ros2 action list # 查看动作详情类型、服务器、客户端 ros2 action info /turtle1/rotate_absolute # 查看动作接口定义 ros2 interface show turtlesim/action/RotateAbsolute下一篇博客我们再继续深入此篇暂就到这儿。