文章目录
- 前言
- 一、自定义协议
- 传结构体对象
- 序列化和反序列化
- 什么是序列化?
- 反序列化
- 二、计算器服务端(线程池版本)
- 1.main.cc
- 2.Socket.hpp
- 3.protocol.hpp
- 4.Calculator.hpp
- 5.serverCal.hpp
- 6.threadPool.hpp
- 7.Task.hpp
- 8. log.hpp
- 客户端
- Windows客户端
- 运行
前言
我们已经学会了Tcp、Udp网络传输协议,并且之前我们也实现了简易的聊天室和翻译器。
我们知道,传输层是OS系统里就给我们写好的, 应用层才是我们自己需要去编写的,现在我们对应用层来进行一些初步的了解。
一、自定义协议
在之前我们使用Tcp和Udp服务进行网络通信,我们一直都是以字符串的形式互相发送消息。
那么我们就只能发送字符串吗? 当然不是,Udp是基于数据流进行网络通信,Udp是基于字节流进行网络通信,虽然我们对字节流和数据流并没有一个特别清晰的认识,但是我们可以知道的是,我们其实是可以传各种各样的类型进行通行的。
这里就提出一种方案
传结构体对象
从我们之前写的代码来看,只要我们在发送数据和接收数据时制定一个协议,每次只读写一个结构体对象的大小,那么我们就可以将该结构体进行通信。 而实际上,底层的很多库也确实是这么做的,但是这并不代表这种方式不存在明显弊端。
最大的弊端就是,你能保证在不同环境之下,同样的结构体类型在不同主机、不同环境下的大小是一样的吗。 结构体的大小涉及到许多,比如说结构体字节对齐,32位和64位系统下内置类型大小可能不同…
所以我们并不推崇这种方案。
而实际上我们其实更推崇传字符串的方式。
序列化和反序列化
什么是序列化?
在现实生活中,我们介绍自己,有些人习惯先说自己的名字,有些人习惯先说自己来自于哪一个城市。 如果你需要去统计大量的人的信息,最好就是先列一个表格,然后让他们严格按照表格上的个人信息顺序去介绍自己,这就是序列化。
再比如说我们今天要写一个网络版本的计算器,我可以写成1+1,也可以写成一加一,再也可以写写成1 + 1,中间带几个空格。 那么这样的话,我们服务器接受到的数据就是形形色色的,不利于我们去解析。
于是我们就制定一个协议,你必须要写成"1 + 1"的形式,否则就是违反协议!
反序列化
反序列化很简单,就比如说我们收到了一段数据,它是一个字符串形式的"1 + 1",我们就需要将该字符串进行解析,反序列化成int x = 1, char op = ‘+’ , int y = 1.
这也是一种反序列化。
二、计算器服务端(线程池版本)
1.main.cc
#include "serverCal.hpp"
void Usage(const char *mes)
{
std::cout << "Usage: " << mes << " port[8080-9000]" << std::endl;
}
const std::string default_ip = "0.0.0.0";
enum{
Usage_Err = 1
};
int main(int argc, char *argv[])
{
if (argc != 2)
{
Usage("./serverCal");
exit(Usage_Err);
}
ServerCal sc;
sc.Init(AF_INET, default_ip, atoi(argv[1]));
sc.Run();
return 0;
}
2.Socket.hpp
对套接字进行封装
#pragma once
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include "log.hpp"
enum
{
Socket_Err = 1,
Bind_Err,
Listen_Err
};
extern Log lg;
const int backlog = 10;
class Socket
{
public:
Socket()
: _sockfd(-1)
{
}
int Getfd()
{
return _sockfd;
}
void Init()
{
int socket_fd = socket(AF_INET, SOCK_STREAM, 0);
if (socket_fd < 0)
{
lg(Fatal, "Socket Create Failed...");
exit(Socket_Err);
}
lg(Info, "Socket Create Succeeded...");
_sockfd = socket_fd;
}
void Bind(const int sinfamily, const std::string &ip, const uint16_t port)
{
memset(&_sockaddr, 0, sizeof _sockaddr);
switch (sinfamily)
{
case AF_INET:
_sockaddr.sin_family = AF_INET;
break;
case AF_INET6:
_sockaddr.sin_family = AF_INET6;
break;
}
_sockaddr.sin_port = htons(port);
inet_aton(ip.c_str(), &(_sockaddr.sin_addr));
int n = bind(_sockfd, (const struct sockaddr *)&_sockaddr, sizeof _sockaddr);
if (n < 0)
{
lg(Fatal, "Bind Failed...");
exit(Bind_Err);
}
lg(Info, "Bind Succeeded..., port: %d", port);
}
void Listen()
{
int n = listen(_sockfd, backlog);
if (n < 0)
{
lg(Fatal, "Listen Failed...");
exit(Listen_Err);
}
lg(Info, "Listen Succeeded...");
}
int Accept(struct sockaddr_in *clientsock, socklen_t *len)
{
int fd = accept(_sockfd, (sockaddr *)clientsock, len);
if (fd < 0)
{
lg(Warning, "Accept Failed...");
return -1;
}
lg(Info, "Accept Succeeded..., Get A new Link, fd: %d", fd);
return fd;
}
int Connect(const std::string &ip, const std::string &port)
{
struct sockaddr_in serversock;
serversock.sin_port = htons(atoi(port.c_str()));
serversock.sin_family = AF_INET;
inet_aton(ip.c_str(), &serversock.sin_addr);
int n = connect(_sockfd, (const struct sockaddr *)&serversock, sizeof serversock);
if (n < 0)
{
lg(Warning, "Accept Failed...");
return n;
}
lg(Info, "Connect Succeeded...");
return n;
}
~Socket()
{
close(_sockfd);
}
private:
int _sockfd;
struct sockaddr_in _sockaddr;
};
3.protocol.hpp
序列化和反序列化的协议制定
#pragma once
#include <iostream>
#include <string>
#include "log.hpp"
extern Log lg;
const char blank_space_sep = ' ';
const char protocol_sep = '\n';
enum Code
{
Div_Zero_Err = 1,
Mod_Zeor_Err,
Operatorr_Err
};
class Request
{
public:
Request() {} // 提供一个无参构造
Request(int x, int y, char op)
: _x(x), _y(y), _operator(op) {}
bool serialize(std::string *out_str)
{
// 协议规定 字符串格式应序列化为"len\n""_x + _y\n"
std::string main_body = std::to_string(_x);
main_body += blank_space_sep;
main_body += _operator;
main_body += blank_space_sep;
main_body += std::to_string(_y);
*out_str = std::to_string(main_body.size());
*out_str += protocol_sep;
*out_str += main_body;
*out_str += protocol_sep;
return true;
}
bool deserialize(std::string &in_str)
{
// 协议规定 in_str的格式应为"len\n""_x + _y\n..."
size_t pos = in_str.find(protocol_sep);
if (pos == std::string::npos)
{
// 说明没找到'\n'
lg(Warning, "Message Format Error..., No Found The First Second \\n");
return false;
}
std::string sl = in_str.substr(0, pos);
int len = std::stoi(sl); // 如果这里的sl不是一串数字,stoi就会抛异常! BUG
int total_len = sl.size() + 1 + len + 1;
if (in_str.size() < total_len)
{
lg(Warning, "Message Format Error..., Lenth Error");
return false;
}
if (in_str[total_len - 1] != '\n')
{
lg(Warning, "Message Format Error..., No Found The Second \\n");
return false;
}
std::string main_body = in_str.substr(pos + 1, len);
// main_body"_x + _y"
int left = main_body.find(blank_space_sep);
if (left == std::string::npos)
{
// 说明没找到' '
lg(Warning, "Message Format Error..., No Found The First ' '");
return false;
}
int right = main_body.rfind(blank_space_sep);
if (left == right)
{
// 说明只有一个' '
lg(Warning, "Message Format Error...,No Found The Second ' '");
return false;
}
_x = std::stoi(main_body.substr(0, left)); // 如果这里的sl不是一串数字,stoi就会抛异常! BUG
_y = std::stoi(main_body.substr(right + 1)); // 如果这里的sl不是一串数字,stoi就会抛异常! BUG
_operator = main_body[left + 1];
in_str.erase(0, total_len);
return true;
}
void print()
{
std::cout << _x << " " << _operator << " " << _y << std::endl;
}
~Request() {}
public:
int _x;
int _y;
char _operator;
};
class Respond
{
public:
Respond() {} // 提供一个无参构造
Respond(int result, int code)
: _result(result), _code(code) {}
bool serialize(std::string *out_str)
{
// 协议规定 字符串格式应序列化为"_result _code"
*out_str = std::to_string(_result);
*out_str += blank_space_sep;
*out_str += std::to_string(_code);
return true;
}
bool deserialize(const std::string &in_str)
{
// 协议规定 in_str的格式应为"_result _code"
size_t pos = in_str.find(blank_space_sep);
if (pos == std::string::npos)
{
// 没找到字符' '
lg(Warning, "Result Message Error...");
return false;
}
_result = std::stoi(in_str.substr(0, pos));
_code = std::stoi(in_str.substr(pos + 1));
return true;
}
void print()
{
std::cout << _result << " " << _code << std::endl;
}
~Respond() {}
public:
int _result;
int _code; // 表示结果可信度 0表示可信
};
4.Calculator.hpp
计算器功能接口函数
#pragma once
#include"protocol.hpp"
class Calculator{
public:
Calculator() {}
Respond calculate(const Request& rq)
{
Respond rs;
switch (rq._operator)
{
case '+':
rs._result = rq._x + rq._y;
break;
case '-':
rs._result = rq._x - rq._y;
break;
case '*':
rs._result = rq._x * rq._y;
break;
case '/':
if(rq._y == 0)
{
lg(Warning,"Found Div Zero Error...");
rs._code = Div_Zero_Err;
}
rs._result = rq._x / rq._y;
break;
case '%':
if(rq._y == 0)
{
lg(Warning,"Found Mod Zero Error...");
rs._code = Mod_Zeor_Err;
}
rs._result = rq._x - rq._y;
break;
default:
lg(Warning,"Found Operator Error...");
rs._code = Operatorr_Err;
break;
}
return rs;
}
};
5.serverCal.hpp
代码如下(示例):
#pragma once
#include "Socket.hpp"
#include "protocol.hpp"
#include "threadPool.hpp"
#include "Task.hpp"
class ServerCal
{
public:
ServerCal()
{
}
void Init(const int sinfamily, const std::string &ip, const uint16_t port)
{
_listensock.Init();
_listensock.Bind(sinfamily, ip, port);
_listensock.Listen();
}
void Run()
{
ThreadPool<Task> *tp = ThreadPool<Task>::GetInstance();
tp->Start();
struct sockaddr_in client;
while (true)
{
memset(&client, 0, sizeof client);
socklen_t len;
int socketfd = _listensock.Accept(&client, &len);
if (socketfd < 0)
continue;
tp->Push(socketfd);
}
}
private:
Socket _listensock;
};
6.threadPool.hpp
很熟悉的线程池封装
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
struct ThreadInfo
{
pthread_t tid;
std::string name;
};
static const int defalutnum = 10;
template <class T>
class ThreadPool
{
public:
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void Unlock()
{
pthread_mutex_unlock(&mutex_);
}
void Wakeup()
{
pthread_cond_signal(&cond_);
}
void ThreadSleep()
{
pthread_cond_wait(&cond_, &mutex_);
}
bool IsQueueEmpty()
{
return tasks_.empty();
}
std::string GetThreadName(const pthread_t tid)
{
for (const auto &ti : threads_)
{
if (ti.tid == tid)
return ti.name;
}
return "None";
}
public:
static void *HandlerTask(void *args)
{
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
std::string name = tp->GetThreadName(pthread_self());
while (true)
{
tp->Lock();
while (tp->IsQueueEmpty())
{
tp->ThreadSleep();
}
T t = tp->Pop();
tp->Unlock();
t();
}
}
void Start()
{
int num = threads_.size();
for (int i = 0; i < num; i++)
{
threads_[i].name = "thread-" + std::to_string(i + 1);
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this);
}
}
T Pop()
{
T t = tasks_.front();
tasks_.pop();
return t;
}
void Push(const T &t)
{
Lock();
tasks_.push(t);
Wakeup();
Unlock();
}
static ThreadPool<T> *GetInstance()
{
if (nullptr == tp_) // ???
{
pthread_mutex_lock(&lock_);
if (nullptr == tp_)
{
// std::cout << "log: singleton create done first!" << std::endl;
tp_ = new ThreadPool<T>();
}
pthread_mutex_unlock(&lock_);
}
return tp_;
}
private:
ThreadPool(int num = defalutnum) : threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
ThreadPool(const ThreadPool<T> &) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete; // a=b=c
private:
std::vector<ThreadInfo> threads_;
std::queue<T> tasks_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
static ThreadPool<T> *tp_;
static pthread_mutex_t lock_;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::tp_ = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;
7.Task.hpp
派发给线程池的任务
#pragma once
#include "Socket.hpp"
#include "protocol.hpp"
#include "Calculator.hpp"
class Task
{
public:
Task(int socket_fd)
: _socket_fd(socket_fd)
{
}
void run()
{
char in_buffer[1024];
Calculator cal;
std::string message = "";
while (true)
{
memset(in_buffer, 0, sizeof in_buffer);
//std:: cout << "开始等待读取..." <<std::endl;
int n = read(_socket_fd, (void *)in_buffer, sizeof in_buffer - 1);
//std::cout << n << " " << strerror(errno) <<std::endl;
//std::cout << "读取到的有效字符为" << n << std::endl;
if (n == 0)
{
lg(Warning, "Connection closed by foreign host, socketfd[%d] closed...", _socket_fd);
break;
}
else if (n < 0)
{
lg(Warning, "Read Error, socketfd[%d]...", _socket_fd);
break;
}
in_buffer[n] = 0;
message += in_buffer;
//std::cout << "报文大小: "<< message.size() <<" ,报文内容: "<< message << std::endl;
Request rq;
if(!rq.deserialize(message)) continue;
Respond rs = cal.calculate(rq);
std::string res;
rs.serialize(&res);
printf("%d %c %d = %d\n",rq._x,rq._operator,rq._y,rs._result);
write(_socket_fd, res.c_str(), res.size());
}
}
void operator()()
{
run();
close(_socket_fd);
}
~Task()
{}
private:
int _socket_fd;
};
8. log.hpp
输出日志消息
#pragma once
#include <iostream>
#include <time.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#define SIZE 1024
#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4
#define Screen 1
#define Onefile 2
#define Classfile 3
#define LogFile "log.txt"
class Log
{
public:
Log()
{
printMethod = Screen;
path = "./log/";
}
void Enable(int method)
{
printMethod = method;
}
std::string levelToString(int level)
{
switch (level)
{
case Info:
return "Info";
case Debug:
return "Debug";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "None";
}
}
void printLog(int level, const std::string &logtxt)
{
switch (printMethod)
{
case Screen:
std::cout << logtxt << std::endl;
break;
case Onefile:
printOneFile(LogFile, logtxt);
break;
case Classfile:
printClassFile(level, logtxt);
break;
default:
break;
}
}
void printOneFile(const std::string &logname, const std::string &logtxt)
{
std::string _logname = path + logname;
int fd = open(_logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666); // "log.txt"
if (fd < 0)
return;
write(fd, logtxt.c_str(), logtxt.size());
close(fd);
}
void printClassFile(int level, const std::string &logtxt)
{
std::string filename = LogFile;
filename += ".";
filename += levelToString(level); // "log.txt.Debug/Warning/Fatal"
printOneFile(filename, logtxt);
}
~Log()
{
}
void operator()(int level, const char *format, ...)
{
time_t t = time(nullptr);
struct tm *ctime = localtime(&t);
char leftbuffer[SIZE];
snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
va_list s;
va_start(s, format);
char rightbuffer[SIZE];
vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
va_end(s);
// 格式:默认部分+自定义部分
char logtxt[SIZE * 2];
snprintf(logtxt, sizeof(logtxt), "%s %s", leftbuffer, rightbuffer);
// printf("%s", logtxt); // 暂时打印
printLog(level, logtxt);
}
private:
int printMethod;
std::string path;
};
Log lg;
客户端
#include "Socket.hpp"
#include "protocol.hpp"
void Usage(const char *mes)
{
std::cout << "Usage: " << mes << " ip[xxx.xxx.xxx.xxx] port[8080-9000]" << std::endl;
}
int main(int argc, char *argv[])
{
if (argc != 3)
{
Usage("./clientCal");
}
Socket local;
local.Init();
int n = local.Connect(argv[1], argv[2]);
if (n < 0)
{
return 1;
}
int localfd = local.Getfd();
std::cout << " 简易计算器, 目前仅支持\" + - * / %\"运算符 " << std::endl;
std::cout << " 数字和运算符请用空格或回车隔开" << std::endl;
Request rq;
Respond rs;
std::string message;
char buffer[1024];
while (true)
{
memset(buffer, 0, sizeof buffer);
std::cout << "请输入您的算式@ ";
std::cin >> rq._x >> rq._operator >> rq._y;
rq.serialize(&message);
write(localfd, message.c_str(), message.size());
// 开始等待结果
n = read(localfd, buffer, sizeof buffer - 1);
if (n == 0)
{
lg(Warning, "Connection closed by foreign host, socketfd[%d] closed...",localfd);
break;
}
else if (n < 0)
{
lg(Warning, "Read Error, socketfd[%d]...", localfd);
break;
}
buffer[n] = 0;
std::string res = buffer;
rs.deserialize(res);
if(rs._code != 0)
{
switch(rs._code)
{
case 1:
std::cout << "出现除0错误" << std::endl;
break;
case 2:
std::cout << "出现模0错误" << std::endl;
break;
case 3:
std::cout << "使用了除 + - * / % 以外的运算符" << std::endl;
break;
default:
std::cout << "发生未知错误" <<std::endl;
break;
}
continue;
}
printf("%d %c %d = %d\n",rq._x,rq._operator,rq._y,rs._result);
}
return 0;
}
Windows客户端
#include<iostream>
#include<string>
#include<WinSock2.h>
#include<Windows.h>
#include<functional>
#include<stdlib.h>
#include"protocol.hpp"
#pragma comment(lib, "ws2_32.lib")
#pragma warning(disable:4996)
#pragma execution_character_set("utf-8")
const int server_port = 8889;
const std::string server_ip = "43.143.58.29";
int main()
{
//初始化网络环境
WSADATA wsd;
WSAStartup(MAKEWORD(2, 2), &wsd);
system("chcp 65001");
//申请套接字
SOCKET socket_fd = socket(AF_INET, SOCK_STREAM, 0);
if (socket_fd == SOCKET_ERROR)
{
perror("Socket Error");
exit(1);
}
//创建并初始化Server端sockaddr_in结构体
struct sockaddr_in server;
memset(&server, 0, sizeof server);
server.sin_family = AF_INET;
server.sin_addr.s_addr = inet_addr(server_ip.c_str());
server.sin_port = htons(server_port);
//开始连接服务器
int n = connect(socket_fd, (const struct sockaddr*)&server, sizeof server);
if (n < 0)
{
//连接失败
std::cout << "Connect Failed" << std::endl;
return 1;
}
std::cout << " 简易计算器, 目前仅支持\" + - * / %\"运算符 " << std::endl;
std::cout << " 数字和运算符请用空格或回车隔开" << std::endl;
Request rq;
Respond rs;
std::string message;
char buffer[1024];
while (true)
{
memset(buffer, 0, sizeof buffer);
std::cout << "请输入您的算式@ ";
std::cin >> rq._x >> rq._operator >> rq._y;
rq.serialize(&message);
send(socket_fd, message.c_str(), (int)message.size(),0);
// 开始等待结果
n = recv(socket_fd, buffer, sizeof buffer - 1,0);
if (n == 0)
{
lg(Warning, "Connection closed by foreign host, socketfd[%d] closed...", socket_fd);
break;
}
else if (n < 0)
{
lg(Warning, "Read Error, socketfd[%d]...", socket_fd);
break;
}
buffer[n] = 0;
std::string res = buffer;
rs.deserialize(res);
if (rs._code != 0)
{
switch (rs._code)
{
case 1:
std::cout << "出现除0错误" << std::endl;
break;
case 2:
std::cout << "出现模0错误" << std::endl;
break;
case 3:
std::cout << "使用了除 + - * / % 以外的运算符" << std::endl;
break;
default:
std::cout << "发生未知错误" << std::endl;
break;
}
continue;
}
printf("%d %c %d = %d\n", rq._x, rq._operator, rq._y, rs._result);
}
//清理环境
closesocket(socket_fd);
WSACleanup();
return 0;
}