说明
Pulsar 是一种用于服务器到服务器消息传递的多租户高性能解决方案。
Pulsar 的主要特性如下:
对 Pulsar 实例中的多个集群的本机支持,并跨集群无缝地复制消息。
极低的发布和端到端延迟。
无缝可扩展至超过一百万个主题。
一个简单的客户端 API,具有Java、Go、Python和C++的绑定。
主题的多种订阅类型(独占、共享和故障转移)。
通过Apache BookKeeper提供的持久消息存储来保证消息传递。无服务器轻量级计算框架Pulsar Functions提供流原生数据处理功能。
基于 Pulsar Functions 构建的无服务器连接器框架Pulsar IO可以更轻松地将数据移入和移出 Apache Pulsar。
当数据老化时,分层存储将数据从热/温存储卸载到冷/长期存储(例如S3和GCS)。
安装包下载
本文使用的是apache-pulsar-3.2.2-bin.tar.gz版本
csdn下载 也可以自行去官网下载
解压目录
tar -zxvf apache-pulsar-3.2.2-bin.tar.gz
目录说明
目录 | 描述 |
---|---|
bin | 入口pulsar点脚本和许多其他命令行工具 |
conf | 配置文件,包括broker.conf |
lib | Pulsar 使用的 JAR |
examples | Pulsar 函数示例 |
instances | Pulsar 函数的工件 |
启动Pulsar
bin/pulsar standalone
注意:需要保证jdk在17+
创建Topic
创建一个名为my-topic的topic
bin/pulsar-admin topics create persistent://public/default/my-topic
生产者发送消息
bin/pulsar-client produce my-topic --messages 'Hello Pulsar!'
消费者消费消息
测试批量发送消息
bin/pulsar-client produce my-topic --messages "$(seq -s, -f 'Message NO.%g' 1 10)"
重新消费
bin/pulsar-client consume my-topic -s 'my-subscription' -p Earliest -n 0
java生产消息
pom.xml
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.14.2</version> </dependency>
代码
package com.pulsar.demo; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; public class PulsarProducer { private static final Logger log = LoggerFactory.getLogger(PulsarProducer.class); private static final String SERVER_URL = "pulsar://192.168.xxx:6650"; public static void main(String[] args) throws Exception { // 构造Pulsar Client PulsarClient client = PulsarClient.builder() .serviceUrl(SERVER_URL) .enableTcpNoDelay(true) .build(); // 构造生产者 Producer<String> producer = client.newProducer(Schema.STRING) .producerName("my-producer") .topic("my-topic") .batchingMaxMessages(1024) .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) .enableBatching(true) .blockIfQueueFull(true) .maxPendingMessages(512) .sendTimeout(10, TimeUnit.SECONDS) .blockIfQueueFull(true) .create(); // 同步发送消息 MessageId messageId = producer.send("Hello World"); log.info("message id is {}",messageId); System.out.println(messageId.toString()); // 异步发送消息 CompletableFuture<MessageId> asyncMessageId = producer.sendAsync("This is a async message"); // 阻塞线程,直到返回结果 log.info("async message id is {}",asyncMessageId.get()); producer.close(); // 关闭licent的方式有两种,同步和异步 // client.close(); client.closeAsync(); } }
java消费消息
package com.pulsar.demo; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; import java.util.concurrent.TimeUnit; public class PulsarConsumer { private static final String SERVER_URL = "pulsar://192.168.xxx:6650"; private static final String topic = "persistent://public/default/my-topic"; // 要订阅的topic public static void main(String[] args) throws Exception { // 构造Pulsar Client PulsarClient client = PulsarClient.builder() .serviceUrl(SERVER_URL) .enableTcpNoDelay(true) .build(); Consumer consumer = client.newConsumer() .consumerName("my-consumer") .topic("my-topic") .subscriptionName("my-subscription") .ackTimeout(10, TimeUnit.SECONDS) .maxTotalReceiverQueueSizeAcrossPartitions(10) .subscriptionType(SubscriptionType.Exclusive) .subscribe(); while (true) { Message msg = consumer.receive(); try { System.out.printf("Message received: %s\n", new String(msg.getData())); consumer.acknowledge(msg); } catch (Exception e) { consumer.negativeAcknowledge(msg); } } } }
停止Pulsar
完成后,您可以关闭 Pulsar 集群。在启动集群的终端窗口中按Ctrl-C 。