RocketMQ - Spring Cloud Alibaba RocketMQ

Spring Cloud Stream是Spring Cloud体系内的一个框架,用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务,其目的是简化消息业务在Spring Cloud应用中的开发。

Spring Cloud Stream的架构图如下所示,应用程序通过Spring Cloud Stream注入的输入通道inputs和输出通道outputs与消息中间件Middleware通信,消息通道通过特定的中间件绑定器Binder实现连接到外部代理。
Spring Cloud Stream的架构图
Spring Cloud Stream的实现基于发布/订阅机制,核心由四部分构成:Spring Framework中的Spring Messaging和Spring Integrataion,以及Spring Cloud Stream中的Binders和Bindings。

Spring Messaging:Spring Framework中的统一消息编程模型,其核心对象如下:

  • Message: 消息对象,包含消息头Header和消息体Payload。
  • MessageChannel:消息通道接口,用于接收消息,提供send方法将消息发送致消息通道。
  • MessageHandler:消息处理器接口,用于处理消息逻辑。

Spring Integration:Spring Framework中用于支持企业集成的一种扩展机制,作用是提供一个简单的模型来构建企业集成解决方案,对Spring Messaging进行了扩展。

  • MessageDispatcher: 消息分发接口,用于分发消息和添加删除消息处理器。
  • MessageRouter:消息路由接口,定义默认的输出消息通道。
  • Filter:消息的过滤注解,用于配置消息过滤表达式。
  • Aggregator:消息的聚合注解,用于将多条消息聚合成一条。
  • Splitter:消息的分割,用于将一条消息拆分成多条。

Binders:目标绑定器,负责与外部消息中间件系统集成的组件。

  • doBindProducer:绑定消息中间件客户端发送消息模块。
  • doBindConsumer:绑定消息中间件客户端接收消息模块。

Bindings:外部消息中间件系统与应用程序提供的消息生产者和消费者之间的桥梁。

Spring Cloud Stream官方提供了Kafka Binder和RabbitMQ Binder,用于集成Kafka和RabbitMQ,Spring Cloud Alibaba中加入了RocketMQ Binder,用于将RocketMQ集成到Spring Cloud Stream。

Spring Cloud Alibaba RocketMQ架构图

Spring Cloud Alibaba RocketMQ的架构图如下所示:
在这里插入图片描述

  • MessageChannel(output):消息通道,用于发送消息,Spring Cloud Stream的标准接口。
  • MessageChannel(input):消息通道,用于订阅消息,Spring Cloud Stream的标准接口。
  • Binder bindProducer:目标绑定器,将发送通道发过来的消息发送到RocketMQ消息服务器,由Spring Cloud Alibaba团队按照Spring Cloud Stream的标准协议实现。
  • Binder bindConsumer:目标绑定器,将接收到RocketMQ消息服务器的消息推送给订阅通道,由Spring Cloud Alibaba团队按照Spring Cloud Stream的标准协议实现。

Spring Cloud Stream消息发送流程

Spring Cloud Stream消息发送流程如下图所示,包括发送、订阅、分发、委派、消息处理等,具体实现如下:
在这里插入图片描述
在业务代码中调用MessageChannel接口的Send()方法,例如source.output().send(message)。

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}

@FunctionalInterface
public interface MessageChannel {
    long INDEFINITE_TIMEOUT = -1L;

    default boolean send(Message<?> message) {
        return this.send(message, -1L);
    }

    boolean send(Message<?> var1, long var2);
}

AbstractMessageChannel是消息通道的基本实现类,提供发送消息和接收消息的公用方法。

@IntegrationManagedResource
public abstract class AbstractMessageChannel extends IntegrationObjectSupport implements MessageChannel, TrackableComponent, InterceptableChannel, MessageChannelMetrics, ConfigurableMetricsAware<AbstractMessageChannelMetrics>, IntegrationPattern {
	
	public boolean send(Message<?> messageArg, long timeout) {
	    // 省略部分代码
        sent = this.doSend(message, timeout);
        // 省略部分代码
        return sent;
     }
	protected abstract boolean doSend(Message<?> var1, long var2);
}

消息发送到AbstractSubscribableChannel类实现的doSend()方法。

protected boolean doSend(Message<?> message, long timeout) {
        try {
            return this.getRequiredDispatcher().dispatch(message);
        } catch (MessageDispatchingException var6) {
            String description = var6.getMessage() + " for channel '" + this.getFullChannelName() + "'.";
            throw new MessageDeliveryException(message, description, var6);
        }
    }

通过消息分发类MessageDispatcher把消息分发给MessageHandler。

private MessageDispatcher getRequiredDispatcher() {
        MessageDispatcher dispatcher = this.getDispatcher();
        Assert.state(dispatcher != null, "'dispatcher' must not be null");
        return dispatcher;
}

protected abstract MessageDispatcher getDispatcher();

从AbstractSubscribableChannel的实现类DirectChannel得到MessageDispatcher的实现类UnicastingDispatcher。

public class DirectChannel extends AbstractSubscribableChannel {

	protected UnicastingDispatcher getDispatcher() {
        return this.dispatcher;
    }
}

调用dispatch()方法把消息分发给各个MessageHandler。

public class UnicastingDispatcher extends AbstractDispatcher {
	public final boolean dispatch(Message<?> message) {
        if (this.executor != null) {
            Runnable task = this.createMessageHandlingTask(message);
            this.executor.execute(task);
            return true;
        } else {
            return this.doDispatch(message);
        }
    }

	private boolean doDispatch(Message<?> message) {
        if (this.tryOptimizedDispatch(message)) {
            return true;
        } else {
            boolean success = false;
            Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
            if (!handlerIterator.hasNext()) {
                throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
            } else {
                ArrayList exceptions = null;

                while(!success && handlerIterator.hasNext()) {
                    MessageHandler handler = (MessageHandler)handlerIterator.next();

                    try {
                        handler.handleMessage(message);
                        success = true;
                    } catch (Exception var9) {
                        RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message, () -> {
                            return "Dispatcher failed to deliver Message";
                        }, var9);
                        if (exceptions == null) {
                            exceptions = new ArrayList();
                        }

                        exceptions.add(runtimeException);
                        boolean isLast = !handlerIterator.hasNext();
                        if (!isLast && this.failover) {
                            this.logExceptionBeforeFailOver(var9, handler, message);
                        }

                        this.handleExceptions(exceptions, message, isLast);
                    }
                }

                return success;
            }
        }
    }
}

遍历所有MessageHandler,调用handlerMessage()处理消息。

private Iterator<MessageHandler> getHandlerIterator(Message<?> message) {
        Set<MessageHandler> handlers = this.getHandlers();
        return this.loadBalancingStrategy != null ? this.loadBalancingStrategy.getHandlerIterator(message, handlers) : handlers.iterator();
    }

查看MessageHandler是从哪里来的,也就是handlers列表中的MessageHandler是如何添加的。

public abstract class AbstractSubscribableChannel extends AbstractMessageChannel implements SubscribableChannel, SubscribableChannelManagement {

	public boolean subscribe(MessageHandler handler) {
        MessageDispatcher dispatcher = this.getRequiredDispatcher();
        boolean added = dispatcher.addHandler(handler);
        this.adjustCounterIfNecessary(dispatcher, added ? 1 : 0);
        return added;
    }
}

AbstractMessageChannelBinder在初始化Binding时,会创建并初始化SendingHandler,调用subscribe()添加到handlers列表。

public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties, P extends ProducerProperties, PP extends ProvisioningProvider<C, P>> extends AbstractBinder<MessageChannel, C, P> implements PollableConsumerBinder<MessageHandler, C>, ApplicationEventPublisherAware {

	public final Binding<MessageChannel> doBindProducer(final String destination, MessageChannel outputChannel, final P producerProperties) throws BinderException {
        // 创建Producer的messageHandler
        final MessageHandler producerMessageHandler;
        final ProducerDestination producerDestination;
        try {
            // 省略部分代码
            producerMessageHandler = this.createProducerMessageHandler(producerDestination, producerProperties, outputChannel, errorChannel);
            // 省略部分代码
            // 创建SendingHandler并调用subscribe
        ((SubscribableChannel)outputChannel).subscribe(new AbstractMessageChannelBinder.SendingHandler(producerMessageHandler, HeaderMode.embeddedHeaders.equals(producerProperties.getHeaderMode()), this.headersToEmbed, this.useNativeEncoding(producerProperties)));
        // 省略部分代码
    }

}

Producer的MessageHandler是由消息中间件Binder来完成的,Spring Cloud Stream提供了创建MessageHandler的规范。

AbstractMessageChannelBinder的初始化由AbstractBindingLifecycle在Spring 容器加载所有Bean。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/236961.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Redis基础系列-主从复制

Redis基础系列-主从复制 文章目录 Redis基础系列-主从复制1. 什么是 Redis 主从复制&#xff1f;2. 主从复制有什么好处&#xff1f;3. 如何配置 Redis 主从复制&#xff1f;4. 主从复制的验证4.1 如何查看主从搭建成功4.2 主从常见疑问4.3 主从常见命令 5. 主从复制的原理和工…

2024美赛备战2--模型建立(*****必看****)

建模 美赛涉及的建模知识范围非常广且深&#xff0c;纵观美赛真题不难发现&#xff0c;很多的模型 都是读研或者读博的时候才会真正深入开始研究&#xff0c;因此&#xff0c;对于做建模的同学来说&#xff0c; 是无法在赛前吃透大量模型的。推荐本科生分两个步骤去有效准备比赛…

设计模式——建造者模式(创建型)

引言 生成器模式是一种创建型设计模式&#xff0c; 使你能够分步骤创建复杂对象。 该模式允许你使用相同的创建代码生成不同类型和形式的对象。 问题 假设有这样一个复杂对象&#xff0c; 在对其进行构造时需要对诸多成员变量和嵌套对象进行繁复的初始化工作。 这些初始化代码…

探索Scrapy-spider:构建高效网络爬虫

Spider简介 Scrapy中的Spider是用于定义和执行数据抓取逻辑的核心组件。Spider负责从指定的网站抓取数据&#xff0c;并定义了如何跟踪链接、解析内容以及提取数据的规则。它允许您定制化地指定要抓取的网站、页面和所需的信息。Spider的作用是按照预定的规则爬取网页&#xf…

关于个人职业选择

职业选择&#xff0c;一直是个老生常谈的话题。这并不是一个容易做的决定。 让我们来看看AI怎么说。 首先是方向性的回答&#xff1a; 然后是一些具体的回答 我个人比较倾向于深耕网络安全。这是一个很有趣也是一个持续发展着的领域。 不知道关于这个事情你怎么看&#xff0…

【C++】POCO学习总结(十二):流(文本编解码、数据压缩、文件读写流等)

【C】郭老二博文之&#xff1a;C目录 1、说明 POCO提供了多种流类&#xff0c;与标准c IOStreams兼容。 大多数POCO流类被实现为过滤器&#xff0c;这意味着它们不写入或读取设备&#xff0c;而是从它们连接的另一个流。 2、文本编解码 2.1 说明 POCO提供了用于编码和解码…

【每日一题】最小体力消耗路径

文章目录 Tag题目来源解题思路方法一&#xff1a;二分枚举答案 写在最后 Tag 【二分枚举答案】【图】【2023-12-11】 题目来源 1631. 最小体力消耗路径 解题思路 拿到这个题目&#xff0c;计算从左上角到右下角的最小体力消耗值&#xff0c;有点像 64. 最小路径和。在 64 题…

散点图直方图折线图的替代

散点图直方图折线图的替代 seaborn官网 数据科学数据可视化&#xff0c;散点图 直方图 折线图的新方法 1.hexbinplot https://seaborn.pydata.org/examples/hexbin marginals.html相当于散点图做了聚合/分箱&#xff0c;使数据的分布展示更明显。Library: seaborn 2.瀑布图展示…

Linux 驱动开发需要掌握哪些编程语言和技术?

Linux 驱动开发需要掌握哪些编程语言和技术&#xff1f; 在开始前我有一些资料&#xff0c;是我根据自己从业十年经验&#xff0c;熬夜搞了几个通宵&#xff0c;精心整理了一份「Linux从专业入门到高级教程工具包」&#xff0c;点个关注&#xff0c;全部无偿共享给大家&#xf…

图论专栏一《图的基础知识》

图论&#xff08;Graph Theory&#xff09;是数学的一个分支。它以图为研究对象。图论中的图是由若干给定的点及连接两点的线所构成的图形&#xff0c;这种图形通常用来描述某些实体之间的某种特定关系&#xff0c;用点代表实体&#xff0c;用连接两点的线表示两个实体间具有的…

OpenVINS学习2——VIRAL数据集eee01.bag运行

前言 周末休息了两天&#xff0c;接着做上周五那个VIRAL数据集没有运行成功的工作。现在的最新OpenVINS需要重新写配置文件&#xff0c;不像之前那样都写在launch里&#xff0c;因此需要根据数据集情况配置好estimator_config.yaml还有两个标定参数文件。 VIRAL数据集 VIRAL…

从零开始实现神经网络(三)_RNN循环神经网络

参考文章&#xff1a;rnn循环神经网络介绍 循环神经网络 &#xff08;RNN&#xff09; 是一种专门处理序列的神经网络。它们通常用于自然语言处理 &#xff08;NLP&#xff09; 任务&#xff0c;因为它们在处理文本方面很有效。在这篇文章中&#xff0c;我们将探讨什么是 RNN&a…

【简易版】Linux下Protobuf 实现网络版通讯录--C++

一、介绍 该项目的主要目的是用于熟悉protobuf的使用&#xff0c;体验数据在网络中序列化反序列化的形式&#xff0c;并非一个完整的项目。 该通讯录只实现了增加联系人的功能。服务器端接收到请求后会将联系人的信息打印。 二、环境搭建 使用Httplib库&#xff0c;可以快速…

【ClickHouse】ClickHouse与MySQL之间实时同步数据(MySQL引擎),将MySQL数据实时同步到clickhouse

参考1:MySQL(通过该配置实现了实时同步) 参考2:experimental MaterializedMySQL 参考3:[experimental] MaterializedMySQL(包含设置 allow_experimental_database_materialized_mysql) MySQL引擎用于将远程的MySQL服务器中的表映射到ClickHouse中&#xff0c;并允许您对表进行I…

亚信科技AntDB携手蓝凌软件,助推企业数字化办公转型升级

随着企业数字化转型的深入&#xff0c;企业对于协同办公、移动门户、数字运营、智能客服等方面的需求越来越高&#xff0c;数智化正成为催生新动能和新优势的关键力量。数字化的办公平台可以帮助企业实现各类信息、流程的集中化、数字化和智能化管理&#xff0c;为企业管理者提…

2-7、转义字符

语雀原文链接 文章目录 1、转义字符2、\r\n的遗留问题3、System 1、转义字符 \r 回车&#xff0c;将光标定位在当前行的开头&#xff0c;不会跳到下一行。return\n 换行符&#xff0c;将光标定位在下一行的开头。newline 2、\r\n的遗留问题 我们在平时使用电脑时&#xff0c…

Mybatis是如何进行分页的?

程序员的公众号&#xff1a;源1024&#xff0c;获取更多资料&#xff0c;无加密无套路&#xff01; 最近整理了一份大厂面试资料《史上最全大厂面试题》&#xff0c;Springboot、微服务、算法、数据结构、Zookeeper、Mybatis、Dubbo、linux、Kafka、Elasticsearch、数据库等等 …

WRF--修改geo_em.d01.nc中的变量,保持其他信息不变

WRF–修改geo_em.d01.nc中的变量&#xff0c;保持其他信息不变 首先呢&#xff0c;找到编译WRF过程中自带的读取nc的一个fortran函数&#xff1a;read_wrf_nc.f90 可以使用Linux命令&#xff1a; find / -name read_wrf_nc.f90 找到之后&#xff0c;修改这个文件&#xff0c…

镜头驱动芯片选型 GC6236,GC6208,GC6209的型号分析,多应用于摄像机镜头,家庭监控云台驱动等产品中

国产芯片GC6236&#xff0c;GC6208&#xff0c;GC6209 为5V摄像机镜头驱动芯片&#xff0c;电压范围在3~5.5(V)&#xff0c;最大持续电流可达0.8(A)最高工作温度在-40~100之间。其特点都具有5V多通道&#xff0c;低噪步进电机驱动和霍尔自动光圈驱动等。可应用在摄像机镜头,家庭…