kafka(三)springboot集成kafka(1)介绍

基于kafka新版本

<dependencies>
     <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>3.0.0</version>
     </dependency>
</dependencies>

一、kafkaProducer

1、介绍

设计上比consumer要简单一些,因为不涉及组管理,即每个producer都是独立工作的。

(1)目前producer的主要功能是向某个topic的某个分区发送一条消息,这就涉及分区选择策略,在ProducerRecord中介绍。

(2)因为有ISR,因此在发送消息时,producer有多种选择来实现消息发送,如不等待任何副本的影响便返回、只等待leader副本响应返回等等。

2、发送流程

produce的发送主要流程概述如下:

  1. 拦截器对发送的消息拦截处理;

  2. 获取元数据信息;

  3. 序列化处理;

  4. 分区处理;

  5. 批次添加处理;

  6. 发送消息。

 

3、主要参数
3.1、acks:

有三个参数:0、1、all,数据可靠性的重要参数,可以保证消息不丢失。

Properties properties = new Properties();
properties.put(ProducerConfig.ACKS_CONFIG,"1");
3.2、 buffer.memory

指定了producer端用于缓冲消息的缓冲区大小,单位是字节。

3.3、compression.type

producer是否压缩消息。

3.4、batch.size

二、ProducerRecord

1、介绍

发送给Kafka Broker的key/value 值对,producer将待发送的消息封装进ProducerRecord实例类。

2、发送消息分区策略
 (1)指定了分区:

当发送时指定了partition就使用该partition。即kafka生产者发送的消息ProducerRecord(String topic, Integer partition, K key, V value)指定了发送到哪个具体的分区。

(2)轮询

如果kafka生产者发送的消息ProducerRecord(String topic, Integer partition, K key, V value)没有指定发送到哪个具体的分区,即partition=null(并且key也为空时,如果此时key不为空的话就会采用另一种分区策略key哈希分区策略),并且使用了默认的分区器,那么消息将被随机的发送到主题的各个可用分区上,分区器使用轮询的算法将消息均衡的分布到各个分区。

(3)key哈希分区策略

根据消息的key进行哈希计算,并将消息发送到对应的分区。保证相同key的消息始终被发送到同一个分区,确保消息的顺序性。

(4)自定义分区策略(即自定义Partitioner)

用户可以根据自己的需求实现自定义的分区策略,通过实现org.apache.kafka.clients.producer.Partitioner接口来自定义分区选择逻辑。

二、 KafkaConsumer

三、生产者发送消息应用

1、同步发送消息

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
 
import java.util.Properties;
import java.util.concurrent.ExecutionException;
 
public class CustomProducerSync {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value序列化(必须):
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 3. 创建kafka生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            // 默认为异步发送
            kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));
            // 末尾加get为同步发送
            kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();
        }
 
        // 5. 关闭资源
        kafkaProducer.close();
    }
}
2、异步发送消息
2.1、普通异步
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
 
import java.util.Properties;
 
public class CustomProducer {
    public static void main(String[] args) {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value序列化(必须):
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 3. 创建kafka生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "wtyy"));
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}
2.2、带回调函数的异步发送

 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;
 
import java.util.Properties;
 
public class CustomProducerCallBack {
    public static void main(String[] args) {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value序列化(必须):
        // 序列化器的serialization是一个接口,找到他的实现类
        // 我们一般都是使用String
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 3. 创建kafka生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),
                    new Callback() {
                       @Override
                       public void onCompletion(RecordMetadata metadata, Exception exception) {
                           //(1)消息发送成功  exception == null  接受到服务端ack消息   调用该方法
                           //(2)消息发送失败  exception != null  也会调用该方法
                           if (exception == null) {
                               System.out.println(metadata);//使用打印演示
                           }else{
                               exception.printStackTrace();//打印异常信息
                           }
                       }
                    });
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

四、消费者接收消息应用

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
 
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
 
public class CustomConsumer {
    public static void main(String[] args) {
        // 1. 创建消费者配置对象
        Properties properties = new Properties();
        // 2. 给消费者配置对象添加参数(不同于生产者,消费者有 4个必要的配置参数)
        //  broker的ip地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // 配置  反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //配置消费者组(组名必须)
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        // 3. 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 注册消费主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        consumer.subscribe(topics);
        // 4.调用方法消费数据
        // 如果kafka集群没有新数据会造成空转
        // 填写参数为时间,如果没有拉取数据,线程睡眠一会
        while (true) {
            // 设置1s中消费的一批数据
            // Duration.ofSeconds(1)不会导致空转,拉取不到的时候睡眠1s
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            // 打印消费数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.topic() + "-" + consumerRecord.partition() + "-" + consumerRecord.offset());
            }
        }
        //5.关闭资源
//        consumer.close();不使用的原因是,已关闭进程,就不会再消费数据了,进程停止就以为着JVM为断电了,不再工作
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/443580.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

HTML 学习笔记(九)颜色值和长度单位

一、颜色 1.通过RGB值来设置颜色 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>table</title&…

【Linux】线程同步与生产消费者问题

欢迎来到Cefler的博客&#x1f601; &#x1f54c;博客主页&#xff1a;折纸花满衣 &#x1f3e0;个人专栏&#xff1a;题目解析 &#x1f30e;推荐文章&#xff1a;【LeetCode】winter vacation training 目录 &#x1f449;&#x1f3fb;CP问题&#x1f449;&#x1f3fb;互斥…

c语言,大宗撮合交易中心系统核心模块代码

撮合交易系统&#xff08;Matching System&#xff09;常用于大宗交易&#xff0c;如股票、期货等市场&#xff0c;它负责根据买卖双方的报价和数量&#xff0c;自动撮合成交。撮合系统的核心模块通常包括订单管理、价格计算和撮合逻辑等部分。 由于撮合系统的实现复杂且依赖于…

【C++ 学习】拷贝构造你了解多少?

文章目录 1. 拷贝构造的引入2. 拷贝构造的引用场景 1. 拷贝构造的引入 拷贝构造函数&#xff1a;只有单个形参&#xff0c;该形参是对本类类型对象的引用(一般常用const修饰)&#xff0c;在用已存在的类类型对象创建新对象时由编译器自动调用&#xff1b; 特征&#xff1a; ① …

集合实现类研究底层(部分):手撕ArrayList底层源码、手撕LinkedList底层源码、手写单向链表和双向链表

day26上 集合框架图 标绿已经学习底层&#xff0c;深入底层主要是研究实现类底层 继承关系图 手撕ArrayList底层源码 ps:研究添加元素的过程 思路&#xff1a; 1.研究继承关系 2.研究属性 3.理解创建集合的过程 – 构造方法的底层原理 4.研究添加元素的过程 提升&#xff1a…

变换,动画

面试题——需求&#xff1a;在不知道父元素与子元素的宽高时 如何让子元素在父元素内居中&#xff1f; 1.定位 父相子绝 2.子元素 top&#xff1a;50% left:50% 3.子元素 transform: translate(-50%,-50%) .parent{height: 500px;background-color: red;position: relative;}.c…

算法第二十五天-寻找排序数组中的最小值

寻找排序数组中的最小值 题目要求 解题思路 二分法 代码 class Solution:def findMin(self, nums: List[int]) -> int:low, high 0, len(nums) - 1while low < high:pivot low (high - low) // 2if nums[pivot] < nums[high]:high pivot else:low pivot 1re…

微信小程序-可以用区域

简介 movable-view和movable-area是可移动的视图容器&#xff0c;在页面中可以拖拽滑动。 本篇文章将会通过该容器实现一个常用的拖拽按钮功能。 使用效果 代码实现 side-view.wtml 布局见下面代码&#xff0c;left view为内容区域&#xff0c;right view为操作按钮&a…

进腾讯工作一个月,我想辞职了......

前几天&#xff0c;我在网上看到一个微博。 一个应届的校招生&#xff0c;目前入职腾讯&#xff0c;工作了一个月。这一个月给他的感受是大量的写测试用例&#xff0c;自己写测试用例的能力熟练了不少&#xff0c;测试技能倒是没有多大的提高&#xff0c;真正需要技术的工作却…

算法学习01:排序二分

算法学习01&#xff1a;排序&&二分 文章目录 算法学习01&#xff1a;排序&&二分前言需要记忆的模版&#xff1a;快速排序归并排序&#xff1a;整数二分&#xff1a;浮点数二分 一、排序1.快速排序2.归并排序&#xff1a; 二、二分1.整数2.浮点数 总结 前言 需要…

汽车协议学习

ⅠOBD 1.OBD接口 OBD有16个引脚&#xff0c;每个引脚的电压不同&#xff08;可以对应不同的协议&#xff09; 车端&#xff1a; 16- 9 (短一点点的) 8-1 &#xff08;长一点的&#xff09; 2.基于OBDⅡ的通信协议 CAN &#xff08;ISO-15765&am…

grafana table合并查询

注&#xff1a;本文基于Grafana v9.2.8编写 1 问题 默认情况下table展示的是一个查询返回的多个field&#xff0c;但是我想要的数据在不同的metric上&#xff0c;比如我需要显示某个pod的读写IO&#xff0c;但是读和写这两个指标存在于两个不同的metirc&#xff0c;需要分别查…

鸿蒙Harmony应用开发—ArkTS声明式开发(基础手势:Marquee)

跑马灯组件&#xff0c;用于滚动展示一段单行文本。仅当文本内容宽度超过跑马灯组件宽度时滚动&#xff0c;不超过时不滚动。 说明&#xff1a; 该组件从API Version 8开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 子组件 无 接口 Ma…

一元函数微分学——刷题(26

目录 1.题目&#xff1a;2.解题思路和步骤&#xff1a;3.总结&#xff1a;小结&#xff1a; 1.题目&#xff1a; 2.解题思路和步骤&#xff1a; 归纳求解&#xff0c;把指数写成负数就比较容易看出来规律 3.总结&#xff1a; 归纳求解&#xff0c;把指数写成负数就比较容易…

【蓝桥杯】k倍区间

一.题目描述 二.问题分析 对于该问题&#xff0c;标签上写的是暴力&#xff0c;但是如果使用暴力的话&#xff0c;会超时。 首先&#xff0c;对于两个数a&#xff0c;b&#xff08;假设a小于b&#xff09;&#xff0c;若a与b对k取余后结果相同&#xff0c;则b-a可以整除k。 …

axios的详细使用

目录 axios&#xff1a;现代前端开发的HTTP客户端王者 一、axios简介 二、axios的基本用法 1. 安装axios 2. 发起GET请求 3. 发起POST请求 三、axios的高级特性 1. 拦截器 2. 取消请求 3. 自动转换JSON数据 四、axios在前端开发中的应用 五、总结 axios&#xff1a…

山泉还可以申请商标不,现阶段通过率如何!

在32类类别啤酒饮料是许多生产水企业主要申请注册的类别&#xff0c;那现在山泉在这个类别还可以申请注册商标不&#xff0c;山泉在这个类别基本上是通用词&#xff0c;首先是需要前面词具有显著性&#xff0c;没的相同或近似才可以。 经普推知产老杨检索发现&#xff0c;在32…

作业 字符数组-统计和加密

字串中数字个数 描述 输入一行字符&#xff0c;统计出其中数字字符的个数。 输入 一行字符串&#xff0c;总长度不超过255。 输出 输出为1行&#xff0c;输出字符串里面数字字符的个数。 样例 #include <iostream> #include<string.h> using namespace std; int m…

瑞_JVM虚拟机_类的生命周期

文章目录 1 JVM虚拟机概述2 类的生命周期2.1 加载阶段2.1.1 加载过程2.1.2 查看内存中的对象&#xff08;hsdb工具&#xff09; 2.2 连接阶段2.2.1 验证2.2.2 准备&#xff08;final特殊&#xff09;2.2.3 解析 2.3 初始化阶段\<client> ★★★2.4 使用阶段2.5 卸载阶段 …

不愧是华为出来的,太厉害了...

实习去了博彦科技&#xff08;外包&#xff09;&#xff0c;做的就是螺丝钉的活&#xff0c;后面还因为人效不佳&#xff0c;被开了。 正式毕业后去了另外一个做电子发票的公司&#xff0c;但是都是功能测试和一点点APP测试&#xff0c;然后经常被开发怼&#xff0c;测试毫无地…