我们知道在ROS中,由很多互不相干的节点组成了一个复杂的系统,单个的节点看起来是没起什么作用,但是节点之间进行了通信之后,相互之间能够交互信息和数据的时候,就变得很有意思了。
节点之间进行通信的一个常用方法就是使用话题(topic),话题表示的是一个定义了类型的消息流,比方说,摄像机产生的数据可能就会被发送到一个叫做image的话题上,类型是Image的图片类型。
1、话题概述
话题是一种通过发布(publish)与订阅(subscribe)进行通信的机制,这在分布式系统中是一种很常见的数据交换方式,节点在发送数据到话题上之前,需要先声明(advertise)话题名和消息类型,然后与roscore建立一个连接,roscore将共享它的订阅者与分享者列表,接着就可以发布数据到这个话题上了。对于想要从话题上接收消息的节点来说,需要通过向roscore发出请求来订阅这个话题,订阅之后,该话题上所有的消息都会被转发到这个请求的节点上,这样发布者跟订阅者就建立起了直接的连接。
它们之间的连接在 Ubuntu18.04版本安装ROS及出现错误的处理方法 这篇文章里面有更详细地讲解,有兴趣的可以去了解下。
需要注意的是,同一个话题上的所有消息必须是同一类型的。另外话题的名字最好是能够体现发送的是什么消息,比方说,在PR2机器人上面 /wide_stereo/right/image_color 的这个话题,可以直观的了解到,发送的右边相机的彩色图像数据。
2、话题声明
2.1、准备工作
在声明话题之前,需要先有一个工作区,对于初次接触的朋友,更多初始化工作区与包的详细操作,可以查阅:ROS新建工作区(workspace)与包(package)编译的实践(C++示例)
我们创建一个test包
catkin_create_pkg test rospy
接着在src目录里面新建一个topic_publisher.py文件,代码如下:
import rospy
from std_msgs.msg import Int32 # ROS标准消息包
rospy.init_node('topic_publisher') # 初始化节点
pub = rospy.Publisher('counter',Int32) # 发布前先声明话题名称与消息类型
rate = rospy.Rate(2) # 速率(Hz)
count = 0
while not rospy.is_shutdown():
pub.publish(count)
count += 1
rate.sleep()
代码很好理解,初始化节点和声明话题,然后就是在rospy没有关闭前,每秒钟发布2次数据,其中rate.sleep()就是休息指定频率的间隔时间。
由于我们需要运行这个文件,所以需要能够执行的权限
chmod u+x topic_publisher.py
查看下文件,可以看到有了可执行权限(-rwxrw-r--):
ls -l topic_publisher.py
我们从from std_msgs.msg import Int32可以知道,有了一个标准消息包std_msgs,所以我们需要告诉ROS的构建系统,在package.xml文件里面添加依赖项(dependency):
cd ~/catkin_ws/src/test
gpedit package.xml
<depend package="std_msgs" />
添加了依赖项之后,来到工作区根目录进行编译:
cd ~/catkin_ws
catkin_make
这样就将test包以及话题发布的Python文件构建好了。
2.2、发布话题
前面有介绍这个rostopic话题命令的使用,在使用命令前,先启动节点管理器roscore,直接输入roscore命令回车即可
重新开一个终端,运行这个节点
rosrun test topic_publisher.py
出现错误:
import-im6.q16: not authorized `rospy' @ error/constitute.c/WriteImage/1037.
from: can't read /var/mail/std_msgs.msg
/home/yahboom/catkin_ws/src/test/src/topic_publisher.py: line 4: syntax error near unexpected token `'topic_publisher''
/home/yahboom/catkin_ws/src/test/src/topic_publisher.py: line 4: `rospy.init_node('topic_publisher')'
看起来上面Python的代码有误,是没有被授权的意思,实质是没有指定解释器,这里需要告知ROS系统,这是一个Python文件
cd ~/catkin_ws/src/test/src
gedit topic_publisher.py
在代码最上面添加:#!/usr/bin/env python
然后再次运行命令即可
rosrun test topic_publisher.py
我们来看下rostopic在测试和维护当中会经常使用到的命令的用法。
2.2.1、rostopic list
再开一个终端,输入命令:rostopic list,可以看到多出了一个counter话题
/counter
/rosout
/rosout_agg
查看话题上面发布的消息:rostopic echo counter -n 5
data: 527
---
data: 528
---
data: 529
---
data: 530
---
data: 531
如果不加-n 5就是一直打印下去,直到按下Ctrl+C终止
2.2.2、rostopic hz
检验它是否按照我们期望的速率来发布消息: rostopic hz counter
subscribed to [/counter]
average rate: 2.003
min: 0.499s max: 0.499s std dev: 0.00000s window: 2
average rate: 1.997
min: 0.499s max: 0.502s std dev: 0.00098s window: 4
average rate: 2.000
min: 0.498s max: 0.502s std dev: 0.00140s window: 6
average rate: 2.000
min: 0.498s max: 0.502s std dev: 0.00118s window: 8
2.2.3、rostopic info
查看已声明的话题:rostopic info counter
Type: std_msgs/Int32
Publishers:
* /topic_publisher (http://YAB:45599/)Subscribers: None
这表明counter话题承载的类型是std_msgs/Int32,并且由topic_publisher声明,运行在YAB主机上,并且通过TCP端口45599来通信,Subscribers: None表示当前还没有订阅者。
2.2.4、rostopic find
通过类型查询所有话题:rostopic find std_msgs/Int32
/counter
类型需要给出包名(std_msgs)/消息类型(Int32)
更多rostopic命令的用法,可以查看帮助:rostopic -h
rostopic is a command-line tool for printing information about ROS Topics.
Commands:
rostopic bw display bandwidth used by topic 显示按话题使用的带宽
rostopic delay display delay of topic from timestamp in header 显示标题中时间戳的话题延迟
rostopic echo print messages to screen 将消息打印到屏幕
rostopic find find topics by type 按类型查找话题
rostopic hz display publishing rate of topic 显示话题的发布频率
rostopic info print information about active topic 打印有关活动话题的信息
rostopic list list active topics 列出活动话题
rostopic pub publish data to topic 向话题发布数据
rostopic type print topic or field type 打印话题或字段类型Type rostopic <command> -h for more detailed usage, e.g. 'rostopic echo -h'
3、订阅话题
有了发布话题之后,现在来写一个订阅的话题,topic_subscriber.py代码如下:
#!/usr/bin/env python
import rospy
from std_msgs.msg import Int32 # ROS标准消息包
# 定义个回调函数
def callback(msg):
print(msg.data)
rospy.init_node('topic_subscriber') # 初始化节点
pub = rospy.Subscriber('counter',Int32,callback) # 多一个回调函数
rospy.spin()
代码可以看到,大同小异,关键区别在于订阅的代码使用了回调函数,因为ROS是一个事件驱动的系统,所以在ROS中将会看到大量使用回调函数。这里的订阅,只要每次有消息来时,就会调用相应的回调函数,并使用接收到的消息作为它的参数。这里使用rospy.spin()将程序的运行交给ROS,避免使用上面发布话题中的while循环的一种捷径,ROS并不是必须要接管程序的主线程。
同样的,这个文件因为要去执行它,所以添加一个可执行权限:chmod u+x topic_subscriber.py
rosrun test topic_subscriber.py
850
851
852
853
854
855
...
可以正常的接收到发布者发布的消息。然后我们使用可视化工具:rqt_graph来看下它们之间的关系,如下图:
可以看到发布者(/topic_publisher)通过话题(/counter)将消息传给订阅者(/topic_subscriber)
也可以使用命令行进行消息发布:rostopic pub counter std_msgs/Int32 9999999
这样就插播一条值为9999999的消息,传给了订阅者。
现在我们来看下话题的状态信息:rostopic info counter
Type: std_msgs/Int32
Publishers:
* /topic_publisher (http://YAB:34725/)
* /rostopic_3918_1692005474115 (http://YAB:45641/)Subscribers:
* /topic_subscriber (http://YAB:42075/)
可以看到这里出现两个发布者,其中一个就是命令行的,也出现了一个订阅者,恩,没有问题!
4、锁存话题
在ROS中,消息是短暂的,意味着订阅可能存在丢失消息的问题,当然上面这个例子发送频率比较高,可能没什么影响,因为马上有新的消息被发送过来,不过对于有些场景,频繁发送消息也不是一个好主意。
比方说,一个节点通过map话题发送地图(数据类型:nav_msgs/OccupancyGrid),一般来说地图在一段时间内是不会发生改变,并且只会在节点加载完地图之后发送一次,如果另外一个节点需要这个地图就永远都无法获取到了。
因为地图通常很大,我们也不想经常发送,解决办法可以将频率调低,不过这里的频率应该设置多大,也是一个技巧性的问题。
所以这里提供了一个不错的解决方案,使用锁存,也就是将话题标记为锁存的时候,订阅者在完成订阅之后将会自动获取话题上最后一条消息。代码很简单,添加一个参数即可:
pub = rospy.Publisher('counter',Int32,latch=True)
在低版本上的参数是latched,所以在新的版本上就会报下面这样的错误:
Traceback (most recent call last):
File "/home/yahboom/catkin_ws/src/test/src/topic_one.py", line 6, in <module>
pub = rospy.Publisher('counter',Int32,latched=True)
TypeError: __init__() got an unexpected keyword argument 'latched'
需要将latched更改成latch。
5、自定义消息类型
ROS有丰富的内置消息类型,比如上面的std_msgs包定义了一些基本的类型,这些类型数据构成的定长或变长数组在Python中经过底层的通信反序列化处理,得到元组(tuple)并且可以设置成元组或列表(list)。
5.1、基本消息类型
ROS中C++与Python的基本消息类型,以及如何序列化,表格如下:
ROS消息类型 | 序列化结果 | C++类型 | C++类型 | 备注 |
bool | Unsigned 8-bit integer | uint8_t | bool | |
int8 | Signed 8-bit integer | int8_t | int | |
uint8 | Unsigned 8-bit integer | uint8_t | uint8_t | uint8[]在Python中是string表示 |
int16 | Signed 16-bit integer | int16_t | int | |
uint16 | Unsigned 16-bit integer | uint16_t | int | |
int32 | Signed 32-bit integer | int32_t | int | |
uint32 | Unsigned 32-bit integer | uint32_t | int | |
int64 | Signed 64-bit integer | int64_t | long | |
uint64 | Unsigned 64-bit integer | uint64_t | long | |
float32 | 32-bit IEEE float | float | float | |
float64 | 64-bit IEEE float | double | float | |
string | ASCII string | std::string | string | ROS不支持Unicode,请使用UTF-8 |
time | secs/nsecs unsigned 32-bit ints | ros::Time | rospy.Time | duration |
可以看到C++比Python有更多的原生数据类型,需要注意的是:C++节点和Python节点之间交换数据的时候,需要注意一些取值范围的问题,比如说,C++中的Uint8表示无符号整数,当Python中的小于0或大于255的数值传过来时,就解释成一个8位无符号整数,这就会导致一些无法预测的bug,所以说在Python中使用范围受限的ROS类型时一定要小心。
有的时候,这些内置消息类型不够用,我们需要自己定义一些消息,这些自定义的数据类型在ROS同样是“一等公民”,跟内置消息数据类型并无差别对待。
5.2、定义新消息
在ROS中自定义消息,是用ROS包中的msg目录中的消息定义文件来说明,然后catkin_make编译成与语言有关的实现版本,这样就可以在代码中使用自定义类型了,先来看一个定义复数的一个例子,看下具体是怎么操作的:
5.2.1、msg文件
消息文件的定义很简单直观,类型 名称
cd ~/catkin_ws/src/test
mkdir msg
cd msg
gedit Complex.msg
定义实部和虚部,内容如下:
float32 real
float32 imaginary
5.2.2、修改package.xml
为了让ROS生成语言相关的消息代码,我们需要告知构建系统新消息的定义,所以需要对package.xml和CMakeLists.txt文件进行修改。
编辑package.xml文件
cd ~/catkin_ws/src/test
gedit package.xml
添加以下两个节点:
<build_depend>message_generation</build_depend>
<run_depend>message_runtime</run_depend>
后面编译的时候如果出错:
CMake Error at /opt/ros/melodic/share/catkin/cmake/catkin_package.cmake:224 (message):
catkin_package() DEPENDS on the catkin package 'message_runtime' which must
therefore be listed as a run dependency in the package.xml
属于版本问题,将节点名称<run_depend>修改为<exec_depend>即可
5.2.3、修改CMakeLists.txt
接着编辑CMakeLists.txt文件:gedit CMakeLists.txt
添加message_generation,让catkin能够找到message_generation包:
find_package(catkin REQUIRED COMPONENTS rospy std_msgs message_generation)
运行时使用消息:
catkin_package(CATKIN_DEPENDS message_runtime)
告诉catkin我们想要编译它们:
add_message_files(FILES Complex.msg)
以及将下面这个注释去掉:
generate_messages(
DEPENDENCIES
std_msgs
)
5.2.4、编译
定义好了之后,catkin就知道了如何编译了,接下来就回到工作区根目录进行编译
cd ~/catkin_ws
catkin_make
编译成功,没有问题。我们也可以来看下这个消息编译后的代码,当然文件所在路径,不同点取决于你的Python版本:
cd ~/catkin_ws/devel/lib/python2.7/dist-packages/test/msg
cat _Complex.py
代码如下:
# This Python file uses the following encoding: utf-8
"""autogenerated by genpy from test/Complex.msg. Do not edit."""
import codecs
import sys
python3 = True if sys.hexversion > 0x03000000 else False
import genpy
import struct
class Complex(genpy.Message):
_md5sum = "54da470dccf15d60bd273ab751e1c0a1"
_type = "test/Complex"
_has_header = False # flag to mark the presence of a Header object
_full_text = """float32 real
float32 imaginary
"""
__slots__ = ['real','imaginary']
_slot_types = ['float32','float32']
def __init__(self, *args, **kwds):
"""
Constructor. Any message fields that are implicitly/explicitly
set to None will be assigned a default value. The recommend
use is keyword arguments as this is more robust to future message
changes. You cannot mix in-order arguments and keyword arguments.
The available fields are:
real,imaginary
:param args: complete set of field values, in .msg order
:param kwds: use keyword arguments corresponding to message field names
to set specific fields.
"""
if args or kwds:
super(Complex, self).__init__(*args, **kwds)
# message fields cannot be None, assign default values for those that are
if self.real is None:
self.real = 0.
if self.imaginary is None:
self.imaginary = 0.
else:
self.real = 0.
self.imaginary = 0.
def _get_types(self):
"""
internal API method
"""
return self._slot_types
def serialize(self, buff):
"""
serialize message into buffer
:param buff: buffer, ``StringIO``
"""
try:
_x = self
buff.write(_get_struct_2f().pack(_x.real, _x.imaginary))
except struct.error as se: self._check_types(struct.error("%s: '%s' when writing '%s'" % (type(se), str(se), str(locals().get('_x', self)))))
except TypeError as te: self._check_types(ValueError("%s: '%s' when writing '%s'" % (type(te), str(te), str(locals().get('_x', self)))))
def deserialize(self, str):
"""
unpack serialized message in str into this message instance
:param str: byte array of serialized message, ``str``
"""
if python3:
codecs.lookup_error("rosmsg").msg_type = self._type
try:
end = 0
_x = self
start = end
end += 8
(_x.real, _x.imaginary,) = _get_struct_2f().unpack(str[start:end])
return self
except struct.error as e:
raise genpy.DeserializationError(e) # most likely buffer underfill
def serialize_numpy(self, buff, numpy):
"""
serialize message with numpy array types into buffer
:param buff: buffer, ``StringIO``
:param numpy: numpy python module
"""
try:
_x = self
buff.write(_get_struct_2f().pack(_x.real, _x.imaginary))
except struct.error as se: self._check_types(struct.error("%s: '%s' when writing '%s'" % (type(se), str(se), str(locals().get('_x', self)))))
except TypeError as te: self._check_types(ValueError("%s: '%s' when writing '%s'" % (type(te), str(te), str(locals().get('_x', self)))))
def deserialize_numpy(self, str, numpy):
"""
unpack serialized message in str into this message instance using numpy for array types
:param str: byte array of serialized message, ``str``
:param numpy: numpy python module
"""
if python3:
codecs.lookup_error("rosmsg").msg_type = self._type
try:
end = 0
_x = self
start = end
end += 8
(_x.real, _x.imaginary,) = _get_struct_2f().unpack(str[start:end])
return self
except struct.error as e:
raise genpy.DeserializationError(e) # most likely buffer underfill
_struct_I = genpy.struct_I
def _get_struct_I():
global _struct_I
return _struct_I
_struct_2f = None
def _get_struct_2f():
global _struct_2f
if _struct_2f is None:
_struct_2f = struct.Struct("<2f")
return _struct_2f
可以看到生成的这个消息里面的函数主要就是做序列化和反序列化操作,序列化(封包)消息到缓冲区,然后反序列化(解包)到消息实例中。其他一些相关信息也解释下:
_md5sum = "54da470dccf15d60bd273ab751e1c0a1"
MD5校验和,这个主要是ROS用它来确保消息是正确版本。每次修改消息文件,可能都需要重新catkin_make编译,因为可能存在其他文件引用了此消息类型,比如C++就是将这个MD5校验和编译进了可执行文件,所以需要再次编译,对于Python的字节码文件(.pyc)也需要再次编译,保证它们的校验和能够匹配起来。
_type = "test/Complex"
类型定义
_full_text = """float32 real
float32 imaginary
"""
__slots__ = ['real','imaginary']
_slot_types = ['float32','float32']
这里就是实部与虚部定义的类型。
6、使用自定义消息
上面的消息被定义好了之后,接下来我们来看下如何使用它。
6.1、消息发布
cd ~/catkin_ws/src/test/src
gedit message_publisher.py
#!/usr/bin/env python
import rospy
from test.msg import Complex
from random import random
rospy.init_node('message_publisher')
pub = rospy.Publisher('complex',Complex)
rate = rospy.Rate(2)
while not rospy.is_shutdown():
msg = Complex()
msg.real = random()
msg.imaginary = random()
pub.publish(msg)
rate.sleep()
代码跟原来差不多,没什么区别,将自定义的Complex类型跟导入其他标准类型一样导入,然后就可以创建消息类的实例了,有了实例就可以给里面的字段赋值了。
6.2、消息订阅
有了消息发布的代码之后,来写一个订阅的,跟前面的方法一样,也是使用回调函数来触发:
cd ~/catkin_ws/src/test/src
gedit message_subscriber.py
#!/usr/bin/env python
import rospy
from test.msg import Complex
def callback(msg):
print('Real:',msg.real)
print('Imaginary:',msg.imaginary)
rospy.init_node('message_subscriber')
sub = rospy.Subscriber('complex',Complex,callback)
rospy.spin()
最后也添加执行权限:chmod u+x message_subscriber.py
然后开三个终端分别输入以下命令:
roscore
rosrun test message_publisher.py
rosrun test message_subscriber.py
输出结果如下:
('Real:', 0.7348452806472778)
('Imaginary:', 0.24286778271198273)
('Real:', 0.8839530348777771)
('Imaginary:', 0.4728539288043976)
('Real:', 0.45228102803230286)
('Imaginary:', 0.9694715142250061)
('Real:', 0.7523669004440308)
('Imaginary:', 0.17689673602581024)
6.3、rosmsg命令
先来看下这条消息有关的命令的帮助:rosmsg -h
rosmsg is a command-line tool for displaying information about ROS Message types.
Commands:
rosmsg show Show message description 显示消息定义
rosmsg info Alias for rosmsg show 是show的别名
rosmsg list List all messages 列出所有消息
rosmsg md5 Display message md5sum 显示MD5校验和
rosmsg package List messages in a package 列出包中所有消息
rosmsg packages List packages that contain messages 列出包含消息的所有包Type rosmsg <command> -h for more detailed usage
查看这个Complex消息内容:rosmsg show Complex
[test/Complex]:
float32 real
float32 imaginary
验证MD5校验和的值: rosmsg md5 Complex
[test/Complex]: 54da470dccf15d60bd273ab751e1c0a1
恩,没有问题,可以看到输出的内容跟代码里面的是一样的。
如果一个消息包含另一个消息,也可以递归显示出来,比如:rosmsg show PointStamped
[geometry_msgs/PointStamped]:
std_msgs/Header header
uint32 seq
time stamp
string frame_id
geometry_msgs/Point point
float64 x
float64 y
float64 z
包含header和point,都是ROS类型
列举定义了消息的所有包:rosmsg packages
actionlib
actionlib_msgs
actionlib_tutorials
bond
control_msgs
controller_manager_msgs
diagnostic_msgs
dynamic_reconfigure
gazebo_msgs
geometry_msgs
map_msgs
nav_msgs
pcl_msgs
roscpp
rosgraph_msgs
rospy_tutorials
sensor_msgs
shape_msgs
smach_msgs
std_msgs
stereo_msgs
test
tf
tf2_msgs
theora_image_transport
trajectory_msgs
turtle_actionlib
turtlesim
visualization_msgs
列举test包定义的消息:rosmsg package test
test/Complex
列举传感器包定义的消息:rosmsg package sensor_msgs
sensor_msgs/BatteryState
sensor_msgs/CameraInfo
sensor_msgs/ChannelFloat32
sensor_msgs/CompressedImage
sensor_msgs/FluidPressure
sensor_msgs/Illuminance
sensor_msgs/Image
sensor_msgs/Imu
sensor_msgs/JointState
sensor_msgs/Joy
sensor_msgs/JoyFeedback
sensor_msgs/JoyFeedbackArray
sensor_msgs/LaserEcho
sensor_msgs/LaserScan
sensor_msgs/MagneticField
sensor_msgs/MultiDOFJointState
sensor_msgs/MultiEchoLaserScan
sensor_msgs/NavSatFix
sensor_msgs/NavSatStatus
sensor_msgs/PointCloud
sensor_msgs/PointCloud2
sensor_msgs/PointField
sensor_msgs/Range
sensor_msgs/RegionOfInterest
sensor_msgs/RelativeHumidity
sensor_msgs/Temperature
sensor_msgs/TimeReference
查看其中相机的消息定义:rosmsg show sensor_msgs/CameraInfo
std_msgs/Header header
uint32 seq
time stamp
string frame_id
uint32 height
uint32 width
string distortion_model
float64[] D
float64[9] K
float64[9] R
float64[12] P
uint32 binning_x
uint32 binning_y
sensor_msgs/RegionOfInterest roi
uint32 x_offset
uint32 y_offset
uint32 height
uint32 width
bool do_rectify
7、发布者与订阅者协同工作
前面是单个的发布者和订阅者,但是节点也可以同时是一个发布者和订阅者,或者拥有多个订阅和发布。实际上在ROS中,节点最常做的事情就是传递消息,并在消息上进行计算,一起来看一个例子
cd ~/catkin_ws/src/test/src
#发布者:
gedit doubler_pub.py
#!/usr/bin/env python
import rospy
from std_msgs.msg import Int32
import random
rospy.init_node('doubler_pub')
pub = rospy.Publisher('number',Int32)
rate = rospy.Rate(2)
while not rospy.is_shutdown():
num = random.randint(1,9)
pub.publish(num)
rate.sleep()
添加执行权限:chmod u+x doubler_pub.py
运行:rosrun test doubler_pub.py
这里就订阅上面的发布,然后做加倍处理之后再发布,也就是同时拥有了订阅和发布的功能
cd ~/catkin_ws/src/test/src
#加倍操作:
gedit doubler.py
#!/usr/bin/env python
import rospy
from std_msgs.msg import Int32
rospy.init_node('doubler')
def callback(msg):
print(msg.data)
doubled = msg.data * 2
pub.publish(doubled)
sub = rospy.Subscriber('number',Int32,callback)
pub = rospy.Publisher('doubled',Int32)
rospy.spin()
添加执行权限:chmod u+x doubler.py
运行:rosrun test doubler.py
订阅者:gedit doubler_sub.py
#!/usr/bin/env python
import rospy
from std_msgs.msg import Int32
def callback(msg):
print(msg.data)
rospy.init_node('doubler_sub')
sub = rospy.Subscriber('doubled',Int32,callback)
rospy.spin()
添加执行权限:chmod u+x doubler_sub.py
运行:rosrun test doubler_sub.py
如下图:
这里的加倍操作,就是在接收到新数据之后才发布,也就是说这个节点就是起到一个信息计算处理的一个作用,然后将加倍的数据发布出去。
最后做一个订阅者查看下结果,输出比对发现确实是将数据做了加倍处理。