【stomp 实战】Spring websocket 用户订阅和会话的管理源码分析

通过Spring websocket 用户校验和业务会话绑定我们学会了如何将业务会话绑定到spring websocket会话上。通过这一节,我们来分析一下会话和订阅的实现

用户会话的数据结构

SessionInfo 用户会话

用户会话定义如下:

private static final class SessionInfo {

		// subscriptionId -> Subscription
		private final Map<String, Subscription> subscriptionMap = new ConcurrentHashMap<>();

		public Collection<Subscription> getSubscriptions() {
			return this.subscriptionMap.values();
		}

		@Nullable
		public Subscription getSubscription(String subscriptionId) {
			return this.subscriptionMap.get(subscriptionId);
		}

		public void addSubscription(Subscription subscription) {
			this.subscriptionMap.putIfAbsent(subscription.getId(), subscription);
		}

		@Nullable
		public Subscription removeSubscription(String subscriptionId) {
			return this.subscriptionMap.remove(subscriptionId);
		}
	}
  • 用户会话中有subscriptionMap。这个表示一个会话中,可以有多个订阅,可以根据subscriptionId找到订阅。

SessionRegistry 用户会话注册

private static final class SessionRegistry {

		private final ConcurrentMap<String, SessionInfo> sessions = new ConcurrentHashMap<>();

		@Nullable
		public SessionInfo getSession(String sessionId) {
			return this.sessions.get(sessionId);
		}

		public void forEachSubscription(BiConsumer<String, Subscription> consumer) {
			this.sessions.forEach((sessionId, info) ->
				info.getSubscriptions().forEach(subscription -> consumer.accept(sessionId, subscription)));
		}

		public void addSubscription(String sessionId, Subscription subscription) {
			SessionInfo info = this.sessions.computeIfAbsent(sessionId, _sessionId -> new SessionInfo());
			info.addSubscription(subscription);
		}

		@Nullable
		public SessionInfo removeSubscriptions(String sessionId) {
			return this.sessions.remove(sessionId);
		}
	}
  • SessionRegistry 中sessions 表示多个会话。根据sessionId可以找到唯一一个会话SessionInfo

Subscription 用户订阅

	private static final class Subscription {

		private final String id;

		private final String destination;

		private final boolean isPattern;

		@Nullable
		private final Expression selector;

		public Subscription(String id, String destination, boolean isPattern, @Nullable Expression selector) {
			Assert.notNull(id, "Subscription id must not be null");
			Assert.notNull(destination, "Subscription destination must not be null");
			this.id = id;
			this.selector = selector;
			this.destination = destination;
			this.isPattern = isPattern;
		}

		public String getId() {
			return this.id;
		}

		public String getDestination() {
			return this.destination;
		}

		public boolean isPattern() {
			return this.isPattern;
		}

		@Nullable
		public Expression getSelector() {
			return this.selector;
		}

		@Override
		public boolean equals(@Nullable Object other) {
			return (this == other ||
					(other instanceof Subscription && this.id.equals(((Subscription) other).id)));
		}

		@Override
		public int hashCode() {
			return this.id.hashCode();
		}

		@Override
		public String toString() {
			return "subscription(id=" + this.id + ")";
		}
	}

SimpUserRegistry 用户注册接口

用户注册的接口如下:

public interface SimpUserRegistry {

	/**
	根据用户名,获取到用户信息
	 * Get the user for the given name.
	 * @param userName the name of the user to look up
	 * @return the user, or {@code null} if not connected
	 */
	@Nullable
	SimpUser getUser(String userName);

	/**
	获取现在所有的注册的用户
	 * Return a snapshot of all connected users.
	 * <p>The returned set is a copy and will not reflect further changes.
	 * @return the connected users, or an empty set if none
	 */
	Set<SimpUser> getUsers();

	/**
	获取在线用户数量
	 * Return the count of all connected users.
	 * @return the number of connected users
	 * @since 4.3.5
	 */
	int getUserCount();

	/**
	 * Find subscriptions with the given matcher.
	 * @param matcher the matcher to use
	 * @return a set of matching subscriptions, or an empty set if none
	 */
	Set<SimpSubscription> findSubscriptions(SimpSubscriptionMatcher matcher);

}

SimpUser实际上就是代表着一个用户,我们来看其实现:LocalSimpUser的定义

	private static class LocalSimpUser implements SimpUser {
		private final String name;
		private final Principal user;
		private final Map<String, SimpSession> userSessions = new ConcurrentHashMap<>(1);
		public LocalSimpUser(String userName, Principal user) {
			Assert.notNull(userName, "User name must not be null");
			this.name = userName;
			this.user = user;
		}
	}

userSessions 表示当前一个用户可以对应多个会话。
这个Principal 是啥,还记得我们上一节通过Spring websocket 用户校验和业务会话绑定中,我们是怎么注册用户的吗

    private void connect(Message<?> message, StompHeaderAccessor accessor) {
        //1通过请求头获取到token
        String token = accessor.getFirstNativeHeader(WsConstants.TOKEN_HEADER);
        //2如果token为空或者用户id没有解析出来,抛出异常,spring会将此websocket连接关闭
        if (StringUtils.isEmpty(token)) {
            throw new MessageDeliveryException("token missing!");
        }
        String userId = TokenUtil.parseToken(token);
        if (StringUtils.isEmpty(userId)) {
            throw new MessageDeliveryException("userId missing!");
        }
        //这个是每个会话都会有的一个sessionId
        String simpleSessionId = (String) message.getHeaders().get(SimpMessageHeaderAccessor.SESSION_ID_HEADER);

        //3创建自己的业务会话session对象
        UserSession userSession = new UserSession();
        userSession.setSimpleSessionId(simpleSessionId);
        userSession.setUserId(userId);
        userSession.setCreateTime(LocalDateTime.now());
        //4关联用户的会话。通过msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg); 此方法,可以发送给用户消息
        accessor.setUser(new UserSessionPrincipal(userSession));
    }

从token中解析出用户的userId,并通过下面的代码,把当前用户和会话绑定起来。一个用户实际上是可以绑定多个会话的。

 accessor.setUser(new UserSessionPrincipal(userSession));

总结一下用户和会话之间的关系,如下图
在这里插入图片描述

订阅过程的源码分析

前端订阅的代码如下

  stompClient.subscribe("/user/topic/answer", function (response) {
      createElement("answer", response.body);
  });

当后端收到订阅消息后,会由SimpleBrokerMessageHandler来处理

	@Override
	protected void handleMessageInternal(Message<?> message) {
		MessageHeaders headers = message.getHeaders();
		String destination = SimpMessageHeaderAccessor.getDestination(headers);
		String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);

		updateSessionReadTime(sessionId);

		if (!checkDestinationPrefix(destination)) {
			return;
		}

		SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
		if (SimpMessageType.MESSAGE.equals(messageType)) {
			logMessage(message);
			sendMessageToSubscribers(destination, message);
		}
		else if (SimpMessageType.CONNECT.equals(messageType)) {
			logMessage(message);
			if (sessionId != null) {
				if (this.sessions.get(sessionId) != null) {
					if (logger.isWarnEnabled()) {
						logger.warn("Ignoring CONNECT in session " + sessionId + ". Already connected.");
					}
					return;
				}
				long[] heartbeatIn = SimpMessageHeaderAccessor.getHeartbeat(headers);
				long[] heartbeatOut = getHeartbeatValue();
				Principal user = SimpMessageHeaderAccessor.getUser(headers);
				MessageChannel outChannel = getClientOutboundChannelForSession(sessionId);
				this.sessions.put(sessionId, new SessionInfo(sessionId, user, outChannel, heartbeatIn, heartbeatOut));
				SimpMessageHeaderAccessor connectAck = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
				initHeaders(connectAck);
				connectAck.setSessionId(sessionId);
				if (user != null) {
					connectAck.setUser(user);
				}
				connectAck.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);
				connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, heartbeatOut);
				Message<byte[]> messageOut = MessageBuilder.createMessage(EMPTY_PAYLOAD, connectAck.getMessageHeaders());
				getClientOutboundChannel().send(messageOut);
			}
		}
		else if (SimpMessageType.DISCONNECT.equals(messageType)) {
			logMessage(message);
			if (sessionId != null) {
				Principal user = SimpMessageHeaderAccessor.getUser(headers);
				handleDisconnect(sessionId, user, message);
			}
		}
		else if (SimpMessageType.SUBSCRIBE.equals(messageType)) {
			logMessage(message);
			this.subscriptionRegistry.registerSubscription(message);
		}
		else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
			logMessage(message);
			this.subscriptionRegistry.unregisterSubscription(message);
		}
	}

当消息类型为SUBSCRIBE时,会调用subscriptionRegistry.registerSubscription(message)
接着来看下subscriptionRegistry.registerSubscription(message)

//AbstractSubscriptionRegistry
	@Override
	public final void registerSubscription(Message<?> message) {
		MessageHeaders headers = message.getHeaders();

		SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
		if (!SimpMessageType.SUBSCRIBE.equals(messageType)) {
			throw new IllegalArgumentException("Expected SUBSCRIBE: " + message);
		}

		String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
		if (sessionId == null) {
			if (logger.isErrorEnabled()) {
				logger.error("No sessionId in  " + message);
			}
			return;
		}

		String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
		if (subscriptionId == null) {
			if (logger.isErrorEnabled()) {
				logger.error("No subscriptionId in " + message);
			}
			return;
		}

		String destination = SimpMessageHeaderAccessor.getDestination(headers);
		if (destination == null) {
			if (logger.isErrorEnabled()) {
				logger.error("No destination in " + message);
			}
			return;
		}

		addSubscriptionInternal(sessionId, subscriptionId, destination, message);
	}

这个代码很简单,就是从消息中取出三个东西,sessionId, subscriptionId, destination,进行注册。

//DefaultSubscriptionRegistry
	@Override
	protected void addSubscriptionInternal(
			String sessionId, String subscriptionId, String destination, Message<?> message) {

		boolean isPattern = this.pathMatcher.isPattern(destination);
		Expression expression = getSelectorExpression(message.getHeaders());
		Subscription subscription = new Subscription(subscriptionId, destination, isPattern, expression);

		this.sessionRegistry.addSubscription(sessionId, subscription);
		this.destinationCache.updateAfterNewSubscription(sessionId, subscription);
	}
	//其实就是添加到sessions map中。会话里把订阅添加到订阅map中
		public void addSubscription(String sessionId, Subscription subscription) {
			SessionInfo info = this.sessions.computeIfAbsent(sessionId, _sessionId -> new SessionInfo());
			info.addSubscription(subscription);
		}

其实就是添加到sessions map中。会话里把订阅添加到订阅map中

那用户和会话是如何关联起来的?
在这里插入图片描述

  • 当订阅事件发生时,取出当前的Principal( accessor.setUser(xxx)设置的),然后生成LocalSimpleUser,即用户
  • 把当前会话,添加到当前用户会话中。这样就给用户绑定好了会话了。

用户会话事件

通过Spring事件机制,管理注册用户信息和会话,包括订阅、取消订阅,会话断连。代码如下

//DefaultSimpUserRegistry
	@Override
	public void onApplicationEvent(ApplicationEvent event) {
		AbstractSubProtocolEvent subProtocolEvent = (AbstractSubProtocolEvent) event;
		Message<?> message = subProtocolEvent.getMessage();
		MessageHeaders headers = message.getHeaders();

		String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
		Assert.state(sessionId != null, "No session id");

		if (event instanceof SessionSubscribeEvent) {
			LocalSimpSession session = this.sessions.get(sessionId);
			if (session != null) {
				String id = SimpMessageHeaderAccessor.getSubscriptionId(headers);
				String destination = SimpMessageHeaderAccessor.getDestination(headers);
				if (id != null && destination != null) {
					session.addSubscription(id, destination);
				}
			}
		}
		else if (event instanceof SessionConnectedEvent) {
			Principal user = subProtocolEvent.getUser();
			if (user == null) {
				return;
			}
			String name = user.getName();
			if (user instanceof DestinationUserNameProvider) {
				name = ((DestinationUserNameProvider) user).getDestinationUserName();
			}
			synchronized (this.sessionLock) {
				LocalSimpUser simpUser = this.users.get(name);
				if (simpUser == null) {
					simpUser = new LocalSimpUser(name, user);
					this.users.put(name, simpUser);
				}
				LocalSimpSession session = new LocalSimpSession(sessionId, simpUser);
				simpUser.addSession(session);
				this.sessions.put(sessionId, session);
			}
		}
		else if (event instanceof SessionDisconnectEvent) {
			synchronized (this.sessionLock) {
				LocalSimpSession session = this.sessions.remove(sessionId);
				if (session != null) {
					LocalSimpUser user = session.getUser();
					user.removeSession(sessionId);
					if (!user.hasSessions()) {
						this.users.remove(user.getName());
					}
				}
			}
		}
		else if (event instanceof SessionUnsubscribeEvent) {
			LocalSimpSession session = this.sessions.get(sessionId);
			if (session != null) {
				String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
				if (subscriptionId != null) {
					session.removeSubscription(subscriptionId);
				}
			}
		}
	}

优雅停机

当服务器停机时,最好给客户端发送断连消息,而不是让客户端过了一段时间发现连接断开。
Spring websocket是如何来实现优雅停机的?

public class SubProtocolWebSocketHandler
		implements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {
	@Override
	public final void stop() {
		synchronized (this.lifecycleMonitor) {
			this.running = false;
			this.clientOutboundChannel.unsubscribe(this);
		}

		// Proactively notify all active WebSocket sessions
		for (WebSocketSessionHolder holder : this.sessions.values()) {
			try {
				holder.getSession().close(CloseStatus.GOING_AWAY);
			}
			catch (Throwable ex) {
				if (logger.isWarnEnabled()) {
					logger.warn("Failed to close '" + holder.getSession() + "': " + ex);
				}
			}
		}
	}

	@Override
	public final void stop(Runnable callback) {
		synchronized (this.lifecycleMonitor) {
			stop();
			callback.run();
		}
	}
}

其奥秘就是其实现了SmartLifecycle。这个是Spring的生命周期接口。我们可以通过实现此接口,在相应的生命周期阶段注册回调事件!
上面的代码,通过调用stop接口,给客户端发送了一个断连的消息。即实现了关机时的主动通知断连。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/586269.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

利用Argo数据分别计算温度、盐度和温盐所造成的比容海平面变化

本文所用到的温盐数据集&#xff1a;IPRC&#xff08;美国夏威夷大学国际太平洋研究中心&#xff09; Argo data products | Argo (ucsd.edu)https://argo.ucsd.edu/data/argo-data-products/ 理论知识&#xff08;相关计算公式&#xff09;&#xff1a; 代码和工具包准备&…

python 中的数据结构

python 中的数据结构 1.1 序列 序列时有索引的数组 举例实现&#xff1a; a["北京","上海","广州","深圳","重庆","成都"] print(a[2]) print(a[-1] " " a[-2]) print(a[1:3]) # 运行结果 "&…

Vulnhub-DIGITALWORLD.LOCAL: VENGEANCE渗透

文章目录 前言1、靶机ip配置2、渗透目标3、渗透概括 开始实战一、信息获取二、smb下载线索三、制作字典四、爆破压缩包密码五、线索分析六、提权&#xff01;&#xff01;&#xff01; Vulnhub靶机&#xff1a;DIGITALWORLD.LOCAL: VENGEANCE ( digitalworld.local: VENGEANCE …

chrome和drive安装包路径

Chrome for Testing availability (googlechromelabs.github.io) 下载Stable下面的包哈

【Leetcode每日一题】 分治 - 排序数组(难度⭐⭐)(69)

1. 题目解析 题目链接&#xff1a;912. 排序数组 这个问题的理解其实相当简单&#xff0c;只需看一下示例&#xff0c;基本就能明白其含义了。 2.算法原理 归并排序&#xff08;Merge Sort&#xff09;是一种采用“分而治之”&#xff08;Divide and Conquer&#xff09;策略…

LLM之RAG实战(三十八)| RAG分块策略之语义分块

在RAG应用中&#xff0c;分块是非常重要的一个环节&#xff0c;常见的分块方法有如下几种&#xff1a; Fixed size chunkingRecursive ChunkingDocument Specific ChunkingSemantic Chunking a&#xff09;Fixed size chunking&#xff1a;这是最常见、最直接的分块方法。我们…

C/C++基础语法练习 - 计算阶乘(新手推荐阅读✨)

题目链接&#xff1a;https://www.starrycoding.com/problem/160 题目描述 给定一个整数 n n n&#xff0c;输出阶乘 n ! n! n!。 输入格式 一个整数 n ( 1 ≤ n ≤ 20 ) n(1 \le n \le 20) n(1≤n≤20)。 输出格式 一个整数 n ! n! n!。 输入样例1 16输出样例1 20922…

树的中心 树形dp

#include<bits/stdc.h> using namespace std; int n; const int N 100005; // 无向边 int ne[N * 2], e[N * 2], idx; int h[N]; int vis[N];int ans 0x7fffffff;void add(int a, int b) {e[idx] b, ne[idx] h[a], h[a] idx; }int dfs(int u) { // 作为根节点vis[u]…

机器学习:基于Sklearn,使用随机森林分类器RandomForestClassifier检测信用卡欺诈

前言 系列专栏&#xff1a;机器学习&#xff1a;高级应用与实践【项目实战100】【2024】✨︎ 在本专栏中不仅包含一些适合初学者的最新机器学习项目&#xff0c;每个项目都处理一组不同的问题&#xff0c;包括监督和无监督学习、分类、回归和聚类&#xff0c;而且涉及创建深度学…

分享一份物联网 SAAS 平台架构设计

一、架构图**** 二、Nginx**** 用于做服务的反向代理。 三、网关**** PaaS平台所有服务统一入口&#xff0c;包含token鉴权功能。 四、开放平台**** 对第三方平台开放的服务入口。 五、MQTT**** MQTT用于设备消息通信、内部服务消息通信。 六、Netty**** Socket通信设…

IoTDB 入门教程①——时序数据库为什么选IoTDB ?

文章目录 一、前文二、性能排行第一三、完全开源四、数据文件TsFile五、乱序数据高写入六、其他七、参考 一、前文 IoTDB入门教程——导读 关注博主的同学都知道&#xff0c;博主在物联网领域深耕多年。 时序数据库&#xff0c;博主已经用过很多&#xff0c;从最早的InfluxDB&a…

正点原子[第二期]Linux之ARM(MX6U)裸机篇学习笔记-9.1-LED灯(模仿STM32驱动开发实验)

前言&#xff1a; 本文是根据哔哩哔哩网站上“正点原子[第二期]Linux之ARM&#xff08;MX6U&#xff09;裸机篇”视频的学习笔记&#xff0c;在这里会记录下正点原子 I.MX6ULL 开发板的配套视频教程所作的实验和学习笔记内容。本文大量引用了正点原子教学视频和链接中的内容。…

IDEA:Server‘s certificate is not trusted(服务器的证书不受信任)

IDEA&#xff1a;Server‘s certificate is not trusted&#xff08;服务器的证书不受信任&#xff09; 打开idea&#xff0c;发现一个莫名其妙的证书弹出来&#xff0c;还关不掉发现组织名是 Doctorcom LTD.百度了下 不知道是什么东西 这也不是下面这种破解了idea的情况 30069…

Ajax.

目录 1. 服务器相关的基础概念 1.1 服务器 1.2 客户端 1.3 服务器对外提供的资源 1.4 数据也是资源 1.5 资源与 URL 地址 1.6 什么是 Ajax 2. Ajax 的基础用法 2.1 POST 请求 2.2 GET 请求 2.3 DELETE 请求 2.4 PUT 请求 2.5 PATCH 请求 3. axios 3.1 axios 的基…

IoTDB 入门教程 问题篇①——内存不足导致datanode服务无法启动

文章目录 一、前文二、问题三、分析四、继续分析五、解决问题 一、前文 IoTDB入门教程——导读 二、问题 执行启动命令&#xff0c;但是datanode服务却无法启动&#xff0c;查询不到6667端口 bash sbin/start-standalone.sh 进而导致数据库连接也同样失败 [rootiZ2ze30dygwd6…

Go 语言(三)【面向对象编程】

1、OOP 首先&#xff0c;Go 语言并不是面向对象的语言&#xff0c;只是可以通过一些方法来模拟面向对象。 1.1、封装 Go 语言是通过结构体&#xff08;struct&#xff09;来实现封装的。 1.2、继承 继承主要由下面这三种方式实现&#xff1a; 1.2.1、嵌套匿名字段 //Add…

实操——使用uploadify插件(php版和Java版) 与 Dropzone.js插件分别实现附件上传

实操——使用uploadify插件&#xff08;php版和Java版&#xff09;与 Dropzone.js插件分别实现附件上传 1. 使用uploadify插件上传1.1 简介1.1.1 简介1.1.2 参考GitHub 1.2 后端PHP版本的uploadify1.2.1 下载项目的目录结构1.2.2 测试看界面效果1.2.3 附页面代码 和 PHP代码 1.…

ctfshow——SQL注入

文章目录 SQL注入基本流程普通SQL注入布尔盲注时间盲注报错注入——extractvalue()报错注入——updataxml()Sqlmap的用法 web 171——正常联合查询web 172——查看源代码、联合查询web 173——查看源代码、联合查询web 174——布尔盲注web 176web 177——过滤空格web 178——过…

LLM 构建Data Multi-Agents 赋能数据分析平台的实践之③:数据分析之二(大小模型协同)

一、概述 随着新一代信息技术在产业数字化中的应用&#xff0c;产生了大量多源多模态信息以及响应的信息处理模式&#xff0c;数据孤岛、模型林立的问题也随之产生&#xff0c;使得业务系统臃肿、信息处理和决策效率低下&#xff0c;面对复杂任务及应用场景问题求解效率低。针…

【iOS】消息流程分析

文章目录 前言动态类型动态绑定动态语言消息发送objc_msgSendSEL&#xff08;selector&#xff09;IMP&#xff08;implementation&#xff09;IMP高级用法 MethodSEL、IMP、Method总结流程概述 快速查找消息发送快速查找的总结buckets 慢速查找动态方法解析resolveInstanceMet…