Kafka的安装及接入SpringBoot

环境:windows、jdk1.8、springboot2

Apache KafkaApache Kafka: A Distributed Streaming Platform.icon-default.png?t=N7T8https://kafka.apache.org/

1.概述

        Kafka 是一种高性能、分布式的消息队列系统,最初由 LinkedIn 公司开发,并于2011年成为 Apache 顶级项目。它设计用于处理大规模的实时数据流,具有高吞吐量、低延迟、持久性等特点,被广泛应用于构建实时数据管道、日志收集、事件驱动架构等场景。

        详细概述见Kafka概述:

1.1 Kafka的作用

  • 发布和订阅记录流
  • 持久存储记录流,Kafka中的数据即使消费后也不会消失
  • 在系统或应用之间构建可靠获取数据的实时流数据管道
  • 构建转换或响应数据流的实时流应用程序
  • Kafka可以处理源源不断产生的数据

1.2 Kafka的一些概念

  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic 就是Rabbitmq中的queue)

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)

  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)

  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

2.Kafka下载安装

Apache KafkaApache Kafka: A Distributed Streaming Platform.icon-default.png?t=N7T8https://kafka.apache.org/downloads        选择最新版就可以

2.1 配置kafka

        解压下载的文件,修改 config 文件夹下的 zookeeper.properties

        修改 config 文件夹下的 server.properties

        当需要外网访问时要配置advertised.listeners(比如连云服务器的kafka)

advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092

 

2.2 启动 zookeeper

        Zookeeper 在 Kafka 中充当了分布式协调服务的角色,帮助 Kafka 实现了集群管理、元数据存储、故障恢复、领导者选举等功能,是 Kafka 高可用性、可靠性和分布式特性的重要支撑。

        kafka_2.13-3.7.0\bin\windows文件夹中输入命令:

zookeeper-server-start.bat ../../config/zookeeper.properties

        可以本地访问看一下:http://localhost:2181/ 

2.3 启动Kafka 

        kafka_2.13-3.7.0\bin\windows文件夹中输入命令:

kafka-server-start.sh ../../config/server.properties

        访问路径: http://localhost:9092/ 

2.4 便捷启动脚本

        两个脚本放到Kafka的目录(kafka_2.13-3.7.0)中

cd bin\windows

zookeeper-server-start.bat ../../config/zookeeper.properties

cd bin\windows

kafka-server-start.bat ../../config/server.properties

3.springboot集成Kafka

3.1 环境搭建

(1)添加pom依赖

<!-- 继承Spring boot工程 -->
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.8.RELEASE</version>
</parent>
<properties>
    <fastjson.version>1.2.58</fastjson.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- kafkfa -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

(2)配置类application.yml

        生产者:

spring:
  kafka:
    bootstrap-servers: xxx.xxx.xxx.xxx:9092
    producer:
      retries: 0
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

        消费者:

spring:
  kafka:
    bootstrap-servers: xxx.xxx.xxx.xxx:9092
    consumer:
      group-id: kafka-demo-kafka-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

(3)启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
 
@SpringBootApplication
public class KafkaApp {
    public static void main(String[] args) {
        SpringApplication.run(KafkaApp.class, args);
    }
}

3.2 消息生产者

        junit测试,新建消息发送方

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;
​
​
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaSendTest {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate; //如果这里有红色波浪线,那是假错误
​
    @Test
    public void sendMsg(){
        String topic = "spring_test";
        kafkaTemplate.send(topic,"hello spring boot kafka!");
        System.out.println("发送成功.");
        while (true){ //保存加载ioc容器
​
        }
    }
}

3.3 消息消费者

        新建监听类:

​
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
​
@Component
public class MyKafkaListener {
​
 //    以下两种方法都行
    
 // 指定监听的主题
//    @KafkaListener(topics = "spring_test")
//    public void receiveMsg(String message){
//        System.out.println("接收到的消息:"+message);
//    }
​
 
    @KafkaListener(topics = "spring_test")
    public void handleMessage(ConsumerRecord<String, String> record) {
        System.out.println("接收到消息,偏移量为: " + record.offset() + " 消息为: " + record.value());
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/616587.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Datax数据采集

一、Datax介绍 官网&#xff1a; DataX/introduction.md at master alibaba/DataX GitHub DataX 是阿里云 DataWorks数据集成 的开源版本&#xff0c;在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。 DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、…

# ERROR: node with name “rabbit“ already running on “MS-ITALIJUXHAMJ“ 解决方案

ERROR: node with name “rabbit” already running on “MS-ITALIJUXHAMJ” 解决方案 一、问题描述&#xff1a; 1、启动 rabbitmq-server.bat 服务时&#xff0c;出错 Error 2、查询 rabbitmqctl status 状态时&#xff0c;出错 Error 3、停止 rabbitmqctl stop 服务时&a…

探索人类意识的多样性:从安全感到语感、节奏感的差异

在我们的日常生活中&#xff0c;人类意识表现出多种多样的特点&#xff0c;这些特点往往与个体的天生禀赋和生活经历密切相关。从安全感到语感、节奏感&#xff0c;每个人的表现都有所不同。今天&#xff0c;让我们一起来探索这些差异&#xff0c;感受人类意识的多样性。 首先&…

指针(5)加油吧

指针&#xff08;5&#xff09; 拿冒泡排序来举例&#xff1a; 1 .qsort void qsort (void* base,//base指向待排序数组的首元素的指针size_t num,//base指向数组中元素的个数size_t size,//base指向的数组中的一个元素的大小&#xff0c;单位是字节int(*cmp)(const void*,co…

通过mvn archetype 创建一个spring boot start 工程

mvn archetype https://maven.apache.org/archetype/index.html 遇到的问题 对于想自定义一个spring-boot-start的同学,比如 Springboot自定义Starter启动器 整个过程很繁琐。 定义属性开关增加 spring boot test start插件定义自动装载 spring.factories or org.springfra…

类加载机制(双亲委派机制)

文章目录 JVM的作用是什么双亲委派机制加载流程 JVM的作用是什么 我们运行Java程序时&#xff0c;要安装JDK&#xff0c;JDK包含JVM&#xff0c;不同环境的JDK都是不同的。 Java 代码在编译后会形成 class 的字节码文件&#xff0c;该字节码文件通过 JVM 解释器&#xff0c;生…

【牛客】SQL206 获取每个部门中当前员工薪水最高的相关信息

1、描述 有一个员工表dept_emp简况如下&#xff1a; 有一个薪水表salaries简况如下&#xff1a; 获取每个部门中当前员工薪水最高的相关信息&#xff0c;给出dept_no, emp_no以及其对应的salary&#xff0c;按照部门编号dept_no升序排列&#xff0c;以上例子输出如下: 2、题目…

7.STL_string(详细)

1. 什么是STL STL(standard template libaray-标准模板库)&#xff1a;是C标准库的重要组成部分&#xff0c;不仅是一个可复用的组件库&#xff0c;而且 是一个包罗数据结构与算法的软件框架。 2. STL的版本 原始版本 Alexander Stepanov、Meng Lee 在惠普实验室完成的原始版…

[单机]成吉思汗3_GM工具_VM虚拟机

稀有端游成吉思汗1,2,3单机版虚拟机一键端完整版 本教程仅限学习使用&#xff0c;禁止商用&#xff0c;一切后果与本人无关&#xff0c;此声明具有法律效应&#xff01;&#xff01;&#xff01;&#xff01; 教程是本人亲自搭建成功的&#xff0c;绝对是完整可运行的&#x…

校园管理系统,基于 SpringBoot+Vue+MySQL 开发的前后端分离的校园管理系统设计实现

目录 一. 前言 二. 功能模块 2.1. 管理员功能模块 2.2. 用户功能模块 2.3. 院校管理员功能模块 三. 部分代码实现 四. 源码下载 一. 前言 随着科学技术的飞速发展&#xff0c;社会的方方面面、各行各业都在努力与现代的先进技术接轨&#xff0c;通过科技手段来提高自身…

[240512] x-cmd 发布 v0.3.6: (se,wkp,ddgo...)x( kimi,gemini,gpt...)

目录 x-cmd 发布 v0.3.6新增了 jina 模块新增了 ddgo 模块新增了 se 模块wkp 模块新增了 writer 模块cosmo 模块 x-cmd 发布 v0.3.6 本次版本的最新引入的功能都是目的为了进一步探索 LLM 的使用。 本版本的改进分为两类&#xff1a;资讯类模块&#xff08;Wikipedia&#xf…

十、Redis内存回收策略和机制

1、Redis的内存回收 在Redis中可以设置key的过期时间&#xff0c;以期可以让Redis回收内存&#xff0c;循环使用。在Redis中有4个命令可以设置Key的过期时间。分别为 expire、pexpire、expireat、pexpireat。 1.1、expire expire key ttl&#xff1a;将key的过期时间设置为tt…

web前端之sass中的颜色函数、active按钮激活、hover鼠标悬浮、disabled禁用、scss循环、css

MENU 效果图htmlsassscss编译后的css页面css 效果图 注意查看蓝色按钮。 html <div class"box"><button class"btn type_1">按钮</button><button class"btn type_2">按钮</button><button class"btn ty…

机器学习(1)

目录 1-1.西瓜书 1-2.课程定位 1-3.机器学习 1-4.典型的机器学习过程 1-5.机器学习理论 1-6.基本术语 1-7.归纳偏好 1-8.NFL定理 1-1.西瓜书 建议使用方式 1.初学机器学习的第一本书:通读、速读;细节不懂处略过&#xff0c;了解机器学习的疆域和基本思想&#xff0c;…

龙迅LT8911EXB MIPIDSI/CSI桥接到EDP点屏,支持EDP1.4

龙迅LT8911EXB描述&#xff1a; Lontium LT8911EXB是MIPIDSI/CSI到eDP转换器&#xff0c;单端口MIPI接收器有1个时钟通道和4个数据通道&#xff0c;每个数据通道最大运行2.0Gbps&#xff0c;最大输入带宽为8.0Gbps。转换器解码输入MIPI RGB16/18/24/30/36bpp、YUV422 16/20/24…

显卡、显卡驱动、CUDA、cuDNN、CUDA Toolkit、NVCC、nvidia-smi等概念的区别与联系

在科技日新月异的今天&#xff0c;显卡、显卡驱动、CUDA、cuDNN、CUDA Toolkit、NVCC、nvidia-smi等术语已经成为了科技领域的重要组成部分。本文旨在阐述这些术语之间的区别与联系&#xff0c;帮助您更好地理解它们在技术生态系统中的作用。 一、显卡 显卡&#xff0c;也称为…

旅游推荐管理系统(小组项目)

文章目录 前言 一、项目介绍 1. 项目目的 2. 项目意义 2.1 提升旅游体验 2.2 促进旅游业发展 2.3 数据积累与分析 2.4 提升服务品质 2.5 优化资源配置 二、项目结构 1. 主要使用的技术 1.1 若依&#xff08;Ruoyi&#xff09;框架 1.2 Vue.js框架 1.3 Ajax 1.4 …

vivado仿真readmemb函数相对路径

目前常用的vivado工程的结构如下所示 prj-name|-xxx|-prj.sim|-sim_1|-behav|-modelsim|-tb_prj.do|-xsim|-prj.srcs|-sim_1|-new|-tb_prj.v|-tb_prj_mem.txt一般来说我们创建的testbench文件和新建的txt文件都会放在srcs->sim_1->new这个路径下面&#xff0c;但是我们在…

谷歌最强AI——Gemini免费使用2个月教程,性能抗衡GPT4

谷歌最强AI——Gemini采用的是Ultra 1.0大模型&#xff0c;功能非常强大&#xff0c;媲美GPT-4&#xff01;谷歌用户只需要绑定虚拟卡&#xff0c;就可以免费使用2个月&#xff01; 谷歌昨夜官宣四项AI新进展&#xff01; 1、最大、功能最强的大模型版本Gemini Ultra 1.0全面…

vs2019 - LoadLibrary失败时的排查方法

文章目录 vs2019 - LoadLibrary失败时的排查方法概述笔记用GetLastError()看错误原因隐式调用DLL接口看错误原因总结END vs2019 - LoadLibrary失败时的排查方法 概述 在做从内存载入DLL的实验&#xff0c;发现从内存载入DLL失败。 昨天还是成功的。昨天将工程归档了&#xff…