c++ websocket 协议分析与实现

前言
网上有很多第三方库,nopoll,uwebsockets,libwebsockets,都喜欢回调或太复杂,个人只需要在后端用,所以手动写个;

1:环境
ubuntu18
g++(支持c++11即可)
第三方库:jsoncpp,openssl
2:安装
jsoncpp 读取json 配置文件 用 自动安装 网上一堆教程
openssl 如果系统没带,需要安装下 sudo apt-get install openssl 一般是1.1版本 够用了
3:websocket server
1> 主要就用到 epoll 模式(io_uring 更好点,就是内核版本要高点),3个进程 主进程作为监控进程 2个子进程 一个network进程 一个 logic 进程
2> 子进程间 主要通过共享内存 加socketpair 通知 交换数据
在这里插入图片描述
在这里插入图片描述
3>websocket 握手协议 先看例子
在这里插入图片描述
上前端代码 html

<!DOCTYPE HTML>  
<html>  
<head>  
    <meta http-equiv="content-type" content="text/html" />  
    <meta name="author" content="https://github.com/" />  
    <title>websocket test</title>  
    <script>
		var socket;  
		function Connect(){  
			try{  
				socket=new WebSocket('ws://192.168.1.131:9000'); //'ws://192.168.1.131:9000');  
			}catch(e){  
				alert('error catch'+e);  
				return;  
			}  
			socket.onopen = sOpen;  
			socket.onerror = sError;
			socket.onmessage= sMessage;
			socket.onclose= sClose;
		}  
		function sOpen(){
			alert('connect success!');
		}
		function sError(e){
			alert("[error] " + e);
			//writeObj(e);
		}
		function sMessage(msg){ 
			if(typeof(msg) == 'object'){
				//let json = JSON.stringify(msg);
				//console.log('server says:' +json);
				//writeObj(msg);
				if(msg.data){  //msg.hasOwnProperty('data')
					console.log('server says'+msg.data);
				}else{
					writeObj(msg);
					//console.log('[1]server says'+msg.data);
				}
			}else{
				alert('server says:' + msg);  
			}
			
		}
		function sClose(e){
			alert("connect closed:" + e.code);
		}  
		function Send(){
			socket.send(document.getElementById("msg").value);
		} 
		function Close(){
			socket.close();
		} 
		function writeObj(obj){ 
			var description = ""; 
			for(var i in obj){ 
			var property=obj[i]; 
			description+=i+" = "+property+"\n"; 
			} 
			alert(description); 
		}
	</script>
</head>  
   
<body>  
<input id="msg" type="text">  
<button id="connect" onclick="Connect();">Connect</button>  
<button id="send" onclick="Send();">Send</button>  
<button id="close" onclick="Close();">Close</button>

</body>  
   
</html>  

在Microsoft Edge 运行结果
在这里插入图片描述
golang 前端代码如下

package main

import (
	"fmt"
	"golang.org/x/net/websocket"
	"log"
	"strings"
)


var origin = "http://192.168.1.131:9000"  
//var url = "ws://192.168.1.131:7077/websocket"
var url = "wss://192.168.1.131:9000/websocket"
func main() {
	ws, err := websocket.Dial(url, "", origin)
	if err != nil {
		log.Fatal(err)
	}

	// send text frame
	var message2 = "hello"
	websocket.Message.Send(ws, message2)
	fmt.Printf("Send: %s\n", message2)
	// receive text frame
	var message string
	websocket.Message.Receive(ws, &message)
	fmt.Printf("Receive: %s\n", message)

	for true {
		fmt.Printf("please input string:")
		var inputstr string
		fmt.Scan(&inputstr)
		if(strings.Compare(inputstr,"quit") == 0){
			break
		}else{
			websocket.Message.Send(ws, inputstr)
			fmt.Printf("Send: %s\n", inputstr)

			var output string
			websocket.Message.Receive(ws, &output)
			fmt.Printf("Receive: %s\n", output)
		}

	}
	ws.Close()//关闭连接
	fmt.Printf("client exit\n")
}

测试结果
在这里插入图片描述

server 握手代码
在这里插入图片描述

int  c_WebSocket::recv_handshake() {
    int n, len, ret;
    uint32_t pos = 0;
    uint16_t u16msglen = 0;
    const bool bssl = isSsl();
    if (bssl) {
        n = SSL_read(m_ssl, m_recv_buf + m_recv_pos, m_recv_buf_size - m_recv_pos);
    }
    else {
        n = recv(m_fd, m_recv_buf + m_recv_pos, m_recv_buf_size - m_recv_pos, 0);
    }
   
    if (n > 0) {
        if (m_is_closed) {
            m_recv_pos = 0;
            return true;
        }
        m_recv_pos += n;
        m_recv_buf[m_recv_pos] = 0;
        printf("recv %d handshake %s len=%d recvlen=%d", m_id, m_recv_buf,n,m_recv_pos);
        // goto READ;
      //  int32_t pos = 0;
        for (;;) {
            if (m_recv_pos >= c_u16MinHandShakeSize)  //消息头
            {
                //  \r 0x0D \n  0xA
                const  int nRet = fetch_http_info((char*)m_recv_buf, m_recv_pos);
                if (1 == nRet) {  //ok

                    //  f(strcasecmp(header, "Sec-Websocket-Protocol") == 0)
                    //      conn->accepted_protocol = value;
                    // 
                    std::map<std::string, std::string>::iterator it1 = m_map_header.find("Sec-WebSocket-Key"); //一般固定24个字节
                    std::map<std::string, std::string>::iterator it2 = m_map_header.find("Sec-WebSocket-Protocol");//
                    int map_size = m_map_header.size();
                    if (it1 != m_map_header.end()) {
                        printf("key=%s value=%s %d \n", it1->first.c_str(), it1->second.c_str(), map_size);
                    }
                    else {
                        return -1;
                    }

                    char acceptvalue[1024] = { 0, };
                    uint32_t  value_len = it1->second.length();
                    memcpy(acceptvalue, it1->second.c_str(), value_len);
                    //  memcpy(accept_key, websocket_key, key_length);
#define MAGIC_KEY "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
                    memcpy(acceptvalue + value_len, MAGIC_KEY, 36);
                    acceptvalue[value_len + 36] = 0;

                    unsigned char md[SHA_DIGEST_LENGTH];
                    SHA1((unsigned char*)acceptvalue, strlen(acceptvalue), md);
                    std::string server_key = base64_encode(reinterpret_cast<const unsigned char*>(md), SHA_DIGEST_LENGTH);
                   
                    char rep_handshake[1024] = { 0, };
                    memset(rep_handshake, 0, sizeof(rep_handshake));
                    if (it2 != m_map_header.end()) {
                        //子协议
                        char szsub_protocol[512] = { 0, };
                        std::size_t pos_t = it2->second.find(",");
                        if (pos_t != std::string::npos && pos_t < 512) {

                            memcpy(szsub_protocol, it2->second.c_str(), pos_t);
                            sprintf(rep_handshake, "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: %s\r\nSec-WebSocket-Protocol: %s\r\n\r\n",
                                server_key.c_str(), szsub_protocol);
                        }
                        else {
                            return -1;
                        }
                    }
                    else {
                        sprintf(rep_handshake, "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: %s\r\n\r\n",
                            server_key.c_str());
                    }
                    m_recv_pos = 0;
                    set_handshake_ok(); //握手完毕
                    send_pkg((uint8_t*)rep_handshake, strlen(rep_handshake));
                    break;
                }
                else {
                    printf("fetch_http_info nRet=%d \n ", nRet);
                    return -2;
                }
            }
            else {
                break;
            }
        }//end for

        if (pos != 0 && m_recv_pos > 0) {
            memcpy(m_recv_buf, m_recv_buf + pos, m_recv_pos);
        }
    }
    else {

        if (bssl) {
            //EAGAIN或EWOULDBLOCK二者应该是一样的,对应的错误码是11
            //ret = SSL_get_error(m_ssl, n);//int ssl_error = SSL_get_verify_result(ssl);
            //if (SSL_ERROR_WANT_READ == ret || SSL_ERROR_WANT_WRITE == ret) return true;
            SSL_ERROR_NONE == ret  n>0 ok  other error
            //printf("SSL_get_error(%d %d %d)\n", n, ret, errno);//SSL_get_error(-1 1,11)
            //return false;
            int ret = ssl_check_error(m_ssl, n);
            printf("SSL_get_error(%d %d %d %d)\n", n, ret, errno, m_recv_pos);//SSL_get_error(-1 -1 11)
            if (ret == -2) {
                return true;
            }
            if (errno == EAGAIN || errno == EINTR) {
                return true;
            }
            return false;
        }
        else {
            if (n == 0)
                return false;

            if (errno == EAGAIN || errno == EINTR) {
                return true;
            }
            else {
                return false;
            }
        }
        
    }
    return  true;
}

int c_WebSocket::fetch_http_info(char* recv_buf, const uint32_t buf_len) {

    //  \r 0x0D \n  0xA
    const uint32_t max_len = 1024;
    char bufline[max_len] = { 0, };
    uint32_t  bufpos = 0;
    uint8_t   ustate = 0;
    //std::map<std::string, std::string> map_header;
    char szsubhead[max_len] = { 0, };
    for (uint32_t i = 0; i < buf_len; i++) {
        bufline[bufpos++] = recv_buf[i];
        if (bufpos >= max_len) return  -1;
        if (recv_buf[i] == 0x0A) {
            bufline[bufpos] = 0;
            if (0 == ustate) { //GET /websocket HTTP/1.1

                if (ws_strncmp(bufline, "GET ", 4)) {
                    if (bufpos < 15) {
                        return -1;
                    }
                    //the get url must have a minimum size: GET / HTTP/1.1\r\n 16 (15 if only \n)
                    //return nopoll_cmp (buffer + iterator, "HTTP/1.1\r\n") || nopoll_cmp (buffer + iterator, "HTTP/1.1\n");
                  //  char* pos1 = strstr(bufline, "HTTP/1.1\r\n");
                  ///  char* pos2 = strstr(bufline, "HTTP/1.1\n");
                    int32_t  nhttp1_1_pos = (int32_t)(bufpos - 2 - 8); //HTTP/1.1  8BYTE   HTTP/1.1\r\n  //H的位置
                    if (bufline[bufpos - 2] != '\r') {
                        nhttp1_1_pos += 1;//HTTP/1.1\n
                    }

                    const  int32_t ucopylen = nhttp1_1_pos - 1 - 4; // -1 http前的空格  -4 是GET空格 的长度
                    if (ucopylen > 0 && ucopylen < 128) { //   /websocket 长度
                        memcpy(szsubhead, bufline + 4, ucopylen);
                        szsubhead[ucopylen] = 0;
                    }
                    else {
                        return -3;
                    }

                }
                else {
                    return -1;
                }
                ustate = 1;
                bufpos = 0;
            }
            else {
                //if (buffer_size == 2 && nopoll_ncmp (buffer, "\r\n", 2))
                if (2 == bufpos && ws_strncmp(bufline, "\r\n", 2)) {//握手协议结尾
                    ustate == 2;
                    //检查最基本的握手协议
                    // Connection: Upgrade
               //     Host: 192.168.1.2 : 8080
              //      Sec - WebSocket - Key : 821VqJT7EjnceB8m7mbwWA ==
              //      Sec - WebSocket - Version : 13
              //      Upgrade : websocket
                    // ensure we have all minumum data 
                    std::map<std::string, std::string>::iterator it1 = m_map_header.find("Upgrade");//固定  websocket
                    std::map<std::string, std::string>::iterator it2 = m_map_header.find("Connection");  //固定 Upgrade
                    std::map<std::string, std::string>::iterator it3 = m_map_header.find("Sec-WebSocket-Version");
                    std::map<std::string, std::string>::iterator it4 = m_map_header.find("Sec-WebSocket-Key"); //一般固定24个字节
                    const bool  bcheckOrigin = false; //浏览器必须有,其他可能没有
                    std::map<std::string, std::string>::iterator it5 = m_map_header.find("Origin"); //

                    if (it1 != m_map_header.end() && ws_strncmp(it1->second.c_str(), "websocket", 9) &&
                        it2 != m_map_header.end() && ws_strncmp(it2->second.c_str(), "Upgrade", 7) &&
                        it3 != m_map_header.end() && ws_strncmp(it3->second.c_str(), "13", 2) &&
                        it4 != m_map_header.end() && it4->second.length() > 12 &&
                        (bcheckOrigin == (bcheckOrigin && it5 != m_map_header.end()))) { //其他字段忽略了
                        return 1;
                    }
                    return -6;

                }
                else {
                    char* pos1 = strstr(bufline, ":");
                    if (pos1 != nullptr) {
                        //  std::string key = header.substr(0, end);
                        //  std::string value = header.substr(end + 2);
                        int32_t  key_len = pos1 - bufline;
                        int32_t  value_len = bufpos - key_len - 1 - 1;
                        if (key_len > 1 && value_len > 1) {
                            bufline[key_len] = 0;
                            std::string key = bufline;
                            if (bufline[bufpos - 1] == '\n') {
                                bufline[bufpos - 1] = 0;
                                //   --value_len;
                            }
                            if (bufline[bufpos - 2] == '\r') {
                                bufline[bufpos - 2] = 0;
                                //   --value_len;
                            }
                            std::string value = &bufline[key_len + 2];
                            m_map_header[key] = value;
                        }
                        else {
                            return -4;
                        }



                    }
                    else {
                        return -4;
                    }
                    bufpos = 0;
                }
            }

        }

    }

    return 0;
}

握手请求与回复
Origin: http://192.168.1.131:9000 : 原始的协议和URL
Connection: Upgrade:表示要升级协议了
Upgrade: websocket:表示要升级到 WebSocket 协议;
Sec-WebSocket-Version: 13:表示 WebSocket 的版本。如果服务端不支持该版本,需要返回一个 Sec-WebSocket-Versionheader ,里面包含服务端支持的版本号
Sec-WebSocket-Key:与后面服务端响应首部的 Sec-WebSocket-Accept 是配套的,提供基本的防护,比如恶意的连接,或者无意的连接
服务端响应协议升级
HTTP/1.1 101 Switching Protocols
Connection:Upgrade
Upgrade: websocket
Sec-WebSocket-Accept: Oy4NRAQ13jhfONC7bP8dTKb4PTU=
HTTP/1.1 101 Switching Protocols: 状态码 101 表示协议切换
Sec-WebSocket-Accept:根据客户端请求首部的 Sec-WebSocket-Key 计算出来
将 Sec-WebSocket-Key 跟 258EAFA5-E914-47DA-95CA-C5AB0DC85B11 拼接。
通过 SHA1 计算出摘要,并转成 base64 字符串。计算公式如下:
Base64(sha1(Sec-WebSocket-Key + 258EAFA5-E914-47DA-95CA-C5AB0DC85B11))
Connection:Upgrade:表示协议升级
Upgrade: websocket:升级到 websocket 协议

4:接受数据帧
在这里插入图片描述
代码如下

int  c_WebSocket::recv_dataframe() {
    int n, len,ret;
    //   uint32_t pos = 0;
    uint16_t u16msglen = 0;
    const bool bssl = isSsl();
    if (isSsl()) {
        n = SSL_read(m_ssl, m_recv_buf + m_recv_pos, m_recv_buf_size - m_recv_pos);
    }
    else {
        n = recv(m_fd, m_recv_buf + m_recv_pos, m_recv_buf_size - m_recv_pos, 0);
    }
  //  n = recv(m_fd, m_recv_buf + m_recv_pos, m_recv_buf_size - m_recv_pos, 0);
    if (n > 0) {
        if (m_is_closed) {
            m_recv_pos = 0;
            return true;
        }
        m_recv_pos += n;

        // goto READ;
        int32_t pos = 0;
        for (;;) {
            if (m_recv_pos >= c_u16MsgHeadSize)  //消息头 2个字节
            {
                int t = parse_dataframe(m_recv_buf + pos, m_recv_pos);
                if (t < 0) return false;
                else if (0 == t) break;

                pos += t;
                m_recv_pos -= t; u16msglen + c_u16MsgHeadSize; //sub one packet len
                //  pos += u16msglen + c_u16MsgHeadSize;
            }
            else {
                break;
            }
        }//end for

        if (pos != 0 && m_recv_pos > 0) {
            memcpy(m_recv_buf, m_recv_buf + pos, m_recv_pos);
        }
        if (pos > 0) { //收到消息的时间
            m_lastrecvmsg = get_reactor().getCurSecond();
        }
    }
    else {

        if (bssl) {
            //ret = SSL_get_error(m_ssl, n);
            //if (SSL_ERROR_WANT_READ == ret || SSL_ERROR_WANT_WRITE == ret) return true;
            SSL_ERROR_NONE == ret  n>0 ok  other error
            //return false;
            int ret = ssl_check_error(m_ssl, n);
            if (ret == -2) {
                return true;
            }
            if (errno == EAGAIN || errno == EINTR) {
                return true;
            }
            return false;
        }
        else {
            if (n == 0)
                return false;

            if (errno == EAGAIN || errno == EINTR) {
                return true;
            }
            else {
                return false;
            }
        }
    }
    return  true;
}

处理数据帧,把payload 转发到 logic进程,由logic去处理
一帧数据长度超过 65k 直接抛弃,这里可以根据实际需求设定长度

int  c_WebSocket::parse_dataframe(uint8_t* recv_buf, const uint32_t buf_len) {
    /* get fin bytes */
#define  FAIL_AND_CLOSE  -1         //接受失败OR 关闭
#define  NEED_CLOSE  -1             //需要关闭

#define  CONTINUE_RECV_MSG  0       //消息不完整,需要继续接受
#define  ONE_MSG_LENGHT(X) X        //接受完一条消息,消息总长度为X

#define  MASK_LEN   4      //掩码长度 
#define  PAYLOAD_LENGTH_126  2   //126 额外2个字节


    uint8_t  t_fin = msg_get_bit(recv_buf[0], 7);
    if (t_fin == 0) return FAIL_AND_CLOSE;
    uint8_t t_code = recv_buf[0] & 0x0F;
    uint8_t t_masked = msg_get_bit(recv_buf[1], 7);
    uint16_t t_payload_size = recv_buf[1] & 0x7F;
    if (t_masked == 0) return  FAIL_AND_CLOSE;


    if (t_code == CLOSE_FRAME) {  //关闭帧
        return NEED_CLOSE;
    }


    uint16_t t_playload_pos = c_u16MsgHeadSize;
    if (t_payload_size == 126) {
        if (buf_len < c_u16MsgHeadSize + PAYLOAD_LENGTH_126)  return CONTINUE_RECV_MSG;
        uint16_t length = 0;
        memcpy(&length, recv_buf + c_u16MsgHeadSize, PAYLOAD_LENGTH_126);
        if (length > MAX_PAYLOAD_REQ) return FAIL_AND_CLOSE;  //消息过长
        if (buf_len < c_u16MsgHeadSize + PAYLOAD_LENGTH_126 + MASK_LEN + length) return CONTINUE_RECV_MSG; //等下此接受  //4 为mask长度,前端发过来必须有
        t_payload_size = length;
        t_playload_pos += PAYLOAD_LENGTH_126; //
    }
    else if (t_payload_size == 127) {
        return FAIL_AND_CLOSE;
    }
    else {
        if (buf_len < c_u16MsgHeadSize + MASK_LEN + t_payload_size) return CONTINUE_RECV_MSG; //等下此接受
    }


    memcpy(masking_key_, &recv_buf[t_playload_pos], MASK_LEN);
    t_playload_pos += MASK_LEN; //

    if (t_code == PONG_FRAME) {
        if (m_lastsendping > 0) {
            printf("time=[%u]recv PONG_FRAME \n",g_reactor.getCurSecond());
            m_lastsendping = 0; //ping消息回复
            m_sendpingcount = 0;
        }
        return ONE_MSG_LENGHT(t_playload_pos + t_payload_size;)
    }

    if (t_payload_size == 0) {
        if (t_code == PING_FRAME) {
            //  nopoll_conn_send_pong(conn, nopoll_msg_get_payload_size(msg), (noPollPtr)nopoll_msg_get_payload(msg));
            //  nopoll_msg_unref(msg);
            send_data((char*)&recv_buf[t_playload_pos], t_payload_size, PONG_FRAME);
            return  t_playload_pos + t_payload_size;
        }
        return FAIL_AND_CLOSE;

    }
    // char* play_load = (char*)&recv_buf[t_playload_pos];
    m_payload_length_ = t_payload_size;

    int j = 0;
    for (uint i = 0; i < m_payload_length_; i++) {
        j = i % 4;
        m_payload_[i] = recv_buf[t_playload_pos + i] ^ masking_key_[j];
    }

    //put to public proc
    shm_block_t sb;
    sb.fd = m_fd;
    sb.id = m_id;
    sb.len = t_payload_size;
    sb.type = PROTO_BLOCK;
    sb.frametype = t_code;
    //把数据发送出去
   // recv_push(m_u32channel, m_u32pipeindex, &sb, m_recv_buf + pos + c_u16MsgHeadSize, false);
    recv_push(m_u32channel, m_u32pipeindex, &sb, (uint8_t*)m_payload_, false);
    //int32_t nRet =	printf("client recv one complete pack len=%d m_u32pipeindex=%d nRet=%d\n", u16msglen, m_u32pipeindex, nRet);
 //   m_recv_pos -= u16msglen + c_u16MsgHeadSize; //sub one packet len
 //   pos += u16msglen + c_u16MsgHeadSize;
    return  t_playload_pos + t_payload_size;

}

再来个logic 进程处理

void c_Logic::dologic(struct shm_block_t* pblock, uint8_t *buf, bool brecv)
{
	//处理收到的逻辑
	switch (pblock->type)
	{
	case CLOSE_BLOCK:
		{
			
		}
		break;
	case PROTO_BLOCK:
		{
			if (strncmp((char*)buf, "hello", 5) == 0) {
				buf[0] = 'H';
				buf[1] = 'E';
				buf[2] = 'L';
				buf[3] = 'L';
				buf[4] = 'O';
				send_data(pblock,buf,pblock->len, (WebSocketFrameType)pblock->frametype);
			}
			else {
				buf[0] = '_';
				send_data(pblock, buf, pblock->len, (WebSocketFrameType)pblock->frametype);
			}
		}
		break;
	case CDUMP_BLOCK:
		{

		}
		break;
	default:
		break;
	}
}

发送数据

int send_data(struct shm_block_t* pblock, uint8_t* msg, const uint32_t msglen, WebSocketFrameType ftype) {
	const uint32_t   MAX_PAYLOAD_SEND = 4 * 1024; //最大发送长度
	if (msglen > MAX_PAYLOAD_SEND) return -1;

	uint32_t length = msglen;
	char               header[14];
	int                header_size;
	memset(header, 0, sizeof(header));
	const bool bhas_fin = true;
	if (bhas_fin) {
		msg_set_bit(header, 7);
	}
	if (ftype >= 0) {
		header[0] |= ftype & 0x0f;
	}
	const bool bhas_mask = false; //服务器发送不需要mask,前端给过来才需要
	if (bhas_mask) {
		msg_set_bit(header + 1, 7);
	}

	header_size = 2;

	if (length < 126) {
		header[1] |= length;
	}
	else if (length <= 0xFFFF) {
		/* set the next header length is at least 65535 */
		header[1] |= 126;
		header_size += 2;
		/* set length into the next bytes */
		msg_set_16bit(length, header + 2);
	}
	else {
		//再大的不让发送 //先写上,用不上也没关系
		header[1] = 127;
#if defined(WS_64BIT_PLATFORM)
		if (length < 0x8000000000000000) {
			header[2] = (length & 0xFF00000000000000) >> 56;
			header[3] = (length & 0x00FF000000000000) >> 48;
			header[4] = (length & 0x0000FF0000000000) >> 40;
			header[5] = (length & 0x000000FF00000000) >> 32;
		}
#else
		// (length < 0x80000000)
		header[2] = header[3] = header[4] = header[5] = 0;
#endif
		header[6] = (length & 0x00000000FF000000) >> 24;
		header[7] = (length & 0x0000000000FF0000) >> 16;
		header[8] = (length & 0x000000000000FF00) >> 8;
		header[9] = (length & 0x00000000000000FF);

		header_size += 8;
	}

	if (bhas_mask) {
		//不写了 //
	  //  msg_set_32bit(mask_value, header + header_size);
	  //  header_size += 4;
	}
	

	uint8_t buf[MAX_PAYLOAD_SEND + 14];
	memcpy(buf, header, header_size);
	memcpy(buf + header_size, msg, msglen);
	//send_pkg(buf, msglen + header_size);
	//return msglen + header_size;

	shm_block_t sb;
	sb.fd = pblock->fd;
	sb.id = pblock->id;
	sb.type = PROTO_BLOCK;
	sb.len = msglen + header_size;
	sb.frametype = (uint8_t)ftype;
	send_push(0, 1, &sb, buf, true);
	return 0;
}

5:支持 SSL
先加载证书

bool c_Accept::loadssl(const char* private_key_file, const char* server_crt_file, const char* ca_crt_file) {

	m_ctx = SSL_CTX_new(SSLv23_server_method());
	if (!m_ctx) { return false; }
	//assert(ctx);
	// 不校验客户端证书
	SSL_CTX_set_verify(m_ctx, SSL_VERIFY_NONE, nullptr);
	// 加载CA的证书  
	if (!SSL_CTX_load_verify_locations(m_ctx, ca_crt_file, nullptr)) {
		printf("SSL_CTX_load_verify_locations error!\n");
		return false;
	}
	// 加载自己的证书  
	if (SSL_CTX_use_certificate_file(m_ctx, server_crt_file, SSL_FILETYPE_PEM) <= 0) {
		printf("SSL_CTX_use_certificate_file error!\n");
		return false;
	}

	// 加载私钥
	if (SSL_CTX_use_PrivateKey_file(m_ctx, private_key_file, SSL_FILETYPE_PEM) <= 0) {
		printf("SSL_CTX_use_PrivateKey_file error!\n");
		return false;
	}

	// 判定私钥是否正确  
	if (!SSL_CTX_check_private_key(m_ctx)) {
		printf("SSL_CTX_check_private_key error!\n");
		return false;
	}
	return true;}

accept 后, ssl = SSL_new(get_ssl_ctx()); 再调用 SSL_accept

bool c_Accept::handle_input()
{
	sockaddr_in ip;
	socklen_t len;

	int cli_fd;
	while (1) {
		len = sizeof(ip);
		cli_fd = accept(m_fd, (sockaddr *)&ip, &len);

		if (cli_fd >= 0) {
			if ((uint32_t)cli_fd >= get_reactor().max_handler()) {
				close(cli_fd);
				continue;
			}
			if (!get_reactor().add_cur_connect(get_max_connect())) {
				printf("client max connect is over \n");
				close(cli_fd);
				return true;
			}
			SSL* ssl = nullptr;
			if (isSsl()) {
				ssl = SSL_new(get_ssl_ctx());
				if (ssl == nullptr) {
					get_reactor().sub_cur_connect();
					close(cli_fd);
					continue;
				}
				printf("accept SSL_new \n");
			}
			c_WebSocket*ts = new (std::nothrow) c_WebSocket();
			if (!ts) {
				get_reactor().sub_cur_connect();
				close(cli_fd);
				continue;
			}
			printf("accept client connect \n");
			ts->start(cli_fd, ip,m_u32channel,m_u32pipeindex,ssl);
		}
		else {
			if (errno == EAGAIN || errno == EINTR || errno == EMFILE || errno == ENFILE) {
				return true;
			}
			else {
				return false;
			}
		}
	}
}
void c_WebSocket::start(int fd, sockaddr_in& ip, uint32_t channel, uint32_t u32pipeindex,SSL* ssl)
{
    m_u32channel = channel;
    m_u32pipeindex = u32pipeindex;
    m_fd = fd;
    m_ip = ip;
    //---------------------------------------------
    m_lastrecvmsg = g_reactor.getCurSecond();
    c_heartbeat::GetInstance().handle_input_modify(fd, m_id, m_lastrecvmsg, m_lastrecvmsg);
    set_noblock(m_fd);

    m_ssl = ssl;
    if (isSsl()) {
        printf("ssl client handshake ready\n");
        SSL_set_fd(ssl, m_fd);
        int code, ret;
        int retryTimes = 0;
     //   uint64_t begin = 0;//Time::SystemTime();
        // 防止客户端连接了但不进行ssl握手, 单纯的增大循环次数无法解决问题,
        while ((code = SSL_accept(ssl)) <= 0 && retryTimes++ < 100) {
            ret = SSL_get_error(ssl, code);
            if (ret != SSL_ERROR_WANT_READ) {
                printf("ssl accept error. sslerror=%d  errno=%d \n", ret,errno); // SSL_get_error(ssl, code));
                break;
            }
            usleep(20 * 1000);//20ms //msleep(1); //这里最多会有2s的等待时间,以后一定要异步
        }

        if (code != 1) {
            handle_fini();
            return;
        }
        printf("ssl client handshake ok (%d)\n", retryTimes);
    }

    m_recv_buf_size = default_recv_buff_len;
    m_recv_buf = (uint8_t*)malloc(m_recv_buf_size);
    if (!m_recv_buf) {
        handle_fini();
        return;
    }
    //----------------------------------
    return;
}

6:心跳检查 10秒(可以自行设定)未收到消息,发送ping,发送2次,没回应 断线

bool c_WebSocket::checcklastmsg(uint32_t t) {
    if (m_lastrecvmsg + 10 <= t) {
        if (!is_handshake_ok()) return true;
        if (m_lastsendping > 0 && m_lastsendping + 10 <= t && m_sendpingcount > 1) {
            //disconnect
            printf("[%u]ready disconnect \n",t);
            return true;
        }
        else if(m_lastsendping == 0  || (m_sendpingcount > 0 && m_lastsendping+10 <=t)){
            //发送ping
            m_lastsendping = t;
            ++m_sendpingcount;
            send_ping_frame();
            printf("time=%u,%d send ping frame\n", t, m_sendpingcount);
        }
    }
    return false;
}

int c_WebSocket::send_ping_frame() {

    uint32_t length = 0;
    char               header[14];
    int                header_size;
    memset(header, 0, sizeof(header));
    const bool bhas_fin = true;
    if (bhas_fin) {
        msg_set_bit(header, 7);
    }
    header[0] |= PING_FRAME & 0x0f;
    const bool bhas_mask = false; //服务器发送不需要mask,前端给过来才需要
    if (bhas_mask) {
        msg_set_bit(header + 1, 7);
    }

    header_size = 2;

    if (length < 126) {
        header[1] |= length;
    }

    uint8_t buf[MAX_PAYLOAD_SEND + 14];
    memcpy(buf, header, header_size);
    // memcpy(buf + header_size, msg, msglen);

    send_pkg(buf, header_size);
    return header_size;
}
void c_WebSocket::send_pkg(uint8_t* buf, uint32_t len){
//--------------------------------------------------------------
//有上次预留的
 uint32_t p = 0;
 int n;
 if (isSsl()) {
     n = SSL_write(m_ssl, buf, len);   // 发送响应主体
 }
 else {
    n = send(m_fd, buf, len, 0);
 }
 if (n > 0) {
     if ((uint32_t)n == len) {
         //printf("send data len = %d need send len=%d \n",n,len);
         return;
     }
     else {
         p = n;
     }
 }
 else {
     if (errno != EAGAIN && errno != EINTR) {
         handle_error();
         return;
     }
 }
 //没发送完,存起来下次再发送,这里自行处理
 //----------------------------------------------------------------
}

7:json配置文件读取 jsoncpp API

bool  c_JsonReader::read_json_file(const char* jsonfile)
{
#define  LISTENIP				"listenip"
#define  LISTENPORT				"listenport"
#define  USESSL					"usessl"
#define  PRIVATEKEYFILE			"privatekeyfile"
#define  SERVERCRTFILE			"servercrtfile"
#define  CACRTFILE				"cacrtfile"
#define  AES128KEYHANDSHAKE		"aes128keyhandshake"
#define  AES128IV				"aes128iv"
#define  MAXCONN				"maxconn"
#define  CHECKHEARTBEAT			"checkheartbeat"
#define  OPENBLACKWHITEIP		"openblackwhiteip"
#define  SINGLEIPMAXCONN		"singleipmaxconn"

#define  MIN(A,B)  A<B?A:B
	FILE* f = fopen(jsonfile, "rb");
	if (f) {
		const int buf_size = 4 * 1024; 
		char buf[buf_size] = { 0, };
		memset(buf, 0, sizeof(buf));
		size_t n = fread(buf, sizeof(char), buf_size, f);
		fclose(f);
		if (n < 10) {
			printf("read_json_file file  length too short \n");
			return false;
		}

		Json::Reader reader;
		Json::Value root;
		if (reader.parse(buf,root)) {

			if (root[LISTENIP].empty() || root[LISTENPORT].empty() || root[MAXCONN].empty()  \
				|| root[USESSL].empty()  || root[AES128KEYHANDSHAKE].empty() || root[AES128IV].empty()) {
				printf("read_json_file base fail\n");
				return  false;

			}
			const bool busessl = root[USESSL].asBool();
			m_chatSerCfg.buseSsl = busessl;
			if (busessl) { 
				const bool bp = root[PRIVATEKEYFILE].empty();
				const bool bs = root[SERVERCRTFILE].empty();
				const bool bc = root[CACRTFILE].empty();
				if (bp || bs || bc) {
					printf("read_json_file ssl fail\n");
					return  false;
				}
				strncpy(m_chatSerCfg.szprivatekeyfile, root[PRIVATEKEYFILE].asString().c_str(), MIN(root[PRIVATEKEYFILE].asString().length(), ssl_file_len));
				strncpy(m_chatSerCfg.szservercrtfile, root[SERVERCRTFILE].asString().c_str(), MIN(root[SERVERCRTFILE].asString().length(), ssl_file_len));
				strncpy(m_chatSerCfg.szcacrtfile, root[CACRTFILE].asString().c_str(), MIN(root[CACRTFILE].asString().length(), ssl_file_len));
			}
			else{

			}
			m_chatSerCfg.u16maxconn = (uint16_t)root[MAXCONN].asUInt();
			strncpy(m_chatSerCfg.szlistenip, root[LISTENIP].asString().c_str(), sizeof(m_chatSerCfg.szlistenip) - 1);
			m_chatSerCfg.nlistenport = (int32_t)root[LISTENPORT].asInt();

			memcpy(m_chatSerCfg.u8AES128keyhandshake, root[AES128KEYHANDSHAKE].asString().c_str(), root[AES128KEYHANDSHAKE].asString().length());
			memcpy(m_chatSerCfg.u8AES128iv, root[AES128IV].asString().c_str(), 16);

			{//safe config
				const bool bcheck = root[CHECKHEARTBEAT].empty();
				const bool bbwip = root[OPENBLACKWHITEIP].empty();
				const bool bmaxconn = root[SINGLEIPMAXCONN].empty();
				if (!bcheck) {
					m_chatSerCfg.bcheckheartbeat = root[CHECKHEARTBEAT].asBool();
				}

				if (bbwip && (bbwip == bmaxconn)) {
					m_chatSerCfg.u8openblackwhiteip =(uint8_t) root[CHECKHEARTBEAT].asUInt();
					m_chatSerCfg.u8singleipmaxconn = (uint8_t)root[SINGLEIPMAXCONN].asUInt();
				}
			}
			


			return true;
		}

	}
	printf("json file no exist or  parse json file fail \n");
	return false;
}

8:只是帮助分析websocket 协议
红框这边 ssl_accept 是需要优化的,可以考虑用coroutine 或 thread callback
在这里插入图片描述
9: 后续继续优化,差不多,再上demo
如果觉得有用,麻烦点个赞,加个收藏

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/252849.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【案例】注册表简介,新建一个右键菜单打开方式选项

这里写目录标题 来源注册表的介绍注册表编辑器VScode的打开方式菜单![image-20231217201730121](https://img-blog.csdnimg.cn/img_convert/56c02643df9e8ec3afb4f3ac5cc0cdd5.png)如何自定义一个右键菜单备份注册表新建一个菜单选项”右键用记事本打开“ DWORDQWORD可扩充字符…

社交网络分析3:社交网络隐私攻击、保护的基本概念和方法 + 去匿名化技术 + 推理攻击技术 + k-匿名 + 基于聚类的隐私保护算法

社交网络分析3&#xff1a;社交网络隐私攻击、保护的基本概念和方法 去匿名化技术 推理攻击技术 k-匿名 基于聚类的隐私保护算法 写在最前面社交网络隐私泄露用户数据暴露的途径复杂行为的隐私风险技术发展带来的隐私挑战经济利益与数据售卖防范措施 社交网络 用户数据隐私…

力扣刷题-二叉树-路径总和

112 路径总和 给定一个二叉树和一个目标和&#xff0c;判断该树中是否存在根节点到叶子节点的路径&#xff0c;这条路径上所有节点值相加等于目标和。 说明: 叶子节点是指没有子节点的节点。 示例: 给定如下二叉树&#xff0c;以及目标和 sum 22&#xff0c; 返回 true, 因为…

【漏洞复现】红帆OA iorepsavexml.aspx文件上传漏洞

漏洞描述 广州红帆科技深耕医疗行业20余年,专注医院行政管控,与企业微信、阿里钉钉全方位结合,推出web移动一体化办公解决方案——iOffice20(医微云)。提供行政办公、专业科室应用、决策辅助等信息化工具,采取平台化管理模式,取代医疗机构过往多系统分散式管理,实现医…

C#实现MQTT over WebSocket

如何在网页端实现MQTT消息的发布和订阅&#xff1f; 实现MQTT功能&#xff0c;可以发布和订阅主题通过WebSocket协议将MQTT消息转发给对应的网页端 带着这个实现思路&#xff0c;采用C#控制台程序实现MQTT服务端功能&#xff0c;web端可以直接使用websocket插件与服务端双向通…

力扣刷题-二叉树-二叉树的所有路径

257 二叉树的所有路径 给定一个二叉树&#xff0c;返回所有从根节点到叶子节点的路径。 说明: 叶子节点是指没有子节点的节点。 示例: 思路 参考&#xff1a; https://www.programmercarl.com/0257.%E4%BA%8C%E5%8F%89%E6%A0%91%E7%9A%84%E6%89%80%E6%9C%89%E8%B7%AF%E5%BE…

MNIST内置手写数字数据集的实现

torchvision库 torchivision库是PyTorch中用来处理图像和视频的一个辅助库&#xff0c;接下来我们就会使用torchvision库加载内置的数据集进行分类模型的演示 为了统一数据加载和处理代码&#xff0c;PyTorch提供了两个类用于处理数据加载&#xff0c;他们分别是torch.utils.…

进程通信知识基础【Linux】——下篇

目录 前文 一&#xff0c;命名管道 创建命名管道 1. getline——c库 2. unlink——系统接口 实践代码 common.hpp client.cpp server.cpp Log.cpp 二&#xff0c;共享内存&#xff08;system V接口&#xff09; 1. 创建共享内存 shmget接口 2. 删除共享内存 常见…

PMP项目管理 - 相关方管理

系列文章目录 PMP项目管理 - 质量管理 PMP项目管理 - 采购管理 PMP项目管理 - 资源管理 PMP项目管理 - 风险管理 PMP项目管理 - 沟通管理 现在的一切都是为将来的梦想编织翅膀&#xff0c;让梦想在现实中展翅高飞。 Now everything is for the future of dream weaving wing…

【一种用opencv实现高斯曲线拟合的方法】

背景&#xff1a; 项目中需要实现数据的高斯拟合&#xff0c;进而提取数据中标准差&#xff0c;手头只有opencv库&#xff0c;经过资料查找验证&#xff0c;总结该方法。 基础知识&#xff1a; 1、opencv中solve可以实现对矩阵参数的求解&#xff1b; 2、线的拟合就是对多项…

【深度强化学习】确定性策略梯度算法 DDPG

前面讲到如 REINFORCE&#xff0c;Actor-Critic&#xff0c;TRPO&#xff0c;PPO 等算法&#xff0c;它们都是随机性策略梯度算法&#xff08;Stochastic policy&#xff09;&#xff0c;在广泛的任务上表现良好&#xff0c;因为这类方法鼓励了算法探索&#xff0c;给出的策略是…

探索 Vim:一个强大的文本编辑器

引言&#xff1a; Vim&#xff08;Vi IMproved&#xff09;是一款备受推崇的文本编辑器&#xff0c;拥有强大的功能和高度可定制性&#xff0c;提供丰富的编辑和编程体验。本文将探讨 Vim 的基本概念、使用技巧以及为用户带来的独特优势。 简介和发展 1. Vim 的简介和历史 V…

【二分查找】自写二分函数的总结

作者推荐 【动态规划】【广度优先搜索】LeetCode:2617 网格图中最少访问的格子数 本文涉及的基础知识点 二分查找算法合集 自写二分函数 的封装 我暂时只发现两种&#xff1a; 一&#xff0c;在左闭右开的区间寻找最后一个符合条件的元素&#xff0c;我封装成FindEnd函数。…

力扣刷题-二叉树-平衡二叉树

110 平衡二叉树 给定一个二叉树&#xff0c;判断它是否是高度平衡的二叉树。 本题中&#xff0c;一棵高度平衡二叉树定义为&#xff1a;一个二叉树每个节点 的左右两个子树的高度差的绝对值不超过1。 示例 1: 给定二叉树 [3,9,20,null,null,15,7] 返回 true 。 给定二叉树 [1…

AUTOSAR ComM模块配置以及代码

ComM模块配置以及代码执行流程 1、基本的一个通道的配置列表 ComMNmVariant 概念的个人理解&#xff1a; FULL&#xff1a; 完全按照AUTOSAR NM方式进行调用 LIGHT &#xff1a;设置一个超时时间&#xff0c;在请求停止通信的时候开始计时&#xff0c;超时之后才会进入FULLCOM…

运维实践|采集MySQL数据出现many connection errors

文章目录 问题出现问题分析当前环境问题分析 解决方案1 检查调度事件任务是否开启2 开启调度事件任务3 创建一张日志表4 创建函数存储过程5 创建事件定时器6 开启事件调度任务7 检查核实是否创建 总结 问题出现 最近在做OGG结构化数据采集工作&#xff0c;在数据采集过程中&am…

将博客搬至微信公众号了

一、博客搬家通知 各位码友们好&#xff0c;大家是不是基本很少看 CSDN 了呢&#xff1f;平时开发是不都依靠着 chatGPT 来解决工作中的技术问题了&#xff0c;不过我觉得在工作中的使用场景各式各样的&#xff0c;具体问题还得自己具体去梳理逻辑&#xff0c;再考虑用什么样的…

C语言:求和1+1/2-1/3+1/4-1/5+……-1/99+1/100

#include<stdio.h> int main() {int i 0;double sum 0.0;int flag 1;for (i 1;i < 100;i){sum 1.0 / i * flag;flag -flag;}printf("sum%lf\n", sum);return 0; }

SpringIOC之@Primary

博主介绍&#xff1a;✌全网粉丝5W&#xff0c;全栈开发工程师&#xff0c;从事多年软件开发&#xff0c;在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战&#xff0c;博主也曾写过优秀论文&#xff0c;查重率极低&#xff0c;在这方面有丰富的经验…

力扣刷题-二叉树-找树左下角的值

513 找树左下角的值 给定一个二叉树的 根节点 root&#xff0c;请找出该二叉树的 最底层 最左边 节点的值。 假设二叉树中至少有一个节点。 示例 1&#xff1a; 示例 2&#xff1a; 思路 层序遍历 直接层序遍历&#xff0c;因为题目说了是最底层&#xff0c;最左边的值&a…