Kafka:安装和配置

 producer:发布消息的对象,称为消息产生者 (Kafka topic producer)

topic:Kafka将消息分门别类,每一个消息称为一个主题(topic)

consumer:订阅消息并处理发布消息的对象称为消费者(consumer)

broker:已发布的消息保存在一组服务器中,称为kafka集群,集群中的每一个服务器都是一个代理(broker),消费者(consumer)可以订阅一个或者多个主题(topic),并从broker中拉取数据,从而消费这些已发布的信息。

1、Kafka对zookeeper是一个强依赖,保存Kafka相关的节点数据,所以安装kafka之前要先安装zookeeper

下载镜像

docker pull zookeeper:3.4.14

创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

下载镜像

docker pull wurstmeister/kafka:2.12-2.3.1

创建容器

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

2、入门案例

①创建kafka-demo工程并引入依赖

        <!--kafka-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>

②创建ProducerQuickStart生产者类并实现

package com.heima.kafkademo.sample;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/*生产者*/
public class ProducerQuickStart {
    public static void main(String[] args) {
        /*1、kafka配置信息*/
        Properties properties = new Properties();
        //kafka连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,5);
        //key和value的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        /*2、生产对象*/
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
        //封装发送消息的对象
        ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");

        /*3、发送消息*/
        producer.send(record);

        /*4、关闭通道,负责消息发送不成功*/
        producer.close();
    }
}

③创建ConsumerQuickStart消费者类并实现

package com.heima.kafkademo.sample;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/*消费者*/
public class ConsumerQuickStart {
    public static void main(String[] args) {
        /*1、kafka配置信息*/
        Properties properties = new Properties();
        //kafka连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,5);
        //key和value的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        /*2、消费者对象*/
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        /*3、订阅主题*/
        consumer.subscribe(Collections.singletonList("itheima-topic"));

        //当前线程处于一直监听状态
        while (true){
            //4、获取消息
            ConsumerRecords<String,String> consumerRecords =
                    consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : consumerRecords) {
                System.out.println(record.key());
                System.out.println(record.value());
            }
        }
    }
}

④运行测试

        成功接收到消息

总结

  • 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)

  • 生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)

下一篇: springboot集成kafka收发消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/71507.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

鸿鹄工程项目管理系统em Spring Cloud+Spring Boot+前后端分离构建工程项目管理系统 em

​ Java版工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离 功能清单如下&#xff1a; 首页 工作台&#xff1a;待办工作、消息通知、预警信息&#xff0c;点击可进入相应的列表 项目进度图表&#xff1a;选择&#xff08;总体或单个&#xff09;项目…

ShardingSphere简单介绍

此文章为笔记&#xff0c;为阅读其他文章的感受、补充、记录、练习、汇总&#xff0c;非原创&#xff0c;感谢每个知识分享者。 文章目录 第01章 高性能架构模式1、读写分离架构2、数据库分片架构2.1、垂直分片2.2、水平分片 3、读写分离和数据分片架构4、实现方式4.1、程序代…

【果树农药喷洒机器人】Part6:基于深度相机与分割掩膜的果树冠层体积探测方法

&#x1f4e2;&#xff1a;如果你也对机器人、人工智能感兴趣&#xff0c;看来我们志同道合✨ &#x1f4e2;&#xff1a;不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 &#x1f4e2;&#xff1a;文章若有幸对你有帮助&#xff0c;可点赞 &#x1f44d;…

计算机竞赛 GRU的 电影评论情感分析 - python 深度学习 情感分类

1 前言 &#x1f525;学长分享优质竞赛项目&#xff0c;今天要分享的是 &#x1f6a9; GRU的 电影评论情感分析 - python 深度学习 情感分类 &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&#xff1a;3分工作量&#xff1a;3分创新点&#xff1a;4分 这…

C++ STL vector 模拟实现

✅<1>主页&#xff1a;我的代码爱吃辣 &#x1f4c3;<2>知识讲解&#xff1a;C之STL &#x1f525;<3>创作者&#xff1a;我的代码爱吃辣 ☂️<4>开发环境&#xff1a;Visual Studio 2022 &#x1f4ac;<5>前言&#xff1a;上次我们已经数字会用…

Leetcode34 在排序数组中查找元素的第一个和最后一个位置

给你一个按照非递减顺序排列的整数数组 nums&#xff0c;和一个目标值 target。请你找出给定目标值在数组中的开始位置和结束位置。 如果数组中不存在目标值 target&#xff0c;返回 [-1, -1]。 你必须设计并实现时间复杂度为 O(log n) 的算法解决此问题。 代码&#xff1a; c…

若依-plus-vue启动显示Redis连接错误

用的Redis是windows版本&#xff0c;6.2.6 报错的主要信息如下&#xff1a; Failed to instantiate [org.redisson.api.RedissonClient]: Factory method redisson threw exception; nested exception is org.redisson.client.RedisConnectionException: Unable to connect t…

Vue中使用Tailwind css

1.什么是Tailwind 就是一个CSS框架&#xff0c;和你知道的bootstrap&#xff0c;element ui&#xff0c;Antd&#xff0c;bulma。一样。将一些css样式封装好&#xff0c;用来加速我们开发的一个工具。 Tailwind解释 tailwind css 中文文档 2.Vue使用Tailwind配置 1. 新建vu…

GB/T28181设备接入端如何应用到数字城管场景?

什么是数字城管&#xff1f; 数字城管&#xff0c;又称“数字化城市管理”或“智慧城管”&#xff0c;是一种采用信息化手段和移动通信技术来处理、分析和管理整个城市的所有城管部件和城管事件信息&#xff0c;促进城市管理现代化的信息化措施。 数字城管通过建立城市管理信息…

为什么String要设计成不可变的

文章目录 一、前言二、缓存hashcode缓存 三、性能四、安全性五、线程安全 一、前言 为什么要将String设计为不可变的呢&#xff1f;这个问题一直困扰着许多人&#xff0c;甚至有人直接向Java的创始人James Gosling提问过。在一次采访中&#xff0c;当被问及何时应该使用不可变…

腾讯云COS的快速接入

背景 最近在研究一个剪贴板粘贴工具&#xff0c;实现粘贴图片&#xff0c;返回可访问的地址&#xff0c;这个在我的哔哩哔哩上有出一期视频&#x1f92d;。但是&#xff0c;我发现部分博客平台不能正常的转载我的图片链接&#xff0c;于是研究了一下腾讯云的COS&#xff08;阿…

CSDN博客批量查询质量分https://yma16.inscode.cc/请求超时问题(设置postman超时时间)(接口提供者设置了nginx超时时间)

文章目录 查询链接问题请求超时原因解决谷歌浏览器超时问题办法&#xff08;失败了&#xff09;谷歌浏览器不支持设置请求超时时间&#xff08;谷歌浏览器到底有没限制请求超时&#xff1f;貌似没有限制&#xff1f;&#xff09;看能否脱离浏览器请求&#xff0c;我们查看关键代…

Java负载均衡算法实现与原理分析(轮询、随机、哈希、加权、最小连接)

文章目录 一、负载均衡算法概述二、轮询&#xff08;RoundRobin&#xff09;算法1、概述2、Java实现轮询算法3、优缺点 三、随机&#xff08;Random&#xff09;算法1、概述2、Java实现随机算法 四、源地址哈希&#xff08;Hash&#xff09;算法1、概述2、Java实现地址哈希算法…

面试笔记:Android 架构岗,一次4小时4面的体验

作者&#xff1a;橘子树 此次面试一共4面4小时&#xff0c;中间只有几分钟间隔。对持续的面试状态考验还是蛮大的。 关于面试的心态&#xff0c;保持悲观的乐观主义心态比较好。面前做面试准备时保持悲观&#xff0c;尽可能的做足准备。面后积极做复盘&#xff0c;乐观的接受最…

【MongoDB】数据库、集合、文档常用CRUD命令

目录 一、数据库操作 1、创建数据库操作 2、查看当前有哪些数据库 3、查看当前在使用哪个数据库 4、删除数据库 二、集合操作 1、查看有哪些集合 2、删除集合 3、创建集合 三、文档基本操作 1、插入数据 2、查询数据 3、删除数据 4、修改数据 四、文档分页查询 …

Selenium之css怎么实现元素定位?

世界上最远的距离大概就是明明看到一个页面元素站在那里&#xff0c;但是我却定位不到&#xff01;&#xff01; Selenium定位元素的方法有很多种&#xff0c;像是通过id、name、class_name、tag_name、link_text等等&#xff0c;但是这些方法局限性太大&#xff0c; 随着自动…

PS透明屏,在科技展示中,有哪些优点展示?

PS透明屏是一种新型的显示技术&#xff0c;它将传统的显示屏幕与透明材料相结合&#xff0c;使得屏幕能够同时显示图像和透过屏幕看到背后的物体。 这种技术在商业展示、广告宣传、产品展示等领域有着广泛的应用前景。 PS透明屏的工作原理是利用透明材料的特性&#xff0c;通…

旷视科技AIoT软硬一体化走向深处,生态和大模型成为“两翼”?

齐奏AI交响曲的当下&#xff0c;赛道玩家各自精彩。其中&#xff0c;被称作AI四小龙的商汤科技、云从科技、依图科技、旷视科技已成长为业内标杆&#xff0c;并积极追赶新浪潮。无论是涌向二级市场还是布局最新风口大模型&#xff0c;AI四小龙谁都不甘其后。 以深耕AIoT软硬一…

机器学习-自定义Loss函数

1、简介 机器学习框架中使用自定义的Loss函数&#xff0c; 2、应用 &#xff08;1&#xff09;sklearn from sklearn.metrics import max_error from sklearn.metrics import make_scorer from sklearn.model_selection import cross_val_score from sklearn.linear_model …

腾讯云标准型CVM云服务器详细介绍

腾讯云CVM服务器标准型实例的各项性能参数平衡&#xff0c;标准型云服务器适用于大多数常规业务&#xff0c;例如&#xff1a;web网站及中间件等&#xff0c;常见的标准型云服务器有CVM标准型S5、S6、SA3、SR1、S5se等规格&#xff0c;腾讯云服务器网来详细说下云服务器CVM标准…