ros与mqtt相互转换

vda5050

VDA5050协议介绍 和 详细翻译-CSDN博客

ros与mqtt相互转换

如何转换的,通过某个中转包,获取ros的消息然后以需要的格式转换为mqtt

需要的参数

ros相关

parameters=[
('ros_subscriber_type', 'vda5050_msgs/NodeState'),
('ros_subscriber_queue', 1),
]
        

mqtt相关

parameters=[
('interface_name', 'uagv'),
('major_version', 'v2'),
('manufacturer', 'RobotCompany'),
('serial_number', 'carter01'),
('mqtt_client_name', 'RosToMqttBridge'),
('mqtt_host_name', 'localhost'),
('mqtt_port', 1883),
('mqtt_transport', 'tcp'),
('mqtt_ws_path', ''),
('mqtt_keep_alive', 60),
('convert_snake_to_camel', True),
('reconnect_period', 5),
('retry_forever', False),
('num_retries', 10)
]
        

ros需要的内容

订阅了需要转换为mqtt的ros订阅者

self.subscription = self.create_subscription(
            ros_loader.get_message_class(
                self.get_parameter('ros_subscriber_type').value),
            'ros_sub_topic', self.__ros_subscriber_callback,
            self.get_parameter('ros_subscriber_queue').value)

mqtt需要的内容

mqtt客户端

self.mqtt_client = mqtt.Client(
            self.get_parameter('mqtt_client_name').value,
            transport=self.get_parameter('mqtt_transport').value)

mqtt配置项

if self.get_parameter('mqtt_transport').value == 'websockets' and \
                self.get_parameter('mqtt_ws_path').value != '':
            self.mqtt_client.ws_set_options(path=self.get_parameter('mqtt_ws_path').value)

        self.interface_name = self.get_parameter('interface_name').value
        self.major_version = self.get_parameter('major_version').value
        self.manufacturer = self.get_parameter('manufacturer').value
        self.serial_number = self.get_parameter('serial_number').value
        self.mqtt_topic_prefix = \
            f'{self.interface_name}/{self.major_version}/{self.manufacturer}/{self.serial_number}'

mqtt状态检查

基于次数、或bool值,结合try-except来实现对mqtt_host_name、mqtt_port、mqtt_keep_alive的检查

max_retries = self.get_parameter('num_retries').value
        retries = 0
        connected = False
        retry_forever = self.get_parameter('retry_forever').value
        while retries < max_retries or retry_forever:
            try:
                self.mqtt_client.connect(
                    self.get_parameter('mqtt_host_name').value,
                    self.get_parameter('mqtt_port').value,
                    self.get_parameter('mqtt_keep_alive').value)
                connected = True
                break
            except ConnectionRefusedError as e:
                self.get_logger().error(f'Connection Error: {e}. Please check the mqtt_host_name.')
                time.sleep(self.get_parameter('reconnect_period').value)
                retries += 1
            except socket.timeout as e:
                self.get_logger().error(f'Connection Error: {e}. Please check the mqtt_host_name'
                                        ' and make sure it is reachable.')
                time.sleep(self.get_parameter('reconnect_period').value)
                retries += 1
            except socket.gaierror as e:
                self.get_logger().error(f'Connection Error: {e}. Could not resolve mqtt_host_name')
                time.sleep(self.get_parameter('reconnect_period').value)
                retries += 1
        if connected:
            self.mqtt_client.loop_start()
        else:
            self.get_logger().error('Failed to connect to MQTT broker, ending retries.')

ros话题转换mqtt

需要使用的包

from rosbridge_library.internal import message_conversion
import json

使用message_conversion提取话题数据,利用json将提取的数据转换并发布到mqtt客户端中。

 def __ros_subscriber_callback(self, msg):
        try:
            extracted = message_conversion.extract_values(msg)
            if self.get_parameter('convert_snake_to_camel').value:
                self.mqtt_client.publish(
                    f'{self.mqtt_topic_prefix}/state',
                    json.dumps(convert_dict_keys(extracted, 'snake_to_dromedary')))
            else:
                self.mqtt_client.publish(f'{self.mqtt_topic_prefix}/state', json.dumps(extracted))
        except (message_conversion.FieldTypeMismatchException,
                json.decoder.JSONDecodeError) as e:
            self.get_logger().info(repr(e))

mqtt转换ros

使用包

from rosbridge_library.internal import ros_loader
from rosbridge_library.internal import message_conversion
import json

用ros_loader.get_message_instance创建ros消息

接收mqtt的msg,由json加载为ros消息键值,再用message_conversion将ros消息键值对转换为ros_msg并利用发布者发布。

def on_mqtt_message(client, userdata, msg):
            try:
                publisher = None
                self.get_logger().info(f'From {msg.topic}: {str(msg.payload)}')
                if msg.topic.endswith('order'):
                    ros_msg = ros_loader.get_message_instance(
                        self.get_parameter('ros_publisher_type').value)
                    publisher = self.publisher
                if msg.topic.endswith('instantActions'):
                    ros_msg = ros_loader.get_message_instance('vda5050_msgs/InstantActions')
                    publisher = self.instant_actions_publisher

                if self.get_parameter('convert_camel_to_snake').value:
                    message_dict = json.loads(str(msg.payload, 'utf-8'))
                    converted_message_dict = convert_dict_keys(
                        message_dict, 'camel_to_snake')
                    message_conversion.populate_instance(
                        converted_message_dict, ros_msg)
                else:
                    message_conversion.populate_instance(
                        json.loads(str(msg.payload, 'utf-8')), ros_msg)
                if publisher:
                    publisher.publish(ros_msg)
            except (message_conversion.FieldTypeMismatchException,
                    json.decoder.JSONDecodeError) as e:
                self.get_logger().info(repr(e))
                error_msg = String()
                error_msg.data = repr(e)
                self.error_publisher.publish(error_msg)

源码

如下页面

isaac_ros_mission_client — isaac_ros_docs documentation

git clone

git clone -b release-3.1 https://github.com/NVIDIA-ISAAC-ROS/isaac_ros_mission_client.git isaac_ros_mission_client
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/907049.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

一些硬件知识【2024/11/2】

当需要提供功率型的输出信号的时候&#xff0c;可以在信号发生器外接功率放大器&#xff0c;这样可以提高输出功率 信号的调幅&#xff08;AM&#xff09;、调频&#xff08;FM&#xff09;与调相&#xff08;PM&#xff09;&#xff1a; 调制信号&#xff1a;控制高频振荡的低…

YOLO即插即用---PKIBlock

Poly Kernel Inception Network for Remote Sensing Detection 论文地址 1. 解决的问题 2. 解决方案 3. 解决问题的具体方法 4. 模块的应用 5. 在目标检测任务中的添加位置 6.即插即用代码 论文地址 2403.06258https://arxiv.org/pdf/2403.06258 1. 解决的问题 遥感图…

丝杆支撑座的更换与细节注意事项

丝杆支撑座是支撑连接丝杆和电机的轴承支撑座&#xff0c;分固定侧和支撑侧&#xff0c;它们都有用预压调整的JIS5级的交界处球轴承。在自动化设备中是常用的传动装置&#xff0c;作为核心部件&#xff0c;对设备精度、稳定性和生产效率产生直接影响。在长时间运行中&#xff0…

3D Gaussian Splatting代码详解(一):模型训练、数据加载

1 模型训练 这段代码实现了一个 3D 高斯模型的训练循环&#xff0c;旨在通过逐步优化模型参数&#xff0c;使其能够精确地渲染特定场景。以下是代码的详细解析&#xff1a; def training(dataset, opt, pipe, testing_iterations, saving_iterations, checkpoint_iterations,…

Docker-微服务项目部署

环境准备 1.微服务项目 参考&#xff1a;通过网盘分享的文件&#xff1a;wolf2w_cloud.zip 链接: https://pan.baidu.com/s/1Lr4k6LPIJ59gVNA_DgKM_Q?pwdkjxt 提取码: kjxt 前端项目&#xff1a;trip-mgrsite-ui&#xff0c;trip-website-ui&#xff0c;trip-wenda-ui 服务项…

设计模式讲解01-建造者模式(Builder)

1. 概述 建造者模式也称为&#xff1a;生成器模式 定义&#xff1a;建造者模式是一种创建型设计模式&#xff0c;它允许你将创建复杂对象的步骤与表示方式相分离。 解释&#xff1a;建造者模式就是将复杂对象的创建过程拆分成多个简单对象的创建过程&#xff0c;并将这些简单…

HTML 基础标签——文本内容标签 <ul>、<ol>、<blockquote> 、<code> 等标签的用法详解

文章目录 1. 标题标签2. 段落标签3. 文本格式化标签4. 列表标签4.1 无序列表 `<ul>`4.2 有序列表 `<ol>`5. 引用标签5.1 块引用 `<blockquote>`5.2 行内引用 `<q>`5.3 作品引用 `<cite>`6. 代码和预格式文本标签6.1 代码标签 `<code>`6.2 …

(51)MATLAB迫零均衡器系统建模与性能仿真

文章目录 前言一、迫零均衡器性能仿真说明二、迫零均衡器系统建模与性能仿真代码1.仿真代码2.代码说明3.迫零均衡器zf_equalizer的MATLAB源码 三、仿真结果1.信道的冲击响应2.频率响应3.迫零均衡器的输入和输出 前言 使用MATLAB对迫零均衡器系统进行建模仿真&#xff0c;完整的…

前端请求后端接口报错(blocked:mixed-content),以及解决办法

报错原因&#xff1a;被浏览器拦截了&#xff0c;因为接口地址不是https的。 什么是混合内容&#xff08;Mixed Content&#xff09; 混合内容是指在同一页面中同时包含安全&#xff08;HTTPS&#xff09;和非安全&#xff08;HTTP&#xff09;资源的情况。当浏览器试图加载非…

python 包和模块

一、模块 一个.py 文件就是一个模块&#xff0c;模块是含有一系列数据&#xff0c;函数&#xff0c;类等的程序。 1、模块导入 1.1、impotrt 模块名称 [ as 别名] import nunpy as np 1.2、form 模块名 import 模块内属性名 [ as 别名] from datetime import datetime as d…

Git下载-连接码云-保姆级教学(连接Gitee失败的解决)

Git介绍 码云连接 一、Git介绍 二、Git的工作机制 下载链接&#xff1a;Git - 下载软件包 三、使用步骤 创建一个wss的文件夹&#xff0c;作为‘工作空间’ 四、连接码云账号 五、连接Gitee失败的解决方法 一、Git介绍 Git是一个免费的、开源的分布式版本控制…

https和http的区别,及HTTPS的工作流程

HTTP&#xff08;HyperText Transfer Protocol&#xff09;和HTTPS&#xff08;HyperText Transfer Protocol Secure&#xff09;都是超文本传输协议&#xff0c;但它们之间的关键区别在于安全性。 安全性&#xff1a; HTTP&#xff1a;数据以明文传输&#xff0c;没有加密&…

【Python · Pytorch】人工神经网络 ANN(上)

【Python Pytorch】人工神经网络 ANN&#xff08;上&#xff09; 0. 生物神经网络1. 人工神经网络定义2. 人工神经网络结构2.1 感知机2.2 多层感知机2.3 全连接神经网络2.4 深度神经网络 2. 训练流程※ 数据预处理 (Data Preprocessing) 3. 常见激活函数3.1 Sigmoid / Logisti…

基本查询【MySQL】

文章目录 基本查询插入时是否更新替换查询指定列查询查询字段为表达式为查询结果指定别名结果去重where条件NULL 的查询 结果排序筛选分页结果UpdateDelete截断表聚合函数分组(group by)having && where 基本查询 建表 mysql> create table Student (-> id int…

pandas——数据结构

一、series &#xff08;一&#xff09;创建series import pandas as pd#1.使用列表或数组创建Series # 使用列表创建Series&#xff0c;索引默认从0开始 s1 pd.Series([1, 2, 3]) print(s1) # 使用列表和自定义索引创建Series s2 pd.Series([1, 2, 3], index[a, b, c]) pr…

算法妙妙屋-------1.递归的深邃回响:C++ 算法世界的优雅之旅

前言&#xff1a; 递归是一种在算法中广泛应用的思想&#xff0c;其主体思想是通过将复杂的问题分解为更简单的子问题来求解。具体而言&#xff0c;递归通常包括以下几个要素&#xff1a; 基本情况&#xff08;Base Case&#xff09;&#xff1a;每个递归算法必须有一个或多个…

禾川HCQ1控制器程序编译报错如何解决

1、第一次打开用户程序 2、提示库未安装 3、安装库文件 4、脉冲轴库未安装 5、没有错误 去禾川自动化官网,把可以安装的包和库都安装下,程序编译就没有错误了。 6、下载相关包文件

HarmonyOS:@Watch装饰器:状态变量更改通知

Watch应用于对状态变量的监听。如果开发者需要关注某个状态变量的值是否改变&#xff0c;可以使用Watch为状态变量设置回调函数。 说明 从API version 9开始&#xff0c;该装饰器支持在ArkTS卡片中使用。 从API version 11开始&#xff0c;该装饰器支持在元服务中使用。 一、概…

Windows如何查看自己网卡的MAC地址?

本章教程&#xff0c;主要介绍如何在Windows查看自己的网卡mac地址。 一、查询MAC地址方法 打开使用PowerShell&#xff0c;运行以下命令即可查询到自己的网卡MAC地址。 Get-NetAdapter | Select-Object Name, MacAddress二、MAC地址是什么 MAC地址&#xff08;Media Access Co…

Unknown at rule @tailwindscss(unknownAtRules)

一、前言 整合 tailwindcss 后&#xff0c;发现指令提示警告 Unknown at rule tailwindscss(unknownAtRules)&#xff0c;其实是 vscode 无法识别 tailwindscss 指令&#xff0c;不影响使用&#xff0c;但是对于我这种有编程洁癖的人来说&#xff0c;有点膈应。 二、解决方案…