Kafka的配置和使用

目录

1.服务器用docker安装kafka

2.springboot集成kafka实现生产者和消费者


1.服务器用docker安装kafka

        ①、安装docker(docker类似于linux的软件商店,下载所有应用都能从docker去下载)

                a、自动安装 

curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun

                b、启动docker

sudo systemctl start docker

                c、 通过运行hello-world镜像来验证是否正确安装了Docker Engine-Community。

// 拉取镜像

sudo docker pull hello-world

// 执行

hello-world sudo docker run hello-world

                 d、安装成功

         ②、zookeeper

                a、docker search zookeeper

                b、docker pull zookeeper

        ③、安装kafka

                a、docker search kafka

                b、docker pull wurstmeister/kafka

        ④、运行zookeeper

                a、docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper

        ⑤、运行kafka

                a、 docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=42.194.238.131:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://42.194.238.131:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka

                b、参数说明

参数说明:

-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己

-e KAFKA_ZOOKEEPER_CONNECT=172.21.10.10:2181/kafka 配置zookeeper管理kafka的路径172.21.10.10:2181/kafka

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.21.10.10:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口

-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

        ⑥、检验kafka是否可以使用

docker exec -it kafka bash

cd /opt/kafka_2.13-2.8.1/

cd bin

                a、运行kafka生产者并发送消息

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

                b、在开一个页面,运行kafka消费者发送消息

 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

        ⑦、结果是这个样子的

 

         ⑧、每条消息都有一个主题,消费者指定监听哪个主题的消息,如果进来消息队列的是我们指定监听的主题,就消费,否则不消费(topic这里指定的生产和消费的主题)

        ⑨、消费者宕掉了,生产者接着发,消息不会丢,消费者重启之后会重新接收到宕机之后发的所有消息

2.springboot集成kafka实现生产者和消费者

        ①、在pom中创建依赖

<dependency>

        <groupId>org.springframework.kafka</groupId>

        <artifactId>spring-kafka</artifactId>

         <version>2.7.8</version>

</dependency>

        ②、配置kafka

                a、在 application.yml 文件中添加以下配置:(注:yml中两个相同名字的会报错,比如两个spring)

spring:

         kafka:

                #自己的kafka所在的ip地址和端口号

                 bootstrap-servers: localhost:9092

                 consumer:

                 #一个group-id代表一个消费组,一个消息可以被几个消费组消费

                    group-id: my-group

                    auto-offset-reset: earliest

                producer: #序列化

                    value-serializer: org.apache.kafka.common.serialization.StringSerializer

                    key-serializer: org.apache.kafka.common.serialization.StringSerializer

        b、创建一个生产者

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

sendMessage 方法,用于发送消息到 Kafka。

@RestController
public class KafkaController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping("/send")
    public void sendMessage(@RequestBody String message) {
        kafkaTemplate.send("my-topic", message);
    }

}

        c、 创建一个消费者

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

@KafkaListener 注解声明了一个消费者方法,用于接收从 

my-topic 主题中读取的消息

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-group-id")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/59076.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

有哪些好用的AI绘画网站?

随着人工智能技术的发展&#xff0c;人工智能绘画工具逐渐成为数字艺术领域的热门话题。人工智能绘画工具是利用深度学习和其他技术来模拟绘画过程和效果的工具&#xff0c;可以帮助用户快速创作高质量的艺术作品。除了Midjourney、除了openai等流行的AI绘画工具外&#xff0c;…

使用WiFi测量仪进行机器人定位的粒子过滤器研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

K3s vs K8s:轻量级对决 - 探索替代方案

在当今云原生应用的领域中&#xff0c;Kubernetes&#xff08;简称K8s&#xff09;已经成为了无可争议的领导者。然而&#xff0c;随着应用规模的不断增长&#xff0c;一些开发者和运维人员开始感受到了K8s的重量级特性所带来的挑战。为了解决这一问题&#xff0c;一个名为K3s的…

python 常见数据类型和方法

不可变数据类型 不支持直接增删改 只能查 str 字符串 int 整型 bool 布尔值 None None型特殊常量 tuple 元组(,,,)回到顶部 可变数据类型&#xff0c;支持增删改查 list 列表[,,,] dic 字典{"":"","": ,} set 集合("",""…

「Qt」常用事件介绍

&#x1f514; 在开始本文的学习之前&#xff0c;笔者希望读者已经阅读过《「Qt」事件概念》这篇文章了。本文会在上篇文章的基础上&#xff0c;进一步介绍 Qt 中一些比较常用的事件。 0、引言 当我们想要让控件收到某个事件时做一些操作&#xff0c;通常都需要重写相应的事件处…

Python---Numpy

文章目录 1.Numpy是什么&#xff1f;2.ndarray2.1 什么是ndarray?2.2 ndarray的属性2.3 ndarray的类型 3.Numpy基本操作3.1 生成0或1的数组3.2 从现有数组生成数组拓展&#xff1a;浅拷贝和深拷贝 3.3 生成固定范围的数组3.4 生成随机数组3.4.1 正态分布3.4.2 均匀分布 3.5 形…

5款无广告的超实用软件,建议收藏!

​ 大家好,我又来了,今天向大家推荐几款软件,它们有个共同的特点,就是无广告、超级实用,大家看完之后,可以自己去搜索下载试用。 1.重复文件清理——Duplicate Cleaner ​ Duplicate Cleaner是一款用于找出硬盘中重复文件并删除的工具。它可以通过内容或文件名查找重复文档、…

多语言gRPC开发入门与避坑指南

目录 gRPC相关介绍 什么是gPRC gPRC的优点 gPRC的缺点 gPRC定位 协议缓冲区&#xff08;Protocol Buffers&#xff09; 四种调用方式 gRPC开发三大步骤 第一步&#xff1a;定义和编写proto服务文件 第二步&#xff1a;proto文件转化为gRPC代码 第三步&#xff1a;调…

IDEA中maven项目失效,pom.xml文件橙色/橘色

IDEA中maven项目失效&#xff0c;pom.xml文件橙色/橘色 IDEA中Maven项目失效 IDEA中创建的maven项目中的文件夹都变成普通格式&#xff0c;pom.xml变成橙色 右键点击橙色的pom.xml文件&#xff0c;选择add as maven project maven项目开始重新导入相应依赖&#xff0c;恢复…

QT图形视图系统 - 使用一个项目来学习QT的图形视图框架 - 终篇

QT图形视图系统 - 终篇 接上一篇&#xff0c;我们需要继续完成以下的效果&#xff1b; 先上个效果图&#xff1a; 修改背景&#xff0c;使之整体适配 上一篇我们绘制了标尺&#xff0c;并且我们修改了放大缩小和对应的背景&#xff0c;整体看来&#xff0c;我们的滚动条会和…

单篇笔记曝光248万+,素颜、寸头…小红书女性种草新趋势分析!

最近&#xff0c;小红书上刮起一阵素颜、寸头&#xff0c;拒绝美丽绑架的风潮&#xff0c;他们称之为“脱美役”&#xff0c;即脱离美丽枷锁&#xff0c;做自己&#xff0c;接纳原本的自己。这是女性觉醒的又一阵风&#xff0c;品牌要如何跟上这波种草新趋势呢&#xff1f; 单篇…

村田授权代理:共模扼流线圈针对汽车专用设备高频噪声的降噪对策

车载市场正不断扩充ADAS、自动驾驶、V2X、车载信息系统等的应用。由于此类应用要处理庞大的信息&#xff0c;因此为了执行处理&#xff0c;内部处理信号的处理速度亦不断高速化。另一方面&#xff0c;由于部件数量增多&#xff0c;安装密度增大&#xff0c;因此要求部件小型化。…

JVM之垃圾回收器

1.如何判断对象可以回收 1.1 引用计数法 什么是引用计数器法 在对象中添加一个引用计数器&#xff0c;每当有一个地方引用它时&#xff0c;计数器值就加一&#xff1b;当引用失效时&#xff0c;计数器值就减一&#xff1b;任何时刻计数器为零的对象就是不可能再被使用的。 …

芯旺微冲刺IPO,车规级MCU竞争白热化下的“隐忧”凸显

在汽车智能化和电动化发展带来的巨大蓝海市场下&#xff0c;产业链企业迎来了一波IPO小高潮。 日前&#xff0c;上海芯旺微电子技术股份有限公司&#xff08;以下简称“芯旺微”&#xff09;在科创板的上市申请已经被上交所受理&#xff0c;拟募资17亿元&#xff0c;用于投建车…

【如何提高在浏览器时的专注力!去广告和新闻!】如何使用浏览器时看不到任何新闻以及广告【浏览器去除广告和新闻】

如何使用浏览器时看不到任何新闻以及广告 1. 使用chrome浏览器或者其他浏览器都可以2. 使用bing搜索3. 去广告 1. 使用chrome浏览器或者其他浏览器都可以 2. 使用bing搜索 bing 3. 去广告

【bug】记录一次使用Swiper插件时loop属性和slidersPerView属性冲突问题

简言 最近在vue3使用swiper时&#xff0c;突然发现loop属性和slides-per-view属性同时存在启用时&#xff0c;loop生效&#xff0c;下一步只能生效一次的bug&#xff0c;上一步却是好的。非常滴奇怪。 解决过程 分析属性是否使用错误。 loop是循环模式&#xff0c;布尔型。 …

Qt展示动态波形

Qt展示动态波形 需求描述成品展示实现难点Qt多线程 需求描述 接入串口&#xff0c;配置串口顺序进行接收数据&#xff1b;数据分成两个串口分别传入&#xff0c;使用多线程并发接入&#xff1b;时域数据有两个通道&#xff08;I&#xff0c;Q&#xff09;&#xff0c;分别以实…

【汇总】解决Ajax请求后端接口,返回ModelAndView页面不跳转

【汇总】解决Ajax请求后端接口&#xff0c;返回ModelAndView不跳转 问题发现问题解决方法一&#xff1a;直接跳转到指定URL&#xff08;推荐&#xff09;方法二&#xff1a;将返回的html内容&#xff0c;插入到页面某个元素中方法三&#xff1a;操作文档流方法四&#xff1a;使…

Windows terminal 添加 git bash 解决git中文乱码显示问题

Windows terminal 添加 git bash 解决git中文乱码显示问题 在 windows terminal 中配置git 说明&#xff1a; 点击箭头选择设置 说明&#xff1a; 点击"添加新配置文件"配置名称命令行&#xff0c;可执行文件的具体语句 C:\Program Files\Git\bin\bash.exe启动目录…

什么是注意力机制?注意力机制的计算规则

我们观察事物时&#xff0c;之所以能够快速判断一种事物(当然允许判断是错误的)&#xff0c;是因为我们大脑能够很快把注意力放在事物最具有辨识度的部分从而作出判断&#xff0c;而并非是从头到尾的观察一遍事物后&#xff0c;才能有判断结果&#xff0c;正是基于这样的理论&a…