Golang 搭建 WebSocket 应用(八) - 完整代码

本文应该是本系列文章最后一篇了,前面留下的一些坑可能后面会再补充一下,但不在本系列文章中了。

整体架构

再来回顾一下我们的整体架构:

在这里插入图片描述

在我们的 demo 中,包含了以下几种角色:

  1. 客户端:一般是浏览器,用于接收消息;
  2. Hub:消息中心,用于管理所有的客户端连接,以及将消息推送给客户端;
  3. 调用 /send 发送消息的应用:用于将消息发送给 Hub,然后由 Hub 将消息推送给客户端。

然后,每一个 WebSocket 连接都有一个关联的读协程和写协程,
用于读取客户端发送的消息,以及将消息推送给客户端。

目录结构

├── LICENSE  // 协议
├── Makefile // 一些常用的命令
├── README.md
├── authenticator.go      // 认证器
├── authenticator_test.go // 认证器测试
├── bytes.go // 字符串和 []byte 之间转换的辅助方法
├── client.go // WebSocket 客户端
├── go.mod    // 项目依赖
├── go.sum    // 项目依赖
├── hub.go    // 消息中心
├── main.go   // 程序入口
├── message   // 消息记录器
│   ├── db_logger.go
│   ├── db_logger_test.go
│   ├── log.go
│   └── stdout_logger.go
├── server.go // HTTP 服务
└── server_test.go // HTTP 接口的测试

运行

注:需要 Go 1.20 或以上版本

  1. 下载依赖:

可以使用七牛云的代理加速下载。

go mod tidy
  1. 启动 WebSocket 服务端:
go run main.go

Hub 代码

最终,我们的 Hub 代码演进成了下面这样:

// bufferSize 通道缓冲区、map 初始化大小
const bufferSize = 128

// Handler 错误处理函数
type Handler func(log message.Log, err error)

// Hub 维护了所有的客户端连接
type Hub struct {
	// 注册请求
	register chan *Client
	// 取消注册请求
	unregister chan *Client
	// 记录 uid 跟 client 的对应关系
	userClients map[string]*Client
	// 互斥锁,保护 userClients 以及 clients 的读写
	sync.RWMutex
	// 消息记录器
	messageLogger message.Logger
	// 错误处理器
	errorHandler Handler
	// 验证器
	authenticator Authenticator
	// 等待发送的消息数量
	pending atomic.Int64
}

// 默认的错误处理器
func defaultErrorHandler(log message.Log, err error) {
	res, _ := json.Marshal(log)
	fmt.Printf("send message: %s, error: %s\n", string(res), err.Error())
}

func newHub() *Hub {
	return &Hub{
		register:      make(chan *Client),
		unregister:    make(chan *Client),
		userClients:   make(map[string]*Client, bufferSize),
		RWMutex:       sync.RWMutex{},
		messageLogger: &message.StdoutMessageLogger{},
		errorHandler:  defaultErrorHandler,
		authenticator: &JWTAuthenticator{},
	}
}

// 注册、取消注册请求处理
func (h *Hub) run() {
	for {
		select {
		case client := <-h.register:
			h.Lock()
			h.userClients[client.uid] = client
			h.Unlock()
		case client := <-h.unregister:
			h.Lock()
			close(client.send)
			delete(h.userClients, client.uid)
			h.Unlock()
		}
	}
}

// 返回 Hub 的当前的关键指标
func metrics(hub *Hub, w http.ResponseWriter) {
	pending := hub.pending.Load()
	connections := len(hub.userClients)
	_, _ = w.Write([]byte(fmt.Sprintf("# HELP connections 连接数\n# TYPE connections gauge\nconnections %d\n", connections)))
	_, _ = w.Write([]byte(fmt.Sprintf("# HELP pending 等待发送的消息数量\n# TYPE pending gauge\npending %d\n", pending)))
}

其中:

  • Hub 中的 registerunregister 通道用于处理客户端的注册和取消注册请求;
  • Hub 中的 userClients 用于记录 uidClient 的对应关系;
  • Hub 中的 messageLogger 用于记录消息;
  • Hub 中的 errorHandler 用于处理错误;
  • Hub 中的 authenticator 用于验证客户端的身份;
  • Hub 中的 pending 用于记录等待发送的消息数量。

目前实现存在的问题:

  • registerunregister 通道被消费的时候需要加锁,这样会导致 registerunregister 变成串行的,性能不好;
  • userClients 也是需要加锁的,这样会导致 userClients 的读写也是串行的,性能不好;

对于这两个问题,前面我们讨论过,一种可行的办法分段 map,然后对每一个 map 都有一个对应的 sync.Mutex 互斥锁来保证其读写的安全。

Client 代码

Client 比较关键的方法是:

  • writePump:负责将消息推送给客户端。
  • serveWs:处理 WebSocket 连接请求。
  • send:处理消息发送请求。

writePump

这个方法会从 send 通道中获取消息,然后推送给客户端。
推送失败会调用 errorHandler 处理错误。
推送成功会将 pending 减一。

// writePump 负责推送消息给 WebSocket 客户端
//
// 该方法在一个独立的协程中运行,我们保证了每个连接只有一个 writer。
// Client 会从 send 请求中获取消息,然后在这个方法中推送给客户端。
func (c *Client) writePump() {
	defer func() {
		_ = c.conn.Close()
	}()

	// 从 send 通道中获取消息,然后推送给客户端
	for {
		messageLog, ok := <-c.send

		// 设置写超时时间
		_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
		// c.send 这个通道已经被关闭了
		if !ok {
			c.hub.pending.Add(int64(-1 * len(c.send)))
			return
		}

		if err := c.conn.WriteMessage(websocket.TextMessage, StringToBytes(messageLog.Message)); err != nil {
			c.hub.errorHandler(messageLog, err)
			c.hub.pending.Add(int64(-1 * len(c.send)))
			return
		}

		c.hub.pending.Add(int64(-1))
	}
}

serveWs

serveWs 方法会处理 WebSocket 连接请求,然后将其注册到 Hub 中。
在连接的时候会对客户端进行认证,认证失败会断开连接。
最后会启动读写协程。

// serveWs 处理 WebSocket 连接请求
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
	// 升级为 WebSocket 连接
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		w.WriteHeader(http.StatusBadRequest)
		_, _ = w.Write([]byte(fmt.Sprintf("upgrade error: %s", err.Error())))
		return
	}

	// 认证失败的时候,返回错误信息,并断开连接
	uid, err := hub.authenticator.Authenticate(r)
	if err != nil {
		_ = conn.SetWriteDeadline(time.Now().Add(time.Second))
		_ = conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("authenticate error: %s", err.Error())))
		_ = conn.Close()
		return
	}

	// 注册 Client
	client := &Client{hub: hub, conn: conn, send: make(chan message.Log, bufferSize), uid: uid}
	client.conn.SetCloseHandler(closeHandler)
	// register 无缓冲,下面这一行会阻塞,直到 hub.run 中的 <-h.register 语句执行
	// 这样可以保证 register 成功之后才会启动读写协程
	client.hub.register <- client

	// 启动读写协程
	go client.writePump()
	go client.readPump()
}

send

send 是一个 http 接口,用于处理消息发送请求。
它会从 Hub 中获取 uid 对应的 Client,然后将消息发送给客户端。

// send 处理消息发送请求
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {
	uid := r.FormValue("uid")
	if uid == "" {
		w.WriteHeader(http.StatusBadRequest)
		_, _ = w.Write([]byte("uid is required"))
		return
	}

	// 从 hub 中获取 uid 关联的 client
	hub.RLock()
	client, ok := hub.userClients[uid]
	hub.RUnlock()
	if !ok {
		w.WriteHeader(http.StatusBadRequest)
		_, _ = w.Write([]byte(fmt.Sprintf("client not found: %s", uid)))
		return
	}

	// 记录消息
	messageLog := message.Log{Uid: uid, Message: r.FormValue("message")}
	_ = hub.messageLogger.Log(messageLog)

	// 发送消息
	client.send <- messageLog

	// 增加等待发送的消息数量
	hub.pending.Add(int64(1))
}

github

完整代码可以在 github 上进行查看:https://github.com/eleven26/go-pusher

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/337053.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

java枚举详细解释

枚举的基本认识 我们一般直接定义一个单独的枚举类 public enum 枚举类名{枚举项1,枚举项2,枚举项3 } 可以通过 枚举类名.枚举项 来访问该枚举项的 - 可以理解为 枚举项就是我们自己定义的一个数据类型,是独一无二的 接下来我们直接用一个例子来完全理解 加深理解 这里…

Linux-nginx(安装配置nginx、配置反向代理、Nginx配置负载均衡、动静分离)

关于代理 正向代理: 客户明确知道自己访问的网站是什么 隐藏客户端的信息 目录 关于代理 一、Nginx的安装与配置 1、安装依赖 2、安装nginx &#xff08;1&#xff09;上传压缩包到目录 /usr/nginx里面 &#xff08;2&#xff09;解压文件 &#xff08;3&#xff09…

String在VS与Linux下的区别

目录 一、string的成员 1.VS 2.Linux 二、string的扩容机制 1. VS 2.Linux 一、string的成员 string是C标准库中的一个类模板&#xff0c;用于表示和操作字符串 string在 Windows 与 Linux 中的成员不是相同的 1.VS 4个成员&#xff1a;_str , _size , _capacity 和…

【一文详解】Java多线程和并发知识点详细总结【万字总结】

Java并发编程 并发编程的三个特性 原子性 一次操作或者多次操作&#xff0c;要么所有的操作全部都得到执行并且不会受到任何因素的干扰而中断&#xff0c;要么都不执行。 在 Java 中&#xff0c;可以借助synchronized、各种 Lock 以及各种原子类实现原子性。 synchronized…

MySQL(五)——多表查询

上期文章 MySQL&#xff08;四&#xff09;——约束 文章目录 上期文章多表关系一对多&#xff08;多对一&#xff09;多对多多表外键关系可视化一对一 多表查询概述笛卡尔积多表查询分类连接查询 内连接隐式内连接显式内连接 外连接左外连接右外连接 自连接联合查询 union&am…

python-基础篇-变量

文章目录 变量的基本使用目标01. 变量定义1) 变量演练1 —— iPython2) 变量演练 2 —— PyCharm3) 变量演练 3 —— 超市买苹果思考题 02. 变量的类型2.1 变量类型的演练 —— 个人信息2.2 变量的类型2.3 不同类型变量之间的计算1) **数字型变量** 之间可以直接计算2) **字符串…

Python基础第四篇(Python函数)

文章目录 一、函数介绍二、函数的定义三、函数的参数与返回值四、函数说明文档五、函数的嵌套六、变量域七、函数案例1.源代码2.读出结果 在程序设计领域&#xff0c;函数成为一个不可或缺的角色&#xff0c;它们为我们提供了精练、高效和易于管理的编程方式。本篇博客将带您深…

CentOS 7安装Java并配置环境

一、安装Java环境 1、检查系统是否安装Java [rootlocalhost ~]# java -version 2、更新系统软件包 [rootlocalhost ~]# yum update #遇到[y/n],选择y并回车&#xff0c;耐心等待下载完毕&#xff0c;之后系统会自动检验更新的软件包遇到 /var/run/yum.pid 已被锁定 /var/…

【动态规划】【数学】【C++算法】805 数组的均值分割

作者推荐 【动态规划】【数学】【C算法】18赛车 本文涉及知识点 动态规划 数学 805 数组的均值分割 给定你一个整数数组 nums 我们要将 nums 数组中的每个元素移动到 A 数组 或者 B 数组中&#xff0c;使得 A 数组和 B 数组不为空&#xff0c;并且 average(A) average(B)…

nuclei安装;linux上 以及使用教程

kali安装go环境_go1.17 kali安装-CSDN博客Ubuntu完美解决Github网站打不开问题 - 一抹烟霞 - 博客园 (cnblogs.com) All releases - The Go Programming Language 然但是上面两个我似乎都没用到网上的教程 也不适用 一个网不好 一个apt没找到包 然后我先试试了版本 结果 我的…

BGP Origin 属性控制选路试验

一、拓朴图&#xff1a; 二、配置步骤&#xff1a; 1、配置 IP 2、配置 IGP&#xff0c;我们这里用了静态&#xff0c;互相宣告了对端接口和 Loopback 0 3、配置 BGP 4、在 R1 上通过 BGP 宣告 1.1.1.1&#xff0c;查看 R2 的路由&#xff0c;发现两条 1.1.1.1 的路由&#x…

Vue中的组件

在应用程序的开发中&#xff0c;组件是不可缺少的。在Vue的使用中&#xff0c;同样也会用到组件。   vue组件的一般知识点&#xff1a;   1、组件的名字唯一&#xff1b;   2、组件以Html形式书写&#xff1b;   3、组件可以复用&#xff1b;   4、组件可以嵌套&…

postgresql(Windows)初始化数据库教程

省流&#xff1a;本文章内容讲的是如何初始化postgresql数据库环境&#xff0c;前提是已经安装好postgresql数据库&#xff0c;安装步骤参考postgresql&#xff08;Windows&#xff09;安装教程 # 开始&#xff1a;安装postgresql-12.14-2-windows-x64.exe完成后进行初始化数据…

gin中间件篇

1. 全局中间件 所有请求都经过此中间件 package mainimport ("fmt""time""github.com/gin-gonic/gin" )// 定义中间 func MiddleWare() gin.HandlerFunc {return func(c *gin.Context) {t : time.Now()fmt.Println("中间件开始执行了&quo…

《Linux高性能服务器编程》笔记04

Linux高性能服务器编程 本文是读书笔记&#xff0c;如有侵权&#xff0c;请联系删除。 参考 Linux高性能服务器编程源码: https://github.com/raichen/LinuxServerCodes 豆瓣: Linux高性能服务器编程 文章目录 Linux高性能服务器编程第09章I/O复用9.1 select系统调用9.2 po…

JVM之java内存区域[1](程序计数器、栈)

文章目录 版权声明零 运行时数据区一 程序计数器1.1 加载阶段1.2 执行阶段1.3 多线程情况 二 栈2.1 java虚拟机栈2.2 java虚拟机栈帧的组成2.2.1 局部变量表2.2.2 操作数栈2.2.3 帧数据 2.3 栈内存溢出2.4 设置帧大小2.5 本地方法栈 版权声明 本博客的内容基于我个人学习黑马程…

如何快速打开github

作为一个资深码农&#xff0c;怎么能不熟悉全球最大的同性交友社区——github呢&#xff0c;但头疼的是github有时能打开&#xff0c;有时打不开&#xff0c;这是怎么回事&#xff1f; 其实问题出在github.com解析DNS上&#xff0c;并不是需要FQ。下面提供一个方法&#xff0c;…

C++:基于C的语法优化

C&#xff1a;基于C的语法优化 命名空间命名空间域域作用限定符展开命名空间域 输入输出缺省参数全缺省参数半缺省参数 函数重载参数类型不同参数个数不同参数类型的顺序不同 引用基本语法按引用传递返回引用引用与指针的区别 内联函数autoauto与指针和引用结合 范围for循环nul…

官方版2345加速浏览器(好用的浏览器分享)

官方版2345加速浏览器&#xff08;好用的浏览器分享&#xff09; 2345加速浏览器拥有智能拦截骚扰广告&#xff0c;识别欺诈网站&#xff0c;云收藏夹等功能&#xff0c;高速上网、不假死、不卡机&#xff0c;是一款强大的多功能网页浏览器。 使用2345加速浏览器,您可以轻松应对…

DHCP配置(路由器,交换机)

DHCP接口地址池配置 拓扑 PC配置DHCP点击应用。 路由器配置命令 <Huawei>sy Enter system view, return user view with CtrlZ. [Huawei]int g0/0/1[Huawei-GigabitEthernet0/0/1]ip address 10.1.1.1 24[Huawei-GigabitEthernet0/0/1]q[Huawei]dhcp enable Info: T…