强烈推荐教程:
《ROS 2机器人开发从入门到实践》3.2.2订阅小说并合成语音_哔哩哔哩_bilibili
构建功能包
# create package demo_python_pkg
ros2 pkg create --build-type ament_python --license Apache-2.0 demo_python_pkg
自己写的代码放在./demo_python_pkg/demo_python_pkg目录下
发布者
import rclpy
from rclpy.node import Node
import requests
from example_interfaces.msg import String
from queue import Queue
class NovelPubNode(Node):
def __init__(self, node_name):
super().__init__(node_name)
self.novels_queue = Queue()
self.get_logger().info("NovelPubNode has been started")
self.novel_pub = self.create_publisher(String, "novel_topic", 10)
self.timer = self.create_timer(5,self.timer_callback)
def timer_callback(self):
if not self.novels_queue.empty():
line = self.novels_queue.get()
msg = String()
msg.data = line
self.novel_pub.publish(msg)
self.get_logger().info(f"Published novel: {line}")
def download_novel(self, url):
response = requests.get(url)
response.encoding = 'utf-8'
text = response.text
for text_line in text.splitlines():
self.novels_queue.put(text_line)
self.get_logger().info(f"Downloaded novel: {text}")
def main():
rclpy.init()
node = NovelPubNode("novel_pub_node")
node.download_novel("http://0.0.0.0:8000/novel_1.txt")
rclpy.spin(node)
node.shutdown()
订阅者
import time
import espeakng
import rclpy
from rclpy.node import Node
from example_interfaces.msg import String
from queue import Queue
import threading
class NovelSubNode(Node):
def __init__(self, node_name):
super().__init__(node_name)
self.get_logger().info("NovelSubNode has been created!")
self.novels_queue = Queue()
self.create_subscription(String, "novel_topic", self.novel_callback, 10)
self.say_thread = threading.Thread(target=self.say)
self.say_thread.start()
def novel_callback(self, msg):
self.novels_queue.put(msg.data)
def say(self):
engine = espeakng.Speaker()
engine.voice = "zh"
while rclpy.ok():
if not self.novels_queue.empty():
novel = self.novels_queue.get()
engine.say(novel)
print("Said: " + novel)
engine.wait()
else:
time.sleep(1)
def main():
rclpy.init()
node = NovelSubNode("novel_sub_node")
rclpy.spin(node)
rclpy.shutdown()
代码完成后,配置setup.py文件
格式为:"名字 = 包名.文件名:函数名"
在根目录运行以下终端命令
# build package demo_python_pkg
colcon build
# source setup.bash
source install/setup.bash
运行以上命令后,得到build、install、log文件夹
可执行的节点文件在以下文件夹
./install/demo_python_pkg/lib/demo_python_pkg
运行节点命令
ros2 run demo_python_pkg python_pub_node
ros2 run demo_python_pkg python_sub_node
其他常用命令
# check if node is running
ros2 node list
# check if topic is published
ros2 topic list
# check topic content
ros2 topic echo /novel_topic
# check topic speed
ros2 topic hz /novel_topic
# check if service is available
ros2 service list