Kafka应用Demo: 抽取消费者公共处理代码并利用redis实现多消费者实例负载分担

问题描述

在项目中使用消息中间件,主要为实现两个目的:

  1. 任务排队:当请求过多时,消费端无法同时处理,需要排队等待。这一点kafka采用的是"拉取消息"的模式,自然支持。
  2. 负载分担: 这里的负载负担不是指Kafka本身的横向扩容,而是指在任务量过大时可以通过增加消费者实例,提升效率。

下面重点分析一下,增加消费者实例以提升系统处理效率的问题。

简要分析

 Kafka消费者订阅消息可以通过主题订阅,也可以指定分区订阅。如果是通过主题订阅,消费者将获取该主题下所有分区的消息。如果是指定分区订阅,消费者只能获取该分区下的消息。

  1. 按主题订阅的模式。同一分组的消费者实例中只会有一个实例收到消息,其它实例处于空转状态,会浪费资源,无法提升效率。当处理任务的实例挂了后,服务再平衡,另外一个消费者实例才会接手继续处理任务。
  2. 按分区定义的模式。消费者只能接收到所订阅分区的消息。如果有多个消费者实例订阅同一个分区,它们将收到相同的消息,这相当于广播, 如果不做特殊处理会出现消息多次消费的情况(这种订阅方式,官方文档要求的是配置不同的分组ID,否则会导致提交冲突)。

 按分区订阅的方式会增加生产者发送消息的处理复杂度,发送消息时需要知道哪些消息放在哪个分区能被正确消费。后面如果增加了分区和主题,生产者和消费者都需要做较大的改动。

 所以,初步考虑还是走按主题订阅消息这条路。生产者发送消失时只需要指定topic, 消费者订阅也简单。在这种模式下,有两点需要处理:

  1. 多个消费者实例为实现负载分担,需要配置不同的组ID,这样可以收到相同的消息(相当于广播)。
  2. 为避免同一个消息在多个消费者实例中重复处理,需要做一些互斥。这个可以考虑用redis来做。

生产者代码样例

import com.alibaba.fastjson.JSON;
import com.elon.base.constant.KafkaTopicConst;
import com.elon.base.model.BIReportTask;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;


public class KafkaProducerService {
    private KafkaProducer<String, String> producer = null;

    public KafkaProducerService() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.5.128:9092");
        props.put("acks", "0");
        props.put("group.id", "1111");
        props.put("retries", "2");
        props.put("partitioner.class", NeoPartitioner.class);
        //设置key和value序列化方式
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);

        //生产者实例
        producer = new KafkaProducer<>(props);
    }

    /**
     * 外部调用的发消息接口
     */
    public void sendMessage() {
        for (int i = 0; i < 20; ++i) {
            BIReportTask task = new BIReportTask();
            task.setPath("d:/temp");
            task.getParamMap().put("value", String.valueOf(i));
            ProducerRecord<String, String> record = new ProducerRecord(KafkaTopicConst.ELON_TOPIC, JSON.toJSONString(task));
            producer.send(record);
        }
    }

 生产者发送消息处理比较简单,创建ProducerRecord只需要指定Topic和Value。如果对消息处理有顺序要求,可以指定一个消息键。

消费者公共处理代码

 下面的公共处理逻辑代码可以写到一个公共的common库中,编译成jar包。谁需要订阅处理kafka消息,引入依赖即可。

1. 定义一个公共的Task模型

import lombok.Getter;
import lombok.Setter;

import java.util.Date;

/**
 * 异步任务模型基类. 定义模型公共属性
 *
 * @author neo
 * @since 2024-05-14
 */
@Getter
@Setter
public class TaskBase {
    // 任务唯一ID标识, 用UUID
    private String taskId = "";

    // 任务编码(任务类别)
    private String taskCode = "";

    // 创建人
    private String createUser = "";

    // 创建时
    private Date createTime = null;

    // 修改人
    private String updateUser = "";

    // 修改时间
    private Date updateTime = null;

    public TaskBase() {

    }

    public TaskBase(String taskId, String taskCode) {
        this.taskId = taskId;
        this.taskCode = taskCode;
    }
}

 这里面的taskId是任务唯一的UUID标识。 taskCode是任务的类别,可以定义为常量,用于分区不同的任务。

2. 定义消息处理抽象类

import lombok.Getter;

/**
 * 任务处理类。派生出各子类实现具体的处理逻辑
 *
 * @author neo
 * @since 2024-05-14
 */
public abstract class TaskHandler {
    // 任务编码
    @Getter
    private final String taskCode;

    /**
     * 任务处理接口
     *
     * @param taskJson 任务对象JSON串
     */
    public abstract void handle(String taskJson);

    protected TaskHandler(String taskCode) {
        this.taskCode = taskCode;
    }
}

 所有消费者都需要从该类继承,并显示handle接口,实际的业务处理逻辑在子类的handle方法中去实现。

3. 消费者服务处理类

3.1核心逻辑代码

import com.alibaba.fastjson.JSON;
import com.elon.base.model.TaskBase;
import com.elon.base.util.StringUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * Kafka消费者服务类。订阅接收消息, 再转给具体的业务处理类处理
 *
 * @author neo
 * @since 2024-5-14
 */
@Component
public class KafkaConsumerService {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);

    @Value("${neo.application_name:}")
    private String applicationName;

    // Kafka分区器连接
    @Value("${neo.kafka.bootstrap.servers:}")
    private String kafkaServer;

    // Kafka组ID
    @Value("${neo.kafka.group.id:}")
    private String kafkaGroupId;

    // 最大一次拉取的消息数量
    @Value("${neo.kafka.max.poll.records:1}")
    private int maxPollRecords;

    @Value("${neo.kafka.topics}")
    private List<String> topics;

    @Value("${neo.redis.ip:}")
    private String redisIp;

    @Value("${neo.redis.port:}")
    private int redisPort;

    // 消费者
    private KafkaConsumer consumer = null;

    // 任务处理器. Map<任务编码, 任务处理器>
    private Map<String, TaskHandler> handlerMap = new HashMap<>();

    /**
     * 注册任务处理器
     *
     * @param handler 任务处理器
     */
    public void registerHandler(TaskHandler handler) {
        handlerMap.put(handler.getTaskCode(), handler);
    }

    /**
     * 初始化消费者实例. 订阅主题消息
     */
    public void initKafkaConsumer() {
        LOGGER.info("Subscribe message. kafkaServer:{}|kafkaGroupId:{}|maxPollRecords:{}|topics:{}",
                kafkaServer, kafkaGroupId, maxPollRecords, topics);

        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaServer);  // 指定 Broker
        properties.put("group.id", kafkaGroupId);              // 指定消费组群 ID
        properties.put("max.poll.records", maxPollRecords);
        properties.put("enable.auto.commit", "false");
        properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象
        properties.put("value.deserializer", StringDeserializer.class);  // 将 value 的字节数组转成 Java 对象

        consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(topics);  // 订阅主题

        new Thread(this::handleMessage).start();
    }

    /**
     * 从Kafka获取消息,传给相应的处理器处理.
     */
    public void handleMessage() {
        Jedis jedisClient = getJedisClient();
        while (true) {
            synchronized (this) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                LOGGER.info("Fetch record num:{}", records.count());
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        handleSingleMessage(jedisClient, record);
                    } catch (Exception e) {
                        LOGGER.error("Handle message fail. Topic:{}|Partition:{}|Offset:{}|Key:{}|Message:{}",
                                record.topic(), record.partition(), record.offset(), record.key(), record.value());
                    }
                }

                // 提交任务,更新offset
                consumer.commitSync();
            }
        }
    }

    /**
     * 处理单个消息.
     *
     * @param jedisClient redis客户端接口
     * @param record kafka消息记录
     * @author neo
     */
    private void handleSingleMessage(Jedis jedisClient, ConsumerRecord<String, String> record) {
        TaskBase taskBase = JSON.parseObject(record.value(), TaskBase.class) ;
        if (!handlerMap.containsKey(taskBase.getTaskCode())) {
            return;
        }

        // 判断同一个任务是否已经有其它实例在处理
        String taskKey = "Task_" + taskBase.getTaskId();
        String handleAppName = jedisClient.getSet(taskKey, applicationName);

        // 设置过期时间只是为了方便自动清除redis中的数据。在实际项目中,任务数据是非常重要的,往往需要持久化到数据库
        jedisClient.expire(taskKey, 60 * 60);
        if (!StringUtil.isEmpty(handleAppName)) {
            jedisClient.set(taskKey, handleAppName, new SetParams().px(1000 * 60 * 60));
            LOGGER.info("Task:{} completed. Handle app name:{}", taskBase.getTaskId(), handleAppName);
            return;
        }

        // 将消息分发给具体的handler类处理
        LOGGER.info("Handle message. Topic:{}|Partition:{}|Offset:{}|Key:{}|Message:{}",
                record.topic(), record.partition(), record.offset(), record.key(), record.value());
        TaskHandler handler = handlerMap.get(taskBase.getTaskCode());
        handler.handle(record.value());
    }

    public Jedis getJedisClient() {
        Jedis jedis = new Jedis(redisIp, redisPort);
        return jedis;
    }

这里面有几个重要的方法:

  1. registerHandler: 消息处理类注册接口。消费者程序在继承实现了的该接口后,可以将子类实例注册过来。
  2. initKafkaConsumer: 这里面主要是按topic订阅消息的代码。
  3. handleMessage: 从Kafka服务器拉去消息并处理。
  4. handleSingleMessage: 处理单条消息。核心代码。

 在handleSingleMessage方法中,首先会去查询handlerMap中是否有Task Code对应的处理器。因消息是按Topic广播过来的,肯定会有当前消费者不需要处理的消息(其它消费者订阅的)。然后, 在处理任务前会先将taskId通过jedis的getSet方法存储到redis,同时检测是否有其它消费者已经处理了该任务(已处理,则不再重复处理)。

 注:这个地方因需要保证操作的原子性 用了jedis的getSet方法,实际上还是有瑕疵,第二次调用时会修改旧的数据。应该还可以通过lua脚本做一个好一些的处理(后续再处理)。

3.2 yml配置样例

在这里插入图片描述

消费者定制样例代码

 定制代码是开发过程中每个消费者根据业务需要,结合需要消费的具体消息增加的业务处理代码。下面以虚构的一个生成BI业务报表为例,简单说明一下。

1. 定义任务模型

/**
 * 生成BI报告的任务模型。不同的任务可以根据需要定义不同的模型
 *
 * @author neo
 * @since 2024-05-14
 */
@Getter
@Setter
public class BIReportTask extends TaskBase {
    // 存放报告的路径
    private String path;

    // 参数
    private Map<String, String> paramMap = new HashMap<>();

    public BIReportTask() {
        super(UUID.randomUUID().toString(), TaskCodeConst.KAFKA_REPORT_EXPORT_BI_REPORT);
    }
}

 根据业务需要传递的参数而定义,和Task Code关联。生产者和消费者约定使用同一模型。

2. 具体消息处理类

import com.alibaba.fastjson.JSON;
import com.elon.base.constant.TaskCodeConst;
import com.elon.base.model.BIReportTask;
import com.elon.base.service.kafka.KafkaConsumerService;
import com.elon.base.service.kafka.TaskHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * BI报表生成处理类
 *
 * @author neo
 * @since 2024-05-14
 */
@Component
public class KafkaBIReportHandler extends TaskHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaBIReportHandler.class);

    @Resource
    private KafkaConsumerService kafkaConsumerService;

    public KafkaBIReportHandler() {
        super(TaskCodeConst.KAFKA_REPORT_EXPORT_BI_REPORT);
    }

    @PostConstruct
    public void init() {
        kafkaConsumerService.registerHandler(this);
    }

    @Override
    public void handle(String taskJson) {
        BIReportTask task = JSON.parseObject(taskJson, BIReportTask.class);
        LOGGER.info("Create BI report. taskCode:{}|taskId:{}", task.getTaskCode(), task.getTaskId());

        // 生成BI报表. 具体的处理逻辑略. 这里等待5秒表示业务处理耗时
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

注:类中的init方法完成了注册功能。

3. 初始化消费者

/**
 * 消费者应用初始化.
 *
 * @author neo
 * @since 2024-05-15
 */
@Component
public class ConsumerApplicationInit implements ApplicationRunner {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerApplicationInit.class);

    @Resource
    private KafkaConsumerService kafkaConsumerService;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        kafkaConsumerService.initKafkaConsumer();
        LOGGER.info("Init kafka consumer success.");
    }
}

在SpringBoot启动完成后做初始化操作。

测试验证及说明

 修改yaml文件的组ID,springboot服务端口号,打两个包启动。生产者发送一批消息测试。可以看到两个消费者实例同时在处理这一批消息,并不重复。当前消费者会跳过另一消费者已处理过的任务。

在这里插入图片描述

 上面描述的代码及方案基本可实现以Kafka作为消息中间件,多消费者实例负载分担和可靠性要求。Demo代码仅作为个人研究用,不一定适用于实际的实际项目开发。仅作参考。

详细代码可参考github:

公共代码包:https://github.com/ylforever/elon-base

消费者定制代码:https://github.com/ylforever/neo-kafka-consumer

生产者代码:https://github.com/ylforever/neo-kafka-producer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/629607.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网络安全|隐藏IP地址的5种不同方法

隐藏计算机的IP地址在互联网在线活动种可以保护个人隐私&#xff0c;这是在线活动的一种常见做法&#xff0c;包括隐私问题、安全性和访问限制内容等场景。那么如何做到呢?有很5种方法分享。每种方法都有自己的优点和缺点。 1. 虚拟网络 当您连接到虚拟服务器时&#xff0c;您…

Spring MVC(四) 数据校验

在开发过程中有一环必不可少的部分就是数据校验&#xff0c;用户在页面中填写的数据通过表单提交时&#xff0c;前端的JS可以做一些是否合法性的验证&#xff0c;比如是否为空、两次密码是否一致、格式是否正确等等验证。当数据到了后台控制器&#xff0c;为了确保程序的健壮性…

STM32--HC-SR501 热释电人体红外感应模块

实物引脚图&#xff1a; 模块工作特性&#xff1a; 当人进入感应范围之后输出引脚输出高电平&#xff0c;人离开感应范围自动延时输出低电平 热释电效应&#xff1a; 热释电传感器&#xff0c;也称为人体红外传感器&#xff0c;其工作原理基于热释电效应。这种传感器由几个关…

IDC:2023年中国IT安全软件市场同比增长4.7%

IDC最新发布的《中国IT安全软件市场跟踪报告&#xff0c;2023H2》显示&#xff0c;2023年下半年中国IT安全软件市场厂商整体收入约为169.8亿人民币&#xff08;约合23.5亿元美元&#xff09;&#xff0c;同比上升2.7%。结合全年数据&#xff0c;2023全年中国IT安全软件市场规模…

三路输出小功率开关电源【MATLAB/simulink】

拟选用一种DC-DC变换器拓扑使用1700 V SiC MOSFET或IGBT设计三相功率系 统的高频开关直流辅助电源&#xff0c;它可用于太阳能逆变器、工业开关电源、电动汽车充电器、 电机驱动装置等领域。&#xff08;建议采用单端反激式电路拓扑&#xff0c;开关频率为80kHz) 电路基本参数&…

项目管理—需求管理规程(软件研发过程标准,管理标准,标书技术编写,资质评审,安全管理体系,项目交付,实施运维,各类建设方案)

软件资料清单列表部分文档清单&#xff1a;工作安排任务书&#xff0c;可行性分析报告&#xff0c;立项申请审批表&#xff0c;产品需求规格说明书&#xff0c;需求调研计划&#xff0c;用户需求调查单&#xff0c;用户需求说明书&#xff0c;概要设计说明书&#xff0c;技术解…

提升用户体验:Xinstall免邀请码功能详解

在移动互联网时代&#xff0c;App的推广和运营显得尤为重要。然而&#xff0c;传统的App推广方式往往需要用户填写繁琐的邀请码&#xff0c;这不仅降低了用户体验&#xff0c;还影响了推广效果。幸运的是&#xff0c;Xinstall作为国内专业的App全渠道统计服务商&#xff0c;推出…

qmake、CMake、make和Makefile

为了跟踪C工程的全部部分&#xff0c;要求有一种机制来精确地指定&#xff1a; 涉及的输入文件&#xff0c;如源代码文件&#xff1a;.cpp&#xff0c;头文件&#xff1a;.h建立程序时所需的工具&#xff0c;如编译器&#xff1a; g.exe&#xff0c;链接器&#xff1a;ld.exe&a…

邦注科技 电解式超声波清洗机的原理介绍

电解式超声波去除模具表面油污锈迹的原理结合了电解和超声波技术的优势。 首先&#xff0c;电解作用是通过在特定的电解槽中&#xff0c;将模具作为阴极&#xff08;放入清洗框即可&#xff09;&#xff0c;并将有制式电极棒作为阳极。在电解过程中&#xff0c;电流如同魔法师…

如何管理测试计划?测试计划管理都使用哪些在线工具?YesDev

3.2 测试计划 测试计划Testing plan&#xff0c;描述了要进行的测试活动的范围、方法、资源和进度的文档&#xff1b;是对整个信息系统应用软件组装测试和确认测试。 3.2.1 管理测试计划 在测试计划&#xff0c;可以查看、管理和维护全部测试计划。 测试计划列表 点击【测…

easyx快速入门1

1.基本说明 EasyX 是针对 C 的图形库&#xff0c;可以帮助 C/C 初学者快速上手图形和游戏编程。 比如&#xff0c;可以基于 EasyX 图形库很快的用几何图形画一个房子&#xff0c;或者一辆移动的小车&#xff0c;可以编写俄罗斯方块、贪吃蛇、黑白棋等小游戏&#xff0c;可以练…

必应bing广告开户费用介绍,必应搜索广告推广开户服务!

微软必应Bing搜索引擎广告成为了企业提升品牌知名度与市场份额的有效途径之一&#xff0c;作为全球第二大搜索引擎&#xff0c;在中国市场正逐步展现出其独特的广告价值与潜力。对于希望拓展在线市场的中国企业而言&#xff0c;通过云衔科技开启必应Bing国内广告推广之旅&#…

openstack部署nova中出现的问题:

[rootcontroller nova]# su -s /bin/sh -c “nova-manage db sync” nova /usr/lib/python2.7/site-packages/pymysql/cursors.py:170: Warning: (1831, u’Duplicate index block_device_mapping_instance_uuid_virtual_name_device_name_idx. This is deprecated and will be…

战网国际服怎么下载 暴雪战网一键下载安装图文教程

战网国际版&#xff0c;或称为Battle.net全球版&#xff0c;是暴雪娱乐构建的一项跨越国界的综合游戏交流平台&#xff0c;它无视地理限制&#xff0c;旨在服务全球每一个角落的游戏爱好者。不同于地区专属版本&#xff0c;国际版为玩家开启了一扇无门槛的大门&#xff0c;让每…

【Win】如何在Windows隐藏安装的程序

由于维护人员或用户可能无意中通过“程序和功能”选项删除对业务至关重要的软件&#xff0c;这导致服务中断或安全风险。为了防止此类情况发生&#xff0c;确保只有授权的用户才能访问和管理系统中的程序。为了实现这一目标&#xff0c;我们将探讨如何在Windows操作系统中隐藏特…

基于SpringBoot的竹宣非遗宣传网站

摘要 随着互联网的普及和数字化时代的到来&#xff0c;竹编等非物质文化遗产的保护与传承面临新的机遇和挑战。该研究旨在使用SpringBoot后端框架与Vue前端框架&#xff0c;构建一个竹编非遗宣传网站&#xff0c;通过丰富的展示形式和交互体验&#xff0c;提升公众对竹编这一非…

详解JS的URL()和URLSearchParams() API接口

两个 API 接口定义 URL() 构造函数返回一个新创建的 URL 对象&#xff0c;表示由一组参数定义的 URL。 URLSearchParams 接口定义了一些实用的方法来处理 URL 的查询字符串。 快速了解两个 API 在哪里用 以前我们要对地址栏中的 URL 地址进行分析处理&#xff0c;需要自己进…

GPT-4o来了,超拟人语音合成系统的关键都在这里

在众多科技企业竞相提升大模型的多模态能力&#xff0c;致力于将文本总结、图像编辑等功能集成到移动设备中的时候&#xff0c;OpenAI 又双叒叕上新了&#xff01;CEO奥特曼用了3个字母表达他的状态&#xff1a;her&#xff08;就像电影《Her》一样&#xff09;。 5月14日凌晨&…

Android ashmem 原理分析

源码基于&#xff1a;Andoird U Kernel-5.10 0. 简介 ashmem 称为匿名共享内存(Anonymous Shared Memory)&#xff0c;它以驱动程序的形式实现在内核空间中。它有两个特点&#xff1a; 能否辅助内存管理系统来有效地管理不再使用的内存块(pin / unpin)&#xff1b; 通过Bind…

嵌入式学习-PWM输出比较

简介 PWM技术 输出比较框图介绍 定时器部分 比较器控制部分 输出控制部分 相关寄存器