ROS2数据录制实战:手把手教你用ros2 bag记录Duckiebot图像数据(附常见错误排查)
ROS2数据录制实战从Duckiebot仿真到真实场景的全流程指南在机器人开发过程中数据记录与分析是算法验证和系统调试的关键环节。ROS2提供的ros2 bag工具链为开发者提供了强大的数据采集能力但实际应用中往往会遇到各种意料之外的问题。本文将带您从零开始构建一个完整的Duckiebot数据记录方案涵盖仿真环境搭建、命令行操作、Python编程实现以及常见问题排查。1. 环境准备与Duckiebot仿真启动1.1 基础环境配置在开始数据记录前需要确保系统已安装以下组件# 安装ROS2 Rolling发行版以Ubuntu 22.04为例 sudo apt install ros-rolling-desktop # 安装Duckietown仿真环境 pip install duckietown-gym-daffy # 安装rosbag2相关组件 sudo apt install ros-rolling-rosbag2 ros-rolling-rosbag2-storage-default-plugins验证安装是否成功source /opt/ros/rolling/setup.bash python3 -c import gym_duckietown; print(Duckietown环境可用)1.2 启动Duckiebot仿真节点创建一个名为duckiebot_sim.launch.py的启动文件from launch import LaunchDescription from launch_ros.actions import Node def generate_launch_description(): return LaunchDescription([ Node( packageduckietown_simulator, executableduckietown_env, nameduckiebot, outputscreen, parameters[{map_name: udem1}] ) ])启动仿真环境ros2 launch duckiebot_sim.launch.py2. ros2 bag命令行实战技巧2.1 基础录制操作查看当前可用话题ros2 topic list录制单个图像话题ros2 bag record /duckiebot/camera_node/image/compressed常用参数组合参数说明示例-o指定输出文件名ros2 bag record -o duckie_data /image_raw-s设置存储格式ros2 bag record -s mcap /image_raw--max-bag-size限制文件大小(MB)ros2 bag record --max-bag-size 1024 /image_raw2.2 高级录制策略录制多个话题并添加元数据ros2 bag record -o full_session \ /duckiebot/camera_node/image/compressed \ /duckiebot/velocity_to_pose_node/pose \ --qos-profile-overrides-path qos_overrides.yaml其中qos_overrides.yaml内容示例/topic_name: history: keep_last depth: 10 reliability: reliable durability: transient_local2.3 数据验证与回放检查录制内容ros2 bag info full_session回放数据时同步显示图像ros2 bag play full_session ros2 run image_view image_view --ros-args -r /image:/duckiebot/camera_node/image/compressed3. Python编程实现高级录制逻辑3.1 基础录制节点实现创建advanced_recorder.pyimport rclpy from rclpy.node import Node from rosbag2_py import SequentialWriter, StorageOptions, ConverterOptions from rclpy.serialization import serialize_message from sensor_msgs.msg import CompressedImage import os import time class AdvancedRecorder(Node): def __init__(self): super().__init__(advanced_recorder) self.bag_dir fduckie_data_{int(time.time())} os.makedirs(self.bag_dir, exist_okTrue) self.writer SequentialWriter() storage_options StorageOptions( uriself.bag_dir, storage_idsqlite3) converter_options ConverterOptions() self.writer.open(storage_options, converter_options) self.image_sub self.create_subscription( CompressedImage, /duckiebot/camera_node/image/compressed, self.image_callback, 10) topic_info { name: /duckiebot/camera_node/image/compressed, type: sensor_msgs/msg/CompressedImage, serialization_format: cdr } self.writer.create_topic(topic_info) def image_callback(self, msg): self.writer.write( /duckiebot/camera_node/image/compressed, serialize_message(msg), self.get_clock().now().nanoseconds) def main(argsNone): rclpy.init(argsargs) recorder AdvancedRecorder() try: rclpy.spin(recorder) except KeyboardInterrupt: recorder.get_logger().info(录制完成数据保存在: recorder.bag_dir) finally: recorder.destroy_node() rclpy.shutdown() if __name__ __main__: main()3.2 带条件触发的智能录制扩展录制逻辑添加运动检测触发from geometry_msgs.msg import Twist class SmartRecorder(AdvancedRecorder): def __init__(self): super().__init__() self.cmd_sub self.create_subscription( Twist, /duckiebot/cmd_vel, self.cmd_callback, 10) self.is_moving False self.last_active 0 def cmd_callback(self, msg): if abs(msg.linear.x) 0.01 or abs(msg.angular.z) 0.01: self.is_moving True self.last_active time.time() elif time.time() - self.last_active 5.0: self.is_moving False def image_callback(self, msg): if self.is_moving: super().image_callback(msg)4. 常见问题排查与性能优化4.1 典型错误解决方案问题1ImportError: cannot import name SequentialWriter from rosbag2_py解决方案# 确保安装正确版本的rosbag2 sudo apt install ros-rolling-rosbag2-py source /opt/ros/rolling/setup.bash问题2录制文件过大过快优化策略使用压缩存储格式降低录制频率过滤不必要的话题# 在回调函数中添加节流逻辑 self.last_record 0 def image_callback(self, msg): now time.time() if now - self.last_record 0.1: # 10Hz采样 self.writer.write(...) self.last_record now4.2 存储格式对比格式压缩率读写速度兼容性适用场景SQLite3低快高小规模数据MCAP中中中通用场景Zstd高慢低长期存储4.3 多机同步录制方案对于分布式系统需要时间同步# 主节点 ros2 bag record -o master_data /master/camera # 从节点同步时间戳 ros2 bag record -o slave_data /slave/camera --use-sim-time在后期处理时对齐时间戳import pandas as pd master_df pd.read_parquet(master_data/metadata.yaml) slave_df pd.read_parquet(slave_data/metadata.yaml) aligned_data pd.merge_asof( master_df.sort_values(timestamp), slave_df.sort_values(timestamp), ontimestamp, directionnearest)5. 实战构建完整的数据处理流水线5.1 自动化录制脚本创建auto_recorder.sh#!/bin/bash SESSION_ID$(date %s) BAG_DIRsession_$SESSION_ID # 启动仿真环境 ros2 launch duckietown_simulator duckiebot.launch.py SIM_PID$! # 等待话题出现 while ! ros2 topic list | grep -q /duckiebot/camera_node/image/compressed; do sleep 1 done # 开始录制 ros2 bag record -o $BAG_DIR \ /duckiebot/camera_node/image/compressed \ /duckiebot/velocity_to_pose_node/pose RECORD_PID$! # 运行控制节点 ros2 run duckietown_control control_node # 结束录制 kill -INT $RECORD_PID wait $RECORD_PID # 关闭仿真 kill $SIM_PID # 生成报告 ros2 bag info $BAG_DIR report_$SESSION_ID.txt5.2 数据后处理示例使用Python分析录制数据import rosbag2_py from rclpy.serialization import deserialize_message from sensor_msgs.msg import CompressedImage import cv2 reader rosbag2_py.SequentialReader() storage_options rosbag2_py.StorageOptions(urisession_1234567890, storage_idsqlite3) converter_options rosbag2_py.ConverterOptions() reader.open(storage_options, converter_options) while reader.has_next(): topic, data, timestamp reader.read_next() if topic /duckiebot/camera_node/image/compressed: msg deserialize_message(data, CompressedImage) cv_img cv2.imdecode(msg.data, cv2.IMREAD_COLOR) # 进行图像处理...在实际项目中我们发现Duckiebot的图像数据往往需要配合车辆状态信息才能发挥最大价值。建议开发者建立统一的时间索引将传感器数据与控制指令关联分析。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2452661.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!