RK3588中使用Serial转发订阅的话题数据
我们在ROS的使用中常常会通过rostopic echo /***来订阅某个话题数据的输出我想通过串口对其通串口进行转发。#查看ros话题列表 rostopic list找到一个你想要订阅的话题如/IMU_data#订阅话题通过终端查看 rostopic echo /IMU_data就会看到以下这种数据的输出但是我们看不到组包的结构所以我们还需要进一步操作查看 /IMU_data 的组包结构fireflyfirefly:~$ rostopic info /IMU_data Type: sensor_msgs/Imu Publishers: None Subscribers: * /rostopic_11294_1774864513981 (http://firefly:34133/) * /imu_range_serial (http://firefly:42065/) * /ov_msckf (http://firefly:44037/) * /all_serial_forward (http://firefly:34671/)我们可以看到Type sensor_msgs/Imu然后通过rosmsg show sensor_msgs/Imu即可查看其组包结构然后我们就可以开始串口转发该ros话题数据的脚本编写了# 创建空的 test.py 文件 touch test.py # 编辑修改 test.py 文件 gedit test.py # 增加运行权限 chmod x test.py # 运行脚本 ./test.py个人的代码(包含:IMU 激光测距 经纬度 海拔高度#!/usr/bin/env python3 # -*- coding: utf-8 -*- import rospy import serial from sensor_msgs.msg import Range, Imu, NavSatFix # 串口配置 SERIAL_PORT /dev/ttyUSB1 BAUD_RATE 115200 ser None # ------------------- 激光测距 ------------------- def range_callback(msg): distance msg.range rospy.loginfo(f测距: {distance:.3f} m) try: ser.write(frange:{distance:.3f}\r\n.encode(utf-8)) except Exception as e: rospy.logerr(f串口发送失败: {e}) # ------------------- IMU ------------------- def imu_callback(msg): ax msg.linear_acceleration.x ay msg.linear_acceleration.y az msg.linear_acceleration.z gx msg.angular_velocity.x gy msg.angular_velocity.y gz msg.angular_velocity.z qx msg.orientation.x qy msg.orientation.y qz msg.orientation.z qw msg.orientation.w rospy.loginfo(fIMU加速度: [{ax:.2f}, {ay:.2f}, {az:.2f}]) try: send_str fimu:{ax:.2f},{ay:.2f},{az:.2f},{gx:.2f},{gy:.2f},{gz:.2f},{qx:.3f},{qw:.3f}\r\n ser.write(send_str.encode(utf-8)) except Exception as e: rospy.logerr(f串口发送失败: {e}) # ------------------- GPS 经度 纬度 海拔 ------------------- def gps_callback(msg): lat msg.latitude # 纬度 lon msg.longitude # 经度 alt msg.altitude # 海拔 rospy.loginfo(fGPS: 纬度{lat:.6f}, 经度{lon:.6f}, 海拔{alt:.2f}) try: send_str fgps:{lat:.6f},{lon:.6f},{alt:.2f}\r\n ser.write(send_str.encode(utf-8)) except Exception as e: rospy.logerr(f串口发送失败: {e}) # ------------------- 主函数 ------------------- if __name__ __main__: rospy.init_node(all_serial_forward) ser serial.Serial(SERIAL_PORT, BAUD_RATE, timeout0.01) rospy.loginfo(串口打开成功) # 订阅三个话题 rospy.Subscriber(/range, Range, range_callback) rospy.Subscriber(/IMU_data, Imu, imu_callback) rospy.Subscriber(/geovins_gps, NavSatFix, gps_callback) # GPS已加 rospy.spin() ser.close()通过串口接到电脑的串口助手上进行测试baudrate 115200串口转发成功。但是在测试过程中发现无法自动识别USB口和手动断开串口再连接脚本就死掉了所以升级代码新增自己寻找USB设备和支持热插拔#!/usr/bin/env python3 # -*- coding: utf-8 -*- import rospy import serial import serial.tools.list_ports from sensor_msgs.msg import Range, Imu, NavSatFix import time BAUD_RATE 115200 ser None # ------------------- 自动找串口 ------------------- def find_usb_serial(): ports list(serial.tools.list_ports.comports()) for p in ports: if USB in p.description or usb in p.description: return p.device return None # ------------------- 安全发送自动重连核心 ------------------- def safe_send(data): global ser try: # 尝试发送 ser.write(data.encode(utf-8)) except: # 发送失败 → 重新找串口 重新打开 rospy.logwarn(串口断开正在重连...) port find_usb_serial() if port: try: ser serial.Serial(port, BAUD_RATE, timeout0.01) rospy.loginfo(f重连成功{port}) time.sleep(0.1) ser.write(data.encode(utf-8)) except: pass # ------------------- 激光 ------------------- def range_callback(msg): distance msg.range safe_send(frange:{distance:.3f}\r\n) # ------------------- IMU ------------------- def imu_callback(msg): ax msg.linear_acceleration.x ay msg.linear_acceleration.y az msg.linear_acceleration.z gx msg.angular_velocity.x gy msg.angular_velocity.y gz msg.angular_velocity.z qx msg.orientation.x qy msg.orientation.y qz msg.orientation.z qw msg.orientation.w safe_send(fimu:{ax:.2f},{ay:.2f},{az:.2f},{gx:.2f},{gy:.2f},{gz:.2f},{qx:.3f},{qw:.3f}\r\n) # ------------------- GPS ------------------- def gps_callback(msg): lat msg.latitude lon msg.longitude alt msg.altitude safe_send(fgps:{lat:.6f},{lon:.6f},{alt:.2f}\r\n) # ------------------- 主函数 ------------------- if __name__ __main__: rospy.init_node(all_serial_forward) # 第一次打开 port find_usb_serial() while not port: rospy.logwarn(未找到串口等待插入...) time.sleep(1) port find_usb_serial() ser serial.Serial(port, BAUD_RATE, timeout0.01) rospy.loginfo(f串口打开成功{port}) rospy.Subscriber(/range, Range, range_callback) rospy.Subscriber(/IMU_data, Imu, imu_callback) rospy.Subscriber(/geovins_gps, NavSatFix, gps_callback) rospy.spin() ser.close()
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2466974.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!