rocketmq Listener 消费消息的优雅方式(基于SPEL)

DefaultMQPushConsumer 配置

package repayment.config;

import cn.itcast.wanxinp2p.repayment.message.diy.DefaultMessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RocketMQConsumerConfig {

    // 如果定义了多个 DefaultMQPushConsumer, 请注意 形参 的名字
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQPushConsumer defaultMQPushConsumer(DefaultMessageListenerConcurrently messageListener) throws MQClientException {
        // 初始化consumer,并设置consumer group name
        DefaultMQPushConsumer consumer = new     DefaultMQPushConsumer("DEFAULT_CONSUMER_GROUP");

        // 设置NameServer地址
        consumer.setNamesrvAddr("localhost:9876");
        //订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
        consumer.subscribe("TEST_TOPIC", "*");

        //注册回调接口来处理从Broker中收到的消息
        consumer.registerMessageListener(messageListener);
        return consumer;
    }


}

自定义 MessageListener

需要特别注意 MessageListener 使用的是 @Autowired 注入的是 MessageHandler 类型的接口

并且执行了 MessageHandler  的getELFilter(),[通过SPEL计算得出]和 test()

计算是该MessageExt否符合.

对于符合的MessageHandler , 先对其 MessageExt 提取Body. 再 执行 具体处理消息的逻辑onMessage()

package repayment.message;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.SimpleEvaluationContext;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Optional;

@Slf4j
@Component
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {

    @Autowired
    private List<MessageHandler> rocketMQListenerList;

    @SuppressWarnings("unchecked")
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt messageExt : msgs) {
            log.debug("received msg: {}", messageExt);
            try {
                long now = System.currentTimeMillis();
                // rocketMQListener 选择
                ExpressionParser parser = new SpelExpressionParser();
                EvaluationContext cont = SimpleEvaluationContext.forReadWriteDataBinding().build();
                cont.setVariable("messageExt", messageExt);
                Optional<MessageHandler> first = rocketMQListenerList.stream()
                        .filter(rocketMQListener -> {
                            String elFilter = rocketMQListener.getELFilter();
                            if (StringUtils.isBlank(elFilter))
                                return true;
                            return parser.parseExpression(elFilter).getValue(cont, Boolean.class);
                        })
                        .filter(rocketMQListener -> rocketMQListener.test(messageExt))
                        .findFirst();
                // 注意,如果筛选完成没有获取到 rocketMQListener 则自此会抛出异常
                MessageHandler rocketMQListener = first.get();

                // 转换消息并执行
                rocketMQListener.onMessage(rocketMQListener.convertMessage(messageExt));

                long costTime = System.currentTimeMillis() - now;
                log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
            } catch (Exception e) {
                log.warn("consume message failed. messageExt:{}", messageExt, e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

MessageHandler 接口定义

package repayment.message;

import org.apache.rocketmq.common.message.MessageExt;

/**
 * @param <T> 消息 body 的数据类型,如果没有重写 convertMessage 方法, 则建议<T> 为 String
 *            消费 RocketMQ 消息的帮助类
 */
public interface MessageHandler<T> {

    /**
     * 通过 SPEL 筛选 MessageHandler 的方式
     * SPEL 上下文设置的了 #messageExt
     *
     * @return 稍后用于计算的 SPEL 表达式(默认返回空字符串,代表不过滤)
     */
    default String getELFilter() {
        return "";
    }

    /**
     * 通过 messageExt  筛选 MessageHandler 的普通方式
     * 默认返回 空字符串,代表不过滤。
     *
     * @param messageExt MessageExt
     * @return true:保留,false:丢弃
     */
    default boolean test(MessageExt messageExt) {
        return true;
    }


    /**
     * @param messageExt MessageExt
     * @return 默认为字符串类型的数据
     */
    default T convertMessage(MessageExt messageExt) {
        return (T) new String(messageExt.getBody());
    }

    /**
     * 具体处理消息的逻辑
     *
     * @param message messageExt.body
     */
    void onMessage(T message);


}

自定义的 HelloMessageHandler

用于解析 topic = TEST_TOPIC, Tags.contains("tag0") 的消息

package repayment.message.handler;

import cn.itcast.wanxinp2p.repayment.ann.MQSelect;
import org.apache.rocketmq.common.message.MessageExt;

// topic = TEST_TOPIC, Tags.contains("tag0")
@Component
public class HelloMessageHandler implements MessageHandler<String> {

    @Override
    public String getELFilter() {
        return "#messageExt.topic == 'TEST_TOPIC'";
    }

    @Override
    public boolean test(MessageExt messageExt) {
        return messageExt.getTags().contains("tag0");
    }

    @Override
    public void onMessage(String message) {
        System.out.println(message);
    }

}

Debug 调试效果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/432982.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C++】类和对象(中)一篇文章带你学会六大默认成员函数

目录 一、类的6个默认成员函数二、 构造函数2.1 概念2.2 特征 三、析构函数3.1 概念3.2 特性 四、拷贝构造函数4.1 概念4.2 特征 五、赋值运算符重载5.1 运算符重载5.2 赋值运算符重载5.3 前置和后置重载 六、日期类的实现七、const成员八、取地址及const取地址操作符重载结尾 …

css网格布局简单介绍

前端网格布局是一种用于在网页上创建复杂网格系统的布局技术。它允许开发者通过简单的语法来定义和控制元素的排列方式&#xff0c;使得页面布局更加灵活和可预测。在CSS中&#xff0c;网格布局可以通过display: grid属性来实现。 特点 1. **灵活性**&#xff1a;网格布…

Docker 部署Harbor 443端口冲突

如果Harbor的443端口和主机服务器的443端口存在冲突,那么需要修改Harbor的443 修改docker-compose中443端口,那么需要docker-compose.yml和harbor.yml保持一致配置 当修改harbor.yml重启之后不生效的,则需要进入harbor安装路径 执行 ./install.sh 命令 harbor.yml docker-…

Xcode 15 适配 MonkeyDev

升级到Xcode15后,使用Xcode创建MonkeyApp后,运行会报错,本篇文章主要讲述此过程遇到的错误和解决办法。 问题1:找不到libc++.dylib文件 问题描述: Build input files cannot be found: /usr/lib/libstdc++.dylib, /usr/lib/libc++.dylib. Did you forget to declare th…

26.基于springboot + vue实现的前后端分离-就业管理系统

项目介绍 系统分为管理员、企业、求职者三个角色 管理员&#xff1a; 登录、个人中心、学生信息管理、企业信息管理、岗位分类管理、学历信息管理、友情链接管理、新闻资讯管理、收藏管理、招聘信息管理、应聘信息管理、求职者信息管理 企业&#xff1a; 注册、登录、个人…

iperf 测试网卡带宽

需求&#xff1a; 自己在ubuntu上写了一个udp程序&#xff0c;但是延时很大。用iperf测试下实际带宽能达到多少。 步骤&#xff1a; 1&#xff0c;windows 安装&#xff08;iPerf - Download iPerf3 and original iPerf pre-compiled binaries&#xff09; 2&#xff0c;lin…

大数据技术学习笔记(五)—— MapReduce(2)

目录 1 MapReduce 的数据流1.1 数据流走向1.2 InputFormat 数据输入1.2.1 FileInputFormat 切片源码、机制1.2.2 TextInputFormat 读数据源码、机制1.2.3 CombineTextInputFormat 切片机制 1.3 OutputFormat 数据输出1.3.1 OutputFormat 实现类1.3.2 自定义 OutputFormat 2 Map…

JavaScript的for循环与双重for循环,前端开发工程师面试题

问&#xff1a;BFC 与 IFC 区别 BFC 是块级格式上下文&#xff0c;IFC 是行内格式上下文&#xff1a; 内部的 Box 会水平放置水平的间距由 margin&#xff0c;padding&#xff0c;border 决定 问&#xff1a;BFC会与float元素相互覆盖吗&#xff1f;为什么&#xff1f;举例说…

opencv环境配置

opencv环境配置 第一步&#xff1a; 官网下载opencv官网下载地址&#xff0c; 下载完成后解压到相应的目录 第二步&#xff1a; Visudal Studio配置相应的opencv环境变量 先创建一个空的项目打开属性管理器 配置包含目录、库目录和链接器 在包含目录中选择opencv的includ…

[最佳实践]FRPC公网链接:在家也可以炼实验室的丹了

key word: 内网穿透 公网链接 远程ssh链接 远程frpc 远程桌面 网络隧道 应用场景 我们使用ssh链接实验室的机器的时候&#xff0c;一般在实验室内部使用的同一个局域网连接&#xff0c;一般使用的是192.168.xx.xx的网络&#xff0c;但是如果我们在家里&#xff0c;使用的家里…

seata服务器集群搭建

搭建seata-server-1.3服务器对应SpringBoot2.3.12&#xff0c;springcloud2.2.3 <spring-cloud-alibaba.version>2.2.3.RELEASE</spring-cloud-alibaba.version> 首先你安装了nacos 1解压文件 2修改cong/file.conf 让seata集群信息可以共享&#xff0c;我们应该…

MySQL 表锁问题

MySQL 表锁解决 查看哪些表被锁&#xff0c;字段 In_use 表示有多少线程在使用这张表&#xff0c;字段 name_locked 表示表格是否被锁&#xff0c;0 代表锁定状态 mysql> show OPEN TABLES where In_use > 0; -------------------------------------------------------…

day52(vueJS)json-server模拟数据

json-server介绍&#xff1a;&#xff1a;&#xff1a;JSON Server 是一个用于快速搭建 REST API 的工具&#xff0c;它可以帮助我们在开发过程中快速模拟 一个后端 API 服务器&#xff0c;方便前端开发人员进行接口调试和开发。使用 JSON Server&#xff0c;你可以通过创建一个…

基于springboot的网上商城系统设计与实现(程序+数据库+文档)

** &#x1f345;点赞收藏关注 → 私信领取本源代码、数据库&#x1f345; 本人在Java毕业设计领域有多年的经验&#xff0c;陆续会更新更多优质的Java实战项目&#xff0c;希望你能有所收获&#xff0c;少走一些弯路。&#x1f345;关注我不迷路&#x1f345;** 目录 一、研…

【QT】重载的信号槽/槽函数做lambda表达式

重载的信号槽 函数指针&#xff1a; int fun(int a,long b) int (*funp)(int, long) fun; 实现回调函数就需要函数指针 信号重载 派生类槽函数发送两个信号 派生类给父类发两个信号 void (SubWidget::*mysigsub)() &SubWidget::sigSub;connect(&subw,mysigsub,t…

农产品采购平台技术解析:Java+SpringBoot+Vue+MySQL

✍✍计算机毕业编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java、…

如何在2.2.1版Aduino IDE中开发ESP32

ESP32芯片集成了WIFI和蓝牙&#xff0c;而且关于生态也很不错&#xff0c;越来越多的学习者和开发者选择此类芯片&#xff0c;而不像用keil开发STM32或者51一样&#xff0c;ESP32虽然也有官方的ESP32-IDF开发软甲&#xff0c;但是经过我个人的实操体验&#xff0c;不适合小白或…

2025汤家凤考研数学,基础视频课程+百度网盘+PDF真题讲解

平时大家都半开玩笑地讲&#xff1a;我数学想要考150分&#xff01;那索性今天这一期&#xff0c;今天认真和大家聊一下&#xff1a; 想考到考研数学150分&#xff0c;应该如何准备&#xff1f; 如果还有小伙伴不知道在哪看汤神的ke&#xff0c;可以看一下以下 2025汤神全程…

力扣543. 二叉树的直径

Problem: 543. 二叉树的直径 文章目录 题目描述思路复杂度Code 题目描述 思路 1.最大直径 左子树的最大深度 右子树的最大深度&#xff1b; 2.定义一个变量maxDiameter记录最大直径&#xff0c;并编写一个递归函数maxDepth&#xff0c;利用树的后序遍历每次递归求取leftMax&a…

怎样压缩图片大小到kb?超实用技巧!

怎样压缩图片大小到kb&#xff1f;在互联网时代&#xff0c;图片已成为我们日常生活中不可或缺的一部分。然而&#xff0c;随着图片分辨率和质量的提升&#xff0c;它们的文件大小也在不断增加&#xff0c;这不仅占用了大量的存储空间&#xff0c;还可能导致网页加载速度变慢。…