7.5.1 需求及设计
又到了小鱼老师带着做最佳实践项目了。需求:做一个在各个房间不断巡逻并记录图像的机器人。
到达目标点后首先通过语音播放到达目标点信息,
再通过摄像头拍摄一张图片保存到本地。
7.5.2 编写巡检控制节点
在chapt7_ws/src下新建功能包,添加rclpy和nav2_simple_commander依赖。
目录src/autopartol_robot/autopartol_robot/新建文件patrol_node.py
from geometry_msgs.msg import PoseStamped,Pose
from nav2_simple_commander.robot_navigator import BasicNavigator,TaskResult
import rclpy
from rclpy.node import Node
import rclpy.time
from tf2_ros import TransformListener, Buffer #坐标监听器
from tf_transformations import quaternion_from_euler,euler_from_quaternion# 欧拉角转换
class PatrolNode(BasicNavigator):
def __init__(self,node_name='patrol_node'):
super().__init__(node_name)
self.buffer_ = Buffer()
self.listner_ = TransformListener(self.buffer_,self)
#声明相关参数
self.declare_parameter('initial_point',[0.0, 0.0, 0.0])
self.declare_parameter('target_points',[0.0, 0.0, 0.0, 1.0, 1.0, 1.57])
self.initial_point_ = self.get_parameter('initial_point').value
self.initial_points_= self.get_parameter('target_points').value
def get_pose_by_xyyaw(self, x, y, yaw):
# 通过x,y,yaw return PoseStamped对象
pose = PoseStamped()
pose.header.frame_id = 'map'
pose.pose.position.x = x
pose.pose.position.y = y
quat = quaternion_from_euler(0,0,yaw)
pose.pose.orientation.x =quat[0]
pose.pose.orientation.y =quat[1]
pose.pose.orientation.z =quat[2]
pose.pose.orientation.w =quat[3]
return pose
def init_robot_pose(self):
#初始化机器人位姿
self.initial_point_ = self.get_parameter('initial_point').value
init_pose = self.get_pose_by_xyyaw(self.initial_point_[0],self.initial_point_[1],self.initial_point_[2])
self.setInitialPose(init_pose)
self.waitUntilNav2Active()#等待导航可用
def get_target_points(self):
#通过参数值获取目标点集合
points = []
self.target_points_ = self.get_parameter('target_points').value
for index in range(int(len(self.target_points_)/3)):
x = self.target_points_[index*3]
y = self.target_points_[index*3+1]
yaw = self.target_points_[index*3+2]
points.append([x,y,yaw])
self.get_logger().info(f'获取到目标点:{index}->{x},{y},{yaw}')
return points
def nav_to_pose(self,target_pose):
#导航到指定位姿
self.waitUntilNav2Active()
self.goToPose(target_pose)
while not self.isTaskComplete():
feedback = self.getFeedback()
self.get_logger().info(f'剩余距离:{feedback.distance_remaining }')
reslut = self.getResult()
self.get_logger().info(f'导航结果:{reslut}')
def get_cuurent_pose(self):
#通过TF获取当前位姿
while rclpy.ok():
try:
result = self.buffer_.lookup_transform('map','base_footprint',
rclpy.time.Time(seconds=0),rclpy.time.Duration(seconds=1))
transform = result.transform
self.get_logger().info(f'平移:{transform.translation}')
# self.get_logger().info(f'旋转:{transform.rotation}')
# ratation_euler = euler_from_quaternion([
# transform.rotation.x,
# transform.rotation.y,
# transform.rotation.z,
# transform.rotation.w]
# )
# self.get_logger().info(f'平移rpy:{ratation_euler}')
return transform
except Exception as e:
self.get_logger().warn(f'获取坐标转换异常{str(e)}')
def main():
rclpy.init()
patrol = PatrolNode() #节点
patrol.initial_pose()
while rclpy.ok():
points = patrol.get_target_points()
for point in points:
x,y,yaw = point[0],point[1],point[2]
target_pose = patrol.get_pose_by_xyyaw(x,y,yaw)
patrol.nav_to_pose(target_pose)
rclpy.shutdown()
基本上吧之前的单接口调用综合起来。
为方便参数配置,在src/autopartol_robot/新建目录config,新增配置文件
patrol_config.yaml
patrol_node:
ros__parameters:
initial_point: [0.0, 0.0, 0.0]
target_points: [
0.0,0.0,0.0,
1.0,2.0,3.14,
-4.5,1.5,1.57,
-8.0,-5.0,1.57,
1.0,-5.0,3.14,
]
注意目标点的选取是机器人可达的位置,不能选到障碍物。
启动gazebo模拟器,启动nav2.
再启动patrol_node
ros2 run autopartol_robot patrol_node --ros-args --params-file /home/bohu/chapt7/chapt7_ws/install/autopartol_robot/share/autopartol_robot/config/patrol_config.yaml
会发现等一会开始初始化地图后,开始沿着设定目标点运动。效果如下
也有异常情况,机器人靠墙太近,gazebo看出距离墙还有段距离,但是在rviz里面局部代价地图有一点变形,导致机器以为有障碍物卡住的现象。
7.5.3 添加语音播报功能
在chatpt7_ws/src下新建功能包autopatrol_interfaces,功能包下新建目录srv,srv新新建接口文件
speachText.srv
string text # 合成文字
---
bool result # 合成结果
在CMakeLists.txt注册,并在package.xml添加标签
<member_of_group>rosidl_interface_packages</member_of_group>
重新构建
src/autopartol_robot/autopartol_robot/目录下新建文件speaker.py
import rclpy
from rclpy.node import Node
from autopatrol_interfaces.srv import SpeachText
import espeakng
class Speaker(Node):
def __init__(self):
super().__init__('speaker')
self.speech_service_ = self.create_service(SpeachText,'speech_text',
self.speech_text_callback)
self.speaker_ = espeakng.Speaker()
self.speaker_.voice ='zh'
def speech_text_callback(self,request,response):
self.get_logger().info(f'正在朗读 {request.text}')
self.speaker_.say(request.text)
self.speaker_.wait()
response.result = True
return response
def main():
rclpy.init()
node = Speaker()
rclpy.spin(node)
rclpy.shutdown()
修改patrol_node.py,增加语音合成调用
from geometry_msgs.msg import PoseStamped,Pose
from nav2_simple_commander.robot_navigator import BasicNavigator,TaskResult
import rclpy
from rclpy.node import Node
import rclpy.time
from tf2_ros import TransformListener, Buffer #坐标监听器
from tf_transformations import quaternion_from_euler,euler_from_quaternion# 欧拉角转换
from autopatrol_interfaces.srv import SpeachText
class PatrolNode(BasicNavigator):
def __init__(self,node_name='patrol_node'):
super().__init__(node_name)
self.buffer_ = Buffer()
self.listner_ = TransformListener(self.buffer_,self)
#声明相关参数
self.declare_parameter('initial_point',[0.0, 0.0, 0.0])
self.declare_parameter('target_points',[0.0, 0.0, 0.0, 1.0, 1.0, 1.57])
self.initial_point_ = self.get_parameter('initial_point').value
self.initial_points_= self.get_parameter('target_points').value
self.speech_client_ = self.create_client(SpeachText,'speech_text')
def get_pose_by_xyyaw(self, x, y, yaw):
# 通过x,y,yaw return PoseStamped对象
pose = PoseStamped()
pose.header.frame_id = 'map'
pose.pose.position.x = x
pose.pose.position.y = y
quat = quaternion_from_euler(0,0,yaw)
pose.pose.orientation.x =quat[0]
pose.pose.orientation.y =quat[1]
pose.pose.orientation.z =quat[2]
pose.pose.orientation.w =quat[3]
return pose
def init_robot_pose(self):
#初始化机器人位姿
self.initial_point_ = self.get_parameter('initial_point').value
init_pose = self.get_pose_by_xyyaw(self.initial_point_[0],self.initial_point_[1],self.initial_point_[2])
self.setInitialPose(init_pose)
self.waitUntilNav2Active()#等待导航可用
def get_target_points(self):
#通过参数值获取目标点集合
points = []
self.target_points_ = self.get_parameter('target_points').value
for index in range(int(len(self.target_points_)/3)):
x = self.target_points_[index*3]
y = self.target_points_[index*3+1]
yaw = self.target_points_[index*3+2]
points.append([x,y,yaw])
self.get_logger().info(f'获取到目标点:{index}->{x},{y},{yaw}')
return points
def nav_to_pose(self,target_pose):
#导航到指定位姿
self.waitUntilNav2Active()
self.goToPose(target_pose)
while not self.isTaskComplete():
feedback = self.getFeedback()
self.get_logger().info(f'剩余距离:{feedback.distance_remaining }')
reslut = self.getResult()
self.get_logger().info(f'导航结果:{reslut}')
def get_cuurent_pose(self):
#通过TF获取当前位姿
while rclpy.ok():
try:
result = self.buffer_.lookup_transform('map','base_footprint',
rclpy.time.Time(seconds=0),rclpy.time.Duration(seconds=1))
transform = result.transform
self.get_logger().info(f'平移:{transform.translation}')
# self.get_logger().info(f'旋转:{transform.rotation}')
# ratation_euler = euler_from_quaternion([
# transform.rotation.x,
# transform.rotation.y,
# transform.rotation.z,
# transform.rotation.w]
# )
# self.get_logger().info(f'平移rpy:{ratation_euler}')
return transform
except Exception as e:
self.get_logger().warn(f'获取坐标转换异常{str(e)}')
def speech_text(self,text):
#调用服务合成语音
while not self.speech_client_.wait_for_service(timeout_sec=1):
self.get_logger().info("wait for speech service")
request = SpeachText.Request()
request.text = text
future = self.speech_client_.call_async(request)
rclpy.spin_until_future_complete(self,future)
if future.result() is not None:
result = future.result().result
if result:
self.get_logger().info(f'语音合成成功:{text}')
else:
self.get_logger().warn(f'语音合成失败:{text}')
else:
self.get_logger().warn('语音合成服务请求失败')
def main():
rclpy.init()
patrol = PatrolNode() #节点
patrol.speech_text('正在准备初始化位置')
patrol.init_robot_pose()
patrol.speech_text('初始化位置完成')
while rclpy.ok():
points = patrol.get_target_points()
for point in points:
x,y,yaw = point[0],point[1],point[2]
target_pose = patrol.get_pose_by_xyyaw(x,y,yaw)
patrol.speech_text(text=f'准备前往目标点{x},{y}')
patrol.nav_to_pose(target_pose)
rclpy.shutdown()
效果跟上面类似,日志输出多了语音的
[speaker-2] [INFO] [1737017187.818829250] [speaker]: 正在朗读 准备前往目标点1.0,2.0
[patrol_node-1] [INFO] [1737017191.194245817] [patrol_node]: 语音合成成功:准备前往目标点1.0,2.0
[patrol_node-1] [INFO] [1737017195.311579068] [patrol_node]: Nav2 is ready for use!
[patrol_node-1] [INFO] [1737017195.314096555] [patrol_node]: Navigating to goal: 1.0 2.0...
[patrol_node-1] [INFO] [1737017195.509438991] [patrol_node]: 剩余距离:0.2445448786020279
[patrol_node-1] [INFO] [1737017195.612344544] [patrol_node]: 剩余距离:2.230710744857788
[patrol_node-1] [INFO] [1737017195.716231929] [patrol_node]: 剩余距离:2.230710744857788
[patrol_node-1] [INFO] [1737017195.819218225] [patrol_node]: 剩余距离:2.230710744857788
[patrol_node-1] [INFO] [1737017195.923079415] [patrol_node]: 剩余距离:2.230710744857788
7.5.4订阅图像并记录
from geometry_msgs.msg import PoseStamped,Pose
from nav2_simple_commander.robot_navigator import BasicNavigator,TaskResult
import rclpy
from rclpy.node import Node
import rclpy.time
from tf2_ros import TransformListener, Buffer #坐标监听器
from tf_transformations import quaternion_from_euler,euler_from_quaternion# 欧拉角转换
from autopatrol_interfaces.srv import SpeachText
from sensor_msgs.msg import Image #消息接口
from cv_bridge import CvBridge #图像转换
import cv2 #保存图像
class PatrolNode(BasicNavigator):
def __init__(self,node_name='patrol_node'):
super().__init__(node_name)
self.buffer_ = Buffer()
self.listner_ = TransformListener(self.buffer_,self)
#声明相关参数
self.declare_parameter('initial_point',[0.0, 0.0, 0.0])
self.declare_parameter('target_points',[0.0, 0.0, 0.0, 1.0, 1.0, 1.57])
self.initial_point_ = self.get_parameter('initial_point').value
self.initial_points_= self.get_parameter('target_points').value
self.speech_client_ = self.create_client(SpeachText,'speech_text')
# 订阅与保存图像相关定义
self.declare_parameter('image_save_path', '')
self.image_save_path = self.get_parameter('image_save_path').value
self.bridge = CvBridge()
self.latest_img_ = None
self.image_sub_ = self.create_subscription(Image,'/camera_sensor/image_raw',self.img_callback,1)
def img_callback(self,msg):
self.latest_img_ = msg
def record_img(self):
if self.latest_img_ is not Node:
pose = self.get_cuurent_pose()
cv_image = self.bridge.imgmsg_to_cv2(self.latest_img_)
cv2.imwrite(f'{self.image_save_path}img_{pose.translation.x:3.2f}_{pose.translation.y:3.2f}.png',
cv_image)
def get_pose_by_xyyaw(self, x, y, yaw):
# 通过x,y,yaw return PoseStamped对象
pose = PoseStamped()
pose.header.frame_id = 'map'
pose.pose.position.x = x
pose.pose.position.y = y
quat = quaternion_from_euler(0,0,yaw)
pose.pose.orientation.x =quat[0]
pose.pose.orientation.y =quat[1]
pose.pose.orientation.z =quat[2]
pose.pose.orientation.w =quat[3]
return pose
def init_robot_pose(self):
#初始化机器人位姿
self.initial_point_ = self.get_parameter('initial_point').value
init_pose = self.get_pose_by_xyyaw(self.initial_point_[0],self.initial_point_[1],self.initial_point_[2])
self.setInitialPose(init_pose)
self.waitUntilNav2Active()#等待导航可用
def get_target_points(self):
#通过参数值获取目标点集合
points = []
self.target_points_ = self.get_parameter('target_points').value
for index in range(int(len(self.target_points_)/3)):
x = self.target_points_[index*3]
y = self.target_points_[index*3+1]
yaw = self.target_points_[index*3+2]
points.append([x,y,yaw])
self.get_logger().info(f'获取到目标点:{index}->{x},{y},{yaw}')
return points
def nav_to_pose(self,target_pose):
#导航到指定位姿
self.waitUntilNav2Active()
self.goToPose(target_pose)
while not self.isTaskComplete():
feedback = self.getFeedback()
self.get_logger().info(f'剩余距离:{feedback.distance_remaining }')
reslut = self.getResult()
self.get_logger().info(f'导航结果:{reslut}')
def get_cuurent_pose(self):
#通过TF获取当前位姿
while rclpy.ok():
try:
result = self.buffer_.lookup_transform('map','base_footprint',
rclpy.time.Time(seconds=0),rclpy.time.Duration(seconds=1))
transform = result.transform
self.get_logger().info(f'平移:{transform.translation}')
# self.get_logger().info(f'旋转:{transform.rotation}')
# ratation_euler = euler_from_quaternion([
# transform.rotation.x,
# transform.rotation.y,
# transform.rotation.z,
# transform.rotation.w]
# )
# self.get_logger().info(f'平移rpy:{ratation_euler}')
return transform
except Exception as e:
self.get_logger().warn(f'获取坐标转换异常{str(e)}')
def speech_text(self,text):
#调用服务合成语音
while not self.speech_client_.wait_for_service(timeout_sec=1):
self.get_logger().info("wait for speech service")
request = SpeachText.Request()
request.text = text
future = self.speech_client_.call_async(request)
rclpy.spin_until_future_complete(self,future)
if future.result() is not None:
result = future.result().result
if result:
self.get_logger().info(f'语音合成成功:{text}')
else:
self.get_logger().warn(f'语音合成失败:{text}')
else:
self.get_logger().warn('语音合成服务请求失败')
def main():
rclpy.init()
patrol = PatrolNode() #节点
patrol.speech_text('正在准备初始化位置')
patrol.init_robot_pose()
patrol.speech_text('初始化位置完成')
while rclpy.ok():
points = patrol.get_target_points()
for point in points:
x,y,yaw = point[0],point[1],point[2]
target_pose = patrol.get_pose_by_xyyaw(x,y,yaw)
patrol.speech_text(text=f'准备前往目标点{x},{y}')
patrol.nav_to_pose(target_pose)
patrol.speech_text(text=f'已到达目标点{x},{y},准备记录图像')
patrol.record_img()
patrol.speech_text(text=f'图像记录完成')
rclpy.shutdown()
重新构建后运行,拍照效果如下