Kafka消息服务之Java工具类

注:此内容是本人在另一个技术平台发布的历史文章,转载发布到CSDN;

Apache Kafka是一个开源分布式事件流平台,也是当前系统开发中流行的高性能消息队列服务,数千家公司使用它来实现高性能数据管道、流分析、数据集成和关键任务应用程序。
Kafka 可以很好地替代更传统的消息代理。消息代理的使用原因多种多样(将处理与数据生产者分离开来、缓冲未处理的消息等)。与大多数消息系统相比,Kafka 具有更好的吞吐量、内置分区、复制和容错能力,这使其成为大规模消息处理应用程序的良好解决方案。

Java工具类

此基于kafka客户端的工具类,提供基础的消息发送与监听功能。

pom.xml

       <!-- 集成kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.2</version>
        </dependency>

KafkaUtils.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;

/**
 * @Description kafka工具类,提供消息发送与监听
 */
public class KafkaUtils {

    /**
     * 获取实始化KafkaStreamServer对象
     * @return
     */
    public static KafkaStreamServer bulidServer(){
        return new KafkaStreamServer();
    }

    /**
     * 获取实始化KafkaStreamClient对象
     * @return
     */
    public static KafkaStreamClient bulidClient(){
        return new KafkaStreamClient();
    }

    public static class KafkaStreamServer{
        KafkaProducer<String, String> kafkaProducer = null;

        private KafkaStreamServer(){}

        /**
         * 创建配置属性
         * @param host
         * @param port
         * @return
         */
        public KafkaStreamServer createKafkaStreamServer(String host, int port){
            String bootstrapServers = String.format("%s:%d", host, port);
            if (kafkaProducer != null){
                return this;
            }
            Properties properties = new Properties();
            //kafka地址,多个地址用逗号分割
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            kafkaProducer = new KafkaProducer<>(properties);
            return this;
        }

        /**
         * 向kafka服务发送生产者消息
         * @param topic
         * @param msg
         * @return
         */
        public Future<RecordMetadata> sendMsg(String topic, String msg){
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
            Future<RecordMetadata> future = kafkaProducer.send(record);
            System.out.println("消息发送成功:" + msg);
            return future;
        }

        /**
         * 关闭kafka连接
         */
        public void close(){
            if (kafkaProducer != null){
                kafkaProducer.flush();
                kafkaProducer.close();
                kafkaProducer = null;
            }
        }
    }

    public static class KafkaStreamClient {
        KafkaConsumer<String, String> kafkaConsumer = null;
        private KafkaStreamClient(){}

        /**
         * 配置属性,创建消费者
         * @param host
         * @param port
         * @return
         */
        public KafkaStreamClient createKafkaStreamClient(String host, int port, String groupId){
            String bootstrapServers = String.format("%s:%d", host, port);
            if (kafkaConsumer != null){
                return this;
            }
            Properties properties = new Properties();
            //kafka地址,多个地址用逗号分割
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  bootstrapServers);
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            kafkaConsumer = new KafkaConsumer<String, String>(properties);
            return this;
        }

        /**
         * 客户端消费者拉取消息,并通过回调HeaderInterface实现类传递消息
         * @param topic
         * @param headerInterface
         */
        public void pollMsg(String topic, HeaderInterface headerInterface) {
            kafkaConsumer.subscribe(Collections.singletonList(topic));
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    try{
                        headerInterface.execute(record);
                    }catch(Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }

        /**
         * 关闭kafka连接
         */
        public void close(){
            if (kafkaConsumer != null){
                kafkaConsumer.close();
                kafkaConsumer = null;
            }
        }
    }


    @FunctionalInterface
    interface HeaderInterface{
        void execute(ConsumerRecord<String, String> record);
    }

    /**
     * 测试示例
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        //生产者发送消息
//        KafkaStreamServer kafkaStreamServer =  KafkaUtils.bulidServer().createKafkaStreamServer("127.0.0.1", 9092);
//        int i=0;
//        while (i<10) {
//            String msg = "Hello," + new Random().nextInt(100);
//            kafkaStreamServer.sendMsg("test", msg);
//            i++;
//            Thread.sleep(100);
//        }
//        kafkaStreamServer.close();
//        System.out.println("发送结束");

        System.out.println("接收消息");
        KafkaStreamClient kafkaStreamClient =  KafkaUtils.bulidClient().createKafkaStreamClient("127.0.0.1", 9092, "consumer-45");
        kafkaStreamClient.pollMsg("test", new HeaderInterface() {
            @Override
            public void execute(ConsumerRecord<String, String> record) {
                System.out.println(String.format("topic:%s,offset:%d,消息:%s", record.topic(), record.offset(), record.value()));
            }
        });

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/972872.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

小怿学习日记(七) | Unreal引擎灯光架构

灯光的布局对于HMI场景中车模的展示效果有着举足轻重的地位。本篇内容将简单介绍ES3.1的相关知识&#xff0c;再深入了解Unreal引擎中车模的灯光以及灯光架构。 一、关于ES3.1 1.1 什么是ES3.1 ES3.1这个概念对于美术的同学可能比较陌生&#xff0c;ES3.1指的是OpenGL ES3.1&…

DeepSeek 接入PyCharm实现AI编程!(支持本地部署DeepSeek及官方DeepSeek接入)

前言 在当今数字化时代&#xff0c;AI编程助手已成为提升开发效率的利器。DeepSeek作为一款强大的AI模型&#xff0c;凭借其出色的性能和开源免费的优势&#xff0c;成为许多开发者的首选。今天&#xff0c;就让我们一起探索如何将DeepSeek接入PyCharm&#xff0c;实现高效、智…

广度优先搜索详解--BFS--蒟蒻的学习之路

1.什么是广度优先搜索? 广度优先搜索&#xff08;Breadth-First Search&#xff0c;简称BFS&#xff09;是一种遍历或搜索树和图的算法&#xff0c;也称为宽度优先搜索&#xff0c;BFS算法从图的某个节点开始&#xff0c;依次对其所有相邻节点进行探索和遍历&#xff0c;然后再…

. Unable to find a @SpringBootConfiguration(默认软件包中的 Spring Boot 应用程序)

解决&#xff1a; 新建一个包即可 问题&#xff1a; 默认软件包中的 Spring Boot 应用程序。 原因&#xff1a; 默认包的定义 &#xff1a; 如果一个 Java 类没有使用 package 声明包名&#xff0c;则该类会被放置在默认包中。Spring Boot 遵循 Java 的包管理约定&#xff…

DeepSeek企业级部署实战指南:从服务器选型到Dify私有化落地

对于个人开发者或尝鲜者而言&#xff0c;本地想要部署 DeepSeek 有很多种方案&#xff0c;但是一旦涉及到企业级部署&#xff0c;则步骤将会繁琐很多。 比如我们的第一步就需要先根据实际业务场景评估出我们到底需要部署什么规格的模型&#xff0c;以及我们所要部署的模型&…

“三次握手”与“四次挥手”:TCP传输控制协议连接过程

目录 什么是TCP协议 “三次握手”建立连接 “四次挥手”断开连接 “三次握手”和“四次挥手”的反思 总结 什么是TCP协议 想象一下&#xff0c;你和远方的朋友要进行一场电话交流&#xff0c;但这通电话不仅仅是随便聊聊&#xff0c;而是要传递一封重要的信件。为了确保这…

网络运维学习笔记 012网工初级(HCIA-Datacom与CCNA-EI)某机构新增:GRE隧道与EBGP实施

文章目录 GRE隧道&#xff08;通用路由封装&#xff0c;Generic Routing Encapsulation&#xff09;协议号47实验&#xff1a;思科&#xff1a;开始实施&#xff1a; 华为&#xff1a;开始实施&#xff1a; eBGP实施思科&#xff1a;华为&#xff1a; GRE隧道&#xff08;通用路…

Android 动态加入Activity 时 manifest 注册报错解决。使用manifestPlaceholders 占位

需求如下&#xff1a; 项目 测试demo 有多个渠道&#xff0c;部分渠道包含支付功能&#xff0c;在主测试代码外&#xff0c;需要一个单独 Activity 调用测试代码。 MainActivityPayActivity渠道A包含不包含渠道B包含包含 因为支付功能需要引入对应的 moudule&#xff0c;因此…

【koa】05-koa+mysql实现数据库集成:连接和增删改查

前言 前面我们已经介绍了第二阶段的第1-4点内容&#xff0c;本篇介绍第5点内容&#xff1a;数据库集成&#xff08;koamysql&#xff09; 也是第二阶段内容的完结。 一、学习目标 在koa项目中正常连接数据库&#xff0c;对数据表进行增删改查的操作。 二、操作步骤 本篇文章…

linux--关于makefile

makefile文件 可以指定编译顺序&#xff0c;这样方便一个项目的多个文件要编译的挨个操作的麻烦。 makefile文件的命名&#xff1a;makefile 或者 Makefile 必须是这俩&#xff0c;系统才能识别 规则的书写语法如下&#xff1a; 一个makefile内可以有多个规则 目标:依赖a 依…

俄罗斯方块游戏完整代码示例

以下是一个基于Cocos Creator引擎开发的俄罗斯方块游戏的完整代码示例。该游戏实现了俄罗斯方块的基本功能&#xff0c;并且代码整合在单个文件中&#xff0c;无需任何外部依赖&#xff0c;可以直接在浏览器中运行。 1. 创建Cocos Creator项目 首先&#xff0c;确保你已经安装了…

学习kafka和flink

kafka kafka安装一套流程 方法一&#xff1a;启动需安装zookeeper和kafka 【Kafka】Windows下安装Kafka&#xff08;图文记录详细步骤&#xff09; 安装Tzq2018写的上面链接安装的&#xff0c;一切很顺利&#xff0c;除了zookeeper的环境变量不管如何配置都不管用&#xff0…

SLT-加载表添加字段重新刷数

1、LTRC数据提供->输入表名->停止加载/复制 2、LTRS添加表字段&#xff08;只有在加载部分字段的情况下&#xff09;&#xff1b; 在查看修改概览页将需要的字段选中并删除&#xff0c;删除的字段自动归集到已修改概览里。 3、数据提供-》输入表名-》创建/数据库视图&am…

【黑马点评优化】2-Canel实现多级缓存(Redis+Caffeine)同步

【黑马点评优化】2-Canel实现多级缓存&#xff08;RedisCaffeine&#xff09;同步 0 背景1 配置MySQL1.1 开启MySQL的binlog功能1.1.1 找到mysql配置文件my.ini的位置1.1.2 开启binlog 1.2 创建canal用户 2 下载配置canal2.1 canal 1.1.5下载2.2 配置canal2.3 启动canal2.4 测试…

【iOS】SwiftUI状态管理

State ObservedObject StateObject 的使用 import SwiftUIclass CountModel: ObservableObject {Published var count: Int 0 // 通过 Published 标记的变量会触发视图更新init() {print("TimerModel initialized at \(count)")} }struct ContentView: View {State…

组合总和力扣--39

目录 题目 思路 剪枝优化 代码 题目 给你一个 无重复元素 的整数数组 candidates 和一个目标整数 target &#xff0c;找出 candidates 中可以使数字和为目标数 target 的 所有 不同组合 &#xff0c;并以列表形式返回。你可以按 任意顺序 返回这些组合。 candidates 中的…

基于知识图谱的问答系统:后端Python+Flask,数据库Neo4j,前端Vue3(提供源码)

基于知识图谱的问答系统&#xff1a;后端PythonFlask&#xff0c;数据库Neo4j&#xff0c;前端Vue3 引言 随着人工智能技术的不断发展&#xff0c;知识图谱作为一种结构化的知识表示方式&#xff0c;逐渐成为问答系统的重要组成部分。本文将介绍如何构建一个基于知识图谱的问答…

【第四届网络安全、人工智能与数字经济国际学术会议(CSAIDE 2025】网络安全,人工智能,数字经济的研究

重要信息 会议官网&#xff1a;www.csaide.net 会议时间&#xff1a;2025年3月7-9日 会议地点&#xff1a;马来西亚-马来西亚理工大学新山校区&#xff08;线上线下混合&#xff09; 简介 过去几年&#xff0c;数字经济蓬勃发展&#xff0c;已成为全球经济增长的驱动力。…

VSCode AI提效工具,通义灵码前端开发体验

安装 安装依旧很简单&#xff0c;vs code拓展插件中搜索就出来了&#xff0c;记住下边这个图标。 亮点 新接入了deepseek-v3\deepseek-r1模型&#xff0c;不仅支持智能问答&#xff0c;而且增加了AI程序员&#xff0c;可以直接按照完成编码任务&#xff0c;修改优化代码&am…

Ansys Zemax | 使用衍射光学器件模拟增强现实 (AR) 系统的出瞳扩展器 (EPE):第 2 部分

附件下载 联系工作人员获取附件 在 OpticStudio 中使用 RCWA 工具为增强现实&#xff08;AR&#xff09;系统设置出瞳扩展器&#xff08;EPE&#xff09;的示例中&#xff0c;首先解释了 k空间中光栅的规划&#xff0c;并详细讨论了设置每个光栅的步骤。 介绍 本文提供了多…