Golang 搭建 WebSocket 应用(三) - 实现一个消息推送中心

有了前两篇的铺垫,相信大家已经对 Golang 中 WebSocket 的使用有一定的了解了,
今天我们以一个更加真实的例子来学习如何在 Golang 中使用 WebSocket

需求背景

在实际的项目中,往往有一些任务耗时比较长,然后我们会把这些任务做异步的处理,但是又要及时给客户端反馈任务的处理进度。

对于这种场景,我们可以使用 WebSocket 来实现。其他可以使用 WebSocket 进行通知的场景还有像管理后台一些通知(比如新订单通知)等。

在本篇文章中,就是要实现一个这样的消息推送系统,具体来说,它会有以下功能:

  1. 可以给特定的用户推送:建立连接的时候,就建立起 WebSocket 连接与用户 ID 之间的关联
  2. 断开连接的时候,移除 WebSocket 连接与用户的关联,并且关闭这个 WebSocket 连接
  3. 业务系统可以通过 HTTP 接口来给特定的用户推送 WebSocket 消息:只要传递用户 ID 以及需要推送的消息即可

基础框架

下面是一个最简单版本的框架图:

在这里插入图片描述

它包含如下几个角色:

  1. Client 客户端,也就是实际中接收消息通知的浏览器
  2. Server 服务端,在我们的例子中,服务端实际不处理业务逻辑,只处理跟客户端的消息交互:维持 WebSocket 连接,推送消息到特定的 WebSocket 连接
  3. 业务逻辑:这个实际上不属于 demo 的一部分,但是 Server 推送的数据是来自业务逻辑处理的结果

设计成这样的目的是为了将技术跟业务进行分离,业务逻辑上的变化不影响到底层技术,同样的,WebSocket 推送中心的技术上的变动也不会影响到实际的业务。

开始开发

一些结构体变动

  1. Client 结构体的变化
type Client struct {
	hub *Hub
	conn *websocket.Conn
	send chan []byte
    // 新增字段
    uid int
}

因为我们需要建立起 WebSocket 连接与用户之间的关联,因此我们需要一个额外的字段来记录用户 ID,也就是上面的 uid 字段。

这个字段会在客户端建立连接后写入。

  1. Hub 结构体的变化
type Hub struct {
	clients map[*Client]bool
	register chan *Client
	unregister chan *Client

	// 记录 uid 跟 client 的对应关系
	userClients map[int]*Client

    // 读写锁,保护 userClients 以及 clients 的读写
	sync.RWMutex
}
  1. 因为我们不再需要做广播,所以会移除 Hub 中的 broadcast 字段。

取而代之的是,我们会直接在消息推送接口中写入到 uid 对应的 Clientsend 通道。
当然我们也可以在 Hub 中另外加一个字段来记录要推送给不同 uid 的消息,但是我们的 Hubrun 方法是一个协程处理的,当需要推送的数据较多或者其中有
网络延迟的时候,会直接影响到推送给其他用户的消息。当然我们也可以改造一下 run 方法,启动多个协程来处理,不过这样比较复杂,本文会在 writePump 中处理。
(也就是建立 WebSocket 连接时的那个写操作协程)

  1. 同时为了更加快速地通过 uid 来获取对应的 WebSocket 连接,新增了一个 userClients 字段。

这是一个 map 类型的字段,keyuid,值是对应的 Client 指针。

  1. 最后新增了一个 Mutex 互斥锁

因为,在用户实际进行登录的时候需要写入 userClients 字段,而这是一个 map 类型字段,并不支持并发读写。
如果我们在接受并发连接的时候同时修改 userClients 的时候会导致 panic,因此我们使用了一个互斥锁来保证 userClients 的读写安全。

同时,clients 也是一个 map,但上一篇文章中没有使用 sync.Mutex 来保护它的读写,在并发操作的时候也是会有问题的,
所以 Mutex 同时也需要保护 clients 的读写。

func (h *Hub) run() {
	for {
		select {
		case client := <-h.register:
			h.Lock()
			h.clients[client] = true
			h.Unlock()
		case client := <-h.unregister:
			if _, ok := h.clients[client]; ok {
				h.Lock()
				delete(h.userClients, client.uid)
				delete(h.clients, client)
				h.Unlock()
				close(client.send)
			}
		}
	}
}

最后,我们会在 Hubrun 方法中写 userClients 或者 clients 字段的时候,先获取锁,写成功的时候释放锁。

建立连接

在本篇中,将会继续沿用上一篇的代码,只是其中一些细节会有所改动。建立连接这步操作,跟上一篇的一样:

// 将 HTTP 转换为 WebSocket 连接的 Upgrader
var upgrader = websocket.Upgrader{
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
}

// 处理 WebSocket 连接请求
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
    // 升级为 WebSocket 连接
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println(err)
		return
	}
    // 新建一个 Client
	client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
    // 注册到 Hub
	client.hub.register <- client

    // 推送消息的协程
	go client.writePump()
    // 结束消息的协程
	go client.readPump()
}

接收消息

由于我们要做的只是一个推送消息的系统,所以我们只处理用户发来的登录请求,其他的消息会全部丢弃:

func (c *Client) readPump() {
	defer func() {
		c.hub.unregister <- c
		_ = c.conn.Close()
	}()
	c.conn.SetReadLimit(maxMessageSize)
	c.conn.SetReadDeadline(time.Time{}) // 永不超时
	for {
		// 从客户端接收消息
		_, message, err := c.conn.ReadMessage()
		if err != nil {
			log.Println("readPump error: ", err)
			break
		}

		// 只处理登录消息
		var data = make(map[string]string)
		err = json.Unmarshal(message, &data)
		if err != nil {
			break
		}

		// 写入 uid 以及 Hub 的 userClients
		if uid, ok := data["uid"]; ok {
			c.uid = uid
			c.hub.Lock()
			c.hub.userClients[uid] = c
			c.hub.Unlock()
		}
	}
}

在本文中,假设客户端的登录消息格式为 {"uid": "123456"} 这种 json 格式。

在这里也操作了 userClients 字段,同样需要使用互斥锁来保证操作的安全性。

发送消息

  1. 在我们的系统中,可以提供一个 HTTP 接口来跟业务系统进行交互:
// 发送消息的接口
// 参数:
// 1. uid:接收消息的用户 ID
// 2. message:需要发送给这个用户的消息
http.HandleFunc("/send", func(w http.ResponseWriter, r *http.Request) {
    send(hub, w, r)
})

// 发送消息的方法
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {
	uid := r.FormValue("uid")
	// 参数错误
	if uid == "" {
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	// 从 hub 中获取 client
	hub.Lock()
	client, ok := hub.userClients[uid]
	hub.Unlock()
	// 尚未建立连接
	if !ok {
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	// 发送消息
	message := r.FormValue("message")
	client.send <- []byte(message)
}
  1. 实际发送消息的操作

writePump 方法中,我们会将从 /send 接收到的数据发送给对应的用户:

// 发送消息的协程
func (c *Client) writePump() {
	defer func() {
		_ = c.conn.Close()
	}()
	for {
		select {
		case message, ok := <-c.send:
			// 设置写超时时间
			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			// 连接已经被关闭了
			if !ok {
				c.conn.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}

			// 获取一个发送消息的 Writer
			w, err := c.conn.NextWriter(websocket.TextMessage)
			if err != nil {
				return
			}
			// 写入消息到 Writer
			w.Write(message)

            // 关闭 Writer
			if err := w.Close(); err != nil {
				return
			}
		}
	}
}

在这个方法中,我们会从 c.send 这个 chan 中获取需要发送给客户端的消息,然后进行发送操作。

测试

  1. 启动 main 程序
go run main.go
  1. 打开一个浏览器的控制台,执行以下代码
ws = new WebSocket('ws://127.0.0.1:8181/ws')
ws.send('{"uid": "123"}')

这两行代码的作用是与 WebSocket 服务器建立连接,然后发送一个登录信息。

然后我们打开控制台的 Network -> WS -> Message 就可以看到浏览器发给服务端的消息:

在这里插入图片描述

  1. 使用 HTTP 客户端发送消息给 uid 为 123 的用户

假设我们的 WebSocket 服务器绑定的端口为 8181

打开终端,执行以下命令:

curl "http://localhost:8181/send?uid=123&message=Hello%20World"

然后我们可以在 Network -> WS -> Message 看到接收到了消息 Hello World

在这里插入图片描述

结束了

到此为止,我们已经实现了一个初步可工作的 WebSocket 应用,当然还有很多可以优化的地方,
比如:

  1. 错误处理
  2. Hub 状态目前对外部来说是一个黑盒子,我们可以加个接口返回一下 Hub 的当前状态,比如当前连接数
  3. 日志:出错的时候,日志可以帮助我们快速定位问题

这些功能会在后续继续完善,今天就到此为止了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/330097.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【创作活动】ChatGPT 和文心一言哪个更好用?

文章目录 文心一言优点缺点 ChatGPT优点缺点 Java编码能力比较对人工智能的看法 ChatGPT是由OpenAI开发的交互式AI大模型&#xff0c; 文心一言是由百度研发的知识增强大语言模型&#xff0c;本文从Java开发的角度对比一下哪个更好用&#xff08;本文仅用于投稿CSDN创造活动&am…

2024年阿里云服务器4核8G配置活动价格多少钱?

阿里云服务器4核8g配置云服务器u1价格是955.58元一年&#xff0c;4核8G配置还可以选择ECS计算型c7实例、计算型c8i实例、计算平衡增强型c6e、ECS经济型e实例、AMD计算型c8a等机型等ECS实例规格&#xff0c;规格不同性能不同&#xff0c;价格也不同&#xff0c;阿里云服务器网al…

缺失msvcr120.dll怎么办,msvcr120.dll一键修复教程

在计算机系统运行过程中&#xff0c;遇到“msvcr120.dll丢失”的情况时常困扰着众多用户。这一现象不仅会导致部分软件无法正常启动或运行&#xff0c;还可能引发一系列连锁反应&#xff0c;影响到用户的日常操作体验。msvcr120.dll作为Microsoft Visual C Redistributable Pac…

使用easyexcel 导出多级表头demo

先看效果&#xff1a; 1、引入maven依赖 <!--EasyExcel --> <dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>3.2.1</version> </dependency> 2、实体类 package com.…

全球光伏知名企业-晶科能源联合泛微采知连,建立文控管理平台

晶科能源股份有限公司&#xff08;简称“晶科能源”&#xff09;是一家全球知名、极具创新力的太阳能科技企业。 &#xff08;图片素材来自晶科能源官网&#xff09; 公司战略性布局光伏产业链核心环节&#xff0c;聚焦光伏产品一体化研发制造和清洁能源整体解决方案提供&…

安全牧场,保障优质奶源 追溯羊奶品质

安全牧场&#xff0c;保障优质奶源 追溯羊奶品质 近年来&#xff0c;人们对食品安全和健康越来越关注&#xff0c;而安全牧场的兴起正能够满足人们对优质奶源的需求。安全牧场以严格的品质监控和科学的管理&#xff0c;为消费者提供可追溯的高品质羊奶产品。本文小编羊大师将为…

白山云基于StarRocks数据库构建湖仓一体数仓的实践

背景 随着每天万亿级别的业务数据流向数据湖&#xff0c;数据湖的弊端也逐渐凸显出来&#xff0c;例如&#xff1a; 数据入湖时效性差&#xff1a;数据湖主要依赖于离线批量计算&#xff0c;通常不支持实时数据更新&#xff0c;因此无法保证数据的强一致性&#xff0c;造成数…

openssl3.2 - 官方demo学习 - test - certs

文章目录 openssl3.2 - 官方demo学习 - test - certs概述笔记.sh的执行语句打印的方法要修改的实际函数END openssl3.2 - 官方demo学习 - test - certs 概述 官方demos目录有证书操作的例子 已经做了笔记 openssl3.2 - 官方demo学习 - certs 但是这个demos/certs目录的脚本,…

龙腾荆楚 | 软件供应链安全检测中心落地襄阳

1月16日&#xff0c;襄阳市东津新区“园区提质、企业满园”行动暨2024年东津云谷首月重大项目集中签约活动圆满完成&#xff0c;开源网安城市级项目再下一城&#xff0c;分别与襄阳市政府、高校、国投签订战略合作协议&#xff0c;推动荆楚地区数字政府、数字经济、数字社会、数…

DQN、Double DQN、Dueling DQN、Per DQN、NoisyDQN 学习笔记

文章目录 DQN (Deep Q-Network)说明伪代码应用范围 Double DQN说明伪代码应用范围 Dueling DQN实现原理应用范围伪代码 Per DQN (Prioritized Experience Replay DQN)应用范围伪代码 NoisyDQN伪代码应用范围 部分内容与图片摘自&#xff1a;JoyRL 、 EasyRL DQN (Deep Q-Networ…

H5小游戏如何提升APP变现收益?

在当前用户规模稳定但变现压力增加的背景下&#xff0c;开发者需要挖掘用户价值&#xff0c;提高营收&#xff0c;这成为开发者关注的重点话题。对于那些“用户用完即走”的APP产品来说&#xff0c;接入H5游戏能够吸引停留&#xff0c;为其带来收入上的增长。 一、什么是H5游戏…

异步Merkle Tree

1. 引言 前序博客&#xff1a; 利用多核的Rust快速Merkle tree Anoushk Kharangate 2023年论文《Asynchronous Merkle Trees》&#xff0c;其对Merkle tree数据结构进行修改&#xff0c;使得可跨多线程异步计算。 开源代码实现见&#xff1a; https://github.com/anoushk1…

Kylin 安装novnc 远程访问

noVNC可以使用浏览器直接访问服务器&#xff0c;而不需要使用VNC客户端。 1.初始环境 关闭防火墙或允许IP访问本机 2.安装依赖 dnf install -y tigervnc-server git 3.git下载novnc git clone https://github.com/novnc/noVNC.git 4.配置信任证书 openssl req -new -x509 …

上海亚商投顾:沪指探底回升 大金融板块午后走强

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 指昨日探底回升&#xff0c;深成指、创业板指午后跌超1%&#xff0c;尾盘集体拉升翻红&#xff0c;北证50指数涨…

回归预测 | Matlab实现MSADBO-CNN-LSTM基于改进蜣螂算法优化卷积神经网络-长短期记忆神经网络多特征回归预测

回归预测 | Matlab实现MSADBO-CNN-LSTM基于改进蜣螂算法优化卷积神经网络-长短期记忆神经网络多特征回归预测 目录 回归预测 | Matlab实现MSADBO-CNN-LSTM基于改进蜣螂算法优化卷积神经网络-长短期记忆神经网络多特征回归预测预测效果基本描述程序设计参考资料 预测效果 基本描…

顶顶通呼叫中心中间件自动外呼来电转人工显示被叫号码而不是显示路由条件 :一步步配置(mod_cti基于FreeSWITCH)

介绍 顶顶通呼叫中心中间件自动外呼来电转人工显示被叫号码而不是显示自动外呼的路由条件&#xff0c;可以是默认的被叫号码也可以改为显示指定的号码 一、显示默认被叫 1、配置拨号方案 打开ccadmin-》点击拨号方案-》找到进入排队-》配置跟图中一样的通道变量。修改了拨号…

electron桌面应用开发——快速入门教程

文章目录 前言一、electron是什么&#xff1f;二、electron 进程模型1.主进程2.渲染进程3.预加载脚本4.进程通信4.1 sendon&#xff08;单向&#xff09;4.2 invokehandle (双向)4.3 主进程向渲染进程发送事件 三、窗口创建与应用事件四、技术栈和构建工具五、electron-vite安装…

【数据分析实战】冰雪大世界携程景区游客客源分布pyecharts地图

文章目录 引言数据集展示Python代码可视化展示本人浅薄分析 写在最后 今年冬天&#xff0c;哈尔滨冰雪旅游"杀疯了"&#xff0c;在元旦假期更是被南方游客"包场"。据哈尔滨市文化广电和旅游局提供大数据测算&#xff0c;截至元旦假日第3天&#xff0c;哈尔…

低代码配置-属性配置面板设计

模块设计 tab项切换 组件基础属性组件数据属性组件事件属性表单属性 模块输出函数设计 tab切换函数 列表表单属性 数据来源&#xff1a; 调用接口时一次赋予&#xff0c;无需使用selectItem&#xff0c;如需使用&#xff0c;归入基础属性列表标题是否展示筛选区域 示例&am…