云原生时代顶流消息中间件Apache Pulsar部署实操之Pulsar IO与Pulsar SQL

文章目录

  • Pulsar IO (Connector连接器)
    • 基础定义
    • 安装Pulsar和内置连接器
    • 连接Pulsar到Cassandra
      • 安装cassandra集群
      • 配置Cassandra接收器
      • 创建Cassandra Sink
      • 验证Cassandra Sink结果
      • 删除Cassandra Sink
    • 连接Pulsar到PostgreSQL
      • 安装PostgreSQL集群
      • 配置JDBC接收器
      • 创建JDBC Sink
      • 验证JDBC Sink结果
  • Pulsar SQL
    • 定义
    • 简单使用

Pulsar IO (Connector连接器)

基础定义

Pulsar IO连接器能够轻松地创建、部署和管理与外部系统(如Apache Cassandra、Aerospike等)交互的连接器。IO连接器有两种类型:源连接器和接收器连接器。

image-20230212113058962

可以通过Connector Admin CLI使用源和接收器子命令管理Pulsar连接器(例如,在连接器上创建、更新、启动、停止、重新启动、重新加载、删除和执行其他操作)。有关最新和完整的信息,请参阅Pulsar管理文档。

安装Pulsar和内置连接器

在将Pulsar连接到数据库之前,需要先安装Pulsar和所需的内置连接器。要启用Pulsar连接器,您需要在下载页面上下载连接器的tarball版本。

# 下载最新版本2.11.0的pulsar-io-cassandra和pulsar-io-jdbc-postgres,需要什么连接器可以从官方查看是否支持并下载,这里举例就下载两个
https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.11.0/connectors/pulsar-io-cassandra-2.11.0.nar
https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.11.0/connectors/pulsar-io-jdbc-postgres-2.11.0.nar
# 在pulsar根目录下创建目录
mkdir connectors
# 将压缩文件移动connectors目录
mv pulsar-io-jdbc-postgres-2.11.0.nar pulsar-io-jdbc-postgres-2.11.0.nar connectors
# 重启pulsar
# 查看可用连接器列表
curl -w '\n' -s http://localhost:8080/admin/v2/functions/connectors

image-20230308101658342

连接Pulsar到Cassandra

安装cassandra集群

# 下载镜像并启动cassandra测试容器
docker run -d --rm --name=cassandra -p 9042:9042 cassandra
# 查看进程
docker ps
# 查看运行日志
docker logs cassandra
# 等待一小段时间后查看Cassandra集群状态
docker exec cassandra nodetool status
# 使用cqlsh连接到Cassandra集群

image-20230308091133556

# 使用cqlsh连接到Cassandra集群
docker exec -ti cassandra cqlsh localhost
# 创建一个密钥空间pulsar_itxs_keyspace
CREATE KEYSPACE pulsar_itxs_keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};
# 创建一个表pulsar_itxs_table
USE pulsar_itxs_keyspace;
CREATE TABLE pulsar_itxs_table (key text PRIMARY KEY, col text);

image-20230308092441377

配置Cassandra接收器

现在已经有一个Cassandra集群在本地运行;要运行Cassandra接收器连接器,需要准备一个配置文件,其中包括Pulsar连接器运行时需要知道的信息,例如Pulsar连接器如何找到Cassandra集群,Pulsar连接器用于写入Pulsar消息的键空间和表是什么等等;可以使用Json或者Yaml这两种格式创建配置文件。

vim examples/cassandra-sink.json

{
    "roots": "192.168.3.100:9042",
    "keyspace": "pulsar_itxs_keyspace",
    "columnFamily": "pulsar_itxs_table",
    "keyname": "key",
    "columnName": "col"
}

vim examples/cassandra-sink.yml

configs:
    roots: "192.168.3.100:9042"
    keyspace: "pulsar_itxs_keyspace"
    columnFamily: "pulsar_itxs_table"
    keyname: "key"
    columnName: "col"

创建Cassandra Sink

可以使用Connector Admin CLI创建sink连接器和操作。运行下面命令来创建一个Cassandra接收器连接器,接收器类型为Cassandra,配置文件为上一步创建的examples/cassandra-sink.yml。

bin/pulsar-admin sinks create \
    --tenant my-test \
    --namespace my-namespace \
    --name cassandra-itxs-sink \
    --sink-type cassandra \
    --sink-config-file examples/cassandra-sink.yml \
    --inputs persistent://my-test/my-namespace/itxs_cassandra    

命令执行后,Pulsar创建接收器连接器cassandra-itxs-sink。这个接收器连接器作为Pulsar函数运行,并将主题itxs_cassandra中产生的消息写入Cassandra表pulsar_itxs_table;

image-20230308103049990

可以使用Connector Admin CLI对连接器进行监控和其他操作。

  • 获取连接器的信息
bin/pulsar-admin sinks get \
  --tenant my-test \
  --namespace my-namespace \
  --name cassandra-itxs-sink
  • 检查连接器的状态
bin/pulsar-admin sinks status \
  --tenant my-test \
  --namespace my-namespace \
  --name cassandra-itxs-sink

验证Cassandra Sink结果

生成一些消息到Cassandra接收器itxs_cassandra的输入主题

for i in {0..9}; do bin/pulsar-client produce -m "itxskey-$i" -n 1 persistent://my-test/my-namespace/itxs_cassandra; done

再次查看连接器的状态,可以有10条记录处理统计信息

image-20230308103247012

查看Cassandra的pulsar_itxs_table

USE pulsar_itxs_keyspace;
select * from pulsar_itxs_table;

image-20230308105728900

删除Cassandra Sink

bin/pulsar-admin sinks delete \
    --tenant my-test \
    --namespace my-namespace \
    --name cassandra-itxs-sink

连接Pulsar到PostgreSQL

安装PostgreSQL集群

这里使用PostgreSQL 12 docker镜像在docker中启动一个单节点PostgreSQL集群。

# 从Docker中拉取PostgreSQL 12映像
docker pull postgres:12
# 启动postgres容器
docker run -d -it --rm \
    --name pulsar-postgres \
    -p 5432:5432 \
    -e POSTGRES_PASSWORD=password \
    -e POSTGRES_USER=postgres \
    postgres:12
# 查看运行日志
docker logs -f pulsar-postgres
# 进入容器
docker exec -it pulsar-postgres /bin/bash
# 使用默认用户名和密码登录PostgreSQL
psql -U postgres postgres
# 使用以下命令创建pulsar_postgres_jdbc_sink表:
create table if not exists pulsar_postgres_jdbc_sink
(
id serial PRIMARY KEY,
name VARCHAR(255) NOT NULL
);

配置JDBC接收器

现在有一个本地运行的PostgreSQ,接下来需要配置JDBC接收器连接器。

  • 创建配置文件vim connectors/pulsar-postgres-jdbc-sink.yaml
configs:
  userName: "postgres"
  password: "password"
  jdbcUrl: "jdbc:postgresql://192.169.3.100:5432/postgres"
  tableName: "pulsar_postgres_jdbc_sink"

创建JDBC Sink

执行下面命令后,Pulsar将创建接收器连接器pulse -postgres-jdbc-sink。这个sink连接器作为Pulsar函数运行,并将Topic为pulsar-postgres-jdbc-sink-topic中产生的消息写入PostgreSQL表pulsar_postgres_jdbc_sink。

bin/pulsar-admin sinks create \
    --tenant my-test \
    --namespace my-namespace \
    --archive ./connectors/pulsar-io-jdbc-postgres-2.11.0.nar \
    --inputs persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic \
    --name pulsar-postgres-my-jdbc-sink \
    --sink-config-file ./connectors/pulsar-postgres-jdbc-sink.yaml \
    --parallelism 1

列出所有的sink

bin/pulsar-admin sinks list \
    --tenant my-test \
    --namespace my-namespace

image-20230308140145820

验证JDBC Sink结果

通过JavaAPI生成一些消息到Cassandra接收器pulsar-postgres-jdbc-sink-topic这个主题,在Java项目添加maven依赖

    <properties>
        <pulsar.version>2.11.0</pulsar.version>
    </properties>        
        

        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client</artifactId>
            <version>${pulsar.version}</version>
        </dependency>

这里演示实体类成员变量简单就直接使用public声明了

package sn.itxs.pulsar.io;

public class User{
    public int id;
    public String name;
}

新增ClientDemo.java

package sn.itxs.pulsar.io;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.schema.AvroSchema;

public class ClientDemo {
    public static void main(String[] args) throws Exception {
        PulsarClient client = null;
        Producer<User> producer = null;
        try {
            client = PulsarClient.builder()
                    .serviceUrl("pulsar://192.168.5.52:6650")
                    .build();

            producer = client.newProducer(AvroSchema.of(User.class))
                    .topic("persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic")
                    .create();
            User user = new User();
            int index = 10;
            while (index++ < 20) {
                try {
                    user.id = index;
                    user.name = "this is a test " + index;
                    producer.newMessage().value(user).send();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            System.out.println("send finish");
        } catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (producer!=null){
                producer.close();
            }
            if (client!=null){
                client.close();
            }
        }
    }
}

运行程序后查看PostgreSQL表pulsar_postgres_jdbc_sink,已经有刚才

image-20230308163802240

上面由于在Java中创建了Schema,因此不需要手工创建,可以查看当前persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic主体已生成Schema信息如下:

image-20230308165053977

如果要从pulsar-admin命令行创建schema可以这样操作

  • 创建schema,创建一个avro-schema文件,将以下内容复制到该文件中,并将该文件放在pulsar/connectors文件夹中。vim connectors/avro-schema
{
  "type": "AVRO",
  "schema": "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}",
  "properties": {}
}
  • 上传schema到topic,将avro-schema模式上传到pulsar-postgres-jdbc-sink-topic主题
bin/pulsar-admin schemas upload persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic -f ./connectors/avro-schema
  • 检查模式是否上传成功。
bin/pulsar-admin schemas get persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic1

image-20230308145650209

如需stop停止、restart重启指定的sinks可以如下操作,当然也可以更新指定sinks,详细命令可以查阅官网

bin/pulsar-admin sinks stop \
    --tenant my-test \
    --namespace my-namespace \
    --name pulsar-postgres-my-jdbc-sink \

Pulsar SQL

定义

Apache Pulsar用于存储事件数据流,事件数据由预定义的字段构成。通过模式注册表的实现,可以在Pulsar中存储结构化数据,并使用Trino(以前是Presto SQL)查询数据。作为Pulsar SQL的核心,Pulsar Trino插件使Trino集群中的Trino worker能够查询来自Pulsar的数据.

image-20230308170103068

由于Pulsar采用了基于两级段的架构,因此查询性能高效且可扩展性强。Pulsar中的主题在Apache BookKeeper中存储为段。每个主题段被复制到一些BookKeeper节点上,从而支持并发读和高读吞吐量。在Pulsar Trino连接器中,数据直接从BookKeeper中读取,因此Trino worker可以同时从水平可扩展数量的BookKeeper节点中读取

image-20230308170332105

简单使用

在Pulsar中查询数据前,需要安装Pulsar和内置连接器。

# 这里演示就直接启动独立集群
PULSAR_STANDALONE_USE_ZOOKEEPER=1 ./bin/pulsar standalone
# 启动一个Pulsar SQL worker
./bin/pulsar sql-worker run
# 初始化Pulsar独立集群和SQL worker后,执行SQL CLI:
./bin/pulsar sql
show catalogs;
show schemas in pulsar;
show tables in pulsar."public/default";

image-20230308172341425

通过前面的Java示例,我们改为Json格式写入Pulsar的user-topic

package sn.itxs.pulsar.io;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;

public class ClientSqlDemo {
    public static void main(String[] args) throws Exception {
        PulsarClient client = null;
        Producer<User> producer = null;
        try {
            client = PulsarClient.builder()
                    .serviceUrl("pulsar://192.168.5.52:6650")
                    .build();

            producer = client.newProducer(Schema.JSON(User.class))
                    .topic("user-topic")
                    .create();
            User user = new User();
            int index = 10;
            while (index++ < 20) {
                try {
                    user.id = index;
                    user.name = "this is a test " + index;
                    producer.newMessage().value(user).send();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            System.out.println("send finish");
        } catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (producer!=null){
                producer.close();
            }
            if (client!=null){
                client.close();
            }
        }
    }
}

运行程序后再来查询就有刚才发送的消息数据,_开头的字段为Pulsar 自带的。

select * from pulsar."public/default"."user-topic";

image-20230308175830023

  • 本人博客网站IT小神 www.itxiaoshen.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/2012.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【网络】网络层协议——IP

目录网络层IP协议IP基础知识IP地址IP报头格式网段划分CIDR特殊的IP地址IP地址的数量限制私有IP地址和公有IP地址路由IP总结网络层 在复杂的网络环境中确定一个合法的路径。 IP协议 IP协议作为整个TCP/IP中至关重要的协议&#xff0c;主要负责将数据包发送给最终的目标计算机…

多线程 (六) 单例模式

&#x1f389;&#x1f389;&#x1f389;点进来你就是我的人了 博主主页&#xff1a;&#x1f648;&#x1f648;&#x1f648;戳一戳,欢迎大佬指点!人生格言&#xff1a;当你的才华撑不起你的野心的时候,你就应该静下心来学习! 欢迎志同道合的朋友一起加油喔&#x1f9be;&am…

蓝桥杯刷题冲刺 | 倒计时19天

作者&#xff1a;指针不指南吗 专栏&#xff1a;蓝桥杯倒计时冲刺 &#x1f43e;马上就要蓝桥杯了&#xff0c;最后的这几天尤为重要&#xff0c;不可懈怠哦&#x1f43e; 文章目录1.抓住那头牛2.排列序数1.抓住那头牛 题目 链接&#xff1a; 抓住那头牛 - C语言网 (dotcpp.com…

网络安全之防火墙

目录 网络安全之防火墙 路由交换终归结底是联通新设备 防御对象&#xff1a; 定义&#xff1a; 防火墙的区域划分&#xff1a; 包过滤防火墙 --- 访问控制列表技术 --- 三层技术 代理防火墙 --- 中间人技术 --- 应用层 状态防火墙 --- 会话追踪技术 --- 三层、四层 UTM …

CrossOver零知识学习1 —— 初识

本文部分内容参考CrossOver22全新版功能简介 免费mac虚拟机工具_CoCo玛奇朵的博客-CSDN博客 特此致谢&#xff01; 一、CrossOver简介 CrossOver是由CODE WEAVERS公司开发的类虚拟机软件&#xff0c;目的是使Linux和Mac OS X操作系统和Window系统兼容。CrossOver英文原意为“…

强烈推荐:0基础入门网安必备《网络安全知识图谱》

蚁景网安学院一直专注于网安实战技能培养&#xff0c;提供全方位的网安安全学习解决方案。我们集聚专业网安技术大佬资源&#xff0c;倾力打造了这本更全面更系统的“网络安全知识图谱”&#xff0c;让大家在网络安全学习路上不迷茫。 在这份网安技能地图册里&#xff0c;我们对…

01 | Msyql系统架构

目录MySQL系统架构连接器查询缓存分析器优化器执行器MySQL系统架构 大体来说&#xff0c;MySQL分为Server层和引擎层两部分。 Server层包含链接器、查询缓存、分析器、优化器和执行器&#xff0c;而引擎层负责的是数据的存储和读取&#xff0c;支持InnoDB、Myisam、Memory等多…

CSS实现文字凹凸效果

使用两个div分别用来实现凹凸效果&#xff1b;text-shadow语法 text-shadow: h-shadow v-shadow blur color; h-shadow&#xff1a;必需。水平阴影的位置。允许负值。 v-shadow &#xff1a;必需。垂直阴影的位置。允许负值。 blur&#xff1a;可选&#xff0c;模糊的距离。 co…

【C语言】你真的了解结构体吗

引言✨我们知道C语言中存在着整形(int、short...)&#xff0c;字符型(char)&#xff0c;浮点型(float、double)等等内置类型&#xff0c;但是有时候&#xff0c;这些内置类型并不能解决我们的需求&#xff0c;因为我们无法用这些单一的内置类型来描述一些复杂的对象&#xff0c…

k8s部署prometheus

k8s部署prometheus 版本说明&#xff1a; k8s&#xff1a;1.24.4 prometheus&#xff1a;release-0.12&#xff08;https://github.com/prometheus-operator/kube-prometheus.git&#xff09; 本次部署采用operator的方式将prometheus部署到k8s中&#xff0c;需对k8s和prom…

springboot+vue驾校管理系统 idea科目一四预约考试,练车

加大了对从事道路运输经营活动驾驶员的培训管理力度&#xff0c;但在实际的管理过程中&#xff0c;仍然存在以下问题&#xff1a;(1)管理部门内部人员在实际管理过程中存在人情管理&#xff0c;不进行培训、考试直接进行发证。(2)从业驾驶员培训机构不能严格执行管理部门的大纲…

SpringBoot解析指定Yaml配置文件

再来个文章目录 文章目录前言1、自定义配置文件2、配置对象类3、YamlPropertiesSourceFactory下面还有投票&#xff0c;帮忙投个票&#x1f44d; 前言 最近在看某个开源项目代码并准备参与其中&#xff0c;代码过了一遍后发现多个自定义的配置文件用来装载业务配置代替数据库…

使用 Python 从点云生成 3D 网格

从点云生成 3D 网格的最快方法 已经用 Python 编写了几个实现来从点云中获取网格。它们中的大多数的问题在于它们意味着设置许多难以调整的参数&#xff0c;尤其是在不是 3D 数据处理专家的情况下。在这个简短的指南中&#xff0c;我想展示从点云生成网格的最快和最简单的过程。…

继承和派生

&#x1f436;博主主页&#xff1a;ᰔᩚ. 一怀明月ꦿ ❤️‍&#x1f525;专栏系列&#xff1a;线性代数&#xff0c;C初学者入门训练&#xff0c;题解C&#xff0c;C的使用文章&#xff0c;「初学」C &#x1f525;座右铭&#xff1a;“不要等到什么都没有了&#xff0c;才下…

手撕数据结构—队列

队列队列的话只允许在一端插入&#xff0c;在另外一端删除。插入数据的那一段叫做队尾&#xff0c;出数据的那一段叫做队头&#xff08;从尾巴插入&#xff09;。因此的话队列是先进先出的。入的顺序与出的顺序的话是一样的。这个与栈是不一样的&#xff0c;因为栈的话就是说如…

问题【Java 基础】

基础1、成员变量与局部变量的区别2、静态变量有什么作用3、字符型常量和字符串常量的区别4、静态方法为什么不能调用非静态成员5、静态方法和实例方法有何不同6、重载和重写有什么区别7、什么是可变长参数8、Java 中的几种基本数据类型了解么9、基本类型和包装类型的区别10、包…

【数据结构】树和二叉树的概念及结构

目录 1.树概念及结构 1.1 树的概念 1.2 树的相关概念 1.3树的表示 1.4 树在实际中的应用 2.二叉树概念及结构 2.1 概念 2.2 特殊的二叉树 2.2.1 满二叉树 2.2.2 完全二叉树 1.树概念及结构 1.1 树的概念 树是一种非线性的数据结构&#xff0c;它是由n(n>0) 个有…

一款专门为自动化测试打造的集成开发工具【Aqua】,“能快速构建自动化测试项目”,就问你爽不爽吧,,,

你好&#xff0c;我是不二。 随着行业内卷越来越严重&#xff0c;自动化测试已成为测试工程师的必备技能&#xff0c;谈及自动化测试肯定少不了编程&#xff0c;说到编程肯定离不开集成开发工具&#xff0c;比如&#xff1a;IntelliJ IDEA可以帮助我们快速构建Maven项目、sprin…

前端已死?后端已亡?弯弯绕绕,几分真几分假

前段时间&#xff0c;我在掘金分享了一篇GPT-4 性能文章&#xff0c;也许是过于强大带来的威胁性&#xff0c;引来评论区的排队哀嚎&#xff08;如下图&#xff09;&#xff0c;所以“前端已死&#xff0c;后端已亡”这个概念真的成立吗&#xff1f;本文着重探讨前端。 前端和后…

警惕,3月20日WOS目录更新,50本SCI/SSCI被剔除,这个出版社多达18本

2023年3月SCI、SSCI期刊目录更新 2023年3月20日&#xff0c;Web of Science核心期刊目录再次更新&#xff01;此次2023年3月SCIE & SSCI期刊目录更新&#xff0c;与上次更新&#xff08;2023年2月&#xff09;相比&#xff0c;共有50本期刊被剔除出SCIE & SSCI期刊目录…