kafka-顺序消息实现

kafka-顺序消息实现

场景

在购物付款的时候,订单会有不同的订单状态,对应不同的状态事件,比如:待支付,支付成功,支付失败等等,我们会将这些消息推送给消息队列 ,后续的服务会根据订单状态进行不同的业务处理,这就要求订单状态推送就要有状态的保证

解决方案

  • 生产者将相同的key的订单状态事件推送到kafka的同一分区
  • kafka 消费者接收消息
  • 消费者将消息提交给线程池
  • 线程池根据接收到的消息,将订单状态事件使用路由策略选择其中一个线程,将具有相同路由key的事件发送到同一个线程的阻塞队列中
  • 单个线程不停的从阻塞队列获取订单状态消息消费

在这里插入图片描述

代码实现

引入依赖
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.2</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>boot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>boot-kafka</name>
<description>boot-kafka</description>
<properties>
    <java.version>17</java.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>2.0.39</version>
    </dependency>
</dependencies>

使用到的DTO
@Data
public class InterOrderDto extends OrderDto implements OrderMessage{

    /**
     * 属于哪个分区
     */
    private String partition;

    @Override
    public String getUniqueNo() {
        return getOrderNo();
    }
}


@Data
public class InterOrderDto extends OrderDto implements OrderMessage{

    /**
     * 属于哪个分区
     */
    private String partition;

    @Override
    public String getUniqueNo() {
        return getOrderNo();
    }
}

public interface OrderMessage {

    /**
     * 线程池路由key
     * @return
     */
    String getUniqueNo();

}
定义topic

这里是 3个分区,2个副本

@Configuration
public class KafkaConfiguration {

    @Bean
    public NewTopic topic(){
        return new NewTopic(Constants.TOPIC_ORDER,3,(short) 2);
    }
}

public interface Constants {

     String TOPIC_ORDER = "order";
}
消费者

消费者:OrderListener

@Component
@Slf4j
public class OrderListener {

    @Autowired
    private OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool;

    @KafkaListener(topics = Constants.TOPIC_ORDER, groupId = "orderGroup", concurrency = "3")
    public void logListener(ConsumerRecord<String, String> record) {
        log.debug("> receive log event: {}-{}", record.partition(), record.value());
        try {
            OrderDto orderDto = JSON.parseObject(record.value(), OrderDto.class);
            InterOrderDto interOrderDto = new InterOrderDto();
            BeanUtils.copyProperties(orderDto, interOrderDto);
            interOrderDto.setPartition(record.partition() + "");
            orderThreadPool.dispatch(interOrderDto);
        } catch (Exception e) {
            log.error("# kafka log listener error: {}", record.value(), e);
        }
    }

}

线程池: OrderThreadPool

/**
 * @Date: 2024/1/24 10:23
 * 线程池实现
 *
 * @param W: worker
 * @param D: message
 */
@Slf4j
public class OrderThreadPool<W extends SingleThreadWorker<D>, D extends OrderMessage> {
    private List<W> workers;
    private int size;

    public OrderThreadPool(int size, Supplier<W> provider) {
        this.size = size;
        workers = new ArrayList<>(size);
        for (int i = 0; i < size; i++) {
            workers.add(provider.get());
        }
        if (CollectionUtils.isEmpty(workers)) {
            throw new RuntimeException("worker size is 0");
        }
        start();
    }

    /**
     * route message to single thread
     *
     * @param data
     */
    public void dispatch(D data) {
        W w = getUniqueQueue(data.getUniqueNo());
        w.offer(data);
    }

    private W getUniqueQueue(String uniqueNo) {
        int queueNo = uniqueNo.hashCode() % size;
        for (W worker : workers) {
            if (queueNo == worker.getQueueNo()) {
                return worker;
            }
        }
        throw new RuntimeException("worker 路由失败");
    }

    /**
     * start worker, only start once
     */
    private void start() {
        for (W worker : workers) {
            new Thread(worker, "OWorder-" + worker.getQueueNo()).start();
        }
    }

    /**
     * 关闭所有 workder, 等待所有任务执行完
     */
    public void shutdown() {
        for (W worker : workers) {
            worker.shutdown();
        }
    }

}

工作线程:SingleThreadWorker, 内部使用阻塞队列使其串行化

/**
 * @Date: 2024/1/24 10:58
 * single thread with a blocking-queue
 */
@Slf4j
public abstract class SingleThreadWorker<T> implements Runnable {

    private static AtomicInteger cnt = new AtomicInteger(0);
    private BlockingQueue<T> queue;
    private boolean started = true;

    /**
     * worker 唯一id
     */
    @Getter
    private int queueNo;

    public SingleThreadWorker(int size) {
        this.queue = new LinkedBlockingQueue<>(size);
        this.queueNo = cnt.getAndIncrement();
        log.info("init worker {}", this.queueNo);
    }

    /**
     * 提交消息
     *
     * @param data
     */
    public void offer(T data) {
        try {
            queue.put(data);
        } catch (InterruptedException e) {
            log.info("{} offer error: {}", Thread.currentThread().getName(), JSON.toJSONString(data), e);
        }
    }

    @Override
    public void run() {
        log.info("{} worker start take ", Thread.currentThread().getName());
        while (started) {
            try {
                T data = queue.take();
                doConsumer(data);
            } catch (InterruptedException e) {
                log.error("queue take error", e);
            }
        }
    }

    /**
     * do real consume message
     *
     * @param data
     */
    protected abstract void doConsumer(T data);

    /**
     * consume rest of message in the queue when thread-pool shutdown
     */
    public void shutdown() {
        this.started = false;
        ArrayList<T> rest = new ArrayList<>();
        int i = queue.drainTo(rest);
        if (i > 0) {
            log.info("{} has rest in queue {}", Thread.currentThread().getName(), i);
            for (T t : rest) {
                doConsumer(t);
            }
        }
    }


}

工作线程实现:OrderWorker, 这里就单独处理订单事件

/**
 * @Date: 2024/1/24 13:42
 * 具体消费者
 */
@Slf4j
public class OrderWorker extends SingleThreadWorker<InterOrderDto>{
    public OrderWorker(int size) {
        super(size);
    }

    @Override
    protected void doConsumer(InterOrderDto data) {
        log.info("{} consume msg: {}", Thread.currentThread().getName(), JSON.toJSONString(data));

    }
}

生产者

生产者:OrderController, 模拟发送不同的事件类型的订单

@RestController
public class OrderController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/send")
    public String send() throws InterruptedException {
        int size = 1000;
        for (int i = 0; i < size; i++) {
            OrderDto orderDto = new InterOrderDto();
            orderDto.setOrderNo(i + "");
            orderDto.setPayStatus(getStatus(0));
            orderDto.setTimestamp(System.currentTimeMillis());
            //相同的key发送到相同的分区
            kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
            TimeUnit.MILLISECONDS.sleep(10);
            orderDto.setPayStatus(getStatus(1));
            orderDto.setTimestamp(System.currentTimeMillis());
            kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
            TimeUnit.MILLISECONDS.sleep(10);
            orderDto.setPayStatus(getStatus(2));
            orderDto.setTimestamp(System.currentTimeMillis());
            kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
        }
        return "success";
    }

    private String getStatus(int status){
        return status == 0 ? "待支付" : status == 1 ? "已支付" : "支付失败";
    }
}

application.properties 配置

# kafka地址
spring.kafka.bootstrap-servers=192.168.x.x:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
启动类
@Slf4j
@SpringBootApplication
public class BootKafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(BootKafkaApplication.class, args);
    }

    /**
     * 配置线程池
     * @return
     */
    @Bean
    public OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool(){
        OrderThreadPool<OrderWorker, InterOrderDto> threadPool =
            new OrderThreadPool<>(3, () -> new OrderWorker(100));
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("shutdown orderThreadPool");
            //容器关闭时让工作线程中的任务都被消费完
            threadPool.shutdown();
        }));
        return threadPool;
    }

}

测试

访问: http://localhost:8080/send, 结果:

OWorder-0 worker start take 
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"待支付","timestamp":1706084482134,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"已支付","timestamp":1706084482271,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"支付失败","timestamp":1706084482282,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"待支付","timestamp":1706084482326,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"已支付","timestamp":1706084482336,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"支付失败","timestamp":1706084482347,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"待支付","timestamp":1706084482391,"uniqueNo":"6"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"已支付","timestamp":1706084482401,"uniqueNo":"6"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"支付失败","timestamp":1706084482412,"uniqueNo":"6"}

可以发现,在我们工作线程中,事件消费是有序的

good luck!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/345883.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据的存储结构

1.类别 顺序存储、链式存储、散列存储、索引存储 2.顺序存储与链式存储的区别 顺序存储链式存储优点 可以实现随机存取每个元素占用最少的空间 充分利用所有存储单元&#xff0c;不会出现碎片现象。缺点 只能使用整块的存储单元&#xff0c;会产出较多的碎片。 需要额外的存…

12.for 条件循环语句 (3)

for 循环语句 允许脚本一次性读取多个信息&#xff0c;然后逐一对信息进行操作处理。当要处理的数据有范围时&#xff0c;使用for循环语句。 使用 for 循环语句从列表文件中读取多个用户名&#xff0c;然后为其逐一创建用户账户并设 置密码。首先创建用户名称的列表文件users.…

Linux浅学笔记02

目录 grep-wc-管道符 echo-tail-重定向符 vi编辑器 grep-wc-管道符 grep命令(过滤文件内容) //更准确的来说&#xff0c;是筛选包括“所需字符”的一句内容或多句内容。 语法&#xff1a;grep [-n] 关键字 文件路径 //-n&#xff1a;可选&#xff0c;表示在结果中匹配的行…

如何防止联通光猫后台运营商远程自动改超管密码

环境: 联通光猫 ZXHN F677V9 硬件版本号 V9.0 软件版本号 V9.0.0P1T3 U盘16G 笔记本电脑 问题描述: 如何防止联通光猫后台运营商远程自动改超管密码 家里用的是ZXHN F677V9 光猫,自己改了超级密码之后用了几次,最近就无法登录了,问了装维师傅说是最近统一改了,这还…

模型之大脑和计算机的模型化

大脑和计算机的模型化 “素数分解&#xff1a;悬而未决的问题与计算机科学的挑战” 计算机同样也可以看作由相互作用的各部分集合而成&#xff1b;很大程度上由于这个原因&#xff0c;理论计算机科学中同样有很多悬而未决的重要问题。其中有如下这样一个例子&#xff0c;我们…

2023 工业 AR 关键词:纵深和开拓

2023 年&#xff0c;以虚实融合、工业元宇宙为代表的“新数字化”升级在工业制造领域达成共识。 ▲五部委联合印发元宇宙行动计划 通过发展元宇宙赋能新型工业化 而相对过去几年的行业渗透广、落地场景多样的 AR 业务拓展与合作&#xff0c;#纵深和#开拓&#xff0c;成为 2023…

网工内推 | 国企、合资公司IT专员,13薪,NA以上即可

01 上海新徐汇&#xff08;集团&#xff09;有限公司 招聘岗位&#xff1a;IT运维 职责描述&#xff1a; 1.负责制定网络体系搭建、IP地址分配、网络拓扑图、无线网络等&#xff1b; 2.负责桌面运维技术支持&#xff0c;确保各类系统和终端设备正常工作&#xff1b; 3.负责弱电…

构建中国人自己的私人GPT—与文档对话

先看效果 他可以从上传的文件中提取内容作为答案。上传文件摄取速度 摄取速度取决于您正在摄取的文档数量以及每个文档的大小。为了加快摄取速度&#xff0c;您可以在配置中更改摄取模式。 存在以下摄取模式&#xff1a; simple&#xff1a;历史行为&#xff0c;一次按顺序摄…

Bread:一款功能强大的BIOS逆向工程和高级调试工具

关于Bread Bread是一款功能强大的BIOS逆向工程和高级调试工具&#xff0c;该工具也是一个“可注入”的实模式&#xff08;Real-Mode&#xff09;x86调试器&#xff0c;可以帮助广大研究人员通过串行线缆从另一台电脑调试任意实模式代码。 考虑到目前社区中很多BIOS逆向工程工…

学编曲怎么入门 学会编曲能赚钱吗 编曲软件哪个好用 学编曲要先学什么 编曲和作曲什么区别

一、学编曲有什么用 1、工作时间较为自由 编曲是一个技能专业&#xff0c;换句话说&#xff0c;编曲是一项技能&#xff0c;如果能够熟练掌握编曲这项技能&#xff0c;那么就可以尝试从事一些和编曲相关的职业&#xff0c;例如编曲师等等&#xff0c;这类和编曲有关的职业大多…

零基础学Python(5)— 基本数据类型

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。在内存中存储的数据可以有多种类型。例如&#xff1a;一个人的姓名可以用字符型存储&#xff0c;年龄可以使用数值型存储&#xff0c;婚姻状况可以使用布尔型存储。这里的字符型、数值型、布尔型都是Python语言中提供的基本…

gin介绍及helloworld

1. 介绍 Gin是一个golang的微框架&#xff0c;封装比较优雅&#xff0c;API友好&#xff0c;源码注释比较明确&#xff0c;具有快速灵活&#xff0c;容错方便等特点 对于golang而言&#xff0c;web框架的依赖要远比Python&#xff0c;Java之类的要小。自身的net/http足够简单&…

RK3399平台开发系列讲解(USB篇)BusHound 工具使用介绍

🚀返回专栏总目录 文章目录 一、BusHound简介二、BusHound的下载三、BusHound设备窗口四、BUSHound发送命令窗口沉淀、分享、成长,让自己和他人都能有所收获!😄 📢 BusHound软件是由美国perisoft公司研制的一种专用于PC机各种总线数据包监视和控制的开发工具软件,其名…

帝国cms无限级分销的逻辑思路效果展示以及表结构的初步规划

#小李子9479# #帝国cms无限级分销# #帝国cms三级分销系统# 关于分销系统 &#xff0c;我们要解决以下几个重要的逻辑关系&#xff0c; 1&#xff0c;用户上下级关系&#xff0c;即A通过分享期邀请链接&#xff0c;B点击或扫码注册后&#xff0c;成为A的下线。 2。下级级别的…

10.常用统计分析方法——主成分分析和因子分析

基础知识&#xff1a; 主成分分析概念 主成分分析PCA&#xff1a;是一种数据降维的技巧&#xff0c;将大量相关变量转化为一组很少的不相关变量&#xff0c;这些无关变量称为主成分。 在特征选择方法中有一种方法是方差过滤&#xff0c;即如果一个特征的方差很小&#xff0c…

ckman:非常好用的ClickHouse可视化集群运维工具

概述 什么是ckman ckman&#xff0c;全称是ClickHouse Management Console&#xff0c; 即ClickHouse管理平台。它是由擎创科技数据库团队主导研发的一款用来管理和监控ClickHouse集群的可视化运维工具。目前该工具已在github上开源&#xff0c;开源地址为&#xff1a;github…

免费SSL申请和自动更新

当前是在mac下操作 安装certbot # mac下brew安装即可 brew install certbotcentos 安装 centos安装文档 申请泛解析证书 sudo certbot certonly --manual --preferred-challengesdns -d *.yourdomain.com## 输出 Saving debug log to /var/log/letsencrypt/letsencrypt.lo…

民用激光雷达行业简析

01. 激光雷达是“机器之眼” • 激光雷达是一个通过发射激光并接受发射激光同时对其进行信号处理&#xff0c;从而获得周边物体距离等信息的主动测量装置。 • 激光雷达主要由光发射、光扫描、光接收三大模块组成。光发射模块集成了驱动、开关和光源等芯片。光接收模块集成了…

构筑双品牌矩阵背后,广汽埃安讲出能源生态闭环的“新故事”

“一路繁花”用来形容广汽埃安的2023年并不为过。 2023年12月28日&#xff0c;埃安达成累计产销百万辆的目标&#xff0c;成为全球最快破百万的纯电品牌、新能源品牌以及汽车品牌&#xff1b;全年累计销量超48万辆&#xff0c;同比增长77%。 值得一提的是&#xff0c;2023年以…

APUE学习之信号(Signal)

目录 一、信号 1、基本概念 2、用户处理信号的方式 3、查看信号 4、可靠信号和不可靠信号 5、信号种类 6、终止进程信号的区别 二、进程对信号的处理 1、signal&#xff08;&#xff09;函数 2、sigaction&#xff08;&#xff09;函数 3、代码演示 4、运行结果…