深入探究node搭建socket服务器

自从上篇中sokect实现了视频通话,但是是使用ws依赖库实现的服务端,所以最近再看ws源码,不看不知道,一看很惊讶。

接下来一点点记录一下,如何搭建一个简易的服务端socket,来实现上次的视频通讯。

搭建一个http服务

首先看一下ws依赖的调用

所以首选我们要创建一个服务器,然后监听端口号

这个不难,直接使用node自带的http依赖

const http = require('http');

class MyWebsocket extends EventEmitter {
  constructor(options) {
    super(options);

    const server = http.createServer();
    server.listen(options.port || 8080);
    
  }
}

module.exports = MyWebsocket;

这样就启动了一个端口号为8080的http服务了,然后这个端口可以自定义,所以初始化的时候,就传参进来就行。

然后我们继续发现,需要用on来监听事件,这要如何在node中实现呢?

on方法在这里遵循了Node.js EventEmitter模式,它允许我们绑定函数到特定的事件上,当该事件发生时,对应的函数会被执行。

什么意思呢?通熟易懂就是继承这个node自带的类EventEmitter

然后你要监听一个connection函数,在MyWebsocket中要怎么触发呢?

使用 emit 方法来触发你定义的事件,并传递任何你想要传递给监听器的数据。

const http = require('http');

class MyWebsocket extends EventEmitter {
  constructor(options) {
    super(options);

    const server = http.createServer();
    server.listen(options.port || 8080);
    this.emit('connection', 参数);
  }
}

module.exports = MyWebsocket;

如何监听客户端socket?

然后到了最重要的一步,我们最主要的功能就是监听socket,那怎么监听客户端来的socket连接?

看一下ws的websocket-server.js源码

我们刚刚不是建立了一个http服务吗?

监听 upgrade 事件

在 Node.js 中,HTTP 服务器可以监听 upgrade 事件来处理 WebSocket 连接或其他需要升级传输层协议的请求。upgrade 事件在客户端发起一个 HTTP 请求并要求升级到其他协议(如 WebSocket)时触发。


class MyWebsocket extends EventEmitter {
  constructor(options) {
    super();
    options = {
      ...options,
    }

    const server = http.createServer();
    server.listen(options.port || 8080);
    this.clients = new Set()
  

    server.on('upgrade', (req, socket) => {
      this.socket = socket; // 存储当前的socket,方便后端调用

      ...

    });
  }

}

socket升级协议

然后需要有socket升级协议,为什么要有升级协议呢?

WebSocket 升级协议(WebSocket Upgrade Protocol)在 Node.js 中是必要的,因为它允许现有的 HTTP 或 HTTPS 服务器与客户端建立持久的、双向的通信连接,而这种连接在技术上被称为 WebSocket 连接。

那什么是socket升级协议呢?

  1. 客户端请求:客户端发起一个 HTTP 请求,请求头部包含 Upgrade: websocket 和 Connection: Upgrade,以及可能的 Sec-WebSocket-Key 和其他 WebSocket 相关的头部信息。
  2. 服务器响应:服务器接收到请求后,如果同意升级,会在响应中包含 Upgrade: websocket 和 Connection: Upgrade 头部,以及一个 Sec-WebSocket-Accept 头部,这个头部是服务器对客户端 Sec-WebSocket-Key 的回应。
  3. 连接升级:一旦客户端和服务器都确认了升级,它们就会关闭 HTTP 连接,同时建立一个新的 WebSocket 连接。这个连接允许双方进行二进制或文本数据的双向通信。

其实就是根据客户端socket连接发过来的请求头,返回一个请求头给客户端来建立连接

看一下ws源码的处理

其实就说读取请求头中的sec-websocket-key字段,然后加上一个固定的字符串,经过 sha1 加密之后,转成 base64 的结果,就是digest

加密使用node中自带的crypto依赖

const crypto = require('crypto');

// 也就是用客户端传过来的 key,加上一个固定的字符串,经过 sha1 加密之后,转成 base64 的结果
function hashKey(key) {
  const sha1 = crypto.createHash('sha1');
  sha1.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11');
  return sha1.digest('base64');
}

这个固定的字符串直接拿ws源码中的

然后就是升级协议的写入

const {
  EventEmitter
} = require('events');
const http = require('http');
const crypto = require('crypto');

const GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
// 也就是用客户端传过来的 key,加上一个固定的字符串,经过 sha1 加密之后,转成 base64 的结果
function hashKey(key) {
  const sha1 = crypto.createHash('sha1');
  sha1.update(key + GUID);
  return sha1.digest('base64');
}

class MyWebsocket extends EventEmitter {
  constructor(options) {
    super(options);

    const server = http.createServer();
    server.listen(options.port || 8080);

    server.on('upgrade', (req, socket) => {
      this.socket = socket;
      // socket.setKeepAlive(true);

      // websocket 升级协议
      const resHeaders = [
        'HTTP/1.1 101 Switching Protocols',
        'Upgrade: websocket',
        'Connection: Upgrade',
        'Sec-WebSocket-Accept: ' + hashKey(req.headers['sec-websocket-key']),
        '',
        ''
      ].join('\r\n');
      socket.write(resHeaders);

    });
  }

}

module.exports = MyWebsocket;

socket监听传输数据

接下来就说socket监听传输数据和socket关闭

socket.on('data', (data) => {
  console.log(data);
});
socket.on('close', (error) => {
  console.error('close', error)
});

然后我们一起看看效果吧

客户端发送的socket数据是

然后看请求头Sec-WebSocket-Accept也对应的上

处理socket传输数据

可以在node中拿到的数据是Buffer的二进制数据,首先需要处理的是WebSocket 协议中的数据帧。这里逻辑就有点复杂了。

协议中的数据帧结构是什么样子的?

数据帧的结构包括头部(Header)和负载(Payload)两部分。以下是数据帧的基本结构:

  1. 控制位(Control Bits):
    • FIN(1 bit):表示这是消息的最后一个片段。如果为1,表示这是消息的结束;如果为0,表示还有后续片段。
    • RSV1、RSV2、RSV3(各1 bit):保留位,用于未来的扩展,目前必须设置为0。
    • Opcode(4 bits):操作码,定义了帧的类型。例如,0x1 表示文本帧,0x2 表示二进制帧,0x8 表示关闭连接,0x9 表示 Ping 帧,0xA 表示 Pong 帧等。
    • Mask(1 bit):掩码位,指示负载数据是否被掩码。客户端发送给服务器的帧必须设置为1,表示数据被掩码;服务器发送给客户端的帧通常设置为0,表示数据未被掩码。
  1. Payload Length(7、7+16、7+64 bits):
    • 7位长度:如果值为0-125,表示负载数据的长度(以字节为单位)。
    • 7+16位长度:如果值为126,接下来的2个字节(16位)表示负载数据的长度。
    • 7+64位长度:如果值为127,接下来的8个字节(64位)表示负载数据的长度。
  1. Masking-Key(0或4 bytes):
    • 当掩码位(Mask)为1时,存在4字节的掩码密钥(Masking-Key)。这个密钥用于对负载数据进行掩码处理,以防止中间代理服务器缓存污染。
  1. Payload Data(负载数据):
    • 包含实际要传输的数据。对于文本帧,这是UTF-8编码的字符串;对于二进制帧,这是任意二进制数据。

从上面我们知道,需要的数据是负载数据,但是数据如果带有掩码,是需要解密的

  1. 解析帧头:
    • 从 bufferData 的第一个字节(byte1)中读取操作码(opcode),这是一个4位的值,用于指示帧的类型(如文本、二进制等)。
    • 从第二个字节(byte2)中读取掩码位(MASK),这是一个1位的值,指示是否使用了掩码。
  1. 计算有效载荷长度:
    • 如果 byte2 的最高位(第7位)是1,表示有效载荷长度为126,需要从 bufferData 的第3个字节和第4个字节(bufferData.readUInt16BE(2))读取有效载荷的实际长度。
    • 如果 byte2 的最高位是0,但有效载荷长度为127,表示有效载荷长度为64位,需要从 bufferData 的第3个字节到第10个字节(bufferData.readBigUInt64BE(2))读取有效载荷的实际长度。
  1. 处理掩码:
    • 如果使用了掩码(MASK 为真),则从 bufferData 中提取掩码密钥(mask key),这是一个4字节的值。
    • 使用掩码密钥对有效载荷数据进行解密(通过 handleMask 函数),以获取实际的数据(realData)。
  1. 处理有效载荷:
    • 最后,函数调用 handleRealData 方法,传入操作码和解密后的实际数据,进行进一步的处理。

function handleMask(maskBytes, data) {
  const payload = Buffer.alloc(data.length);
  for (let i = 0; i < data.length; i++) {
    payload[i] = maskBytes[i % 4] ^ data[i];
  }
  return payload;
}
const OPCODES = {
  CONTINUE: 0,
  TEXT: 1,
  BINARY: 2,
  CLOSE: 8,
  PING: 9,
  PONG: 10,
};


class MyWebsocket extends EventEmitter {
  constructor(options) {
    ...
  }
// 处理 WebSocket 协议中的数据帧
  processData(bufferData) {
    const byte1 = bufferData.readUInt8(0); // 第一个字节(byte1)中读取操作码(opcode),这是一个4位的值,用于指示帧的类型(如文本、二进制等)。
    let opcode = byte1 & 0x0f; 
    
    const byte2 = bufferData.readUInt8(1); // 从第二个字节(byte2)中读取掩码位(MASK),这是一个1位的值,指示是否使用了掩码。
    const str2 = byte2.toString(2);
    const MASK = str2[0];
    console.log(opcode, 'opcode')
    console.log(MASK, 'mask')

    let curByteIndex = 2;
    
    let payloadLength = parseInt(str2.substring(1), 2);
    if (payloadLength === 126) {
      payloadLength = bufferData.readUInt16BE(2);
      curByteIndex += 2;
    } else if (payloadLength === 127) {
      payloadLength = bufferData.readBigUInt64BE(2);
      curByteIndex += 8;
    }
    console.log(payloadLength, 'payloadLength')
    let realData = null;
    
    if (MASK) {
      const maskKey = bufferData.slice(curByteIndex, curByteIndex + 4); // 掩码密钥
      curByteIndex += 4;
      const payloadData = bufferData.slice(curByteIndex, curByteIndex + payloadLength);
      realData = handleMask(maskKey, payloadData); // 使用掩码密钥对有效载荷数据进行解密,以获取实际的数据(realData)。
    } 
    console.log(realData, 'realData')
    this.handleRealData(opcode, realData); // 处理有效载荷
  }
handleRealData(opcode, realDataBuffer) {
    switch (opcode) {
      case OPCODES.TEXT: // 文本
        this.emit('data', realDataBuffer);
        break;
      case OPCODES.BINARY: // 二进制
        this.emit('data', realDataBuffer);
        break;
      default:
        this.emit('close');
        break;
    }
  }

  handleRealData(opcode, realDataBuffer) {
    switch (opcode) {
      case OPCODES.TEXT: // 文本
        this.emit('data', realDataBuffer);
        break;
      case OPCODES.BINARY: // 二进制
        this.emit('data', realDataBuffer);
        break;
      default:
        this.emit('close');
        break;
    }
  }
}

然后调用main.js

const MyWebSocket = require('./ws.js');

const ws = new MyWebSocket({ port: 8000 });
// websocket需要一个服务器,如果两个客户端需要通讯,需要用服务器转发\


ws.on('data', (data) => {
  console.log('receive data:' + data); // 接受消息
});

可以看到,存在掩码,解密之前数据是bufferData,解密之后的数据是realData

这样就成功拿到了客户端传过来的数据了,可以看到客户端传过来的是文本,使用了掩码,效载荷长度为9位,这里的9其实就说字符串{“A”:111}的长度。

服务端发消息给客户端

服务端能接收到消息了,然后就是将消息再给客户端了,所以需要定义一个函数,来发送数据


class MyWebsocket extends EventEmitter {
  constructor(options) {
    ...
  }
  ...
  send(data) {
    let opcode;
    let buffer;
    if (Buffer.isBuffer(data)) {
      opcode = OPCODES.BINARY;
      buffer = data;
    } else if (typeof data === 'string') {
      opcode = OPCODES.TEXT;
      buffer = Buffer.from(data, 'utf8');
    } else {
      console.error('暂不支持发送的数据类型')
    }
    this.doSend(opcode, buffer);
  }

  doSend(opcode, bufferDatafer) {
    this.socket.write(encodeMessage(opcode, bufferDatafer));
  }
}

由于我们上面获取传输数据的时候,知道socket数据需要支持WebSocket 协议中的数据帧的帧结构

因为根据 WebSocket 协议,只有客户端发送给服务器的帧需要掩码。服务器发送给客户端的帧通常不需要掩码。

function encodeMessage(opcode, payload) {
  let bufferData = Buffer.alloc(payload.length + 2 + 0);

  let byte1 = parseInt('10000000', 2) | opcode; // parseInt(130, 2)=1 ; 设置 FIN 为 1
  let byte2 = payload.length;

  bufferData.writeUInt8(byte1, 0); // 
  bufferData.writeUInt8(byte2, 1); // 负载的长度

  payload.copy(bufferData, 2);

  return bufferData;
}
  1. 创建缓冲区:
    • 使用 Buffer.alloc 方法创建一个足够大的 Buffer 对象,以容纳操作码、有效载荷长度和实际的有效载荷数据。这里假设 payload.length < 126,所以有效载荷长度只需要1个字节来表示。
  1. 设置操作码:
    • byte1 是第一个字节,它包含了操作码和 FIN(Finish)标志。这里假设 FIN 标志为 1(即消息结束),操作码通过 opcode 参数传入。操作码的值决定了消息的类型,例如文本(0x1)或二进制(0x2)。
  1. 设置有效载荷长度:
    • byte2 是第二个字节,它包含了有效载荷的长度。由于有效载荷长度小于126,所以只需要1个字节来表示。
  1. 写入操作码和有效载荷长度:
    • 使用 writeUInt8 方法将 byte1 和 byte2 分别写入 bufferData 的第0位和第1位。
  1. 复制有效载荷数据:
    • 使用 copy 方法将 payload 数据复制到 bufferData 的第2位及之后的位置。

const MyWebSocket = require('./ws.js');

const ws = new MyWebSocket({ port: 8000 });
// websocket需要一个服务器,如果两个客户端需要通讯,需要用服务器转发\


ws.on('data', (data) => {
  console.log('receive data:' + data); // 接受消息
  ws.send(data); // 自己给自己发送消息
});

客户端接收到的数据

// 创建WebSocket连接
const socketA = new WebSocket('ws://localhost:8000');

const handleBlobToText = (blob) => {
  let reader = new FileReader()
  reader.readAsText(blob, 'utf-8') // 接收到的是blob数据,先转成文本
  reader.onload = async function () {
    console.log(reader.result)
  }
}
// A接收B的消息
socketA.onmessage = function (event) {
  console.log('A received:', event.data);
  handleBlobToText(event.data)
};

socket传输大量数据

然后直接将视频的数据,传输给服务端,然后服务端就挂了😂

可以看到,node端是收到了客户端的数据

报错的原因是超出了范围,原因就是我们发送消息给客户端的处理这里出现了问题,也就是encodeMessage函数。

我们往前看看,到处理socket传输数据中,再仔细看看数据帧结构,这里有关于负载长度的问题

我们知道,我们需要处理的负载,就是我们需要传输的数据,然后数据量太大,是要区分来处理数据的。

很明显,上面的encodeMessage只适用于处理0-125的负载长度,而发送视频的数据,我们可以看看长度为多少

找到问题了,需要解决一下,接下来改写一下encodeMessage函数

你需要考虑 WebSocket 的最大帧大小限制。WebSocket 协议定义了三种帧类型来表示数据的长度:

  1. 单字节帧:用于长度小于 126 的数据。
  2. 双字节帧:用于长度在 126 到 65535 之间的数据。
  3. 八字节帧:用于长度大于 65535 的数据。
function encodeMessagePerf(options, data) {
  let offset = 2;
  let dataLength = data.length;

  let payloadLength = dataLength;
  // WebSocket 的最大帧大小限制
  // 1. 单字节帧:用于长度小于 126 的数据。
  // 2. 双字节帧:用于长度在 126 到 65535 之间的数据。
  // 3. 八字节帧:用于长度大于 65535 的数据。

  if (dataLength >= 65536) {
    offset += 8;
    payloadLength = 127;
  } else if (dataLength > 125) {
    offset += 2;
    payloadLength = 126;
  }

  const target = Buffer.allocUnsafe(offset);
  // 操作码 0x1表示文本帧;0x2表示二进制帧;0x8表示关闭连接;0x9表示ping帧;0xA表示pong帧。
  target[0] = options | 0x80; // 设置FIN

  target[1] = payloadLength; // 负载长度

  if (payloadLength === 126) {
    target.writeUInt16BE(dataLength, 2);
  } else if (payloadLength === 127) {
    target[2] = target[3] = 0;
    target.writeUIntBE(dataLength, 4, 6);
  }
  // 根据 WebSocket 协议,只有客户端发送给服务器的帧需要掩码。服务器发送给客户端的帧通常不需要掩码
  return [target, data];
}

writeUInt16BE、writeUIntBE 和 writeUInt8 是 Node.js 中 Buffer 类的三个方法,它们的主要区别在于它们写入的值的大小和字节序。

  1. writeUInt16BE:
    • 这个方法用于将一个无符号的16位整数(即0到65535之间的整数)以大端字节序(Big Endian)写入到 Buffer 对象中。大端字节序意味着高位字节在前,低位字节在后。
    • 例如,如果你要写入的值是 0xABCD,使用 writeUInt16BE 方法后,Buffer 中的数据将是 0xAB 后跟 0xCD。
    • 使用这个方法时,你需要指定一个 offset 参数,表示从 Buffer 的哪个位置开始写入。如果 offset 超出了 Buffer 的长度,或者提供的值不是有效的无符号16位整数,行为是未定义的。
  1. writeUInt8:
    • 这个方法用于将一个无符号的8位整数(即0到255之间的整数)写入到 Buffer 对象中。这个方法不涉及字节序,因为它只处理一个字节。
    • 使用 writeUInt8 方法时,你同样需要指定一个 offset 参数。如果 offset 超出了 Buffer 的长度,或者提供的值不是有效的无符号8位整数,行为同样是未定义的。
  1. writeUIntBE:

用于将无符号整数以大端序(Big Endian)格式写入到 Buffer 对象中,它可以处理的整数大小可以达到 48 位(6 个字节)。这个方法允许你指定要写入的字节长度(byteLength),这可以是 1、2、3、4、5 或 6 字节。如果 byteLength 大于 2,它将写入一个大于 16 位的整数。

总结来说,writeUInt16BE 用于写入16位的整数,并且遵循大端字节序,而 writeUInt8 用于写入8位的整数,不涉及字节序。两者都需要一个 offset 参数来指定写入位置。在实际应用中,选择哪个方法取决于你需要写入的数据类型和字节序要求。如果你需要写入更大或更小的整数,或者需要处理可变长度的整数,那么 writeUIntBE 是更合适的选择。

Buffer.allocUnsafe 和 Buffer.alloc 是 Node.js 中用于创建 Buffer 实例的两种方法,它们的主要区别在于内存的初始化方式和安全性。

  1. Buffer.allocUnsafe(size):
    • Buffer.allocUnsafe 创建一个指定大小的 Buffer 实例,但它不会初始化分配的内存。这意味着,新创建的 Buffer 实例可能包含之前内存中的数据,这些数据可能是敏感的。因此,这个方法被称为“不安全”(unsafe)。
    • 使用 Buffer.allocUnsafe 创建的 Buffer 实例通常比 Buffer.alloc 创建的要快,因为它避免了初始化内存的步骤。
    • 如果你需要确保 Buffer 中的数据是干净的,你应该在使用 Buffer.allocUnsafe 创建实例后,使用 Buffer.fill() 方法来填充整个 Buffer,或者在写入数据之前完全覆盖它。
  1. Buffer.alloc(size[, fill[, encoding]]):
    • Buffer.alloc 创建一个指定大小的 Buffer 实例,并且会用指定的值(默认为0)初始化整个 Buffer。这确保了新创建的 Buffer 实例不会包含任何旧数据。
    • Buffer.alloc 方法比 Buffer.allocUnsafe 慢,因为它需要额外的时间来初始化内存。
    • 如果你不需要处理可能包含敏感数据的旧内存,或者你打算立即用新数据覆盖整个 Buffer,那么使用 Buffer.alloc 是一个更安全的选择。

在实际应用中,如果你需要处理敏感数据或者需要确保 Buffer 的内容是可预测的,建议使用 Buffer.alloc。如果你对性能有更高的要求,并且能够确保在读取或使用 Buffer 之前清除或覆盖其内容,那么可以考虑使用Buffer.allocUnsafe。

在ws源码中,使用的是Buffer.allocUnsafe,可能是为了性能优化和内存管理。在 Node.js 中,Buffer.allocUnsafe 可能会使用一个内部的内存池来分配 Buffer 实例。这意味着,如果创建的 Buffer 大小小于或等于 Buffer.poolSize 的一半,那么这些 Buffer 实例可能会共享内存池中的内存。这有助于减少内存分配和垃圾回收的开销。

当然在ws源码中,处理数据帧的函数是frame

能看到,除了负载数据之外,ws中还有很多其他参数,比如说处理掩码,比如是否设置FIN,指定是否可以修改’ data '等等。但是正常使用就是上面的encodeMessagePerf流程了。

接下来就完美解决了数据大量的问题。

如何区分不同客户端?

能看到目前只是一个客户端,那客户端一多,想要控制A客户端,发消息给B客户端,B客户端发消息给A客户端,那这样就要区分不同的客户端了。

那如何区分不同的客户端呢?

JavaScript中有一个数据结构,能够将不同的数据集区分开来,就是Set啦,看一下ws的源码

客户端存储在一个Set对象中,连接的时候,就添加到clients中,断开连接了,就delete掉,这样就能清楚有几个客户端了。

看完之后,就动手撸一下试试

class MyWebsocket extends EventEmitter {
  constructor(options) {
    super();
    options = {
      ...options,
    }

    const server = http.createServer();
    server.listen(options.port || 8080);
    this.clients = new Set()
  
    server.on('upgrade', (req, socket) => {
      this.socket = socket;
      
      socket.setKeepAlive(true);
      // websocket 升级协议
      const resHeaders = [
        'HTTP/1.1 101 Switching Protocols',
        'Upgrade: websocket',
        'Connection: Upgrade',
        'Sec-WebSocket-Accept: ' + hashKey(req.headers['sec-websocket-key']),
        '',
        '',
      ].join('\r\n');
      socket.write(resHeaders);
      socket.on('data', (data) => {
        this.processData(data);
      });
      socket.on('close', () => {
        this.clients.delete(socket)
        console.log('close')
      })
      socket.on('end', () => {
        this.clients.delete(socket)
        console.log('end')
      })
      socket.on('error', (err) => {
        console.log('error', err)
      })
      

      if (this.clients) {
        this.clients.add(socket)
      }
      // debugger
      console.log(this.clients.size, 'clients.size')
    });
  }
}

这样子就搭建好了,看一下客户端的数量是否正确

能看到数量是正常的,然后就是将A客户端的数据发送给B客户端了

ws.on('data', function connection({realDataBuffer, clients}) {
  // 需要在data这个回调中拿到clients数据
  console.log('Client connected', realDataBuffer.toString('utf8'));
  // ws.send(data + ' ' + Date.now()); // 发送消息
  // 将消息发送给所有客户端
  if (clients) {
    clients.forEach(function each(client) {
      if (client !== ws.socket) {
        ws.send(realDataBuffer); // 客户端接受的是blob格式数据
      }
    });
  }
});

这里,需要在data的回调中拿到clients数据,所以需要传递数据出来。由于clients在constructor中定义的,所以可以直接在emit回调的时候,将clients传递出来。

这样,外面就能访问到clients了,然后我们直接这样传递数据,但是你以为这就完了吗?那你就太天真了🤣😂

在这里插入图片描述

找一下问题,我做了一个简单的demo,就是A页面发消息给B页面,B页面收到了,然后B页面发消息给A页面,A页面并没有收到,然后node端是都能看到AB各自发送的消息的。

其实就是node端发送消息出现了问题,仔细看一下下面的代码

ws.on('data', function connection({realDataBuffer, clients}) {
  console.log('Client connected', realDataBuffer.toString('utf8'));
  // ws.send(data + ' ' + Date.now()); // 发送消息
  // 将消息发送给所有客户端
  if (clients) {
    clients.forEach(function each(client) {
      if (client !== ws.socket) { // ws.socket是最后连接的客户端,不是当前要发消息的客户端
        ws.send(realDataBuffer); // 客户端接受的是blob格式数据
      }
    });
  }
});

原因就是ws.socket是最后连接的客户端,不是当前发生消息的客户端。

所以导致,我首先刷新的A页面,再刷新了B页面,ws.socket出现了覆盖,此时ws.socket就是B客户端了。

所以后面B页面发送消息给A页面,A页面没有收到,而是又再次将消息发送给了B页面。

如何将A的数据,广发给其他客户端?

如何解决呢?我们仔细看一下ws是如何将一个客户端的数据,发送给除了当前以外的其他客户端。

可以看到这里是先有connection,然后才能触发message回调

而且在connection回调中,拿到了ws对象,这个对象的message回调才会监听到客户端发送过来的消息

也就是说我们上面的data回调需要再包一层,然后判断是通过ws这个对象来判断是不是当前需要发送消息的客户端的。

这样就觉得ws其实就是一个客户端对象,是clients中的一个对象,然后ws的message回调,就是接收到当前客户端的消息,然后只要将不是当前ws的客户端发送消息,就可以了。

所以这里区分成服务端的socket管理和单个客户端的socket管理,需要在再新增一个ws对象,也需要继承自EventEmitter,因为它也有on事件

class SingleData extends EventEmitter  {
  constructor(socket) {
    super();
    this.socket = socket

    this.socket.on('data', (data) => { // 传输数据
      this.processData(data);
    });
    this.socket.on('close', () => {
      this.emit('close')
    })
    this.socket.on('end', () => {
      this.emit('end')
      console.log('end')
    })
    this.socket.on('error', (err) => {
      this.emit('error', err)
    })
    
  }
  handleRealData(opcode, realDataBuffer) {
    switch (opcode) {
      case OPCODES.TEXT: // 文本
        this.emit('message', realDataBuffer);
        break;
      case OPCODES.BINARY: // 二进制
        this.emit('message', realDataBuffer);
        break;
      default:
        this.emit('close');
        break;
    }
  }

  processData(bufferData) {
    const byte1 = bufferData.readUInt8(0);
    let opcode = byte1 & 0x0f; 
    
    const byte2 = bufferData.readUInt8(1);
    const str2 = byte2.toString(2);
    const MASK = str2[0];

    let curByteIndex = 2;
    
    let payloadLength = parseInt(str2.substring(1), 2);
    if (payloadLength === 126) {
      payloadLength = bufferData.readUInt16BE(2);
      curByteIndex += 2;
    } else if (payloadLength === 127) {
      payloadLength = bufferData.readBigUInt64BE(2);
      curByteIndex += 8;
    }

    let realData = null;
    
    if (MASK) {
      const maskKey = bufferData.slice(curByteIndex, curByteIndex + 4);  
      curByteIndex += 4;
      const payloadData = bufferData.slice(curByteIndex, curByteIndex + payloadLength);
      realData = handleMask(maskKey, payloadData);
    } 
    
    this.handleRealData(opcode, realData);
  }

  send(data) {
    let opcode;
    let buffer;
    if (Buffer.isBuffer(data)) {
      opcode = OPCODES.BINARY;
      buffer = data;
    } else if (typeof data === 'string') {
      opcode = OPCODES.TEXT;
      buffer = Buffer.from(data, 'utf8');
    } else {
      console.log(data)
      console.error('暂不支持发送的数据类型')
    }
    this.doSend(opcode, buffer);
  }

  doSend(opcode, bufferDatafer) {
    // 大量数据
    let list = frame(bufferDatafer)

    if (list.length === 2) {
      this.socket.cork();
      this.socket.write(list[0]);
      this.socket.write(list[1]);
      this.socket.uncork();
    } else {
      this.socket.write(list[0]);
    }
  }
}

module.exports = SingleData;

由于这是单个的客户端socket数据,所以将接受和发送socket数据帧的处理,放到这个对象来。

然后就可以将服务端的socket对象,简化成下面这样

const { EventEmitter } = require('events');
const http = require('http');
const crypto = require('crypto');
const SingleData = require('./clientSocket.js')

class MyWebsocket extends EventEmitter {
  constructor(options) {
    super();
    options = {
      ...options,
    }

    const server = http.createServer();
    server.listen(options.port || 8080);
    this.clients = new Set()
  

    server.on('upgrade', (req, socket) => {
      this.socket = socket;
      
      socket.setKeepAlive(true);
      // websocket 升级协议
      const resHeaders = [
        'HTTP/1.1 101 Switching Protocols',
        'Upgrade: websocket',
        'Connection: Upgrade',
        'Sec-WebSocket-Accept: ' + hashKey(req.headers['sec-websocket-key']),
        '',
        '',
      ].join('\r\n');
      socket.write(resHeaders);
      const ws = new SingleData(socket);
      
      ws.on('close', () => {
        this.clients.delete(ws)
        console.log('close')
      })
      ws.on('end', () => {
        this.clients.delete(ws)
        console.log('end')
      })
      ws.on('error', (err) => {
        console.log('error', err)
      })
      

      if (this.clients) {
        this.clients.add(ws)
      }
      this.emit('connection', ws);

    });
  }

}

module.exports = MyWebsocket;

再看看效果,这样就实现了一个简易版的ws源码啦~

在这里插入图片描述

当然ws源码中的处理,比上面这个简易版复杂很多,发送消息有Sender类,接受消息有Receiver类,实现双向通讯,而且数据处理上,也更加谨慎。

如果你想深入研究源码,可以看看这篇文章:https://juejin.cn/post/6844903850667671560

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/406419.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

检索增强生成(RAG)——提示工程方法

在检索增强生成(RAG)过程中,提示工程也是一个不可忽略的部分。提示工程可以降低 RAG 应用出现的幻觉,提高 RAG 应用回答的质量。 下面简单介绍一些关于提示工程的论文。 欢迎关注公众号(NLP Research),及时查看最新内容 1. Thread of Thought(ThoT) 论文:Thread of …

LLM (Large language model)的指标参数

1. 背景介绍 我们训练大模型的时候&#xff0c;或者我们用RAG的时候&#xff0c;不知道我们的算法&#xff0c;或者我们的提示&#xff0c;或者我们的本地知识库是否已经整理得符合要求了。又或我们需要一个指标去评估我们目前的所有围绕大模型&#xff0c;向量数据库或外挂知…

【EI会议征稿通知】2024年软件自动化与程序分析国际学术会议(SAPA 2024)

2024年软件自动化与程序分析国际学术会议&#xff08;SAPA 2024) 2024 International Conference on Software Automation and Program Analysis 在当今科技社会中&#xff0c;软件产业呈快速发展趋势&#xff0c;软件自动化与程序分析技术在提高软件质量、降低开发成本、提升…

QT问题 打开Qt Creator发现没有菜单栏

之前不知道按了什么快捷键,当我再次打开Qt Creator时发现菜单栏消失啦 找了许多原因发现:安装有道词典的快捷键Ctrl Alt m 与Qt Creator里的快捷键冲突导致菜单栏被莫名其妙的隐藏 解决方法: 1找到有道词典快捷键 2再次按快捷键 Ctrl Alt m就可以重新显示菜单栏

Servlet使用Cookie和Session

一、会话技术 当用户访问web应用时&#xff0c;在许多情况下&#xff0c;web服务器必须能够跟踪用户的状态。比如许多用户在购物网站上购物&#xff0c;Web服务器为每个用户配置了虚拟的购物车。当某个用户请求将一件商品放入购物车时&#xff0c;web服务器必须根据发出请求的…

特保罗环保节能邀您参观2024生物发酵产品与技术装备展

参观企业介绍 山东特保罗环保节能科技有限公司位处山东章丘区&#xff0c;是一家致力于研发 “MVR&多效蒸发器”和“高难工业废水零排放”技术等的科技制造型高新技术企业。特保罗公司隶属山东明天机械集团股份有限公司&#xff0c;集团公司旗下拥有山东明天机械有限公司、…

计算机网络:思科实验【1-访问WEB服务器】

&#x1f308;个人主页&#xff1a;godspeed_lucip &#x1f525; 系列专栏&#xff1a;Cisco Packet Tracer实验 本文对应的实验报告源文件请关注微信公众号程序员刘同学&#xff0c;回复思科获取下载链接。 实验目的实验环境实验内容熟悉仿真软件访问WEB服务器 实验体会总结…

CLion 2023:专注于C和C++编程的智能IDE mac/win版

JetBrains CLion 2023是一款专为C和C开发者设计的集成开发环境&#xff08;IDE&#xff09;&#xff0c;它集成了许多先进的功能&#xff0c;旨在提高开发效率和生产力。 CLion 2023软件获取 CLion 2023的智能代码编辑器提供了丰富的代码补全和提示功能&#xff0c;使您能够更…

32单片机基础:EXTI外部中断

本节是STM32的外部中断系统和外部中断。 中断系统是管理和执行中断的逻辑结构&#xff0c;外部中断是总多能产生中断的外设之一&#xff0c; 所以本节借助外部中断学习一下中断系统。 下图灰色的&#xff0c;是内核的中断&#xff0c;比如第一个&#xff0c;当产生复位事件时…

旷视low-level系列(三):(NAFNet)Simple Baselines for Image Restoration

题目&#xff1a;Simple Baselines for Image Restoration 单位&#xff1a;旷视 收录&#xff1a;ECCV2022 论文&#xff1a;https://arxiv.org/abs/2204.04676 代码&#xff1a;https://github.com/megvii-research/NAFNet 文章目录 1. Motivation2. Contributions3. Methods…

NFT Insider #120:福布斯在 The Sandbox 推出永久建筑,哈佛教授表示Web3 和 NFT 将会继续存在

引言&#xff1a;NFT Insider由NFT收藏组织WHALE Members &#xff08;https://twitter.com/WHALEMembers&#xff09;、BeepCrypto &#xff08;https://twitter.com/beep_crypto&#xff09;联合出品&#xff0c;浓缩每周NFT新闻&#xff0c;为大家带来关于NFT最全面、最新鲜…

《初阶数据结构》尾声

目录 前言&#xff1a; 《快速排序&#xff08;非递归&#xff09;》: 《归并排序》&#xff1a; 《归并排序&#xff08;非递归&#xff09;》&#xff1a; 《计数排序》&#xff1a; 对于快速排序的优化&#xff1a; 分析&#xff1a; 总结&#xff1a; 前言&#xff1a…

《UE5_C++多人TPS完整教程》学习笔记24 ——《P25 完善菜单子系统(Polishing The Menu Subsystem)》

本文为B站系列教学视频 《UE5_C多人TPS完整教程》 —— 《P25 完善菜单子系统&#xff08;Polishing The Menu Subsystem&#xff09;》 的学习笔记&#xff0c;该系列教学视频为 Udemy 课程 《Unreal Engine 5 C Multiplayer Shooter》 的中文字幕翻译版&#xff0c;UP主&…

200万上下文窗口创飞Gemini 1.5!微软来砸谷歌场子了

谷歌刚刷新大模型上下文窗口长度记录&#xff0c;发布支持100万token的Gemini 1.5&#xff0c;微软就来砸场子了。 推出大模型上下文窗口拉长新方法——LongRoPE&#xff0c;一口气将上下文拉至2048k token&#xff0c;也就是200多万&#xff01; 并且1000步微调内&#xff0c…

深入浅出:探究过完备字典矩阵

在数学和信号处理的世界里&#xff0c;我们总是在寻找表达数据的最佳方式。在这篇博文中&#xff0c;我们将探讨一种特殊的矩阵——过完备字典矩阵&#xff0c;这是线性代数和信号处理中一个非常有趣且实用的概念。 什么是过完备字典矩阵&#xff1f; 首先&#xff0c;我们先…

<网络安全>《49 网络攻防专业课<第十四课 - 华为防火墙的使用(2)>

6 防火墙的防范技术 6.1 ARP攻击防范 攻击介绍 攻击者通过发送大量伪造的ARP请求、应答报文攻击网络设备&#xff0c;主要有ARP缓冲区溢出攻击和ARP拒绝服务攻击两种。 ARP Flood攻击&#xff08;ARP扫描攻击&#xff09;&#xff1a;攻击者利用工具扫描本网段或者跨网段主机时…

MT8791迅鲲900T联发科5G安卓核心板规格参数_MTK平台方案定制

MT8791安卓核心板是一款搭载了旗舰级配置的中端手机芯片。该核心板采用了八核CPU架构设计&#xff0c;但是升级了旗舰级的Arm Cortex-A78核心&#xff0c;两个大核主频最高可达2.4GHz。配备了Arm Mali-G68 GPU&#xff0c;通过Mali-G88的先进技术&#xff0c;图形处理性能大幅提…

网络原理-UDP/TCP协议

协议 在网络通信中,协议是非常重要的一个概念,在下面,我将从不同层次对协议进行分析. 应用层 IT职业者与程序打交道最多的一层,调用系统提供的API写出的代码都是属于应用层的. 应用层中有很多现成的协议,但是更多的,我们需要根据实际情况来进行制作自定义协议. 自定义协议…

Vue3 (unplugin-auto-import自动导入的使用)

安装 参考链接 npm i -D unplugin-auto-importvite.config.ts里面配置 import AutoImport from unplugin-auto-import/viteAutoImport({imports:[ vue,vue-router]})重新运行项目会生成一个auto-imports.d.ts的文件 /* eslint-disable */ /* prettier-ignore */ // ts-nochec…

C++之类作用域

目录 1、全局作用域 2、类作用域 2.1、设计模式之Pimpl 2.2、单例模式的自动释放 2.2.0、检测内存泄漏的工具valgrind 2.2.1、可以使用友元形式进行设计 2.2.2、内部类加静态数据成员形式 2.2.3、atexit方式进行 2.2.4、pthread_once形式 作用域可以分为类作用域、类名…