目录
再谈协议
结构化数据
计算器
封装socket
封装TcpServer服务器
协议定制
问题
解决方法
守护进程
setsid
守护进程函数
json库
序列化和反序列化
再谈协议
原来不是说过什么是协议吗,协议不就是通信双方必须遵守的约定吗,那这就完了吗?那肯定是没有。
网络协议就是双方必须遵守的约定,使数据在网络上能够传输,网络通信的双方必须遵循相同的规则,而协议就需要通过计算机语言的方式表示出来,只有通信计算机双方都遵守相同的协议才能互相通信交流。
我们使用的TCP/UDP接口,都是传输层对TCP/UDP的包装,以文件的形式让我们可以进行应用层编程,所以我们原来写过的代码都是应用层开发,但是并没有涉及到协议的内容,只是用套接字进行收发工作。
结构化数据
那如果我想要传输一些结构化的数据,这时就不能将数据一个个发送到网络中。可能光说要传入结构化的数据不好理解,我们接下来就通过一个例子来说一下。
如果我想要实现一个网络版本的计算器,客户端发送数据,服务端处理数据,客户端需要发送两个操作数和一个操作符,而这就是一组结构化的数据,如果客户端一个一个地发送,那么服务端就不知道客户端需要做什么,所以我们通常是这样做的。
有了这个序列化和反序列化的过程,就相当与加了一层软件层,实现业务逻辑和网络通信一定程度上的解耦。
结构体中的字段就是协议的一部分,就可以规定a必须要在左边,b必须要做右边。
计算器
封装socket
我们已经写过TCP服务器了,所以我们再来给他封装一下,主要的代码都没有改,把socket、bind、listen、accept、connect函数封装了一下,也为了以后更方便地使用。
// Sock.hpp #include <iostream> #include <string> #include <cerrno> #include <cstdio> #include <cstring> #include <cassert> #include <unistd.h> #include <sys/wait.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include "log.hpp" class Sock { public: const static int g_backlog = 20; public: Sock(uint16_t port, std::string ip = "") {} int Socket() { // 创建socket int listensock = socket(AF_INET, SOCK_STREAM, 0); if (listensock < 0) { logMessage(FATAL, "%d:%s", errno, strerror(errno)); exit(2); } logMessage(NORMAL, "sock: %d", listensock); return listensock; } void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0") { // bind struct sockaddr_in local; memset(&local, 0, sizeof(local)); local.sin_family = AF_INET; local.sin_port = htons(port); local.sin_addr.s_addr = ip.empty() ? INADDR_ANY : inet_addr(ip.c_str()); if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0) { logMessage(FATAL, "bind error, %d:%s", errno, strerror(errno)); exit(3); } logMessage(NORMAL, "bind success ..."); } void Listen(int sock) { // tcp面向连接的,通信前要先建立连接,设置为监听状态,listen if (listen(sock, g_backlog) < 0) { logMessage(FATAL, "listen error, %d:%s", errno, strerror(errno)); exit(4); } logMessage(NORMAL, "listen success ..."); } // const std::T& :输入型参数 // std::T* :输出型参数 // std::T& :输入输出型参数 int Accept(int listensock, std::string* ip, uint16_t* port) { // 获取连接accept struct sockaddr_in src; // 客户端addr socklen_t len = sizeof(src); int servicesock = accept(listensock, (struct sockaddr *)&src, &len); if (servicesock < 0) { logMessage(ERROR, "accept error, %d:%s", errno, strerror(errno)); return -1; // 获取失败 } if (port) *port = ntohs(src.sin_port); if (ip) *ip = inet_ntoa(src.sin_addr); return servicesock; } bool Connect(int sock, const std::string& server_ip, const uint16_t& server_port) { struct sockaddr_in server; memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; server.sin_port = htons(server_port); server.sin_addr.s_addr = inet_addr(server_ip.c_str()); if (connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0) return true; else false; } ~Sock() {} };
封装TcpServer服务器
通过构造函数就可以完成创建套接字、bind、listen的任务,并且拿到了listensock,func_t就是函数对象,以后有任务就放到数组中。
// TcpServer.hpp #include "Sock.hpp" #include <functional> namespace dsh_tcpserver { typedef std::function<void(int)> func_t; class TcpServer { public: TcpServer(const uint16_t &port, const std::string &ip = "0.0.0.0") :sock_(port) { listensock_ = sock_.Socket(); sock_.Bind(listensock_, port, ip); sock_.Listen(listensock_); } ~TcpServer() { if (listensock_ >= 0) close(listensock_); } private: int listensock_; Sock sock_; std::vector<func_t> func_; }; }
服务器的初始化工作已经做好后就是启动了,通过Accept拿到了对端IP和端口号,之后还是创建线程,让线程去做这些任务。
// TcpServer.hpp class ThreadData { public: ThreadData(int sock, TcpServer* server) :sock_(sock) ,server_(server) {} ~ThreadData(){} public: int sock_; TcpServer* server_; }; class TcpServer { private: static void *ThreadRoutine(void *args) { pthread_detach(pthread_self()); // 线程分离 ThreadData* td = static_cast<ThreadData*>(args); // 强制转化 td->server_->Excute(td->sock_); // 线程调用类内成员函数 close(td->sock_); // 调用完后关闭sock并释放td delete td; return nullptr; } public: void BindService(func_t func) // 绑定执行函数 { func_.push_back(func); } // 执行对应的函数 void Excute(int sock) { for (auto& f : func_) { f(sock); } } void Start() { while (true) { std::string clientip; uint16_t clientport; int sock = sock_.Accept(listensock_, &clientip, &clientport); // 接收的sock if (sock == -1) continue; logMessage(NORMAL, "accept new link success, sock: %d", sock); pthread_t tid; ThreadData* td = new ThreadData(sock, this); pthread_create(&tid, nullptr, ThreadRoutine, (void*)td); } } };
当我们运行server.cc的时候,拿到TcpServer就可以把要执行的函数绑定到函数对象数组中,从对端获取连接成功后,创建线程,把需要的数据放在ThreadData中,执行对应的ThreadRoutine函数,不想对线程join,直接设置线程分离,通过传入的this指针指向的TcpServer执行函数数组中的函数,执行完后,关闭此次连接的sock,释放td。
// CalServer.cc static void Usage(const std::string& process) { std::cout << "\nUsage: " << process << " port\n" << std::endl; } int main(int argc, char* argv[]) { if (argc != 2) { Usage(argv[0]); exit(1); } std::unique_ptr<TcpServer> server(new TcpServer(atoi(argv[1]))); server->BindService(/*要放入函数数组中的函数*/); server->Start(); return 0; }
这就把网络功能和服务进行了解耦,在server.cc中就可以这样写。
协议定制
因为我们要实现的是一个网络计算器,一定要有我们自己定制的协议。
想要从客户端发送一个Request,将这个Request序列化为字节流式的,通过网络发送到服务端,服务端把字节流式的数据反序列化为一个Request,处理这个Request。
Request处理完后就需要把结果Response返还给客户端,将Response序列化为字节流式的数据,通过网络返还给客户端,客户端拿到字节流式的数据反序列化为Response。
// Protocol.hpp #define SPACE " " #define SPACE_LEN strlen(SPACE) class Request { public: std::string Serialize() // 序列化 -> "x_ op_ y_" { std::string str; str = std::to_string(x_); str += SPACE; str += op_; str += SPACE; str += std::to_string(y_); return str; } bool Deserialize(const std::string& str) // 反序列化 "x_ op_ y_" -> 结构化数据 { std::size_t left = str.find(SPACE); if (left == std::string::npos) return false; std::size_t right = str.rfind(SPACE); x_ = stoi(str.substr(0, left)); y_ = stoi(str.substr(right + SPACE_LEN)); op_ = str[left + SPACE_LEN]; return true; } public: Request(){} Request(int x, int y, char op) :x_(x) ,y_(y) ,op_(op) {} ~Request(){} public: int x_; int y_; char op_; }; class Response { public: std::string Serialize() // 序列化 -> "code_ result_" { std::string s; s = std::to_string(code_); s += SPACE; s += std::to_string(result_); return s; } bool Deserialize(const std::string& str) // 反序列化 "code_ result_" -> 结构化数据 { std::size_t pos = str.find(SPACE); if (pos == std::string::npos) return false; code_ = stoi(str.substr(0, pos)); result_ = stoi(str.substr(pos + SPACE_LEN)); return true; } public: Response(){} Response(int result, int code) :result_(result) ,code_(code) {} ~Response(){} public: int result_; // 结果 int code_; // 结果状态码 }; // 封装一下recv和send std::string Recv(int sock) { char inbuffer[1024]; ssize_t s = recv(sock, inbuffer, sizeof(inbuffer), 0); if (s > 0) { inbuffer[s] = 0; return inbuffer; } else return ""; } void Send(int sock, const std::string str) { send(sock, str.c_str(), str.size(), 0); }
// CalServer.cc static Response calculatorHelp(const Request &req) // 拿到结构化的请求,返回结构化的数据 { Response resp(0, 0); switch (req.op_) { case '+': resp.result_ = req.x_ + req.y_; break; case '-': resp.result_ = req.x_ - req.y_; break; case '*': resp.result_ = req.x_ * req.y_; break; case '/': if (0 == req.y_) resp.code_ = 1; else resp.result_ = req.x_ / req.y_; break; case '%': if (0 == req.y_) resp.code_ = 2; else resp.result_ = req.x_ % req.y_; break; default: resp.code_ = 3; break; } return resp; } void calculator(int sock) { while (true) { std::string str = Recv(sock); // 读到了一个请求 if (str.empty()) break; Request req; req.Deserialize(str); // 反序列化 Response resp = calculatorHelp(req); // 得到一个结构化的结果 std::string respString = resp.Serialize(); // 结果序列化 // 如果正在写入的时候对端关闭了,那就不能再写入了,此时OS回向进程发送SIGPIPE信号,所以我们设置忽略即可 Send(sock, respString); } } int main(int argc, char *argv[]) { if (argc != 2) { Usage(argv[0]); exit(1); } // 通常我们的服务器会忽略这个信号,防止出现非法写入的问题 signal(SIGPIPE, SIG_IGN); std::unique_ptr<TcpServer> server(new TcpServer(atoi(argv[1]))); server->BindService(calculator); server->Start(); return 0; }
那什么是协议定制呢,这就类似于一个约定,我们规定发过来的数据必须是左操作数 操作符 右操作数。返回的数据必须有code来检测此次返还的数据是否是有效的:
- 0表示成功
- 1表示除0错误
- 2表示模0错误
- 3表示非法操作
最后我们在把客户端写完就可以了。
// CalClient.cc static void Usage(const std::string &process) { std::cout << "\nUsage: " << process << " ip port\n" << std::endl; } int main(int argc, char *argv[]) { if (argc != 3) { Usage(argv[0]); exit(1); } std::string server_ip = argv[1]; uint16_t server_port = atoi(argv[2]); Sock sock; int sockfd = sock.Socket(); // 创建套接字 if (!sock.Connect(sockfd, server_ip, server_port)) // 连接 { std::cerr << "Connect error" << std::endl; } // 通信 std::string s = ""; while (true) { std::cout << "请输入(格式: x op y): "; std::getline(std::cin, s); if (s == "quit") { close(sockfd); break; } Request req; req.Deserialize(s); s = req.Serialize(); Send(sockfd, s); s = Recv(sockfd); Response resp; resp.Deserialize(s); std::cout << "code: " << resp.code_ << std::endl; std::cout << "result: " << resp.result_ << std::endl; } return 0; }
问题
虽然我们的代码写完了,但是还是存在一定的问题的,虽然我规定了数据的格式,返还的格式,但是我们要明确的一点就是,TCP是面向字节流的,我如何确定一个发送过来的数据就是完整的,如果此时缺失了几个字符,那我们上面写的代码就有问题了。
TCP不像UDP,UDP是面向数据报的,就是对端发一个我就接收一个,这一定是一个完整的报文,但是如果使用TCP,这一次我可能发送了一长串的请求,格式也是正确的,那我们该怎么做呢?
那我们就要再说一下什么是TCP了,TCP是传输控制协议。
我们只是把我们自己定义的缓冲区中的数据放到了传输层给我们提供的发送缓冲区中,说白了我们曾经说过的IO接口本质上都是拷贝函数,只是你以为调用了send或者write就发送到了对方的主机中,这是不对的。
所以调用send就一直往缓冲区中拷贝,什么时候发不是应用层考虑的事情,当服务端拿到了数据时就从接收缓冲区中向我们定义的缓冲区中拷贝,拷了多少个也不知道,所以根本不能确定读到的是一个完整的报文。所以TCP发送的次数和TCP接收的次数没有关系,这就是面向字节流。
解决方法
要解决这个文件我就要再来看看我们定制的协议。
// 根据我们定制的协议和TCP发送的数据可能是这样的 x_ op_ y_x_ op_ y_x_ op_ y_x_ op_ y_
所以我们要重新定制协议,我们规定:
// 每个完整报文的格式必须是这样的: len\r\nx_ op_ y_\r\n
len就是我们读取到一个完整报文的长度,为了更好的处理,我们给报文中加入\r\n这种特殊字符对内容进行区分。
有了这个约定我们还要注意的是,这一次我们可能读到的数据是半个,这时候我们就得等一等,等到下次再读取到完整的报文时在做处理,所以我们需要一个缓冲区来记录这一次读到的数据。
bool Recv(int sock, std::string* out) // 修改一下Recv { char buffer[1024]; ssize_t s = recv(sock, buffer, sizeof(buffer) - 1, 0); if (s > 0) { buffer[s] = 0; *out += buffer; } else if (s == 0) { std::cout << "client quit" << std::endl; return false; } else { std::cout << "recv error" << std::endl; return false; } return true; } #define SEP "\r\n" #define SEP_LEN strlen(SEP) // len\r\nx_ op_ y_\r\n std::string Decode(std::string &buffer) { std::size_t pos = buffer.find(SEP); if (pos == std::string::npos) return ""; int size = atoi(buffer.substr(0, pos).c_str()); // 拿到len int surplus = buffer.size() - pos - 2 * SEP_LEN; // 去掉len和两个标记符的长度 if (surplus >= size) { // 至少有一个完整的报文 buffer.erase(0, pos + SEP_LEN); // 去掉len\r\n std::string s = buffer.substr(0, size); // 提取x_ op_ y_ buffer.erase(0, size + SEP_LEN); // 去掉x_ op_ y_\r\n return s; // 后面就是新的报文了 } else return ""; // 没有一个完整的报文 } void calculator(int sock) { // 1.读取数据 std::string inbuffer; while (true) { bool res = Recv(sock, &inbuffer); // 读到了一个请求 if (!res) break; // 读取成功 // 获得一个完整的报文 std::string package = Decode(inbuffer); if (package.empty()) continue; else { Request req; req.Deserialize(package); // 反序列化 Response resp = calculatorHelp(req); // 得到一个结构化的结果 std::string respString = resp.Serialize(); // 结果序列化 // 如果正在写入的时候对端关闭了,那就不能再写入了,此时OS回向进程发送SIGPIPE信号,所以我们设置忽略即可 Send(sock, respString); } } }
现在的流程就是这样的,我自己定义一个缓冲区,作为输出型参数传入Recv,这样就不需要Recv返回一个string了,只需要判断是否读取正确,而且调用recv接口的时候也是往我们自己定义的缓冲区中+=,这样就可以把传输层的缓冲区中的数据添加到后面,然后我们通过Decode函数拿到一个完整的报文,也要判断一下是否是一个完整的报文,之后的操作就是反序列化,处理数据返回Response结果,这一步也是需要重新定制协议的。
len\r\ncode_ result_\r\n
// len\r\nXXXXXXX\r\n std::string Encode(std::string& s) { std::string len = std::to_string(s.size()); std::string new_package = len; new_package += SEP; new_package += s; new_package += SEP; return new_package; } void calculator(int sock) { // 1.读取数据 std::string inbuffer; while (true) { // 1.读取数据 bool res = Recv(sock, &inbuffer); // 读到了一个请求 if (!res) break; std::cout << "inbuffer: " << inbuffer << std::endl; // 2.协议解析,获得一个完整的报文 std::string package = Decode(inbuffer); if (package.empty()) continue; std::cout << "package: " << package << std::endl; Request req; // 3.反序列化 req.Deserialize(package); // 4.业务逻辑,得到一个结构化的结果 Response resp = calculatorHelp(req); // 5.结果序列化 std::string respString = resp.Serialize(); std::cout << "respString: " << respString << std::endl; // 如果正在写入的时候对端关闭了,那就不能再写入了,此时OS回向进程发送SIGPIPE信号,所以我们设置忽略即可 // 6.在发送之前也要对结果加上len\r\ncode_ result_\r\n respString = Encode(respString); std::cout << "encode: " << respString << std::endl; // 7.发送也是有问题的,这个到后面再说 Send(sock, respString); } }
客户端当然也是要改的。
int main(int argc, char *argv[]) { if (argc != 3) { Usage(argv[0]); exit(1); } std::string server_ip = argv[1]; uint16_t server_port = atoi(argv[2]); Sock sock; int sockfd = sock.Socket(); // 创建套接字 if (!sock.Connect(sockfd, server_ip, server_port)) // 连接 { std::cerr << "Connect error" << std::endl; exit(2); } // 通信 bool quit = false; std::string s = ""; while (true) { // 1.获取 std::cout << "请输入(格式: x op y): "; std::getline(std::cin, s); if (s == "quit") { close(sockfd); break; } Request req; req.Deserialize(s); // 以上只是为了方便解析字符串才这样写的 // 2.序列化 s = req.Serialize(); // 3.添加报头 s = Encode(s); // 4.发送给服务端 Send(sockfd, s); // 5.读取 std::string buffer; while (true) { bool ret = Recv(sockfd, &buffer); if (!ret) { quit = true; break; } // 6.去掉报头 std::string package = Decode(buffer); if (package.empty()) continue; Response resp; resp.Deserialize(package); std::cout << "code: " << resp.code_ << std::endl; std::cout << "result: " << resp.result_ << std::endl; break; } } close(sockfd); return 0; }
这样我们的网络版本的计算器就已经完成了,输入输出的格式也可以自己修改。
守护进程
在这之前,我们运行起来的服务器都是前台进程
- 前台进程就是和终端相关联的进程
意思就是这个进程如果可以处理我输入的命令,那他就是一个前台进程,所以bash就是一个前台进程,我们可以在命令行中输入命令,并显示处理的结果。
如果我们把服务器启动,这是在输入ls这样的命令就不会再生效了,所以此时的bash就不再是前台进程了,而服务端就变成了前台进程。
- 我们使用shell登录的时候,只允许一个前台进程和多个后台进程
- 进程除了有pid和ppid,还要有组id
- 在命令行中通过管道同时启动多个进程,这些进程就是兄弟关系,父进程都是bash
在这之中有一行PGID,这就是组id,而且组id就是第一个进程的id。
- 同时被创建的多个进程可以成为一个进程组,组长就是第一个进程
在这之中还有一列就是SID,这叫做会话id。
- 任何一次登录的用户需要多个进程或进程组,来为用户提供服务(bash),用户也可以启动进程和进程组,那么这些进程或者服务都属于这一次会话
当我们登录后,系统会帮我们创建一次会话,创建bash和终端,之后就可以启动自己的进程或者进程组。退出登录后,系统要释放此次会话的资源,如果此时会话中还有正在运行的前台进程,那这个进程就有可能也要被释放,所以我们想要一个服务器一直运行,它就不能随着会话退出而终止。
我们使用Windows操作系统的时候,关机的位置会有一个注销的选项,注销就相当于退出登录,终止此次启动的服务和进程。
所以如果想要让服务器不随会话退出,那就需要把它拉出来,让他自成一个会话,原来的会话退出就不会影响它了,所以这就叫做守护进程。
setsid
返回值:成功返回这个进程的sid,失败返回-1
【注意】:setsid要成功调用,必须保证当前进程不是进程组的组长,进程组的组长就是启动的第一个进程,那让它不是第一个不就可以了吗。
守护进程函数
我们可以在服务器启动之前,调用我们自己写的函数,通过这个函数执行一系列操作。
- 忽略SIGPIPE、SIGCHLD信号
- fork创建子进程,让父进程直接退出
- 调用setsid
标准输入、输出、错误的重定向,它已经是一个独立的进程了,所以不能再向显示器打印
我们还要知道的就是在/dev目录中有一个null文件。
如果输出重定向到这个文件中,数据就会自动丢弃,输入重定向也读不到任何数据。
// Daemon.hpp #include <iostream> #include <signal.h> #include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> void MyDaemon() { // 1. 忽略信号,SIGPIPE,SIGCHLD signal(SIGPIPE, SIG_IGN); signal(SIGCHLD, SIG_IGN); // 2. 不要让自己成为进程组组长 if (fork() > 0) exit(0); // 父进程直接终止 // 那子进程就一定不是组长 // 3. 调用setsid setsid(); // 4. 标准输入、输出、错误的重定向,它已经是一个独立的进程了,所以不能再向显示器打印 int devnull = open("/dev/null", O_RDONLY | O_WRONLY); if (devnull > 0) { dup2(devnull, 0); dup2(devnull, 1); dup2(devnull, 2); close(devnull); } }
所以这就是我们为什么要写日志文件的原因,不能向显示器中打印,那就要向文件中打印。
当我们把服务端以守护进程的方式启动就会变成这样,pid、pgid、sid都是一样的,证明它自成一个会话,而它的ppid也变成了bash,所以守护进程也是孤儿进程的一种。
json库
我们上面自己写的字符串解析的方式一定会有问题的,所以我们也可以使用别人的库,使用sudo yum -y install jsoncpp-devel下载这个库,下面我们就来看看怎么使用吧,这个库中的数据是key-value结构的。
#include <iostream> #include <string> #include <jsoncpp/json/json.h> using namespace std; int main() { int x = 10; int y = 20; char z = '+'; Json::Value root; // 万能对象 root["x"] = x; // key-value root["y"] = y; root["op"] = z; Json::StyledWriter writer; // 以某种风格写入 string s = writer.write(root); cout << s << endl; return 0; }
使用StyledWriter输出的形式就是上面这样的,如果使用FastWriter就会是下面这样的。
序列化和反序列化
有了这个库,我们就可以重新编写一下序列化和反序列化的代码,虽然引入了库,但是需要改的地方只有序列化和反序列化,其他地方都不会变,添加报头和标记符都没有变。
#include <jsoncpp/json/json.h> class Request { public: std::string Serialize() // 序列化 { Json::Value root; // 万能对象 root["x"] = x_; root["y"] = y_; root["op"] = op_; Json::FastWriter writer; return writer.write(root); } bool Deserialize(const std::string& str) // 反序列化 { Json::Value root; Json::Reader reader; reader.parse(str, root); x_ = root["x"].asInt(); y_ = root["y"].asInt(); op_ = root["op"].asInt(); return true; } public: Request(){} Request(int x, int y, char op) :x_(x) ,y_(y) ,op_(op) {} ~Request(){} public: int x_; int y_; char op_; }; class Response { public: std::string Serialize() // 序列化 { Json::Value root; // 万能对象 root["code"] = code_; root["result"] = result_; Json::FastWriter writer; return writer.write(root); } bool Deserialize(const std::string& str) // 反序列化 { Json::Value root; Json::Reader reader; reader.parse(str, root); code_ = root["code"].asInt(); result_ = root["result"].asInt(); return true; } public: Response(){} Response(int result, int code) :result_(result) ,code_(code) {} ~Response(){} public: int result_; // 结果 int code_; // 结果状态码 };