数据同步

作者:iohannes 发布时间: 2025-07-11 阅读量:14

功能类SyncedMessage

#pragma once

#include <sensor_msgs/Imu.h>
#include <sensor_msgs/PointCloud2.h>
#include <queue>
#include <vector>
#include <algorithm>
#include <functional>
#include "slam_lidar/VideoPath.h"
#include <geometry_msgs/QuaternionStamped.h>
#include <geometry_msgs/PointStamped.h>
#include "slam_lidar/synced_message.h"
#include <ros/time.h>
#include <ros/ros.h>

namespace slam_lidar
{

    typedef struct _synced_message_t
    {
        geometry_msgs::QuaternionStampedConstPtr imu_ori;
        geometry_msgs::TwistStampedConstPtr imu_twi;
        geometry_msgs::PointStampedConstPtr gnss_pos;
        sensor_msgs::PointCloud2ConstPtr lidar_point;
        slam_lidar::VideoPathConstPtr video_img;

    } synced_message_t;

    class SyncedMessage
    {
    public:
        using CallbackFunc = std::function<void(synced_message_t)>;

        void putImuOri(const geometry_msgs::QuaternionStampedConstPtr &imu_ori_msg)
        {
            if (imu_ori_msg == nullptr)
            {
                std::cerr << "imu_ori_msg is null" << std::endl;
                return;
            }

            // ROS_INFO("got imu ori msg");
            geometry_msgs::QuaternionStampedConstPtr temp = boost::make_shared<geometry_msgs::QuaternionStamped>(*imu_ori_msg);
            all_imu_ori_msg_.push(temp);
            // update();
        }

        void putImuTwi(const geometry_msgs::TwistStampedConstPtr &imu_twi_msg)
        {
            if (imu_twi_msg == nullptr)
            {
                std::cerr << "imu_twi_msg is null" << std::endl;
                return;
            }

            // ROS_INFO("got imu twi msg");
            geometry_msgs::TwistStampedConstPtr temp(new geometry_msgs::TwistStamped(*imu_twi_msg));
            all_imu_twi_msg_.push(temp);
            // update();
        }

        void putGnssPos(const geometry_msgs::PointStampedConstPtr &gnss_pos_msg)
        {
            if (gnss_pos_msg == nullptr)
            {
                std::cerr << "gnss_pos_msg is null" << std::endl;
                return;
            }

            // ROS_INFO("got gnss pos msg");
            geometry_msgs::PointStampedConstPtr temp(new geometry_msgs::PointStamped(*gnss_pos_msg));
            all_gnss_pos_msg_.push(temp);
            // update();
        }

        void putLidarPoint(const sensor_msgs::PointCloud2ConstPtr &lidar_point_msg)
        {
            if (lidar_point_msg == nullptr)
            {
                std::cerr << "lidar_point_msg is null" << std::endl;
                return;
            }

            // ROS_INFO("got lidar point msg");
            sensor_msgs::PointCloud2ConstPtr temp(new sensor_msgs::PointCloud2(*lidar_point_msg));
            all_lidar_point_msg_.push(temp);
            update();
        }

        void putVideoImg(const slam_lidar::VideoPathConstPtr &video_img_msg)
        {
            if (video_img_msg == nullptr)
            {
                return;
            }

            // ROS_INFO("got video img msg");
            slam_lidar::VideoPathConstPtr temp(new slam_lidar::VideoPath(*video_img_msg));
            all_video_img_msg_.push(temp);
            // update();
        }

        void setCallback(CallbackFunc callback_func)
        {
            callback_func_ = callback_func;
        }

    private:
        std::queue<geometry_msgs::QuaternionStampedConstPtr> all_imu_ori_msg_;
        std::queue<geometry_msgs::TwistStampedConstPtr> all_imu_twi_msg_;
        std::queue<geometry_msgs::PointStampedConstPtr> all_gnss_pos_msg_;
        std::queue<sensor_msgs::PointCloud2ConstPtr> all_lidar_point_msg_;
        std::queue<slam_lidar::VideoPathConstPtr> all_video_img_msg_;
        CallbackFunc callback_func_ = nullptr;

        const int BUFF_SIZE = 10;

        struct HeapNode
        {
            ros::Time time;
            int queueIndex;

            bool operator>(const HeapNode &other) const
            {
                return time > other.time;
            }
        };

        bool dataReady()
        {
            // all_imu_ori_msg_
            if (all_imu_ori_msg_.size() < BUFF_SIZE)
            {
                return false;
            }

            // all_imu_twi_msg_
            if (all_imu_twi_msg_.size() < BUFF_SIZE)
            {
                return false;
            }

            // all_gnss_pos_msg_
            if (all_gnss_pos_msg_.size() < BUFF_SIZE)
            {
                return false;
            }

            // all_lidar_point_msg_
            if (all_lidar_point_msg_.size() < BUFF_SIZE)
            {
                return false;
            }

            // all_video_img_msg_
            if (all_video_img_msg_.size() < BUFF_SIZE)
            {
                return false;
            }
            return true;
        }

        void update()
        {

            try
            {
                if (!dataReady())
                {
                    // std::cerr << "Data not ready" << std::endl;
                    return;
                }
                // std::cout << "Data ready" << std::endl;
                synced_message_t result = findClosestValues();

                // 调用回调函数
                if (callback_func_)
                {
                    callback_func_(result);
                }
                // ROS_INFO("imu_ori size: %ld", all_imu_ori_msg_.size());
                // ROS_INFO("imu_twi size: %ld", all_imu_twi_msg_.size());
                // ROS_INFO("gnss_pos size: %ld", all_gnss_pos_msg_.size());
                // ROS_INFO("lidar_point size: %ld", all_lidar_point_msg_.size());
                // ROS_INFO("video_img size: %ld", all_video_img_msg_.size());
            }
            catch (...)
            {
                ROS_ERROR("syn msg update error");
            }
        }

        synced_message_t findClosestValues()
        {
            ros::Duration lastDiff = ros::Duration(0);  // 用于记录上一次的时间差
            ros::Duration minDiff = ros::Duration(1e9); // 初始化为一个很大的值

            std::priority_queue<HeapNode, std::vector<HeapNode>, std::greater<HeapNode>> minHeap;
            ros::Time maxTime = ros::Time(0);

            synced_message_t result;

            // 初始化堆
            if (!all_imu_ori_msg_.empty())
            {
                auto item = all_imu_ori_msg_.front();
                ros::Time stamp = item->header.stamp;
                minHeap.push({stamp, 0});
                maxTime = std::max(maxTime, stamp);
            }
            if (!all_imu_twi_msg_.empty())
            {
                auto item = all_imu_twi_msg_.front();
                ros::Time stamp = item->header.stamp;
                minHeap.push({stamp, 1});
                maxTime = std::max(maxTime, stamp);
            }
            if (!all_gnss_pos_msg_.empty())
            {
                auto item = all_gnss_pos_msg_.front();
                ros::Time stamp = item->header.stamp;
                minHeap.push({stamp, 2});
                maxTime = std::max(maxTime, stamp);
            }
            if (!all_lidar_point_msg_.empty())
            {
                auto item = all_lidar_point_msg_.front();
                ros::Time stamp = item->header.stamp;
                minHeap.push({stamp, 3});
                maxTime = std::max(maxTime, stamp);
            }
            if (!all_video_img_msg_.empty())
            {
                auto item = all_video_img_msg_.front();
                ros::Time stamp = item->header.stamp;
                minHeap.push({stamp, 4});
                maxTime = std::max(maxTime, stamp);
            }

            // 如果堆为空,说明没有消息可以处理
            if (minHeap.empty())
            {
                ROS_WARN("No messages available to sync.");
                return result;
            }

            // 循环处理
            while (!minHeap.empty())
            {
                HeapNode current = minHeap.top();
                minHeap.pop();

                ros::Duration diff = maxTime - current.time;

                // 如果当前时间差大于上一次的最小时间差,则认为找到最佳匹配
                if (diff.toNSec() > lastDiff.toNSec() && lastDiff.toNSec() != 0)
                {
                    break;
                }

                // 更新最小差值
                if (diff.toNSec() < minDiff.toNSec()
                    && !all_imu_ori_msg_.empty() && !all_imu_twi_msg_.empty()
                    && !all_gnss_pos_msg_.empty() && !all_lidar_point_msg_.empty()
                    && !all_video_img_msg_.empty())
                {
                    minDiff = diff;
                    result.imu_ori = all_imu_ori_msg_.front();
                    result.imu_twi = all_imu_twi_msg_.front();
                    result.gnss_pos = all_gnss_pos_msg_.front();
                    result.lidar_point = all_lidar_point_msg_.front();
                    result.video_img = all_video_img_msg_.front();
                }

                lastDiff = diff; // 更新上一次的时间差

                // 更新堆
                switch (current.queueIndex)
                {
                case 0:
                {
                    all_imu_ori_msg_.pop();
                    if (!all_imu_ori_msg_.empty())
                    {
                        auto item = all_imu_ori_msg_.front();
                        ros::Time stamp = item->header.stamp;
                        minHeap.push({stamp, 0});
                        maxTime = std::max(maxTime, stamp);
                    }
                }
                break;
                case 1:
                {
                    all_imu_twi_msg_.pop();
                    if (!all_imu_twi_msg_.empty())
                    {
                        auto item = all_imu_twi_msg_.front();
                        ros::Time stamp = item->header.stamp;
                        minHeap.push({stamp, 1});
                        maxTime = std::max(maxTime, stamp);
                    }
                }
                break;
                case 2:
                {
                    all_gnss_pos_msg_.pop();
                    if (!all_gnss_pos_msg_.empty())
                    {
                        auto item = all_gnss_pos_msg_.front();
                        ros::Time stamp = item->header.stamp;
                        minHeap.push({stamp, 2});
                        maxTime = std::max(maxTime, stamp);
                    }
                }
                break;
                case 3:
                {
                    all_lidar_point_msg_.pop();
                    if (!all_lidar_point_msg_.empty())
                    {
                        auto item = all_lidar_point_msg_.front();
                        ros::Time stamp = item->header.stamp;
                        minHeap.push({stamp, 3});
                        maxTime = std::max(maxTime, stamp);
                    }
                }
                break;
                case 4:
                {
                    all_video_img_msg_.pop();
                    if (!all_video_img_msg_.empty())
                    {
                        auto item = all_video_img_msg_.front();
                        ros::Time stamp = item->header.stamp;
                        minHeap.push({stamp, 4});
                        maxTime = std::max(maxTime, stamp);
                    }
                }
                break;
                default:
                    break;
                }
            }

            return result;
        }
    };
}

使用

SyncedMessage synced_message_;
synced_message_.setCallback(std::bind(&SlamNodelet::HandleSynData, this, std::placeholders::_1));

void HandleSynData(const synced_message_t &synced_message)
{
    try
    {
        
    }
    catch (...)
    {
        ROS_ERROR("HandleSynData error.");
    }
}