kafka-顺序消息实现
场景
在购物付款的时候,订单会有不同的订单状态,对应不同的状态事件,比如:待支付,支付成功,支付失败等等,我们会将这些消息推送给消息队列 ,后续的服务会根据订单状态进行不同的业务处理,这就要求订单状态推送就要有状态的保证
解决方案
- 生产者将相同的key的订单状态事件推送到kafka的同一分区
- kafka 消费者接收消息
- 消费者将消息提交给线程池
- 线程池根据接收到的消息,将订单状态事件使用路由策略选择其中一个线程,将具有相同路由key的事件发送到同一个线程的阻塞队列中
- 单个线程不停的从阻塞队列获取订单状态消息消费
代码实现
引入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>boot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>boot-kafka</name>
<description>boot-kafka</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.39</version>
</dependency>
</dependencies>
使用到的DTO
@Data
public class InterOrderDto extends OrderDto implements OrderMessage{
/**
* 属于哪个分区
*/
private String partition;
@Override
public String getUniqueNo() {
return getOrderNo();
}
}
@Data
public class InterOrderDto extends OrderDto implements OrderMessage{
/**
* 属于哪个分区
*/
private String partition;
@Override
public String getUniqueNo() {
return getOrderNo();
}
}
public interface OrderMessage {
/**
* 线程池路由key
* @return
*/
String getUniqueNo();
}
定义topic
这里是 3个分区,2个副本
@Configuration
public class KafkaConfiguration {
@Bean
public NewTopic topic(){
return new NewTopic(Constants.TOPIC_ORDER,3,(short) 2);
}
}
public interface Constants {
String TOPIC_ORDER = "order";
}
消费者
消费者:OrderListener
@Component
@Slf4j
public class OrderListener {
@Autowired
private OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool;
@KafkaListener(topics = Constants.TOPIC_ORDER, groupId = "orderGroup", concurrency = "3")
public void logListener(ConsumerRecord<String, String> record) {
log.debug("> receive log event: {}-{}", record.partition(), record.value());
try {
OrderDto orderDto = JSON.parseObject(record.value(), OrderDto.class);
InterOrderDto interOrderDto = new InterOrderDto();
BeanUtils.copyProperties(orderDto, interOrderDto);
interOrderDto.setPartition(record.partition() + "");
orderThreadPool.dispatch(interOrderDto);
} catch (Exception e) {
log.error("# kafka log listener error: {}", record.value(), e);
}
}
}
线程池: OrderThreadPool
/**
* @Date: 2024/1/24 10:23
* 线程池实现
*
* @param W: worker
* @param D: message
*/
@Slf4j
public class OrderThreadPool<W extends SingleThreadWorker<D>, D extends OrderMessage> {
private List<W> workers;
private int size;
public OrderThreadPool(int size, Supplier<W> provider) {
this.size = size;
workers = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
workers.add(provider.get());
}
if (CollectionUtils.isEmpty(workers)) {
throw new RuntimeException("worker size is 0");
}
start();
}
/**
* route message to single thread
*
* @param data
*/
public void dispatch(D data) {
W w = getUniqueQueue(data.getUniqueNo());
w.offer(data);
}
private W getUniqueQueue(String uniqueNo) {
int queueNo = uniqueNo.hashCode() % size;
for (W worker : workers) {
if (queueNo == worker.getQueueNo()) {
return worker;
}
}
throw new RuntimeException("worker 路由失败");
}
/**
* start worker, only start once
*/
private void start() {
for (W worker : workers) {
new Thread(worker, "OWorder-" + worker.getQueueNo()).start();
}
}
/**
* 关闭所有 workder, 等待所有任务执行完
*/
public void shutdown() {
for (W worker : workers) {
worker.shutdown();
}
}
}
工作线程:SingleThreadWorker
, 内部使用阻塞队列使其串行化
/**
* @Date: 2024/1/24 10:58
* single thread with a blocking-queue
*/
@Slf4j
public abstract class SingleThreadWorker<T> implements Runnable {
private static AtomicInteger cnt = new AtomicInteger(0);
private BlockingQueue<T> queue;
private boolean started = true;
/**
* worker 唯一id
*/
@Getter
private int queueNo;
public SingleThreadWorker(int size) {
this.queue = new LinkedBlockingQueue<>(size);
this.queueNo = cnt.getAndIncrement();
log.info("init worker {}", this.queueNo);
}
/**
* 提交消息
*
* @param data
*/
public void offer(T data) {
try {
queue.put(data);
} catch (InterruptedException e) {
log.info("{} offer error: {}", Thread.currentThread().getName(), JSON.toJSONString(data), e);
}
}
@Override
public void run() {
log.info("{} worker start take ", Thread.currentThread().getName());
while (started) {
try {
T data = queue.take();
doConsumer(data);
} catch (InterruptedException e) {
log.error("queue take error", e);
}
}
}
/**
* do real consume message
*
* @param data
*/
protected abstract void doConsumer(T data);
/**
* consume rest of message in the queue when thread-pool shutdown
*/
public void shutdown() {
this.started = false;
ArrayList<T> rest = new ArrayList<>();
int i = queue.drainTo(rest);
if (i > 0) {
log.info("{} has rest in queue {}", Thread.currentThread().getName(), i);
for (T t : rest) {
doConsumer(t);
}
}
}
}
工作线程实现:OrderWorker
, 这里就单独处理订单事件
/**
* @Date: 2024/1/24 13:42
* 具体消费者
*/
@Slf4j
public class OrderWorker extends SingleThreadWorker<InterOrderDto>{
public OrderWorker(int size) {
super(size);
}
@Override
protected void doConsumer(InterOrderDto data) {
log.info("{} consume msg: {}", Thread.currentThread().getName(), JSON.toJSONString(data));
}
}
生产者
生产者:OrderController
, 模拟发送不同的事件类型的订单
@RestController
public class OrderController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/send")
public String send() throws InterruptedException {
int size = 1000;
for (int i = 0; i < size; i++) {
OrderDto orderDto = new InterOrderDto();
orderDto.setOrderNo(i + "");
orderDto.setPayStatus(getStatus(0));
orderDto.setTimestamp(System.currentTimeMillis());
//相同的key发送到相同的分区
kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
TimeUnit.MILLISECONDS.sleep(10);
orderDto.setPayStatus(getStatus(1));
orderDto.setTimestamp(System.currentTimeMillis());
kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
TimeUnit.MILLISECONDS.sleep(10);
orderDto.setPayStatus(getStatus(2));
orderDto.setTimestamp(System.currentTimeMillis());
kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
}
return "success";
}
private String getStatus(int status){
return status == 0 ? "待支付" : status == 1 ? "已支付" : "支付失败";
}
}
application.properties 配置
# kafka地址
spring.kafka.bootstrap-servers=192.168.x.x:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
启动类
@Slf4j
@SpringBootApplication
public class BootKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(BootKafkaApplication.class, args);
}
/**
* 配置线程池
* @return
*/
@Bean
public OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool(){
OrderThreadPool<OrderWorker, InterOrderDto> threadPool =
new OrderThreadPool<>(3, () -> new OrderWorker(100));
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("shutdown orderThreadPool");
//容器关闭时让工作线程中的任务都被消费完
threadPool.shutdown();
}));
return threadPool;
}
}
测试
访问: http://localhost:8080/send
, 结果:
OWorder-0 worker start take
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"待支付","timestamp":1706084482134,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"已支付","timestamp":1706084482271,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"支付失败","timestamp":1706084482282,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"待支付","timestamp":1706084482326,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"已支付","timestamp":1706084482336,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"支付失败","timestamp":1706084482347,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"待支付","timestamp":1706084482391,"uniqueNo":"6"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"已支付","timestamp":1706084482401,"uniqueNo":"6"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"支付失败","timestamp":1706084482412,"uniqueNo":"6"}
可以发现,在我们工作线程中,事件消费是有序的
good luck!