1.ros2概述
ROS2(Robot Operating System 2)是一个用于机器人应用程序的开源软件框架。它是ROS(Robot Operating System)的下一代版本,旨在改进和扩展原始ROS的特性,以适应更广泛的机器人应用场景和需求。ROS2的发展重点包括提高系统的实时性能、可靠性、可伸缩性和跨平台可移植性。
核心特点:
- 实时性能:ROS2设计考虑了实时系统要求,这对于自动驾驶汽车和工业自动化等应用至关重要。
- 跨平台:ROS2支持多种操作系统,如Linux、macOS、Windows,甚至包括RTOS(实时操作系统)。
- 可伸缩性:ROS2允许在不同规模的项目中使用,从单节点应用到大型分布式系统。
- 组件化:ROS2鼓励更模块化的软件设计,便于重用和替换系统中的各个部分。
- 安全性:ROS2提供了对安全通信的支持,这对于在安全关键型应用中部署至关重要。
主要组件:
- 核心组件:
- 通信:ROS2提供了一套先进的通信库,支持不同的通信模式,如发布/订阅、服务调用和动作通信。
- 构建工具:ament是ROS2的构建系统,类似于ROS1中的catkin。colcon是用于构建ROS2项目的工具,它也可以构建其他基于CMake的项目。
- 中间件:ROS2支持多种中间件,如DDS(Data Distribution Service),以提高系统的灵活性和性能。
-
机器人基础应用组件:这些组件包括感知、规划、控制等机器人功能模块。
-
扩展组件:提供调试、可视化和其他辅助功能,以支持开发和测试。
发展背景:
ROS1自2007年发布以来,在机器人研究社区中得到了广泛的应用。然而,随着机器人技术的进步和新的应用场景(如自动驾驶汽车)的出现,ROS1在实时性、可靠性和可伸缩性方面的局限性变得越来越明显。为了满足这些新的需求,ROS2应运而生。
转向ROS2的原因:
- 性能需求:新的应用场景需要更高的性能和可靠性。
- 标准化:ROS2遵循更现代的软件工程实践,支持更广泛的开发环境和工具。
- 社区发展:为了维持和扩大ROS社区,需要一个能够适应未来发展的新平台。
ROS2的发展代表了机器人操作系统技术的进步,它为机器人开发者提供了一个更加健壮和灵活的平台,以支持从研究到商业应用的转换。
2.创建ros2 包
在dev_ws/src/下创建功能包learn
在dev_ws/下编译colcon build(编译工作空间下的所有)
Source install/local_setuo.bash
3.节点 (编写发布者&订阅者的代码)
节点在机器人系统中的职责就是执行某些具体的任务,从计算机操作系统的角度来看,也叫做进程;
每个节点都是一个可以独立运行的可执行文件,比如执行某一个python程序,或者执行C++编译生成的结果,都算是运行了一个节点;
4.话题
https://book.guyuehome.com/ROS2/2.%E6%A0%B8%E5%BF%83%E6%A6%82%E5%BF%B5/2.4_%E8%AF%9D%E9%A2%98/
不需要写pub了,usb_cam会发布,我们只需要订阅相机的图片,然后处理检测可视化就行。
learning_topic/topic_webcam_pub.py
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
from sensor_msgs.msg import Image # 图像消息类型
from cv_bridge import CvBridge # ROS与OpenCV图像转换类
import cv2 # Opencv图像处理库
"""
创建一个发布者节点
"""
class ImagePublisher(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.publisher_ = self.create_publisher(Image, 'image_raw', 10) # 创建发布者对象(消息类型、话题名、队列长度)
self.timer = self.create_timer(0.1, self.timer_callback) # 创建一个定时器(单位为秒的周期,定时执行的回调函数)
self.cap = cv2.VideoCapture(0) # 创建一个视频采集对象,驱动相机采集图像(相机设备号)
self.cv_bridge = CvBridge() # 创建一个图像转换对象,用于稍后将OpenCV的图像转换成ROS的图像消息
def timer_callback(self):
ret, frame = self.cap.read() # 一帧一帧读取图像
if ret == True: # 如果图像读取成功
self.publisher_.publish(
self.cv_bridge.cv2_to_imgmsg(frame, 'bgr8')) # 发布图像消息
self.get_logger().info('Publishing video frame') # 输出日志信息,提示已经完成图像话题发布
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = ImagePublisher("topic_webcam_pub") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
learning_topic/topic_webcam_sub.py
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
from sensor_msgs.msg import Image # 图像消息类型
from cv_bridge import CvBridge # ROS与OpenCV图像转换类
import cv2 # Opencv图像处理库
import numpy as np # Python数值计算库
lower_red = np.array([0, 90, 128]) # 红色的HSV阈值下限
upper_red = np.array([180, 255, 255]) # 红色的HSV阈值上限
from paxini_object_detection import object_detect # 我们写的在另外一个地方就行
"""
创建一个订阅者节点
"""
class ImageSubscriber(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.sub = self.create_subscription(
Image, 'image_raw', self.listener_callback, 10) # 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度)
self.cv_bridge = CvBridge() # 创建一个图像转换对象,用于OpenCV图像与ROS的图像消息的互相转换
def object_detect(self, image):
hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) # 图像从BGR颜色模型转换为HSV模型
mask_red = cv2.inRange(hsv_img, lower_red, upper_red) # 图像二值化
contours, hierarchy = cv2.findContours(
mask_red, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE) # 图像中轮廓检测
for cnt in contours: # 去除一些轮廓面积太小的噪声
if cnt.shape[0] < 150:
continue
(x, y, w, h) = cv2.boundingRect(cnt) # 得到苹果所在轮廓的左上角xy像素坐标及轮廓范围的宽和高
cv2.drawContours(image, [cnt], -1, (0, 255, 0), 2)# 将苹果的轮廓勾勒出来
cv2.circle(image, (int(x+w/2), int(y+h/2)), 5,
(0, 255, 0), -1) # 将苹果的图像中心点画出来
cv2.imshow("object", image) # 使用OpenCV显示处理后的图像效果
cv2.waitKey(10)
def listener_callback(self, data):
self.get_logger().info('Receiving video frame') # 输出日志信息,提示已进入回调函数
image = self.cv_bridge.imgmsg_to_cv2(data, 'bgr8') # 将ROS的图像消息转化成OpenCV图像
self.object_detect(image) # 苹果检测
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = ImageSubscriber("topic_webcam_sub") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
entry_points={
'console_scripts': [
'topic_helloworld_pub = learning_topic.topic_helloworld_pub:main',
'topic_helloworld_sub = learning_topic.topic_helloworld_sub:main',
'topic_webcam_pub = learning_topic.topic_webcam_pub:main',
'topic_webcam_sub = learning_topic.topic_webcam_sub:main',
],
},
5.setup.py
from setuptools import setup
from setuptools import find_packages
PACKAGE_NAME = 'my_package'
DESCRIPTION = 'A short description of my package'
AUTHOR = 'My Name'
AUTHOR_EMAIL = 'my.name@example.com'
LICENSE = 'Apache License, Version 2.0'
URL = 'https://github.com/my_username/my_package'
VERSION = '1.0.0'
setup(
name=PACKAGE_NAME, # 设置软件包名称。
version=VERSION,
description=DESCRIPTION,
author=AUTHOR,
author_email=AUTHOR_EMAIL,
license=LICENSE,
url=URL,
packages=find_packages(exclude=['test']), # 设置将要安装的 Python 包。
install_requires=[ # 设置软件包安装时必需的 Python 包列表。
'setuptools', # 必需的包,即 setuptools。
'numpy', # 必需的包,即 numpy。
'cython'
],
entry_points={ # 设置命令行脚本入口点的字典。
'console_scripts': [ # 设置命令行脚本的类别。
'my_script = my_package.my_script:main' # 命令行脚本名称和对应的 Python 模块和方法名称。
],
},
zip_safe=True # 确定软件包在安装后是否以压缩文件形式分发,如果是,则设为 True,
)
6.rclpy.spin(node)
-
接收到新的消息或服务请求时,会调用回调函数。
-
定期计时器触发时,会调用回调函数。
-
检查是否有调用回调函数等待的期间内新的客户端连接,如果有,会调用回调函数。
7.编译
Colcon build
. install/setup.bash
Ros run pkgname nodename
8.publish rgb 然后yolo目标检测
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
from sensor_msgs.msg import Image # 图像消息类型
from cv_bridge import CvBridge # ROS与OpenCV图像转换类
import cv2 # Opencv图像处理库
import numpy as np # Python数值计算库
lower_red = np.array([0, 90, 128]) # 红色的HSV阈值下限
upper_red = np.array([180, 255, 255]) # 红色的HSV阈值上限
import open3d
from paxini_object_detection import paxini_3d_object_detect # 我们写的在另外一个地方就行
"""
创建一个订阅者节点
"""
class ImageSubscriber(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.sub = self.create_subscription(
Image, 'image_raw', self.listener_callback, 10) # 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度)
self.cv_bridge = CvBridge() # 创建一个图像转换对象,用于OpenCV图像与ROS的图像消息的互相转换
self.3d_detect = paxini_3d_object_detect()
def object_detect(self, image):
rgb = cv2.cvtColor(rgb, cv2.COLOR_BGR2HSV) # 图像从BGR颜色模型转换为HSV模型
dep = cv2.cvtColor(dep, cv2.COLOR_BGR2HSV) # 图像从BGR颜色模型转换为HSV模型
# 转点云
# detect
result = self.3d_detect(points,model_weights)
cv2.imshow("object", image) # 使用OpenCV显示处理后的图像效果
cv2.waitKey(10)
def listener_callback(self, data):
self.get_logger().info('Receiving video frame') # 输出日志信息,提示已进入回调函数
rgb = self.cv_bridge.imgmsg_to_cv2(data[0], 'bgr8') # 将ROS的图像消息转化成OpenCV rgb
dep = self.cv_bridge.imgmsg_to_cv2(data[1], 'bgr8') # 将ROS的图像消息转化成OpenCV depth
self.object_detect(rgb ,dep ) # 检测
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = ImageSubscriber("topic_webcam_sub") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
ros订阅深度图
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import std_msgs.msg
from sensor_msgs.msg import Image
import numpy as np
import cv2
class Image_saver(Node):
def __init__(self):
# 订阅了5个主题,4个方向的相机和一个键盘按键
super().__init__('subscriber')
self.count = 0
self.save_path = '/home/robot/test_hyx/calibration_images/'
self.dir_list = ['front/', 'right/', 'left/']
self.subscription1 = self.create_subscription(
Image,
'front_camera_topic',
self.listener_callback1,
10)
self.subscription2 = self.create_subscription(
Image,
'right_camera_topic',
self.listener_callback2,
10)
self.subscription3 = self.create_subscription(
Image,
'left_camera_topic',
self.listener_callback3,
10)
## self.subscription4 = self.create_subscription(
## Image,
## 'back_camera_topic',
## self.listener_callback4,
## 10)
#self.subscription # prevent unused variable warning
self.keyboard = self.create_subscription(
std_msgs.msg.UInt32,
'key_pressed',
self.keyboard_callback,
10)
#self.keyboard
self.cache = []
for _ in range(3):
self.cache.append(None)
def listener_callback1(self, msg):
self.cache[0] = (msg.height, msg.width, msg.data) # 把数据放入cache里面
def listener_callback2(self, msg):
self.cache[1] = (msg.height, msg.width, msg.data)
def listener_callback3(self, msg):
self.cache[2] = (msg.height, msg.width, msg.data)
## def listener_callback4(self, msg):
## self.cache[3] = (msg.height, msg.width, msg.data)
def keyboard_callback(self, msg): # 按下键盘开始保存
if msg.data == 32:
for i in range(3):
save_path = self.save_path + self.dir_list[i]+'%04d.jpg'%self.count
img = np.array(self.cache[i][2],dtype=np.uint8)
img = img.reshape(self.cache[i][0], self.cache[i][1], 3)
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
if i == 0:
img = cv2.flip(img, 0)
img = cv2.flip(img, 1)
#print(save_path)
#print(img)
cv2.imwrite(save_path, img)
print('images saved!')
self.count += 1
def main(args=None):
rclpy.init(args=args)
img_saver = Image_saver()
rclpy.spin(img_saver)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
minimal_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
键盘按键的发布者:
import sys
# pynput throws an error if we import it before $DISPLAY is set on LINUX
from pynput.keyboard import KeyCode
if sys.platform not in ('darwin', 'win32'):
import os
os.environ.setdefault('DISPLAY', ':0')
from pynput import keyboard
import rclpy
from rclpy.parameter import Parameter
import std_msgs.msg
class KeystrokeListen:
def __init__(self, name=None):
self.node = rclpy.create_node(name or type(self).__name__)
self.node.declare_parameter('exit_on_esc',True)
self.pub_glyph = self.node.create_publisher(std_msgs.msg.String, 'glyphkey_pressed', 10)
# todo: when ROS2 supports Enums, use them: https://github.com/ros2/rosidl/issues/260
# 上面订阅的是下面的空格键
self.pub_code = self.node.create_publisher(std_msgs.msg.UInt32, 'key_pressed_calib', 10)
if self.exit_on_esc:
self.logger.info('To end this node, press the escape key')
def spin(self):
# keyboard.Listener创建一个键盘监听器,将self.on_press和self.on_release方法分别作为按下和释放键的回调函数
with keyboard.Listener(on_press=self.on_press, on_release=self.on_release) as listener:
# rclpy.spin_once方法以处理ROS2的回调函数,同时检查监听器是否仍在运行。若监听器运行并且ROS2节点正常运行,则继续循环。
while rclpy.ok() and listener.running:
rclpy.spin_once(self.node, timeout_sec=0.1)
@property
def logger(self):
return self.node.get_logger()
@property
def exit_on_esc(self):
param = self.node.get_parameter('exit_on_esc')
if param.type_ != Parameter.Type.BOOL:
new_param = Parameter('exit_on_esc', Parameter.Type.BOOL, True)
self.logger.warn(
'Parameter {}={} is a {} but expected a boolean. Assuming {}.'.format(param.name,
param.value,
param.type_,
new_param.value),
once=True)
self.node.set_parameters([new_param])
param = new_param
value = param.value
assert isinstance(value, bool)
return value
def on_release(self, key):
# todo: implement this
pass
def on_press(self, key):
try:
char = getattr(key, 'char', None)
if isinstance(char, str):
#self.logger.info('pressed ' + char)
self.pub_glyph.publish(self.pub_glyph.msg_type(data=char)) # 发布key字符串
else:
try:
# known keys like spacebar, ctrl
name = key.name
vk = key.value.vk
except AttributeError:
# unknown keys like headphones skip song button
name = 'UNKNOWN'
vk = key.vk
#self.logger.info('pressed {} ({})'.format(name, vk))
# todo: These values are not cross-platform. When ROS2 supports Enums, use them instead
self.pub_code.publish(self.pub_code.msg_type(data=vk)) # 发布空格key?
except Exception as e:
self.logger.error(str(e))
raise
if key == keyboard.Key.esc and self.exit_on_esc:
self.logger.info('stopping listener')
raise keyboard.Listener.StopException
def main(args=None):
rclpy.init(args=args)
KeystrokeListen().spin()
if __name__ == '__main__':
main()
订阅深度图和RGB
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import std_msgs.msg
from sensor_msgs.msg import Image
import numpy as np
import cv2
class Image_saver(Node):
def __init__(self,name):
# 订阅了5个主题,4个方向的相机和一个键盘按键
super().__init__('subscriber')
self.count = 0
self.save_path = '/home/robot/test_hyx/calibration_images/'
self.subscription1 = self.create_subscription(
Image,
'ps_pub_depth_image_01',
self.listener_callback1,
10)
self.subscription2 = self.create_subscription(
Image,
'ps_pub_color_image_01',
self.listener_callback2,
10)
self.keyboard = self.create_subscription(
std_msgs.msg.UInt32,
'key_pressed',
self.keyboard_callback,
10)
#self.keyboard
self.cache = []
for _ in range(2):
self.cache.append(None)
def listener_callback1(self, msg): # dep
self.cache[0] = (msg.height, msg.width, msg.data) # 把数据放入cache里面
def listener_callback2(self, msg): # rgb
self.cache[1] = (msg.height, msg.width, msg.data)
def keyboard_callback(self, msg): # 按下键盘开始保存
if msg.data == 32:
save_dep_path = self.save_path + 'dep/'+'%04d.jpg'%self.count
save_rgb_path = self.save_path + 'rgb/'+'%04d.jpg'%self.count
dep_img = np.array(self.cache[0][2],dtype=np.uint16)
rgb_img = np.array(self.cache[1][2],dtype=np.uint8)
rgb_img = rgb_img .reshape(self.cache[1][0], self.cache[1][1], 3)
dep_img = rgb_img .reshape(self.cache[0][0], self.cache[0][1], 1)
rgb_img = cv2.cvtColor(rgb_img, cv2.COLOR_RGB2BGR)
cv2.imwrite(save_rgb_path , rgb_img )
cv2.imwrite(save_dep_path , dep_img )
print('-------images saved!------')
self.count += 1
def main(args=None):
rclpy.init(args=args)
# depth_img_saver = Image_saver('depth_img_saver = Image_saver()')
depth_img_saver = Image_saver()
rclpy.spin(depth_img_saver )
minimal_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
entry_points={
'console_scripts': [
'topic_depth_rgb_sub = paxini_robot_topic.topic_depth_rgb_sub:main',
],
},
订阅rgb同时2D监测+6d位姿估计并且发布结果
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
from sensor_msgs.msg import Image # 图像消息类型
from cv_bridge import CvBridge # ROS与OpenCV图像转换类
import cv2 # Opencv图像处理库
import numpy as np # Python数值计算库
from std_msgs.msg import Float64MultiArray
import zebra_pose_es # 我们写的在另外一个地方就行
class ImageSubscriber(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.sub = self.create_subscription(Image, 'image_raw', self.listener_callback,10) # 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度)
self.cv_bridge = CvBridge() # 创建一个图像转换对象,用于OpenCV图像与ROS的图像消息的互相转换
self.detect = zebra_pose_es()
self.publisher_ = self.create_publisher(Float64MultiArray, 'pose_res', 10) # 创建发布者对象(消息类型,话题名,队列长度)
def publish_result(self, result):
msg = Float64MultiArray()
msg.data = result.flatten() # 将结果展平为一维数组
self.publisher_.publish(msg) # 发布结果消息
def object_detect(self, rgb):
rgb = cv2.cvtColor(rgb, cv2.COLOR_BGR2HSV) # 图像从BGR颜色模型转换为HSV模型
result = self.detect(rgb)
# cv2.imshow("object", rgb) # 使用OpenCV显示处理后的图像效果
# cv2.waitKey(10)
self.publish_result(result) # 如何处理多个结果
def listener_callback(self, data):
self.get_logger().info('Receiving video frame') # 输出日志信息,提示已进入回调函数
rgb = self.cv_bridge.imgmsg_to_cv2(data[0], 'bgr8') # 将ROS的图像消息转化成OpenCV rgb
self.object_detect(rgb) # 检测
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = ImageSubscriber("topic_webcam_sub") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口