Kafka:Kafka详解

Kafka

消息中间件

区别于rabbitmq,kafka更适用于量级较大的数据(100w级),主要在大数据领域使用

Kafka介绍

一个分布式流媒体平台,类似于消息队列或企业消息传递系统

Kafak的结构如下

在这里插入图片描述

producer:发布消息的对象
topic:Kafak将消息分门别类,每类的消息称为一个主题(Topic)
consumer:订阅主题并处理发布的消息的对象称为主题消费者
broker:已发布的消息保存在一组服务器中,称为kafka集群,集群中的每一个服务器都是一个代理(broker)

消费者可以订阅一个或者多个的主题,并从broker中拉取数据,从而消费这些已发布的消息.

Kafka对zookeeper强依赖,需要使用zk进行分区的负载均衡以及节点的注册
docker安装zk和kafka
docker pull zookeeper:3.4.14
docker run -d --name zookeeper --restart=always -p 2181:2181 zookeeper:3.4.14
docker pull wurstmeister/kafka:2.12-2.3.1
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--restart=always \
--net=host wurstmeister/kafka:2.12-2.3.1

Kafka入门

kafka的生产者消费者模型分为单播和多播

单播:同组内存在多个消费者,一个主题只能发送消息到同一个组内的一个消费者
多播:不同组的多个消费者,一个主题可以发送消息到不同组的多个不同消费者
<dependency>    
	<groupId>org.apache.kafka</groupId>    
	<artifactId>kafka-clients</artifactId>
</dependency>

Kafka的高可用设计(集群模式)

多个broker组成一个cluster集群

集群中的某一条机器宕机,其他机器上的broker依然能对外提供服务

Kafka的备份机制(republication)

消息的备份称为副本(replica)

分为两种副本

领导者副本(leader replica)
追随者副本(follower replica) 分为ISR 和 普通

同步方式:

领导者副本直接接收发布者的消息

随后领导者副本会同步将消息复制保存到ISR follower副本,异步将消息复制保存到普通follower副本

如果leader失效,需要选举出新的leader

依据以下原则选举:

选举优先从ISR中选举,因为ISR副本是同步的

如果ISR中没有生效的副本,就从普通中选举一个
在这里插入图片描述

如果所有副本都失效

可以等待ISR副本活过来保证数据完整性,也可以选举第一个活过来的保证数据可用性

Kafka的生产者详解

消息发送:

分为同步发送和异步发送
同步发送:
ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");
RecordMetadata recordMetadata = producer.send(ProducerRecord).get();//获取发送的结果
异步发送:

传入一个callback对象处理回调结果

//异步消息发送
producer.send(ProducerRecord, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if(e != null){
            System.out.println("记录异常信息到日志表中");
        }
        System.out.println(recordMetadata.offset());
    }
});

生产者配置

消息确认配置acks

//ack配置  消息确认机制
prop.put(ProducerConfig.ACKS_CONFIG,"all");
acks = 0 //生产者不会等待任何来自服务器的响应
acks = 1 //集群leader收到消息后,生产者会接受一个来自服务器的响应
acks = -1/all(默认) //参与复制的所有生产者全部收到消息后,生产者才会收到响应

重试次数retries

//重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);

消息压缩方式

消息默认不会被压缩

//数据压缩
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
| snappy | 占用较少的  CPU,  却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用 |
| lz4    | 占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观        |
| gzip   | 占用较多的  CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法 |

Kafka消费者详解

消费者组:一个或者多个消费者组成的群体

topic会分发消息给每个消费者组中的一个消费者

消息有序性

跨分区的消息无法决定先后顺序

如果需要保证消息的有序性需要让多个消费者去监听同一个分区

提交和偏移量

消费者会在消费前或者消费后向__consumer_offset的特殊topic中发送偏移量,记录每个消息的处理进度

如果消费者崩溃或者新的消费者加入就会触发负载均衡

如果使用默认方式自动提交偏移量,就会在消费前直接自动提交偏移量,可能会出现重复消费或者漏消费的情况

所以我们需要使用手动提交的方式提交偏移量

手动提交(同步,会自动重试):

consumer.commitSync()

异步提交:

consumer.commitAsync(new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
            if(e!=null){
                System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
            }
        }
    });

Spring继承Kafka

引入依赖
<!-- kafkfa -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
    </dependency>
Topic的创建和删改(可省略)
public static void createTopic(String topicName, int partitions, short replicas) throws Exception {
        NewTopic newTopic = new NewTopic(topicName, partitions, replicas);
        CreateTopicsResult topics = adminClient.createTopics(Collections.singleton(newTopic));
        topics.all().get();
        log.info("【{}】topic创建成功", topicName);
    }

    /**
     * @Title deleteTopic
     * @Description 删除topic
     * @param topicName  topic名称
     * @return void
     */
    public static void deleteTopic(String topicName) throws Exception {
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singleton(topicName));
        deleteTopicsResult.all().get();
        log.info("【{}】topic删除成功", topicName);

    }

    /**
     * @Title updateTopicRetention
     * @Description 修改topic的过期时间
     * @param topicName  topic名称
     * @param ms  过期时间(毫秒值)
     * @return void
     */
    public static void updateTopicRetention(String topicName, String ms) throws Exception {
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
        ConfigEntry configEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, ms);
        Config config = new Config(Collections.singleton(configEntry));
        // 创建AlterConfigsOptions
        AlterConfigsOptions alterConfigsOptions = new AlterConfigsOptions().timeoutMs(10000);
        // 执行修改操作
        adminClient.alterConfigs(Collections.singletonMap(resource, config), alterConfigsOptions).all().get();
        log.info("【{}】topic过期时间设置完成,过期时间为:{}毫秒", topicName, ms);
    }
生产者发送消息
@Resource
private KafkaTemplate<String,String> kafkaTemplate;

kafkaTemplate.send("topic","test");
消费者消费消息
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class Listener {

    @KafkaListener(topics = "topic")
    public void onMessage(String message){
        if(!StringUtils.isEmpty(message)){
            System.out.println(message);
        }

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/799137.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

mupdf 编译说明

进入官网下载源码&#xff1a;https://www.mupdf.com/releases 挑选需要的版本&#xff0c;下载解压&#xff0c;然后打开解决方案&#xff0c;进行编译

MySQL 数据库 - 事务

MySQL 数据库&#xff08;基础&#xff09;- 事务 事务简介 事务 是一组操作集合&#xff0c;他是一个不可分割的工作单位&#xff0c;事务会把所有的操作看作是一个整体一起向系统发送请求&#xff0c;即这些操作要么同时成功&#xff0c;要么同时失败。 比如&#xff1a;张…

tomcat的优化、动静分离

tomcat的优化 tomcat自身的优化 tomcat的并发处理能力不强&#xff0c;大项目不适应tomcat做为转发动态的中间件&#xff08;k8s集群&#xff0c;pytnon rubby&#xff09;&#xff0c;小项目会使用&#xff08;内部使用的&#xff09;动静分离 默认配置不适合生产环境&…

在 PostgreSQL 里如何处理数据的存储优化和查询优化的冲突?

&#x1f345;关注博主&#x1f397;️ 带你畅游技术世界&#xff0c;不错过每一次成长机会&#xff01;&#x1f4da;领书&#xff1a;PostgreSQL 入门到精通.pdf 文章目录 在 PostgreSQL 里如何处理数据的存储优化和查询优化的冲突一、存储优化与查询优化的概述&#xff08;一…

前端实现一键复制功能

1、下载插件 npm i vue-clipboard32.0.0 2、在需要复制的文件中引入插件并使用&#xff1a; JS: import useClipboard from "vue-clipboard3"; const { toClipboard } useClipboard(); HTML: <el-tooltip content"复制内容" placement"top&…

Java 客户端操作 Redis 命令(端口号映射方法,命令演示,注意事项)

文章目录 开放端口号问题引入依赖验证连接通用命令使用set 和 get 命令的使用exists 和 del 命令的使用keys 命令的使用expire 和 ttl 命令type 命令的使用 String 类型命令使用mset 和 mget 命令getrange 和 setrange 命令append 命令incr 和 decr 命令 list 类型命令使用lpus…

05 以物品与用户为基础个性化推荐算法的四大策略

《易经》&#xff1a;“九二&#xff1a;见龙在田&#xff0c;利见大人”。九二是指阳爻在卦中处于第二位&#xff0c;见龙指龙出现在地面上&#xff0c;开始崭露头角&#xff0c;但是仍须努力&#xff0c;应处于安于偏下的位置。 本节是模块二第一节&#xff0c;模块二讲解传…

【学习笔记】无人机(UAV)在3GPP系统中的增强支持(六)-人工智能控制的自主无人机用例

引言 本文是3GPP TR 22.829 V17.1.0技术报告&#xff0c;专注于无人机&#xff08;UAV&#xff09;在3GPP系统中的增强支持。文章提出了多个无人机应用场景&#xff0c;分析了相应的能力要求&#xff0c;并建议了新的服务级别要求和关键性能指标&#xff08;KPIs&#xff09;。…

开源浏览器引擎对比与适用场景:WebKit、Chrome、Gecko

WebKit与Chrome的Blink引擎对比 起源与关系&#xff1a; WebKit最初由苹果公司开发&#xff0c;用于Safari浏览器。后来&#xff0c;WebKit逐渐成为一个独立的开源项目&#xff0c;被多个浏览器厂商采用。Blink是Google基于WebKit项目分支出来的一个浏览器引擎&#xff0c;用于…

自主升级,平稳过渡!麒麟信安保障长沙市智慧交通发展中心CentOS迁移无忧

长沙市智慧交通发展中心围绕综合交通运输协调体系的构建&#xff0c;实施交通运行的监测、预测和预警&#xff0c;面向公众提供交通信息服务&#xff0c;开展多种运输方式的调度协调&#xff0c;提供交通行政管理和应急处置的信息保障。 该中心目前数据日交换量超2亿条&#x…

替换:show-overflow-tooltip=“true“ ,使用插槽tooltip,达到内容可复制

原生的show-overflow-tooltip“true” 不能满足条件&#xff0c;使用插槽自定义编辑&#xff1b; 旧code <el-table-column prop"reason" label"原因" align"center" :show-overflow-tooltip"true" /> <el-table-column pro…

Adminer-CVE-2021-21311

在其4.0.0到4.7.9版本之间&#xff0c;连接 ElasticSearch 和 ClickHouse 数据库时存在一处服务端请求伪造漏洞&#xff08;SSRF&#xff09;。 VPS开启HTTP服务 VPS 开启HTTP 再同时跑POC 确保能访问poc里的链接文件 第一是目标地址 第二个是跳转地址 第三个是监听地址 如果…

【C++】 List 基本使用

C List 基本使用 基本概念 list 是一个序列容器&#xff0c;它内部维护了一个双向链表结构。与 vector 或 deque 等基于数组的容器不同&#xff0c;list 在插入和删除元素时不需要移动大量数据&#xff0c;因此在这些操作上具有较高的效率。然而&#xff0c;访问列表中的特定…

无人机航电系统技术详解

一、系统概述 无人机航电系统&#xff08;Avionics System&#xff09;是无人机飞行与任务执行的核心部分&#xff0c;它集成了飞控系统、传感器、导航设备、通信设备等&#xff0c;为无人机提供了必要的飞行控制和任务执行能力。航电系统的设计和性能直接影响到无人机的安全性…

AIGC产品经理学习路径

基础篇&#xff08;课时 2 &#xff09; AIGC 行业视角 AIGC 的行业发展演进&#xff1a;传统模型/深度学习/大模型 AIGC 的产品设计演进&#xff1a;AI Embedded / AI Copilot / AI Agen AIGC 的行业产业全景图 AIGC 的产品应用全景图 AIGC 职业视角 AI 产品经理/ AIGC…

Linux:信号的概念与产生

信号概念 信号是进程之间事件异步通知的一种方式 在Linux命令行中&#xff0c;我们可以通过ctrl c来终止一个前台运行的进程&#xff0c;其实这就是一个发送信号的行为。我们按下ctrl c是在shell进程中&#xff0c;而被终止的进程&#xff0c;是在前台运行的另外一个进程。因…

Android Viewpager2 remove fragmen不生效解决方案

一、介绍 在如今的开发过程只&#xff0c;内容变化已多单一的fragment&#xff0c;变成连续的&#xff0c;特别是以短视频或者直播为主的场景很多。从早起的Viewpage只能横向滑动&#xff0c;到如今的viewpage2可以支持横向或者竖向滑动。由于viewpage2的adapter在设计时支持缓…

预告 | 博睿数据将亮相第四届中国新能源汽车产业数智峰会

随着数字化、智能化浪潮的汹涌而至&#xff0c;全球汽车产业正站在一个崭新的历史起点上。新能源汽车&#xff0c;作为这场科技革命和产业变革的领跑者&#xff0c;其数智化发展正呈现出前所未有的蓬勃态势。正是在这样的背景下&#xff0c;第四届中国新能源汽车产业数智峰会将…

Windows 虚拟机服务器项目部署

目录 一、部署JDK下载JDK安装JDK1.双击 jdk.exe 安装程序2.点击【下一步】3.默认安装位置&#xff0c;点击【下一步】4.等待提取安装程序5.默认安装位置&#xff0c;点击【下一步】6.等待安装7.安装成功&#xff0c;点击【关闭】 二、部署TomcatTomcat主要特点包括&#xff1a;…

【线程安全】关于死锁问题

文章目录 死锁的基本概念死锁的四个必要条件避免死锁避免死锁的算法死锁检测算法 死锁的基本概念 死锁是指在一组进程中的各个进程均占有不会释放的资源&#xff0c;但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。当然&#xff0c;线程之间同样也有死…