kafka快速入门+应用

Kafka, 构建TB级异步消息系统

1.快速入门

1.1 阻塞队列

        在生产线程  和  消费线程  之间起到了 , 缓冲作用,即避免CPU  资源被浪费掉

  • BlockingQueue
    • 解决  线程通信  的问题
    • 阻塞方法  put 、 take
  • 生产者、消费者 模式
    • 生产者:产生数据的线程
    • 消费者:使用数据的线程
  • 实现类
    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • PriorityBlockingQueue、SynchronousQueue、DelayQueue等

1.2 Kafaka 入门

Kafaka 简介

  • Kafaka 是一个分布式的(消息队列    发展到---》 功能更加丰富) 流媒体平台
  • 应用:消息系统、日志收集、用户行为追踪、流式处理

Kafaka 特点

  • 高吞吐量(对于硬盘的顺序读写,性能很高,高于对于内存的随机读写)
  • 消息持久化(将数据存储在硬盘中)
  • 高可靠性 (分布式集群部署,一台服务器挂了,切换到其他服务器)
  • 高扩展性(服务器不够用,则加一台服务器)

Kafaka 术语

  • Broker(集群中的每一台服务器称为一个Broker)、Zookeeper(管理集群的软件)
  • Topic(发布订阅模式,生产者把消息发送的位置-》topic)、Partition(表示对于主题位置的分区,每一个分区按照顺序往其中追加数据)、Offset(消息在分区内存在的索引)
  • Leader Replica(主副本可以提供想要读取某个分区的数据,如果主副本挂了,则从集群中选用一个从副本作为主副本) 、Follower Replica(从副本只是 备份,不负责做响应)
    • 每一个分区有多个副本,如果一个副本坏了,还有备份,提高容错率

1.3 Spring 整合 Kafka

  1. 引入依赖
    1. spring-kafka
  2. 配置Kafka
    1. 配置server
    2. 配置consumer
  3. 访问Kafka
  • 生产者
    • kafkaTemplate.send(topic, data);
  • 消费者
    • @KafkaListener(topics = {"test"})  public void handleMessage(ConsumerRecord record) {}

2.应用

2.1 安装

(1)安装包位置

链接:https://pan.baidu.com/s/1QjZdOUYdmSCSf0zjprJQIQ
提取码:9630

下载后解压缩

(2)相关配置

1

 2

2.2 启动+使用

(1)启动zookeeper
C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

(2)启动kafka
C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0>bin\windows\kafka-server-start.bat config\server.properties

 

(3)创建主题

C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-topics.bat --create --bootstrap-server localhost:9092 -replication-factor 1 --partitions 1 --topic test1

C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-topics.bat --list --bootstrap-server localhost:9092
__consumer_offsets
test
test1

(4)创建生产者

C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test1

(5)创建消费者

C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning

再次生产消息,会自动消费消息

 

2.3 spring中整合kafka

(1)配置Properties

#9. kafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
# 是否自动提交(记录)   消费者偏移量
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000

其中spring.kafka.consumer.group-id=community-consumer-group与。。。保持一致

(2)导入依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

(3)test

1.定义producer

@Component
class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String content) {
        kafkaTemplate.send(topic,content);
    }
}

2.定义consumer

@Component
class KafkaConsumer {
    @KafkaListener(topics ={"test"})
    public void handleMessage(ConsumerRecord record){
        System.out.println(record.value());
        //PHDVB干嘛呢
        //呕吼
    }
}

3.编写测试类

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {

    @Autowired
    private KafkaProducer kafkaProducer;

    @Test
    public void testKafka() {
        kafkaProducer.sendMessage("test","PHDVB干嘛呢");
        kafkaProducer.sendMessage("test","呕吼");

        // 生产者发消息是  主动的 ,消费者收消息是  被动地
        try {
            Thread.sleep(1000 * 10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/539146.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ffmpeg命令与批处理编程

(一) CMD脚本查找所有文件 powershell与cmd转换 powershell与cmd虽然同为windows命令&#xff0c;但许多命令并不通用。 CMD换行符 a 在CMD下&#xff0c;可以用^作为换行符&#xff0c;类似于Linux下的\。举例如下&#xff1a; start pemu.exe ^ -net nic,vlan1,macaddr…

华为海思校园招聘-芯片-数字 IC 方向 题目分享——第三套

华为海思校园招聘-芯片-数字 IC 方向 题目分享——第三套 (共9套&#xff0c;有答案和解析&#xff0c;答案非官方&#xff0c;未仔细校正&#xff0c;仅供参考&#xff09; 部分题目分享&#xff0c;完整版获取&#xff08;WX:didadidadidida313&#xff0c;加我备注&#x…

docker 上达梦导入dump文件报错:本地编码:PG GBK,导入女件编码:PGGB18030

解决方案&#xff1a; 第一步进入达梦数据容器内部 docker exec -it fc316f88caff /bin/bash 第二步&#xff1a;在容器中 /opt/dmdbms/bin目录下 执行命令 cd /opt/dmdbms/bin./dimp USERIDSYSDBA/SYSDBA001 FILE/opt/dmdbms/ZFJG_LJ20240407.dmp SCHEMASZFJG_LJUSERIDSYSD…

顶顶通呼叫中心中间件-回铃音补偿(mod_cti基于FreeSWITCH)

顶顶通呼叫中心中间件-回铃音补偿(mod_cti基于FreeSWITCH) 回铃音的用处 回铃音&#xff1a; 当别人打电话给你时&#xff0c;你的电话响铃了&#xff0c;而他听到的声音叫做回铃音。回铃音是被叫方向主叫方传送&#xff0c;也是彩铃功能的基础。我们平时打电话听到的“嘟 嘟…

html 引入vue Element ui 的方式

第一种&#xff1a;使用CDN的方式引入 <!--引入 element-ui 的样式&#xff0c;--> <link rel"stylesheet" href"https://unpkg.com/element-ui/lib/theme-chalk/index.css"> <!-- 必须先引入vue&#xff0c; 后使用element-ui --> <…

MyBatis 中的动态 SQL 的相关使用方法

为什么会有动态SQL&#xff0c;把SQL写死不是比较方便吗&#xff1f;其实有很多的举例&#xff0c;这里我那一个常见的来说&#xff0c;像我们用户注册&#xff0c;会有必填字段和非必填字段&#xff0c;有些传来的参数不一样&#xff0c;那对应的SQL也不一样&#xff0c;因此&…

微信小程序 uniapp+vue动漫交流系统 java(springboot+ssm)/python(flask+django)/

小程序Android端运行软件 微信开发者工具/hbuiderx uni-app框架&#xff1a;使用Vue.js开发跨平台应用的前端框架&#xff0c;编写一套代码&#xff0c;可编译到Android、小程序等平台。 前端&#xff1a;HTML5,CSS3 VUE 后端&#xff1a;java(springbootssm)/python(flaskdja…

数字化社交的引擎:解析Facebook的影响力

随着数字技术的飞速发展&#xff0c;社交网络已成为人们日常生活中不可或缺的一部分。而在这个数字化社交的世界中&#xff0c;Facebook作为最具影响力和知名度的平台之一&#xff0c;其所扮演的角色越发重要。本文将深入解析Facebook在数字化社交领域的影响力&#xff0c;并探…

独一无二:探索单例模式在现代编程中的奥秘与实践

设计模式在软件开发中扮演着至关重要的角色&#xff0c;它们是解决特定问题的经典方法。在众多设计模式中&#xff0c;单例模式因其独特的应用场景和简洁的实现而广受欢迎。本文将从多个角度详细介绍单例模式&#xff0c;帮助你理解它的定义、实现、应用以及潜在的限制。 1. 什…

VRRP(虚拟路由冗余协议)详解

VRRP-------虚拟路由冗余协议 在一个网络中&#xff0c;要做为一个合格的网络首先就要具备几种冗余&#xff0c;增加网络的可靠性。 这几种冗余分别为&#xff1a;线路冗余&#xff0c;设备冗余&#xff0c;网关冗余&#xff0c;UPS冗余 VRRP该协议就是解决网关冗余的。在二层…

【opencv】示例-imgcodecs_jpeg.cpp使用OpenCV库来创建和处理图像,并保存为不同JPEG采样因子的版本...

上层-原始图像 下层&#xff1a;编码解码后的lossy_img #include <opencv2/core.hpp> // 包含OpenCV核心功能的头文件 #include <opencv2/imgproc.hpp> // 包含OpenCV图像处理功能的头文件 #include <opencv2/imgcodecs.hpp> // 包含OpenCV图像编码解码功能…

滑动门Tab中使用Swiper造成动画不再循环了

路走的多了&#xff0c;坑也多。百度用的多了&#xff0c;就懒得用脑了。 这次案例是swiper效果&#xff0c;swiper官网或者通常的做法是&#xff0c;页面一加载就开始渲染swiper了&#xff0c;当然这个只需要傻傻的复制就行。 但是在滑动门Tab中的内容&#xff0c;还是按照之…

Linux下mysql的彻底卸载

Linux下mysql的彻底卸载 1、查看mysql的安装情况2、删除上图安装的软件3、都删除成功之后&#xff0c;查找相关的mysql的文件4、删除全部文件5、再次执行命令 1、查看mysql的安装情况 rpm -qa | grep -i mysql2、删除上图安装的软件 rpm -ev mysql-community-libs-5.7.27-1.e…

4.Labview簇、变体与类(上)

在Labview中&#xff0c;何为簇与变体&#xff0c;何为类&#xff1f;应该如何理解&#xff1f;具体有什么应用场景&#xff1f; 本文基于Labview软件&#xff0c;独到的讲解了簇与变体与类函数的使用方法和场景&#xff0c;从理论上讲解其数据流的底层概念&#xff0c;从实践上…

服务器数据恢复—不同型号服务器RAID5数据恢复策略有何不同?

RAID5作为应用最广泛的raid阵列级别之一&#xff0c;在不同型号服务器中的RAID5出现故障后&#xff0c;处理方法也不同。 RAID5阵列级别是无独立校验磁盘的奇偶校验磁盘阵列&#xff0c;采用数据分块和独立存取技术&#xff0c;能在同一磁盘上并行处理多个访问请求&#xff0c;…

取出/var/log/secure中一小时内登录失败超过三次的IP

取出/var/log/secure中一小时内登录失败超过三次的IP 前两个字段是日期&#xff0c;第三个字段是小时&#xff0c;第四个字段是IP cat /var/log/secure | sort -i | awk -F [ :] /Failed/{a[$1" "$2" "$3" "$4" "$(NF-3)]}END{for(i …

华为海思数字芯片设计笔试第五套

声明 下面的题目作答都是自己认为正确的答案&#xff0c;并非官方答案&#xff0c;如果有不同的意见&#xff0c;可以评论区交流。 这些题目也是笔者从各个地方收集的&#xff0c;感觉有些题目答案并不正确&#xff0c;所以在个别题目会给出自己的见解&#xff0c;欢迎大家讨论…

手持气象站功能介绍

TH-SQ5手持气象站是一种便携式设备&#xff0c;用于手动测量和记录气象参数&#xff0c;如温度、湿度、风速和气压。这些设备通常用于户外活动、教育和业余气象观测。以下是对机械式手持气象站的一些续写内容&#xff1a; 数据记录功能&#xff1a;虽然基本型号的机械式手持气象…

Java常用数据结构与集合

数据结构 数组&#xff1a; 内存地址连续检索效率高(可以通过下标访问成员)增删操作效率低(保证数据越界的问题,需动态扩容)长度固定&#xff0c;扩容的需要新的数组复制或者Arrays类的copyOf方法 链表 内存地址不连续查询快删除慢&#xff0c;因为需要移动指针又分双向链表…

【MoS2】应变增强的单层MoS2光电探测器

这篇文章的标题是《Strain-Enhanced Large-Area Monolayer MoS2 Photodetectors》&#xff0c;作者是Borna Radatovic等人&#xff0c;发表在《ACS Applied Materials & Interfaces》期刊的2024年第16卷。文章主要研究了应变增强的大面积单层MoS2光电探测器的性能和应用潜力…