【stomp 实战】spring websocket用户消息发送源码分析

这一节,我们学习用户消息是如何发送的。

消息的分类

spring websocket将消息分为两种,一种是给指定的用户发送(用户消息),一种是广播消息,即给所有用户发送消息。那怎么区分这两种消息呢?那就是用前缀了。

用户消息的前缀

  • 不配置的情况下,默认用户消息的前缀是/user
  • 也可以通过下面的方式来配置用户消息
@Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {

        /**
         * stompClient.subscribe("/user/topic/subNewMsg",...)
         * 这个时候,后端推送消息应该这么写
         * msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg);
         * 即去掉了/user前缀
         */
        registry.setUserDestinationPrefix(WsConstants.USER_DESTINATION_PREFIX);
    }
  • 默认情况下,/user是用户消息前缀,那么前端订阅的代码可以这么写
 //订阅用户消息topic1
 stompClient.subscribe("/user/topic/answer", function (response) {
 //do something
 });
  • 后端的发送消息的代码可以这么写,注意,在这里发送的时候,调用的convertAndSendToUser没有带/user前缀
    private final SimpMessageSendingOperations msgOperations;
    public void echo(Principal principal, Msg msg) {
        msgOperations.convertAndSendToUser(username, "/topic/answer", msg);
    }

广播消息的前缀

  • 广播消息没有默认值,必须显示地指定
  • 配置广播消息的前缀是这么配置,通过/topic或者/queue前缀来订阅的,就是广播消息
@Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic", "/queue")
                //配置stomp协议里, server返回的心跳
                .setHeartbeatValue(new long[]{10000L, 10000L})
                //配置发送心跳的scheduler
                .setTaskScheduler(new DefaultManagedTaskScheduler());
    }
  • 前端代码可以这么写
//订阅广播消息topic
  stompClient.subscribe("/topic/boardCast/hello", function (response) {
      // do something
  });
  • 后端代码可以这么写
  private final SimpMessageSendingOperations msgOperations;
  public void echo2(Msg msg) {
        log.info("收到的消息为:{}", msg.getContent());
        msgOperations.convertAndSend("/topic/boardCast/hello", "hello boardCast Message");
    }

发送用户消息源码分析

用户订阅过程

发送消息,本质上就是从内存中找到注册的用户,通过用户名找到用户会话,在从用户会话中找到该用户的订阅,如果该用户有该订阅,那么就发送消息给前端。

总结一下用户和会话之间的关系,如下图
在这里插入图片描述
如果这块不太熟悉,建议回顾这篇文章,了解一下用户,用户会话,订阅之间的关系:【stomp 实战】Spring websocket 用户订阅和会话的管理源码分析

我们通过Debug来看一下,前端执行用户订阅,经历了哪些过程。
假设,当前登录用户是1001

  stompClient.subscribe("/user/topic/answer", function (response) {
  //do something
   });

该用户建立连接,并且绑定1001的用户会话后,执行后端的订阅注册
DefaultSimpUserRegistry响应订阅事件代码如下:
在这里插入图片描述
可以看到,当前的sessionId,destination

在这里插入图片描述
将订阅放到一个subscriptions的map里面。缓存在内存中。

用户消息的发送

后端代码是这么写的,我们来调试一下

    private final SimpMessageSendingOperations msgOperations;
    public void echo(Principal principal, Msg msg) {
        msgOperations.convertAndSendToUser(username, "/topic/answer", msg);
    }

经过层层调用,发现调到了下面的方法
在这里插入图片描述
发现我们的发送目的地变成了这个:this.destinationPrefix + user + destination
通过调试时,发现值如上图所示。
也就是说,我们的发送目的,变成了/user+用户名+我们传的入参/topic/answer
然后再进入下面的代码

//AbstractMessageSendingTemplate
	@Override
	public void convertAndSend(D destination, Object payload, @Nullable Map<String, Object> headers,
			@Nullable MessagePostProcessor postProcessor) throws MessagingException {
		//对消息进行转换,对象转字符串,或者字节数组之类的
		Message<?> message = doConvert(payload, headers, postProcessor);
		//调用Send发送
		send(destination, message);
	}

做了两个事:

  • 对消息进行转换,对象转字符串,或者字节数组之类的
  • 调用Send发送

再来看下send方法

	@Override
	public void send(D destination, Message<?> message) {
		doSend(destination, message);
	}

再调用doSend,由子类SimpMessagingTemplate实现。

//SimpMessagingTemplate
	@Override
	protected void doSend(String destination, Message<?> message) {
		Assert.notNull(destination, "Destination must not be null");

		SimpMessageHeaderAccessor simpAccessor =
				MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);

		if (simpAccessor != null) {
			if (simpAccessor.isMutable()) {
				simpAccessor.setDestination(destination);
				simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);
				simpAccessor.setImmutable();
				sendInternal(message);
				return;
			}
			else {
				// Try and keep the original accessor type
				simpAccessor = (SimpMessageHeaderAccessor) MessageHeaderAccessor.getMutableAccessor(message);
				initHeaders(simpAccessor);
			}
		}
		else {
			simpAccessor = SimpMessageHeaderAccessor.wrap(message);
			initHeaders(simpAccessor);
		}

		simpAccessor.setDestination(destination);
		simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);
		message = MessageBuilder.createMessage(message.getPayload(), simpAccessor.getMessageHeaders());
		sendInternal(message);
	}

其中最关键的是sendInternal

private void sendInternal(Message<?> message) {
		String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
		Assert.notNull(destination, "Destination header required");

		long timeout = this.sendTimeout;
		boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));

		if (!sent) {
			throw new MessageDeliveryException(message,
					"Failed to send message to destination '" + destination + "' within timeout: " + timeout);
		}
	}

然后再通过messageChannel来发送此条消息。

//AbstractMessageChannel
	@Override
	public final boolean send(Message<?> message, long timeout) {
		Assert.notNull(message, "Message must not be null");
		Message<?> messageToUse = message;
		ChannelInterceptorChain chain = new ChannelInterceptorChain();
		boolean sent = false;
		try {
			messageToUse = chain.applyPreSend(messageToUse, this);
			if (messageToUse == null) {
				return false;
			}
			sent = sendInternal(messageToUse, timeout);
			chain.applyPostSend(messageToUse, this, sent);
			chain.triggerAfterSendCompletion(messageToUse, this, sent, null);
			return sent;
		}
		catch (Exception ex) {
			chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);
			if (ex instanceof MessagingException) {
				throw (MessagingException) ex;
			}
			throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex);
		}
		catch (Throwable err) {
			MessageDeliveryException ex2 =
					new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err);
			chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2);
			throw ex2;
		}
	}
  • 构造了一个拦截链,在发送前,可以进行前置处理和后置处理。这个拦截链就是扩展的关键了。我们可以定义自己的拦截器,在发送消息前后进行拦截处理。这里spring给我们的扩展点。
  • 通过sendInternal将消息发送出去

再来看下sendInternal方法,进入子类ExecutorSubscribableChannel

//ExecutorSubscribableChannel
	@Override
	public boolean sendInternal(Message<?> message, long timeout) {
		for (MessageHandler handler : getSubscribers()) {
			SendTask sendTask = new SendTask(message, handler);
			if (this.executor == null) {
				sendTask.run();
			}
			else {
				this.executor.execute(sendTask);
			}
		}
		return true;
	}

可以看到,通过这个Channel,找到messageHandler,这个messageHandler有多个,依次将消息进行处理。
在这里插入图片描述
这里取到的有两个messageHandler

  • SimpleBrokerMessageHandler
  • UserDestinationMessageHandler

进入SendTask,看一下run方法

//
public void run() {
	Message<?> message = this.inputMessage;
	try {
		message = applyBeforeHandle(message);
		if (message == null) {
			return;
		}
		this.messageHandler.handleMessage(message);
		triggerAfterMessageHandled(message, null);
	}
	catch (Exception ex) {
		triggerAfterMessageHandled(message, ex);
		if (ex instanceof MessagingException) {
			throw (MessagingException) ex;
		}
		String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
		throw new MessageDeliveryException(message, description, ex);
	}
	catch (Throwable err) {
		String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
		MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err);
		triggerAfterMessageHandled(message, ex2);
		throw ex2;
	}
}

这里的关键点是:this.messageHandler.handleMessage(message);
首先会进入SimpleBrokerMessageHandler的handleMessage
在这里插入图片描述
可以看到,这里直接跳出去了。
SimpleBrokerMessageHandler的作用就是,看是不是我们配置的广播消息的前缀,要满足这个条件,才能发送消息。我们配置的前缀是/topic,/queue,这里destination前缀是/user,所以提前返回,不处理。
然后,我们还有一个UserDestinationMessageHandler会继续处理。

在这里插入图片描述
这里对destination进行了处理,发现生成了一个result对象,这里解析出一个targetDestinations,可以看到我们的destination变成了下面的样子
/topic/answer-usero2zuy4zg

  • 这个的构成实际上就是把/user前缀去掉
  • 然后加上-user,后面加上sessionId,就是当前会话的id
  • 最后再以这个新生成的targetDestination,将消息发送出去!
    在这里插入图片描述

这里的messagingTemplate,就是SimpMessagingTemplate。又会回到上面分析的代码。

  • SimpMessagingTemplate调用messageChannel来发送消息
  • messageChannel中会取得两个messageHandler来处理。
    像不像递归调用?
    不过这一次由于我们的destination已经变成了/topic/answer-usero2zuy4zg。这时候,在进入SimpleBrokerMessageHandler时,情况就不一样了

在这里插入图片描述
由于destination变成了/topic开头的,此时我们不会跳出去,会找到用户(-user后面跟了SessionId)订阅,将消息发送出去

可以看到,我们找到了一个用户订阅。在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

其实是每个用户订阅时,会将/user前缀去掉,将用户的destination改写成了如下形式,
/user/topic/hello->/topic/hello-user{sessionId}
所以,经过UserDestinationMessageHandler处理,改写后的destination可以通过destination找到用户会话,将此消息发送出去。
到此,我们的用户消息的发送就分析完了

总结

发送用户消息的整个过程如下:

  • SimpMessageSendingOperations.convertAndSendToUser接口发送用户消息,这里不传/user前缀,注意一下
  • 接着SimpMessagingTemplate进行消息的发送
  • SimpMessagingTemplate会交由MessageChannel
  • MessageChannel将会调用MessageHandler来处理消息,有以下两个MessageHandler
    • SimpleBrokerMessageHandler
    • UserDestinationMessageHandler
  • 经过MessageHandler的处理,destination由/user/topic/answer,变成了/topic/answer-usero2zuy4zg。
  • 改写后的destination可以找到用户会话,将此消息发送出去

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/605620.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

通过 Java 操作 redis -- list 列表基本命令

目录 使用命令 lpush&#xff0c;lrange&#xff0c;rpush 使用命令 lpop 和 rpop 使用命令 blpop&#xff0c;brpop 使用命令 llen 关于 redis list 列表类型的相关命令推荐看Redis - list 列表 要想通过 Java 操作 redis&#xff0c;首先要连接上 redis 服务器&#xff…

抽象类基本概念

抽象类及抽象方法 概念&#xff1a;一个类中没有包含足够的信息来描绘一个具体的对象&#xff0c;这种类被定义为抽象类&#xff0c;含有抽象方法的类也被称为抽象类。 用通俗的话来说就是当一个类的某个功能&#xff08;方法&#xff09;实现不确定时&#xff0c;我们就将该…

【Hadoop】MapReduce (七)

MapReduce 执行流程 MapTask执行流程 Read&#xff1a;读取阶段 MapTask会调用InputFormat中的getSplits方法来对文件进行切片切片之后&#xff0c;针对每一个Split&#xff0c;产生一个RecordReader流用于读取数据数据是以Key-Value形式来产生&#xff0c;交给map方法来处理…

RAFT:引领 Llama 在 RAG 中发展

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

Java面试八股文(MySQL篇)

数据库三范式 数据库事务介绍 索引介绍 SQL优化手段 mysql union 与 union all 语法及用法 并发事务带来的问题 大表如何优化 索引类型介绍 MYSQL数据库锁介绍

代码审计-PHP模型开发篇MVC层动态调试未授权脆弱鉴权未引用错误逻辑

知识点 1、PHP审计-动态调试-未授权安全 2、PHP审计-文件对比-未授权安全 3、PHP审计-未授权访问-三种形态动态调试优点 1、实时跟踪代码执行流程 2、实时获取变量接受数据 3、实时分析指定文件动态 环境配置&#xff1a;https://blog.csdn.net/m0_60571842/article/details/…

您可以使用WordPress创建的19种网站类型

当人们决定为什么他们应该使用WordPress时&#xff0c;我们经常会被问到“WordPress可以做[空白]吗&#xff1f;答案大多是肯定的。在本文中&#xff0c;我们将向您展示您可以使用WordPress创建的19种不同类型的网站&#xff0c;而无需学习任何编程技巧。 目录 隐藏 1 开始使用…

JS-拖拽元素放大缩小

效果左右布局&#xff0c;拖拽后&#xff0c;宽度放大缩小 其实自己写也可以&#xff0c;不过还是发现了两个好用的js库&#xff0c;既然不需要自己写&#xff0c;当然是能偷懒就偷懒 1、resizerjs 官网地址&#xff1a;https://github.com/eknowles/resizerjs <!doctype …

Milvus Cloud 的RAG 的广泛应用及其独特优势

一个典型的 RAG 框架可以分为检索器(Retriever)和生成器(Generator)两块,检索过程包括为数据(如 Documents)做切分、嵌入向量(Embedding)、并构建索引(Chunks Vectors),再通过向量检索以召回相关结果,而生成过程则是利用基于检索结果(Context)增强的 Prompt 来激…

C语言22行代码,让你的朋友以为中了病毒

1 **C语言介绍 ** C语言是一种计算机编程语言&#xff0c;由丹尼斯里奇&#xff08;Dennis Ritchie&#xff09;在1972年左右为UNIX操作系统设计并开发。它具有高效、可移植、灵活和强大的特点&#xff0c;在计算机科学领域中具有广泛的应用。C语言是一种结构化语言&#xff0…

【原创】不同RTOS中POSIX接口的实现差异

文章目录 前言POSIX简介RTOS对POSIX的实现情况ZephyrFreeRTOS RTOS提供的POSIX接口实时吗&#xff1f;nanosleepTimer-不同linux版本和xenomai的实现差异PREEMPT-RT Timer实现原理Xenomai Timer实现原理 总结参考 前言 在开发实时应用时&#xff0c;我们希望软件具备良好的跨平…

用于密集预测任务的通道知识蒸馏——关键字:蒸馏

摘要 https://arxiv.org/pdf/2011.13256 知识蒸馏(KD)已被证明是训练紧凑密集预测模型的简单有效工具。通过从大型教师网络转移而来的额外监督来训练轻量级学生网络。大多数先前的针对密集预测任务的KD变体都在空间域中对学生网络和教师网络的激活图进行对齐,通常是通过在每…

实验9:WLAN配置管理(课内实验)

实验9&#xff1a;WLAN配置管理 实验目的及要求&#xff1a; 掌握无线局域网络无线路由器和无线网络控制器的配置与应用。能够完成配置SSID隐藏、密码认证&#xff0c;远程站点WLAN配置和WLC配置等无线网络配置&#xff0c;完成网络连通性测试。 实验设备&#xff1a; 无线…

吴恩达2022机器学习专项课程C2(高级学习算法)W1(神经网络):2.5 更复杂的神经网络

目录 示例填写第三层的层数1.问题2.答案 公式&#xff1a;计算任意层的激活值激活函数 示例 层数有4层&#xff0c;不包括输入层。 填写第三层的层数 1.问题 你能把第二个神经元的上标和下标填写出来吗&#xff1f; 2.答案 根据公式g(wxb)&#xff0c;这里的x对应的是上…

C# IO下的文件和目录详解

文章目录 1、文件和目录的基本概念2、文件的操作方法打开文件&#xff1a;读取文件&#xff1a;写入文件&#xff1a;删除文件&#xff1a;文件权限 3、目录的操作方法创建目录&#xff1a;遍历目录&#xff1a;删除目录目录权限 4、文件和目录的属性与信息5、文件和目录的相对…

钟表——蓝桥杯十三届2022国赛大学B组真题

问题分析 这个问题的关键有两点&#xff1a;1.怎么计算时针&#xff0c;分针&#xff0c;秒针之间的夹角&#xff0c;2.时针&#xff0c;分针&#xff0c;秒针都是匀速运动的&#xff0c;并非跳跃性的。问题1很好解决看下面的代码就能明白&#xff0c;我们先考虑问题2&#xf…

数字图像处理知识点

数字图像处理知识点 一、绪论1、数字图像处理相关概念2、数字图像处理流程1.3 数字图像处理主要研究内容二、视觉与色度基础1、图像传感器与二维成像原理2、三基色2.1 三基色原理2.2 亮度方程3、HSI模型3.1 HSI模型优点3.2 RGB到HSI转换三、数字图像处理基础1、图像的数字化及表…

unreal engine5.3.2 Quixel bridge无法登陆

UE5系列文章目录 文章目录 UE5系列文章目录前言一、问题定位二、解决方法 前言 这几天unreal engine5.3.2 Quixel bridge无法登陆&#xff0c;输入epic 账号和密码&#xff0c;然后在输入epic发送的验证码&#xff0c;总是提示登录失败。就算是使用科学上网依然无法登录。而且…

linux Shell编程之条件语句

条件测试操作 test命令 条件测试操作 Shell环境根据命令执行后的返回状态值&#xff08;$?&#xff09;来判断是否执行成功&#xff0c;当返回值为0&#xff08;真true&#xff09;时表示成功&#xff0c;返回值为非0值&#xff08;假false&#xff09;时表示失败或异常。 t…

Python深度学习基于Tensorflow(5)机器学习基础

文章目录 监督学习线性回归逻辑回归决策树支持向量机朴素贝叶斯 集成学习BaggingBoosting 无监督学习主成分分析KMeans聚类 缺失值和分类数据处理处理缺失数据分类数据转化为OneHot编码 葡萄酒数据集示例 机器学习的流程如下所示&#xff1a; 具体又可以分为以下五个步骤&#…