11_Pulsar Adaptors适配器、kafka适配器、Spark适配器

2.3. Pulsar Adaptors适配器
2.3.1.kafka适配器
2.3.2.Spark适配器

2.3. Pulsar Adaptors适配器

2.3.1.kafka适配器

Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。
在生产者中, 如果想不改变原有kafka的代码架构, 就切换到Pulsar的平台中, 那么Pulsar adaptor on kafka就变的非常的有用了, 它可以帮助我们在不改变原有kafka的代码基础上, 即可接入pulsar, 但是需要注意, 相关配置信息需要进行一些调整, 例如: 地址与topic

  • 1- 需要导入Pulsar集成kafka的依赖包, 删除掉原有Kafka-client包
<dependency> 
     <groupId>org.apache.pulsar</groupId> 
     <artifactId>pulsar-client-kafka</artifactId> 
     <version>2.8.0</version> 
</dependency>

注: 目前Pulsar并在Maven中央仓库中并没有提供Pulsar-client-kafka 2.8.1的包, 故此处导入2.8.0

  • 2-编写生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaAdaptorProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1. 创建kafka生产者的核心类对象: KafkaProducer
        // 1.1: 创建生产者配置对象: 设置相关配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");
        // 消息的确认方案
        props.put("acks", "all");
        // key序列化类型
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value 序列化类型
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props); 
        //2. 发送数据 
        for (int i = 0; i < 10; i++) { 
            //2.1: 创建 生产者数据承载对象 一个对象代表是一条消息数据
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("persistent://public/default/txn_t1",Integer.toString(i), Integer.toString(i)); 
            producer.send(producerRecord).get(); 
        }
        
        //3. 释放资源 
        producer.close();
    }

}
  • 3-编写消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaAdaptorConsumer {
    public static void main(String[] args) {
        //1. 创建kafka的消费者的核心对象: KafkaConsumer
        //1.1: 创建消费者配置对象, 并设置相关的参数:
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");
        //消费者组的 id
        props.setProperty("group.id", "test");
        //是否启动消费者自动提交消费偏移量
        props.setProperty("enable.auto.commit", "true");
        //每间隔多长时间提交一次偏移量:单位 毫秒
        props.setProperty("auto.commit.interval.ms","1000");
        //key 反序列化
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //val 发序列化
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //2. 给消费者设置订阅topic:
        consumer.subscribe(Arrays.asList("persistent://public/default/txn_t1"));
        //3. 循环获取相关的消息数据
        while (true) {
            //3.1: 从kafka中获取消息数据: 参数表示等待超时时间
            //注意: 如果没有获取到数据, 返回一个空集合对象, 如果数据集合中有多个 ConsumerRecord 对象
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            //3.2 遍历ConsumerRecords 获取每一个 ConsumerRecord 对象 : ConsumerRecord 消费者数据承载对象, 一个对象就是一条消息
            for (ConsumerRecord<String, String> record : records) {
                String massage = record.value();
                System.out.println("消息数据为:"+massage);
            }
        } 
    } 
}
  • 4- 先运行消费者, 进行监听, 然后运行生产者, 观察消费者是否可以正常消费到数据
    在这里插入图片描述

2.3.2.Spark适配器

Pulsar 的 Spark Streaming 接收器是一个自定义的接收器,它使用 Apache Spark Streaming 能够从 Pulsar 接
收原始数据。

应用程序可以通过 Spark Streaming receiver 接收 Resilient Distributed Dataset (RDD) 格式的数据,并可
以通过多种方式对其进行处理。

  • 1-导入相关的依赖包
<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-spark</artifactId>
    <version>2.8.0</version>
</dependency>
  • 2-编写spark的流式代码
String serviceUrl = "pulsar://localhost:6650/"; 
String topic = "persistent://public/default/test_src"; 
String subs = "test_sub"; 
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example"); 
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60)); 
ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData(); 
Set<String> set = new HashSet<>(); 
set.add(topic); 
pulsarConf.setTopicNames(set); 
pulsarConf.setSubscriptionName(subs); 
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver( 
serviceUrl, 
pulsarConf, 
new AuthenticationDisabled()); 
JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/69355.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

sentinel核心流程源码解析

sentinel的处理槽(ProcessorSlot) 可以说&#xff0c;sentinel实现的各种功能就是由各处理槽完成的 ,ProcessorSlot定义了四个方法&#xff1a; 当进入该处理槽时触发该方法 处理完 entry方法之后触发该方法 退出该处理槽时触发该方法 exit方法处理完成时触发该方法 sentinel的…

【MySQL安装】卸载与安装MySQL 5.7.X版本

最近由于各种原因&#xff0c;需要重新安装MySQL。之前我的版本是8.0版本&#xff0c;现在装的5.7版本。记录一下自己的安装过程。 目录 1、卸载MySQL8.0 2、安装MySQL5.7 1、卸载MySQL8.0 如何彻底卸载MySQL_mysql 完全卸载_m0小麦麦的博客-CSDN博客相信不少小伙伴们在安装…

ddia(3)----Chapter3. Storage and Retrieval

However, first we’ll start this chapter by talking about storage engines that are used in the kinds of databases that you’re probably familiar with: traditional relational databases, and also most so-called NoSQL databases. We will examine two families o…

捕捉时刻:将PDF文件中的图像提取为个性化的瑰宝(从pdf提取图像)

应用场景&#xff1a; 该功能的用途是从PDF文件中提取图像。这在以下情况下可能会很有用&#xff1a; 图片提取和转换&#xff1a;可能需要将PDF文件中的图像提取出来&#xff0c;并保存为单独的图像文件&#xff0c;以便在其他应用程序中使用或进行进一步处理。例如&#xff…

pdf怎么压缩到1m?这样做压缩率高!

PDF是目前使用率比较高的一种文档格式&#xff0c;因为它具有很高的安全性&#xff0c;还易于传输等&#xff0c;但有时候当文件体积过大时&#xff0c;会给我们带来不便&#xff0c;这时候简单的解决方法就是将其压缩变小。 想要将PDF文件压缩到1M&#xff0c;也要根据具体的情…

QGIS开发五:VS使用QT插件创建UI界面

前面我们说了在创建项目时创建的是一个空项目&#xff0c;即不使用 Qt 提供的综合开发套件 Qt Creator&#xff0c;也不使用 Qt Visual Studio Tools 这类工具。 但是后面发现&#xff0c;如果我想要有更加满意的界面布局&#xff0c;还是要自己写一个UI文件&#xff0c;如果不…

世微AP2400 电动车 摩托车灯照明 汽车灯照明 手电筒照明LED灯降压恒流驱动IC

PCB 布板参考 1. 大电流路径走线要粗&#xff0c;铺铜走线比较好。 2. 大电路回路面积以最短、最宽路径完成比较好。 3. 开关切换连接点&#xff1a;电感 L、开关管漏级与续流肖特基二极管&#xff0c;走线要短与粗&#xff0c;铺铜走线比较好&#xff0c;但同时需要适当面积作…

MySQL索引3——Explain关键字和索引使用规则(SQL提示、索引失效、最左前缀法则)

目录 Explain关键字 索引性能分析 Id ——select的查询序列号 Select_type——select查询的类型 Table——表名称 Type——select的连接类型 Possible_key ——显示可能应用在这张表的索引 Key——实际用到的索引 Key_len——实际索引使用到的字节数 Ref ——索引命…

机器学习深度学习——注意力提示、注意力池化(核回归)

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位即将上大四&#xff0c;正专攻机器学习的保研er &#x1f30c;上期文章&#xff1a;机器学习&&深度学习——常见循环神经网络结构&#xff08;RNN、LSTM、GRU&#xff09; &#x1f4da;订阅专栏&#xff1a;机器…

SqlServer基础之(触发器)

概念&#xff1a; 触发器&#xff08;trigger&#xff09;是SQL server 提供给程序员和数据分析员来保证数据完整性的一种方法&#xff0c;它是与表事件相关的特殊的存储过程&#xff0c;它的执行不是由程序调用&#xff0c;也不是手工启动&#xff0c;而是由事件来触发&#x…

JVM G1垃圾回收机制介绍

G1(Garbage First)收集器 (标记-整理算法)&#xff1a; Java堆并行收集器&#xff0c;G1收集器是JDK1.7提供的一个新收集器&#xff0c;G1收集器基于“标记-整理”算法实现&#xff0c;也就是说不会产生内存碎片。此外&#xff0c;G1收集器不同于之前的收集器的一个重要特点是&…

钓鱼攻击:相似域名识别及如何有效预防攻击

网络犯罪分子很乐意劫持目标公司或其供应商或业务合作伙伴的官方域名&#xff0c;但在攻击的早期阶段&#xff0c;他们通常没有这种选择。相反&#xff0c;在有针对性的攻击之前&#xff0c;他们会注册一个与受害组织的域名相似的域名 - 他们希望您不会发现其中的差异。此类技术…

SpringBoot 的自动装配特性

1. Spring Boot 的自动装配特性 Spring Boot 的自动装配&#xff08;Auto-Configuration&#xff09;是一种特性&#xff0c;它允许您在应用程序中使用默认配置来自动配置 Spring Framework 的各种功能和组件&#xff0c;从而减少了繁琐的配置工作。通过自动装配&#xff0c;您…

TepeScript 问题记录

问题 对object的所有属性赋值或清空&#xff0c;提示类型错误不能赋值 type VoiceParams {_id?: string | undefined;name: string;sex: string;vc_id: string;model_url: string;preview_url: string;isPrivate: boolean;visible: boolean; }const formData reactive<V…

【Minecraft】Fabric Mod开发完整流程2 - 创造模式物品栏与第一个方块

创造模式物品栏 添加到当前已有物品栏 再添加自定义的创造模式物品栏之前&#xff0c;请确保你的确有这个需求&#xff01;否则建议直接添加到当前已有的物品栏内部 创建新文件&#xff1a;com/example/item/ModItemGroup.java package com.example.item;import net.fabricmc.…

出于网络安全考虑,印度启用本土操作系统”玛雅“取代Windows

据《印度教徒报》报道&#xff0c;印度将放弃微软系统&#xff0c;选择新的操作系统和端点检测与保护系统。 备受期待的 "玛雅操作系统 "将很快用于印度国防部的数字领域&#xff0c;而新的端点检测和保护系统 "Chakravyuh "也将一起面世。 不过&#xf…

2024考研408-计算机网络 第五章-传输层学习笔记

文章目录 前言一、传输层提供的服务1.1、传输层的功能1.2、传输层的两个协议&#xff08;TCP、UDP&#xff09;1.3、传输层的寻址与端口&#xff08;常见端口介绍&#xff09; 二、UDP协议2.1、认识UDP功能和特点2.2、UDP首部格式2.3、UDP伪首部字段分析2.4、伪首部校验UDP用户…

【24择校指南】南京大学计算机考研考情分析

南京大学(A) 考研难度&#xff08;☆☆☆☆☆&#xff09; 内容&#xff1a;23考情概况&#xff08;拟录取和复试分数人数统计&#xff09;、院校概况、23初试科目、23复试详情、参考书目、各科目考情分析、各专业考情分析。 正文2178字&#xff0c;预计阅读&#xff1a;6分…

网络原理(JavaEE初阶系列11)

目录 前言&#xff1a; 1.网络原理的理解 2.应用层 2.1自定义协议的约定 2.1.1确定要传输的信息 2.1.2确定数据的格式 3.传输层 3.1UDP 3.1.1UDP报文格式 3.2TCP 3.2.1确认应答 3.2.2超时重传 3.2.3连接管理 3.2.3.1三次握手 3.2.3.2四次挥手 3.2.4滑动窗口 3.…

【JavaEE】Spring Boot - 配置文件

【JavaEE】Spring Boot 开发要点总结&#xff08;2&#xff09; 文章目录 【JavaEE】Spring Boot 开发要点总结&#xff08;2&#xff09;1. 配置文件的两种格式2. .properties 文件2.1 基本语法2.2 注释2.3 配置项2.4 主动读取配置文件的键值2.5 数据库的连接时的需要的信息配…