问题描述
在项目中使用消息中间件,主要为实现两个目的:
- 任务排队:当请求过多时,消费端无法同时处理,需要排队等待。这一点kafka采用的是"拉取消息"的模式,自然支持。
- 负载分担: 这里的负载负担不是指Kafka本身的横向扩容,而是指在任务量过大时可以通过增加消费者实例,提升效率。
下面重点分析一下,增加消费者实例以提升系统处理效率的问题。
简要分析
Kafka消费者订阅消息可以通过主题订阅,也可以指定分区订阅。如果是通过主题订阅,消费者将获取该主题下所有分区的消息。如果是指定分区订阅,消费者只能获取该分区下的消息。
- 按主题订阅的模式。同一分组的消费者实例中只会有一个实例收到消息,其它实例处于空转状态,会浪费资源,无法提升效率。当处理任务的实例挂了后,服务再平衡,另外一个消费者实例才会接手继续处理任务。
- 按分区定义的模式。消费者只能接收到所订阅分区的消息。如果有多个消费者实例订阅同一个分区,它们将收到相同的消息,这相当于广播, 如果不做特殊处理会出现消息多次消费的情况(这种订阅方式,官方文档要求的是配置不同的分组ID,否则会导致提交冲突)。
按分区订阅的方式会增加生产者发送消息的处理复杂度,发送消息时需要知道哪些消息放在哪个分区能被正确消费。后面如果增加了分区和主题,生产者和消费者都需要做较大的改动。
所以,初步考虑还是走按主题订阅消息这条路。生产者发送消失时只需要指定topic, 消费者订阅也简单。在这种模式下,有两点需要处理:
- 多个消费者实例为实现负载分担,需要配置不同的组ID,这样可以收到相同的消息(相当于广播)。
- 为避免同一个消息在多个消费者实例中重复处理,需要做一些互斥。这个可以考虑用redis来做。
生产者代码样例
import com.alibaba.fastjson.JSON;
import com.elon.base.constant.KafkaTopicConst;
import com.elon.base.model.BIReportTask;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerService {
private KafkaProducer<String, String> producer = null;
public KafkaProducerService() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.5.128:9092");
props.put("acks", "0");
props.put("group.id", "1111");
props.put("retries", "2");
props.put("partitioner.class", NeoPartitioner.class);
//设置key和value序列化方式
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
//生产者实例
producer = new KafkaProducer<>(props);
}
/**
* 外部调用的发消息接口
*/
public void sendMessage() {
for (int i = 0; i < 20; ++i) {
BIReportTask task = new BIReportTask();
task.setPath("d:/temp");
task.getParamMap().put("value", String.valueOf(i));
ProducerRecord<String, String> record = new ProducerRecord(KafkaTopicConst.ELON_TOPIC, JSON.toJSONString(task));
producer.send(record);
}
}
生产者发送消息处理比较简单,创建ProducerRecord只需要指定Topic和Value。如果对消息处理有顺序要求,可以指定一个消息键。
消费者公共处理代码
下面的公共处理逻辑代码可以写到一个公共的common库中,编译成jar包。谁需要订阅处理kafka消息,引入依赖即可。
1. 定义一个公共的Task模型
import lombok.Getter;
import lombok.Setter;
import java.util.Date;
/**
* 异步任务模型基类. 定义模型公共属性
*
* @author neo
* @since 2024-05-14
*/
@Getter
@Setter
public class TaskBase {
// 任务唯一ID标识, 用UUID
private String taskId = "";
// 任务编码(任务类别)
private String taskCode = "";
// 创建人
private String createUser = "";
// 创建时
private Date createTime = null;
// 修改人
private String updateUser = "";
// 修改时间
private Date updateTime = null;
public TaskBase() {
}
public TaskBase(String taskId, String taskCode) {
this.taskId = taskId;
this.taskCode = taskCode;
}
}
这里面的taskId是任务唯一的UUID标识。 taskCode是任务的类别,可以定义为常量,用于分区不同的任务。
2. 定义消息处理抽象类
import lombok.Getter;
/**
* 任务处理类。派生出各子类实现具体的处理逻辑
*
* @author neo
* @since 2024-05-14
*/
public abstract class TaskHandler {
// 任务编码
@Getter
private final String taskCode;
/**
* 任务处理接口
*
* @param taskJson 任务对象JSON串
*/
public abstract void handle(String taskJson);
protected TaskHandler(String taskCode) {
this.taskCode = taskCode;
}
}
所有消费者都需要从该类继承,并显示handle接口,实际的业务处理逻辑在子类的handle方法中去实现。
3. 消费者服务处理类
3.1核心逻辑代码
import com.alibaba.fastjson.JSON;
import com.elon.base.model.TaskBase;
import com.elon.base.util.StringUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* Kafka消费者服务类。订阅接收消息, 再转给具体的业务处理类处理
*
* @author neo
* @since 2024-5-14
*/
@Component
public class KafkaConsumerService {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);
@Value("${neo.application_name:}")
private String applicationName;
// Kafka分区器连接
@Value("${neo.kafka.bootstrap.servers:}")
private String kafkaServer;
// Kafka组ID
@Value("${neo.kafka.group.id:}")
private String kafkaGroupId;
// 最大一次拉取的消息数量
@Value("${neo.kafka.max.poll.records:1}")
private int maxPollRecords;
@Value("${neo.kafka.topics}")
private List<String> topics;
@Value("${neo.redis.ip:}")
private String redisIp;
@Value("${neo.redis.port:}")
private int redisPort;
// 消费者
private KafkaConsumer consumer = null;
// 任务处理器. Map<任务编码, 任务处理器>
private Map<String, TaskHandler> handlerMap = new HashMap<>();
/**
* 注册任务处理器
*
* @param handler 任务处理器
*/
public void registerHandler(TaskHandler handler) {
handlerMap.put(handler.getTaskCode(), handler);
}
/**
* 初始化消费者实例. 订阅主题消息
*/
public void initKafkaConsumer() {
LOGGER.info("Subscribe message. kafkaServer:{}|kafkaGroupId:{}|maxPollRecords:{}|topics:{}",
kafkaServer, kafkaGroupId, maxPollRecords, topics);
Properties properties = new Properties();
properties.put("bootstrap.servers", kafkaServer); // 指定 Broker
properties.put("group.id", kafkaGroupId); // 指定消费组群 ID
properties.put("max.poll.records", maxPollRecords);
properties.put("enable.auto.commit", "false");
properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象
properties.put("value.deserializer", StringDeserializer.class); // 将 value 的字节数组转成 Java 对象
consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(topics); // 订阅主题
new Thread(this::handleMessage).start();
}
/**
* 从Kafka获取消息,传给相应的处理器处理.
*/
public void handleMessage() {
Jedis jedisClient = getJedisClient();
while (true) {
synchronized (this) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
LOGGER.info("Fetch record num:{}", records.count());
for (ConsumerRecord<String, String> record : records) {
try {
handleSingleMessage(jedisClient, record);
} catch (Exception e) {
LOGGER.error("Handle message fail. Topic:{}|Partition:{}|Offset:{}|Key:{}|Message:{}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
// 提交任务,更新offset
consumer.commitSync();
}
}
}
/**
* 处理单个消息.
*
* @param jedisClient redis客户端接口
* @param record kafka消息记录
* @author neo
*/
private void handleSingleMessage(Jedis jedisClient, ConsumerRecord<String, String> record) {
TaskBase taskBase = JSON.parseObject(record.value(), TaskBase.class) ;
if (!handlerMap.containsKey(taskBase.getTaskCode())) {
return;
}
// 判断同一个任务是否已经有其它实例在处理
String taskKey = "Task_" + taskBase.getTaskId();
String handleAppName = jedisClient.getSet(taskKey, applicationName);
// 设置过期时间只是为了方便自动清除redis中的数据。在实际项目中,任务数据是非常重要的,往往需要持久化到数据库
jedisClient.expire(taskKey, 60 * 60);
if (!StringUtil.isEmpty(handleAppName)) {
jedisClient.set(taskKey, handleAppName, new SetParams().px(1000 * 60 * 60));
LOGGER.info("Task:{} completed. Handle app name:{}", taskBase.getTaskId(), handleAppName);
return;
}
// 将消息分发给具体的handler类处理
LOGGER.info("Handle message. Topic:{}|Partition:{}|Offset:{}|Key:{}|Message:{}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
TaskHandler handler = handlerMap.get(taskBase.getTaskCode());
handler.handle(record.value());
}
public Jedis getJedisClient() {
Jedis jedis = new Jedis(redisIp, redisPort);
return jedis;
}
这里面有几个重要的方法:
- registerHandler: 消息处理类注册接口。消费者程序在继承实现了的该接口后,可以将子类实例注册过来。
- initKafkaConsumer: 这里面主要是按topic订阅消息的代码。
- handleMessage: 从Kafka服务器拉去消息并处理。
- handleSingleMessage: 处理单条消息。核心代码。
在handleSingleMessage方法中,首先会去查询handlerMap中是否有Task Code对应的处理器。因消息是按Topic广播过来的,肯定会有当前消费者不需要处理的消息(其它消费者订阅的)。然后, 在处理任务前会先将taskId通过jedis的getSet方法存储到redis,同时检测是否有其它消费者已经处理了该任务(已处理,则不再重复处理)。
注:这个地方因需要保证操作的原子性 用了jedis的getSet方法,实际上还是有瑕疵,第二次调用时会修改旧的数据。应该还可以通过lua脚本做一个好一些的处理(后续再处理)。
3.2 yml配置样例
消费者定制样例代码
定制代码是开发过程中每个消费者根据业务需要,结合需要消费的具体消息增加的业务处理代码。下面以虚构的一个生成BI业务报表为例,简单说明一下。
1. 定义任务模型
/**
* 生成BI报告的任务模型。不同的任务可以根据需要定义不同的模型
*
* @author neo
* @since 2024-05-14
*/
@Getter
@Setter
public class BIReportTask extends TaskBase {
// 存放报告的路径
private String path;
// 参数
private Map<String, String> paramMap = new HashMap<>();
public BIReportTask() {
super(UUID.randomUUID().toString(), TaskCodeConst.KAFKA_REPORT_EXPORT_BI_REPORT);
}
}
根据业务需要传递的参数而定义,和Task Code关联。生产者和消费者约定使用同一模型。
2. 具体消息处理类
import com.alibaba.fastjson.JSON;
import com.elon.base.constant.TaskCodeConst;
import com.elon.base.model.BIReportTask;
import com.elon.base.service.kafka.KafkaConsumerService;
import com.elon.base.service.kafka.TaskHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* BI报表生成处理类
*
* @author neo
* @since 2024-05-14
*/
@Component
public class KafkaBIReportHandler extends TaskHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaBIReportHandler.class);
@Resource
private KafkaConsumerService kafkaConsumerService;
public KafkaBIReportHandler() {
super(TaskCodeConst.KAFKA_REPORT_EXPORT_BI_REPORT);
}
@PostConstruct
public void init() {
kafkaConsumerService.registerHandler(this);
}
@Override
public void handle(String taskJson) {
BIReportTask task = JSON.parseObject(taskJson, BIReportTask.class);
LOGGER.info("Create BI report. taskCode:{}|taskId:{}", task.getTaskCode(), task.getTaskId());
// 生成BI报表. 具体的处理逻辑略. 这里等待5秒表示业务处理耗时
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
注:类中的init方法完成了注册功能。
3. 初始化消费者
/**
* 消费者应用初始化.
*
* @author neo
* @since 2024-05-15
*/
@Component
public class ConsumerApplicationInit implements ApplicationRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerApplicationInit.class);
@Resource
private KafkaConsumerService kafkaConsumerService;
@Override
public void run(ApplicationArguments args) throws Exception {
kafkaConsumerService.initKafkaConsumer();
LOGGER.info("Init kafka consumer success.");
}
}
在SpringBoot启动完成后做初始化操作。
测试验证及说明
修改yaml文件的组ID,springboot服务端口号,打两个包启动。生产者发送一批消息测试。可以看到两个消费者实例同时在处理这一批消息,并不重复。当前消费者会跳过另一消费者已处理过的任务。
上面描述的代码及方案基本可实现以Kafka作为消息中间件,多消费者实例负载分担和可靠性要求。Demo代码仅作为个人研究用,不一定适用于实际的实际项目开发。仅作参考。
详细代码可参考github:
公共代码包:https://github.com/ylforever/elon-base
消费者定制代码:https://github.com/ylforever/neo-kafka-consumer
生产者代码:https://github.com/ylforever/neo-kafka-producer