Apache-Pulsar安装操作说明

说明

Pulsar 是一种用于服务器到服务器消息传递的多租户高性能解决方案。

Pulsar 的主要特性如下:

对 Pulsar 实例中的多个集群的本机支持,并跨集群无缝地复制消息。
极低的发布和端到端延迟。
无缝可扩展至超过一百万个主题。
一个简单的客户端 API,具有Java、Go、Python和C++的绑定。
主题的多种订阅类型(独占、共享和故障转移)。
通过Apache BookKeeper提供的持久消息存储来保证消息传递。无服务器轻量级计算框架Pulsar Functions提供流原生数据处理功能。
基于 Pulsar Functions 构建的无服务器连接器框架Pulsar IO可以更轻松地将数据移入和移出 Apache Pulsar。
当数据老化时,分层存储将数据从热/温存储卸载到冷/长期存储(例如S3和GCS)。

安装包下载

本文使用的是apache-pulsar-3.2.2-bin.tar.gz版本

csdn下载 也可以自行去官网下载

解压目录

tar -zxvf apache-pulsar-3.2.2-bin.tar.gz

目录说明

目录描述
bin入口pulsar点脚本和许多其他命令行工具
conf配置文件,包括broker.conf
libPulsar 使用的 JAR
examplesPulsar 函数示例
instancesPulsar 函数的工件

启动Pulsar

bin/pulsar standalone

注意:需要保证jdk在17+

创建Topic

创建一个名为my-topic的topic

bin/pulsar-admin topics create persistent://public/default/my-topic

生产者发送消息

bin/pulsar-client produce my-topic --messages 'Hello Pulsar!'

消费者消费消息

测试批量发送消息

bin/pulsar-client produce my-topic --messages "$(seq -s, -f 'Message NO.%g' 1 10)"


重新消费

bin/pulsar-client consume my-topic -s 'my-subscription' -p Earliest -n 0

java生产消息

pom.xml

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>3.2.2</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-annotations</artifactId>
    <version>2.14.2</version>
</dependency>

代码

package com.pulsar.demo;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
 
public class PulsarProducer {
    private static final Logger log = LoggerFactory.getLogger(PulsarProducer.class);
    private static final String SERVER_URL = "pulsar://192.168.xxx:6650";
    public static void main(String[] args) throws Exception {
        // 构造Pulsar Client
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVER_URL)
                .enableTcpNoDelay(true)
                .build();
        // 构造生产者
        Producer<String> producer = client.newProducer(Schema.STRING)
                .producerName("my-producer")
                .topic("my-topic")
                .batchingMaxMessages(1024)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .enableBatching(true)
                .blockIfQueueFull(true)
                .maxPendingMessages(512)
                .sendTimeout(10, TimeUnit.SECONDS)
                .blockIfQueueFull(true)
                .create();
        // 同步发送消息
        MessageId messageId = producer.send("Hello World");
        log.info("message id is {}",messageId);
        System.out.println(messageId.toString());
        // 异步发送消息
        CompletableFuture<MessageId> asyncMessageId = producer.sendAsync("This is a async message");
        // 阻塞线程,直到返回结果
        log.info("async message id is {}",asyncMessageId.get());
 
        
        producer.close();
 
        // 关闭licent的方式有两种,同步和异步
        // client.close();
        client.closeAsync();
 
    }
}

java消费消息

package com.pulsar.demo;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.concurrent.TimeUnit;
 

public class PulsarConsumer {
    private static final String SERVER_URL = "pulsar://192.168.xxx:6650";
    private static final String topic = "persistent://public/default/my-topic"; // 要订阅的topic
 
    public static void main(String[] args) throws Exception {
        // 构造Pulsar Client
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVER_URL)
                .enableTcpNoDelay(true)
                .build();
        Consumer consumer = client.newConsumer()
                .consumerName("my-consumer")
                .topic("my-topic")
                .subscriptionName("my-subscription")
                .ackTimeout(10, TimeUnit.SECONDS)
                .maxTotalReceiverQueueSizeAcrossPartitions(10)
                .subscriptionType(SubscriptionType.Exclusive)
                .subscribe();
        while (true) {
            Message msg = consumer.receive();
 
            try {
                System.out.printf("Message received: %s\n", new String(msg.getData()));
 
                consumer.acknowledge(msg);
            } catch (Exception e) {
                consumer.negativeAcknowledge(msg);
            }
        }
    }
}

停止Pulsar

完成后,您可以关闭 Pulsar 集群。在启动集群的终端窗口中按Ctrl-C 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/524719.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C语言--#运算符和##运算符

#运算符和##运算符 这里提前补充一点printf C语言里面会天然的吧printf里面俩字符串合并为一个 #的使用 在C语言中&#xff0c;#符号主要用于预处理器指令。预处理器是在代码编译之前执行的&#xff0c;它处理所有以#开始的指令。以下是一些常见的使用情况&#xff1a;1. **…

自定义校验器

1.前端校验 <template><el-dialog:title"!dataForm.brandId ? 新增 : 修改":close-on-click-modal"false":visible.sync"visible"><el-form:model"dataForm":rules"dataRule"ref"dataForm"keyu…

MySQL-主从复制:概述、原理、同步数据一致性问题、搭建流程

主从复制 1. 主从复制概述 1.1 如何提升数据库并发能力 一般应用对数据库而言都是“读多写少”&#xff0c;也就说对数据库读取数据的压力比较大&#xff0c;有一个思路就是采用数据库集群的方案&#xff0c;做主从架构、进行读写分离&#xff0c;这样同样可以提升数据库的并…

基于R语言、MaxEnt模型融合技术的物种分布模拟、参数优化方法、结果分析制图与论文写作

第一章、理论篇&#xff1a;以问题导入的方式&#xff0c;深入掌握原理基础 什么是MaxEnt模型&#xff1f; MaxEnt模型的原理是什么&#xff1f;有哪些用途&#xff1f; MaxEnt运行需要哪些输入文件&#xff1f;注意那些事项&#xff1f; 融合R语言的MaxEnt模型的优势&…

从0到1搭建文档库——sphinx + git + read the docs

sphinx git read the docs 目录 一、sphinx 1 sphinx的安装 2 本地构建文件框架 1&#xff09;创建基本框架&#xff08;生成index.rst &#xff1b;conf.py&#xff09; conf.py默认内容 index.rst默认内容 2&#xff09;生成页面&#xff08;Windows系统下&#xf…

大厂高频面试题复习JAVA学习笔记-JUC多线程及高并发(上)

目录 0 JUC基础概念 wait/sleep的区别 并发与并行的区别 线程的六个状态 JUC结构 ​编辑 1 请谈谈你对volatile的理解 JMM&#xff08;java内存模型&#xff09; 可见性 不保证原子性 有序性​编辑 指令重排 哪些地方用到volatile&#xff1a; 双端检查机制DLC …

PaddleVideo:PP-TSM视频分类

本文记录&#xff1a;使用Paddle框架训练TSM&#xff08;Temporal Shift Module&#xff09; 前提条件&#xff1a;已经安装Paddle和PadleVideo&#xff0c;具体可参考前一篇文章。 1-数据准备&#xff1a; 以UCF101为例&#xff1a;内含13320 个短视频&#xff0c;视频类别&…

数据库相关知识总结

一、数据库三级模式 三个抽象层次&#xff1a; 1. 视图层&#xff1a;最高层次的抽象&#xff0c;描述整个数据库的某个部分的数据 2. 逻辑层&#xff1a;描述数据库中存储的数据以及这些数据存在的关联 3. 物理层&#xff1a;最低层次的抽象&#xff0c;描述数据在存储器中时如…

Prometheus-Grafana基础篇安装绘图

首先Prometheus安装 1、下载 https://prometheus.io/download/ 官网路径可以去这儿下载 2、如图&#xff1a; 3.解压&#xff1a; tar -xf prometheus-2.6.1.linux-amd64 cd prometheus-2.6.1.linux-amd64 4.配置文件说明&#xff1a; vim prometheus.yml 5.启动Promethe…

基于 Vue3 + Webpack5 + Element Plus Table 二次构建表格组件

基于 Vue3 Webpack5 Element Plus Table 二次构建表格组件 文章目录 基于 Vue3 Webpack5 Element Plus Table 二次构建表格组件一、组件特点二、安装三、快速启动四、单元格渲染配置说明五、源码下载地址 基于 Vue3 Webpack5 Element Plus Table 二次构建表格组件&#x…

c++的学习之路:15、list(2)

本章主要是讲模拟实现list&#xff0c;文章末附上代码。 目录 一、创建思路 二、构造函数 三、迭代器 四、增删 五、代码 一、创建思路 如下方代码&#xff0c;链表是由一块一块不连续的空间组成的&#xff0c;所以这里写了三个模板&#xff0c;一个是节点&#xff0c;一…

书生浦语训练营二期第三次作业

文章目录 基础作业1. 在茴香豆 Web 版中创建自己领域的知识问答助手第一轮对话第二轮对话第三轮对话第四轮对话第五轮对话 2.在 InternLM Studio 上部署茴香豆技术助手修改配置文件创建知识库运行茴香豆知识助手 基础作业 1. 在茴香豆 Web 版中创建自己领域的知识问答助手 我…

2-django、http、web框架、django及django请求生命周期、路由控制、视图层

1 http 2 web框架 3 django 3.1 django请求生命周期 4 路由控制 5 视图层 1 http #1 http 是什么 #2 http特点 #3 请求协议详情-请求首行---》请求方式&#xff0c;请求地址&#xff0c;请求协议版本-请求头---》key:value形式-referer&#xff1a;上一次访问的地址-user-agen…

Vue3与TypeScript中动态加载图片资源的解决之道

在前端开发中&#xff0c;Vue.js已成为一个备受欢迎的框架&#xff0c;尤其是在构建单页面应用时。Vue3的发布更是带来了许多性能优化和新特性&#xff0c;而TypeScript的加入则进一步提升了代码的可维护性和健壮性。然而&#xff0c;在实际的项目开发中&#xff0c;我们有时会…

校园圈子小程序,大学校园圈子,三段交付,源码交付,支持二开

介绍 在当今的数字化时代&#xff0c;校园社交媒体和在线论坛成为了学生交流思想、讨论问题以及分享信息的常用平台。特别是微信小程序&#xff0c;因其便捷性、用户基数庞大等特点&#xff0c;已逐渐成为构建校园社区不可或缺的一部分。以下是基于现有资料的校园小程序帖子发…

Redis中的Sentinel(六)

Sentinel 选举领头Sentinel. 当一个主服务器被判断为客观下线时&#xff0c;监视这个下线主服务器的各个Sentinel会进行协商&#xff0c;选举出一个领头Sentinel,并由领头 Sentinel对下线主服务器执行故障转移操作。以下是Redis选举领头Sentinel的规则和方法: 1.所有在线的S…

Python | Leetcode Python题解之第16题最接近的三数之和

题目&#xff1a; 题解&#xff1a; class Solution:def threeSumClosest(self, nums: List[int], target: int) -> int:nums.sort()n len(nums)best 10**7# 根据差值的绝对值来更新答案def update(cur):nonlocal bestif abs(cur - target) < abs(best - target):best…

基于Arduino nano配置银燕电调

1 目的 配置电调&#xff0c;设置电机转动方向&#xff0c;使得CW电机朝顺时针方向转动&#xff0c;CCW电机朝逆时针转动。 2 步骤 硬件 Arduino nano板子及USB线变阻器银燕电调EMAX Bullet 20A朗宇电机 2205 2300KV格氏电池3S杜邦线若干接线端子 软件 BLHeliSuite 注意…

【论文速读】| 大语言模型平台安全:将系统评估框架应用于OpenAI的ChatGPT插件

本次分享论文为&#xff1a;LLM Platform Security: Applying a Systematic Evaluation Framework to OpenAI’s ChatGPT Plugins 基本信息 原文作者&#xff1a;Umar Iqbal, Tadayoshi Kohno, Franziska Roesner 作者单位&#xff1a;华盛顿大学圣路易斯分校&#xff0c;华盛…

服务器主机安全受到危害的严重性

为了让小伙伴们了解到服务器主机安全受到危害的严重性&#xff0c;以下详细说明一下&#xff1a;1. 数据泄露&#xff1a;如果服务器主机遭受攻击&#xff0c;攻击者可能会窃取敏感数据&#xff0c;如用户数据、商业秘密、机密文件等&#xff0c;导致数据泄露和商业机密的泄漏。…