并发服务器框架——zinx

zinx框架

Zinx 是一个用 Go 语言编写的高性能、轻量级的 TCP 服务器框架,它被设计为简单、快速且易于使用。Zinx 提供了一系列的功能,包括但不限于连接管理、数据编解码、业务处理、负载均衡等,适用于构建各种 TCP 网络服务,如游戏服务器、即时通讯服务器等。

下面实现zinx的多个功能包括:路由、全局配置、消息封装、读写分离、消息队列、链接管理等。

在这里插入图片描述
utils包下GlobalObj.go

package utils

import (
	"datarace/zinx/ziface"
	"encoding/json"
	"io/ioutil"
)

/*
存储一切有关Zinx框架的全局参数,供其他模块使用
一些参数也可以通过 用户根据 zinx.json来配置
*/
type GlobalObj struct {
	TcpServer        ziface.IServer //当前Zinx的全局Server对象
	Host             string         //当前服务器主机IP
	TcpPort          int            //当前服务器主机监听端口号
	Name             string         //当前服务器名称
	Version          string         //当前Zinx版本号
	MaxPacketSize    uint32         //都需数据包的最大值
	MaxConn          int            //当前服务器主机允许的最大链接个数
	WorkerPoolSize   uint32         //业务工作Worker池的数量
	MaxWorkerTaskLen uint32         //业务工作Worker对应负责的任务队列最大任务存储数量
	ConfFilePath     string
	MaxMsgChanLen    int
}

/*
定义一个全局的对象
*/
var GlobalObject *GlobalObj

// 读取用户的配置文件
func (g *GlobalObj) Reload() {
	data, err := ioutil.ReadFile("zinx.json")
	if err != nil {
		panic(err)
	}
	//将json数据解析到struct中
	//fmt.Printf("json :%s\n", data)
	err = json.Unmarshal(data, &GlobalObject)
	if err != nil {
		panic(err)
	}
}
func init() {
	//初始化GlobalObject变量,设置一些默认值
	GlobalObject = &GlobalObj{
		Name:             "ZinxServerApp",
		Version:          "V0.4",
		TcpPort:          7777,
		Host:             "0.0.0.0",
		MaxConn:          12000,
		MaxPacketSize:    4096,
		ConfFilePath:     "conf/zinx.json",
		WorkerPoolSize:   10,
		MaxWorkerTaskLen: 1024,
	}
	//从配置文件中加载一些用户配置的参数
	GlobalObject.Reload()
}

ziface包

package ziface

type IConnManager interface {
	Add(conn IConnection)                   //添加链接
	Remove(conn IConnection)                //删除连接
	Get(connID uint32) (IConnection, error) //利用ConnID获取链接
	Len() int                               //获取当前连接
	ClearConn()                             //删除并停止所有链接
}

package ziface

import "net"

type IConnection interface {
	Start()
	Stop()
	GetConnID() uint32
	GetTCPConnection() *net.TCPConn
	RemoteAddr() net.Addr
	SendMsg(msgId uint32, data []byte) error
	//直接将Message数据发送给远程的TCP客户端(有缓冲)
	SendBuffMsg(msgId uint32, data []byte) error //添加带缓冲发送消息接口
	//设置链接属性
	SetProperty(key string, value interface{})
	//获取链接属性
	GetProperty(key string) (interface{}, error)
	//移除链接属性
	RemoveProperty(key string)
}
type HandFunc func(*net.TCPConn, []byte, int) error

package ziface

type IDataPack interface {
	GetHeadLen() int32
	Pack(msg IMessage) ([]byte, error)
	Unpack([]byte) (IMessage, error)
}
package ziface

type IMessage interface {
	GetDataLen() uint32
	GetMsgId() uint32
	GetData() []byte
	SetMsgId(uint32)
	SetData([]byte)
	SetDataLen(uint32)
}

package ziface

type IMsgHandle interface {
	DoMsgHandler(request IRequest)          //马上以非阻塞方式处理消息
	AddRouter(msgId uint32, router IRouter) //为消息添加具体的处理逻辑
	StartWorkerPool()                       //启动worker工作池
	SendMsgToTaskQueue(request IRequest)    //将消息交给TaskQueue,由worker进行处理
}

package ziface

type IRequest interface {
	GetConnection() IConnection
	GetData() []byte
	GetMsgID() uint32
}

package ziface

type IRouter interface {
	PreHandle(req IRequest)
	Handle(req IRequest)
	PostHandle(req IRequest)
}

package ziface

type IServer interface {
	Start()
	Stop()
	Serve()
	AddRouter(msgId uint32, router IRouter)
	//得到链接管理
	GetConnMgr() IConnManager
	//设置该Server的连接创建时Hook函数
	SetOnConnStart(func(IConnection))
	//设置该Server的连接断开时的Hook函数
	SetOnConnStop(func(IConnection))
	//调用连接OnConnStart Hook函数
	CallOnConnStart(conn IConnection)
	//调用连接OnConnStop Hook函数
	CallOnConnStop(conn IConnection)
}

znet包
connection

package znet

import (
	"datarace/zinx/utils"
	"datarace/zinx/ziface"
	"errors"
	"fmt"
	"io"
	"net"
	"sync"
)

type Connection struct {
	//当前Conn属于哪个Server
	TcpServer ziface.IServer //当前conn属于哪个server,在conn初始化的时候添加即可
	//当前连接的socket TCP套接字
	Conn *net.TCPConn
	//当前连接的ID 也可以称作为SessionID,ID全局唯一
	ConnID uint32
	//当前连接的关闭状态
	isClosed bool
	//消息管理MsgId和对应处理方法的消息管理模块
	MsgHandler ziface.IMsgHandle
	//告知该链接已经退出/停止的channel
	ExitBuffChan chan bool
	//无缓冲管道,用于读、写两个goroutine之间的消息通信
	msgChan chan []byte
	//有关冲管道,用于读、写两个goroutine之间的消息通信
	msgBuffChan chan []byte
	//链接属性
	property map[string]interface{}
	//保护链接属性修改的锁
	propertyLock sync.RWMutex
}

// 创建连接的方法
func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
	//初始化Conn属性
	c := &Connection{
		TcpServer:    server, //将隶属的server传递进来
		Conn:         conn,
		ConnID:       connID,
		isClosed:     false,
		MsgHandler:   msgHandler,
		ExitBuffChan: make(chan bool, 1),
		msgChan:      make(chan []byte),
		msgBuffChan:  make(chan []byte, utils.GlobalObject.MaxMsgChanLen), //不要忘记初始化
		property:     make(map[string]interface{}),                        //对链接属性map初始化
	}
	//将新创建的Conn添加到链接管理中
	c.TcpServer.GetConnMgr().Add(c) //将当前新创建的连接添加到ConnManager中
	return c
}

// 设置链接属性
func (c *Connection) SetProperty(key string, value interface{}) {
	c.propertyLock.Lock()
	defer c.propertyLock.Unlock()
	c.property[key] = value
}

// 获取链接属性
func (c *Connection) GetProperty(key string) (interface{}, error) {
	c.propertyLock.RLock()
	defer c.propertyLock.RUnlock()
	if value, ok := c.property[key]; ok {
		return value, nil
	} else {
		return nil, errors.New("no property found")
	}
}

// 移除链接属性
func (c *Connection) RemoveProperty(key string) {
	c.propertyLock.Lock()
	defer c.propertyLock.Unlock()
	delete(c.property, key)
}
func (c *Connection) startReader() {
	fmt.Println("[Reader Goroutine is running]")
	defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]")
	defer c.Stop()
	for {
		// 创建拆包解包的对象
		dp := NewDataPack()
		//读取客户端的Msg head
		headData := make([]byte, dp.GetHeadLen())
		if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
			fmt.Println("read msg head error ", err)
			break
		}
		//拆包,得到msgid 和 datalen 放在msg中
		msg, err := dp.Unpack(headData)
		if err != nil {
			fmt.Println("unpack error ", err)
			break
		}
		//根据 dataLen 读取 data,放在msg.Data中
		var data []byte
		if msg.GetDataLen() > 0 {
			data = make([]byte, msg.GetDataLen())
			if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
				fmt.Println("read msg data error ", err)
				continue
			}
		}
		msg.SetData(data)
		//得到当前客户端请求的Request数据
		req := Request{
			conn: c,
			msg:  msg,
		}
		//从绑定好的消息和对应的处理方法中执行对应的Handle方法
		if utils.GlobalObject.WorkerPoolSize > 0 {
			//已经启动工作池机制,将消息交给Worker处理
			c.MsgHandler.SendMsgToTaskQueue(&req)
		} else {
			//从绑定好的消息和对应的处理方法中执行对应的Handle方法
			go c.MsgHandler.DoMsgHandler(&req)
		}
	}
}

// 启动连接,让当前连接开始工作
func (c *Connection) Start() {
	//1 开启用户从客户端读取数据流程的Goroutine
	go c.startReader()
	//2 开启用于写回客户端数据流程的Goroutine
	go c.StartWriter()
	//按照用户传递进来的创建连接时需要处理的业务,执行钩子方法
	c.TcpServer.CallOnConnStart(c)
}

//func (c *Connection) Start() {
//	go c.startReader()
//	for {
//		select {
//		case <-c.ExitBuffChan:
//			return
//		}
//	}
//}

// 停止连接,结束当前连接状态M
func (c *Connection) Stop() {
	fmt.Println("Conn Stop()...ConnID = ", c.ConnID)
	//如果当前链接已经关闭
	if c.isClosed == true {
		return
	}
	c.isClosed = true
	//==================
	//如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用
	c.TcpServer.CallOnConnStop(c)
	//==================
	// 关闭socket链接
	c.Conn.Close()
	//关闭Writer
	c.ExitBuffChan <- true
	//将链接从连接管理器中删除
	c.TcpServer.GetConnMgr().Remove(c)
	//关闭该链接全部管道
	close(c.ExitBuffChan)
	close(c.msgBuffChan)
}

/*
写消息Goroutine, 用户将数据发送给客户端
*/
func (c *Connection) StartWriter() {
	fmt.Println("[Writer Goroutine is running]")
	defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")
	for {
		select {
		case data := <-c.msgChan:
			//有数据要写给客户端
			if _, err := c.Conn.Write(data); err != nil {
				fmt.Println("Send Data error:, ", err, " Conn Writer exit")
				return
			}
			//针对有缓冲channel需要些的数据处理
		case data, ok := <-c.msgBuffChan:
			if ok {
				//有数据要写给客户端
				if _, err := c.Conn.Write(data); err != nil {
					fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit")
					return
				}
			} else {
				break
				fmt.Println("msgBuffChan is Closed")
			}
		case <-c.ExitBuffChan:
			return
		}
	}
}

// 直接将Message数据发送数据给远程的TCP客户端
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
	if c.isClosed == true {
		return errors.New("Connection closed when send msg")
	}
	//将data封包,并且发送
	dp := NewDataPack()
	msg, err := dp.Pack(NewMsgPackage(msgId, data))
	if err != nil {
		fmt.Println("Pack error msg id = ", msgId)
		return errors.New("Pack error msg ")
	}
	//写回客户端
	c.msgChan <- msg //将之前直接回写给conn.Write的方法 改为 发送给Channel 供Writer读取
	return nil
}

// 从当前连接获取原始的socket TCPConn
func (c *Connection) GetTCPConnection() *net.TCPConn {
	return c.Conn
}

// 获取当前连接ID
func (c *Connection) GetConnID() uint32 {
	return c.ConnID
}

// 获取远程客户端地址信息
func (c *Connection) RemoteAddr() net.Addr {
	return c.Conn.RemoteAddr()
}

// // 直接将Message数据发送数据给远程的TCP客户端
//
//	func (c *Connection) SendMsg(msgId uint32, data []byte) error {
//		if c.isClosed == true {
//			return errors.New("Connection closed when send msg")
//		}
//		//将data封包,并且发送
//		dp := NewDataPack()
//		msg, err := dp.Pack(NewMsgPackage(msgId, data))
//		if err != nil {
//			fmt.Println("Pack error msg id = ", msgId)
//			return errors.New("Pack error msg ")
//		}
//		//写回客户端
//		if _, err := c.Conn.Write(msg); err != nil {
//			fmt.Println("Write msg id ", msgId, " error ")
//			c.ExitBuffChan <- true
//			return errors.New("conn Write error")
//		}
//		return nil
//	}
func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {
	if c.isClosed == true {
		return errors.New("Connection closed when send buff msg")
	}
	//将data封包,并且发送
	dp := NewDataPack()
	msg, err := dp.Pack(NewMsgPackage(msgId, data))
	if err != nil {
		fmt.Println("Pack error msg id = ", msgId)
		return errors.New("Pack error msg ")
	}
	//写回客户端
	c.msgBuffChan <- msg
	return nil
}

ConnManager

package znet

import (
	"datarace/zinx/ziface"
	"errors"
	"fmt"
	"sync"
)

type ConnManager struct {
	connections map[uint32]ziface.IConnection
	connLock    sync.RWMutex
}

/*
创建一个链接管理
*/
func NewConnManager() *ConnManager {
	return &ConnManager{
		connections: make(map[uint32]ziface.IConnection),
	}
}

// 添加链接
func (connMgr *ConnManager) Add(conn ziface.IConnection) {
	//保护共享资源Map 加写锁
	connMgr.connLock.Lock()
	defer connMgr.connLock.Unlock()
	//将conn连接添加到ConnMananger中
	connMgr.connections[conn.GetConnID()] = conn
	fmt.Println("connection add to ConnManager successfully: conn num = ", connMgr.Len())
}

// 删除连接
func (connMgr *ConnManager) Remove(conn ziface.IConnection) {
	//保护共享资源Map 加写锁
	connMgr.connLock.Lock()
	defer connMgr.connLock.Unlock()
	//删除连接信息
	delete(connMgr.connections, conn.GetConnID())
	fmt.Println("connection Remove ConnID=", conn.GetConnID(), " successfully: conn num = ", connMgr.Len())
}

// 利用ConnID获取链接
func (connMgr *ConnManager) Get(connID uint32) (ziface.IConnection, error) {
	//保护共享资源Map 加读锁
	connMgr.connLock.RLock()
	defer connMgr.connLock.RUnlock()
	if conn, ok := connMgr.connections[connID]; ok {
		return conn, nil
	} else {
		return nil, errors.New("connection not found")
	}
}

// 获取当前连接
func (connMgr *ConnManager) Len() int {
	return len(connMgr.connections)
}

// 清除并停止所有连接
func (connMgr *ConnManager) ClearConn() {
	//保护共享资源Map 加写锁
	connMgr.connLock.Lock()
	defer connMgr.connLock.Unlock()
	//停止并删除全部的连接信息
	for connID, conn := range connMgr.connections {
		//停止
		conn.Stop()
		//删除
		delete(connMgr.connections, connID)
	}
	fmt.Println("Clear All Connections successfully: conn num = ", connMgr.Len())
}

datapack

package znet

import (
	"bytes"
	"datarace/zinx/utils"
	"datarace/zinx/ziface"
	"encoding/binary"
	"errors"
)

type DataPack struct{}

func NewDataPack() *DataPack {
	return &DataPack{}
}
func (dp *DataPack) GetHeadLen() uint32 {
	//Id uint32(4字节) +  DataLen uint32(4字节)
	return 8
}

// 封包方法(压缩数据)
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {
	//创建一个存放bytes字节的缓冲
	dataBuff := bytes.NewBuffer([]byte{})
	//写dataLen
	if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetDataLen()); err != nil {
		return nil, err
	}
	//写msgID
	if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil {
		return nil, err
	}
	//写data数据
	if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil {
		return nil, err
	}
	return dataBuff.Bytes(), nil
}
func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) {
	//创建一个从输入二进制数据的ioReader
	dataBuff := bytes.NewReader(binaryData)
	//只解压head的信息,得到dataLen和msgID
	msg := &Message{}
	//读dataLen
	if err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil {
		return nil, err
	}
	//读msgID
	if err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil {
		return nil, err
	}
	//判断dataLen的长度是否超出我们允许的最大包长度
	if utils.GlobalObject.MaxPacketSize > 0 && msg.DataLen > utils.GlobalObject.MaxPacketSize {
		return nil, errors.New("Too large msg data recieved")
	}
	//这里只需要把head的数据拆包出来就可以了,然后再通过head的长度,再从conn读取一次数据
	return msg, nil
}

message

package znet

type Message struct {
	Id      uint32
	DataLen uint32
	Data    []byte
}

func NewMsgPackage(id uint32, data []byte) *Message {
	return &Message{
		Id:      id,
		DataLen: uint32(len(data)),
		Data:    data,
	}
}

// 获取消息数据段长度
func (msg *Message) GetDataLen() uint32 {
	return msg.DataLen
}

// 获取消息ID
func (msg *Message) GetMsgId() uint32 {
	return msg.Id
}

// 获取消息内容
func (msg *Message) GetData() []byte {
	return msg.Data
}

// 设置消息数据段长度
func (msg *Message) SetDataLen(len uint32) {
	msg.DataLen = len
}

// 设计消息ID
func (msg *Message) SetMsgId(msgId uint32) {
	msg.Id = msgId
}

// 设计消息内容
func (msg *Message) SetData(data []byte) {
	msg.Data = data
}

msgHandler

package znet

import (
	"datarace/zinx/utils"
	"datarace/zinx/ziface"
	"fmt"
	"strconv"
)

type MsgHandle struct {
	Apis           map[uint32]ziface.IRouter
	WorkerPoolSize uint32
	TaskQueue      []chan ziface.IRequest
}

func NewMsgHandle() *MsgHandle {
	return &MsgHandle{
		Apis:           make(map[uint32]ziface.IRouter),
		WorkerPoolSize: utils.GlobalObject.WorkerPoolSize,
		TaskQueue:      make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize),
	}
}

// 马上以非阻塞方式处理消息
func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {
	handler, ok := mh.Apis[request.GetMsgID()]
	if !ok {
		fmt.Println("api msgId = ", request.GetMsgID(), " is not FOUND!")
		return
	}
	//执行对应处理方法
	handler.PreHandle(request)
	handler.Handle(request)
	handler.PostHandle(request)
}

// 为消息添加具体的处理逻辑
func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {
	//1 判断当前msg绑定的API处理方法是否已经存在
	if _, ok := mh.Apis[msgId]; ok {
		panic("repeated api , msgId = " + strconv.Itoa(int(msgId)))
	}
	//2 添加msg与api的绑定关系
	mh.Apis[msgId] = router
	fmt.Println("Add api msgId = ", msgId)
}

// 启动一个Worker工作流程
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {
	fmt.Println("Worker ID = ", workerID, " is started.")
	//不断的等待队列中的消息
	for {
		select {
		//有消息则取出队列的Request,并执行绑定的业务方法
		case request := <-taskQueue:
			mh.DoMsgHandler(request)
		}
	}
}

// 启动worker工作池
func (mh *MsgHandle) StartWorkerPool() {
	//遍历需要启动worker的数量,依此启动
	for i := 0; i < int(mh.WorkerPoolSize); i++ {
		//一个worker被启动
		//给当前worker对应的任务队列开辟空间
		mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)
		//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
		go mh.StartOneWorker(i, mh.TaskQueue[i])
	}
}

// 将消息交给TaskQueue,由worker进行处理
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {
	//根据ConnID来分配当前的连接应该由哪个worker负责处理
	//轮询的平均分配法则
	//得到需要处理此条连接的workerID
	workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize
	fmt.Println("Add ConnID=", request.GetConnection().GetConnID(), " request msgID=", request.GetMsgID(), "to workerID=", workerID)
	//将请求消息发送给任务队列
	mh.TaskQueue[workerID] <- request
}

request

package znet

import (
	"datarace/zinx/ziface"
)

type Request struct {
	conn ziface.IConnection
	msg  ziface.IMessage
}

func (r *Request) GetConnection() ziface.IConnection {
	return r.conn
}

// 获取请求消息的数据
func (r *Request) GetData() []byte {
	return r.msg.GetData()
}

// 获取请求的消息的ID
func (r *Request) GetMsgID() uint32 {
	return r.msg.GetMsgId()
}

router

package znet

import "datarace/zinx/ziface"

type BaseRouter struct{}

func (br *BaseRouter) PreHandle(request ziface.IRequest)  {}
func (br *BaseRouter) Handle(request ziface.IRequest)     {}
func (br *BaseRouter) PostHandle(request ziface.IRequest) {}

server

package znet

import (
	"datarace/zinx/utils"
	"datarace/zinx/ziface"
	"fmt"
	"net"
	"time"
)

// iServer 接口实现,定义一个Server服务类
type Server struct {
	//服务器的名称
	Name string
	//tcp4 or other
	IPVersion string
	//服务绑定的IP地址
	IP string
	//服务绑定的端口
	Port int
	//当前Server的消息管理模块,用来绑定MsgId和对应的处理方法
	msgHandler ziface.IMsgHandle
	//当前Server的链接管理器
	ConnMgr ziface.IConnManager
	//新增两个hook函数原型
	//该Server的连接创建时Hook函数
	OnConnStart func(conn ziface.IConnection)
	//该Server的连接断开时的Hook函数
	OnConnStop func(conn ziface.IConnection)
}

// 得到链接管理
func (s *Server) GetConnMgr() ziface.IConnManager {
	return s.ConnMgr
}

// ============== 实现 ziface.IServer 里的全部接口方法 ========
// 开启网络服务
func (s *Server) Start() {
	fmt.Printf("[START] Server listenner at IP: %s, Port %d, is starting\n", s.IP, s.Port)
	fmt.Printf("[START] Server name: %s,listenner at IP: %s, Port %d is starting\n", s.Name, s.IP, s.Port)
	fmt.Printf("[Zinx] Version: %s, MaxConn: %d,  MaxPacketSize: %d\n",
		utils.GlobalObject.Version,
		utils.GlobalObject.MaxConn,
		utils.GlobalObject.MaxPacketSize)
	//开启一个go去做服务端Linster业务
	go func() {
		//1 获取一个TCP的Addr
		s.msgHandler.StartWorkerPool()
		addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
		if err != nil {
			fmt.Println("resolve tcp addr err: ", err)
			return
		}
		//2 监听服务器地址
		listenner, err := net.ListenTCP(s.IPVersion, addr)
		if err != nil {
			fmt.Println("listen", s.IPVersion, "err", err)
			return
		}
		//已经监听成功
		fmt.Println("start Zinx server  ", s.Name, " succ, now listenning...")
		var cid uint32
		cid = 0
		//3 启动server网络连接业务
		for {
			//3.1 阻塞等待客户端建立连接请求
			conn, err := listenner.AcceptTCP()
			if err != nil {
				fmt.Println("Accept err ", err)
				continue
			}
			//=============
			//3.2 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接
			if s.ConnMgr.Len() >= utils.GlobalObject.MaxConn {
				conn.Close()
				continue
			}
			//=============
			//3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的
			dealConn := NewConntion(s, conn, cid, s.msgHandler)
			cid++
			//3.4 启动当前链接的处理业务
			go dealConn.Start()
			//go func() {
			//	//不断的循环从客户端获取数据
			//	for {
			//		buf := make([]byte, 512)
			//		cnt, err := conn.Read(buf)
			//		if err != nil {
			//			fmt.Println("recv buf err ", err)
			//			continue
			//		}
			//		//回显
			//		if _, err := conn.Write(buf[:cnt]); err != nil {
			//			fmt.Println("write back buf err ", err)
			//			continue
			//		}
			//	}
			//}()
		}
	}()
}
func (s *Server) Stop() {
	fmt.Println("[STOP] Zinx server , name ", s.Name)
	//将其他需要清理的连接信息或者其他信息 也要一并停止或者清理
	s.ConnMgr.ClearConn()
}
func (s *Server) Serve() {
	s.Start()
	//TODO Server.Serve() 是否在启动服务的时候 还要处理其他的事情呢 可以在这里添加
	//阻塞,否则主Go退出, listenner的go将会退出
	for {
		time.Sleep(10 * time.Second)
	}
}
func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {
	s.msgHandler.AddRouter(msgId, router)
	fmt.Println("Add Router SUCC!! msgID = ", msgId)
}

/*
创建一个服务器句柄
*/
func NewServer() *Server {
	utils.GlobalObject.Reload()
	s := &Server{
		Name:       utils.GlobalObject.Name,
		IPVersion:  "tcp4",
		IP:         utils.GlobalObject.Host,
		Port:       utils.GlobalObject.TcpPort,
		msgHandler: NewMsgHandle(),   //msgHandler 初始化
		ConnMgr:    NewConnManager(), //创建ConnManage
	}
	return s
}

// 设置该Server的连接创建时Hook函数
func (s *Server) SetOnConnStart(hookFunc func(ziface.IConnection)) {
	s.OnConnStart = hookFunc
}

// 设置该Server的连接断开时的Hook函数
func (s *Server) SetOnConnStop(hookFunc func(ziface.IConnection)) {
	s.OnConnStop = hookFunc
}

// 调用连接OnConnStart Hook函数
func (s *Server) CallOnConnStart(conn ziface.IConnection) {
	if s.OnConnStart != nil {
		fmt.Println("---> CallOnConnStart....")
		s.OnConnStart(conn)
	}
}

// 调用连接OnConnStop Hook函数
func (s *Server) CallOnConnStop(conn ziface.IConnection) {
	if s.OnConnStop != nil {
		fmt.Println("---> CallOnConnStop....")
		s.OnConnStop(conn)
	}
}

客户端

package main

import (
	"datarace/zinx/znet"
	"fmt"
	"io"
	"net"
	"time"
)

/*
模拟客户端
*/
func main() {
	fmt.Println("Client Test ... start")
	//3秒之后发起测试请求,给服务端开启服务的机会
	time.Sleep(3 * time.Second)
	conn, err := net.Dial("tcp", "127.0.0.1:7777")
	if err != nil {
		fmt.Println("client start err, exit!")
		return
	}
	for {
		//发封包message消息
		dp := znet.NewDataPack()
		msg, _ := dp.Pack(znet.NewMsgPackage(0, []byte("Zinx V0.8 Client0 Test Message")))
		_, err := conn.Write(msg)
		if err != nil {
			fmt.Println("write error err ", err)
			return
		}
		//先读出流中的head部分
		headData := make([]byte, dp.GetHeadLen())
		_, err = io.ReadFull(conn, headData) //ReadFull 会把msg填充满为止
		if err != nil {
			fmt.Println("read head error")
			break
		}
		//将headData字节流 拆包到msg中
		msgHead, err := dp.Unpack(headData)
		if err != nil {
			fmt.Println("server unpack err:", err)
			return
		}
		if msgHead.GetDataLen() > 0 {
			//msg 是有data数据的,需要再次读取data数据
			msg := msgHead.(*znet.Message)
			msg.Data = make([]byte, msg.GetDataLen())
			//根据dataLen从io中读取字节流
			_, err := io.ReadFull(conn, msg.Data)
			if err != nil {
				fmt.Println("server unpack data err:", err)
				return
			}
			fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))
		}
		time.Sleep(1 * time.Second)
	}
}

服务端

package main

import (
	"datarace/zinx/ziface"
	"datarace/zinx/znet"
	"fmt"
)

// ping test 自定义路由
type PingRouter struct {
	znet.BaseRouter
}

// Ping Handle
func (this *PingRouter) Handle(request ziface.IRequest) {
	fmt.Println("Call PingRouter Handle")
	//先读取客户端的数据,再回写ping...ping...ping
	fmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))
	err := request.GetConnection().SendBuffMsg(0, []byte("ping...ping...ping"))
	if err != nil {
		fmt.Println(err)
	}
}

type HelloZinxRouter struct {
	znet.BaseRouter
}

// HelloZinxRouter Handle
func (this *HelloZinxRouter) Handle(request ziface.IRequest) {
	fmt.Println("Call HelloZinxRouter Handle")
	//先读取客户端的数据,再回写ping...ping...ping
	fmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))
	err := request.GetConnection().SendBuffMsg(1, []byte("Hello Zinx Router V0.10"))
	if err != nil {
		fmt.Println(err)
	}
}

// 创建连接的时候执行
func DoConnectionBegin(conn ziface.IConnection) {
	fmt.Println("DoConnecionBegin is Called ... ")
	//=============设置两个链接属性,在连接创建之后===========
	fmt.Println("Set conn Name, Home done!")
	conn.SetProperty("Name", "Aceld")
	conn.SetProperty("Home", "https://www.jianshu.com/u/35261429b7f1")
	//===================================================
	err := conn.SendMsg(2, []byte("DoConnection BEGIN..."))
	if err != nil {
		fmt.Println(err)
	}
}

// 连接断开的时候执行
func DoConnectionLost(conn ziface.IConnection) {
	//============在连接销毁之前,查询conn的Name,Home属性=====
	if name, err := conn.GetProperty("Name"); err == nil {
		fmt.Println("Conn Property Name = ", name)
	}
	if home, err := conn.GetProperty("Home"); err == nil {
		fmt.Println("Conn Property Home = ", home)
	}
	//===================================================
	fmt.Println("DoConneciotnLost is Called ... ")
}
func main() {
	//创建一个server句柄
	s := znet.NewServer()
	//注册链接hook回调函数
	s.SetOnConnStart(DoConnectionBegin)
	s.SetOnConnStop(DoConnectionLost)
	//配置路由
	s.AddRouter(0, &PingRouter{})
	s.AddRouter(1, &HelloZinxRouter{})
	//开启服务
	s.Serve()
}

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/948931.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

前端(API)学习笔记(CLASS 4):进阶

1、日期对象 日期对象&#xff1a;用来表示事件的对象 作用&#xff1a;可以得到当前系统时间 1、实例化 在代码中发现了new关键字&#xff0c;一般将这个操作称为实例化 创建一个时间对象并获取时间 获得当前时间 const datenew Date() 使用日志查看&#xff0c;得到的…

LeetCode:98.验证二叉搜索树

跟着carl学算法&#xff0c;本系列博客仅做个人记录&#xff0c;建议大家都去看carl本人的博客&#xff0c;写的真的很好的&#xff01; 代码随想录 LeetCode&#xff1a;98.验证二叉搜索树 给你一个二叉树的根节点 root &#xff0c;判断其是否是一个有效的二叉搜索树。 有效 …

k8s基础(1)—Kubernetes-Pod

一、Pod简介 Pod是Kubernetes&#xff08;k8s&#xff09;系统中可以创建和管理的最小单元&#xff0c;是资源对象模型中由用户创建或部署的最小资源对象模型‌。Pod是由一个或多个容器组成的&#xff0c;这些容器共享存储和网络资源&#xff0c;可以看作是一个逻辑的主机‌。…

vue 项目集成 electron 和 electron 打包及环境配置

vue electron 开发桌面端应用 安装 electron npm i electron -D记得加上-D&#xff0c;electron 需添加到devDependencies&#xff0c;如果添加到dependencies后面运行可能会报错 根目录创建electron文件夹&#xff0c;在electron文件夹创建main.js&#xff08;或者backgrou…

Rabbitmq追问1

如果消费端代码异常&#xff0c;未手动确认&#xff0c;那么这个消息去哪里 2024-12-31 21:19:12 如果消费端代码发生异常&#xff0c;未手动确认&#xff08;ACK&#xff09;的情况下&#xff0c;消息的处理行为取决于消息队列的实现和配置&#xff0c;以下是基于 RabbitMQ …

第二十八周机器学习笔记:PINN求正反解求PDE文献阅读——反问题、动手深度学习

第二十七周周报 一、文献阅读题目信息摘要Abstract网络架构实验——Data-driven discovery of partial differential equations&#xff08;偏微分方程的数据驱动发现&#xff09;1. Continuous time models&#xff08;连续时间模型&#xff09;例子&#xff1a;(Navier–Stok…

02-1堆的概念

2-1 堆的概念 开启实验步骤: 有两种创建工程的方式, 直接使用老师的文档, 然后进行调试 老师的工程 或者跳转我的步骤进行调试从零创建(ctrl 加鼠标左键,快速跳转) 视频观看地址: 韦东山freeRTOS快速入门视频教程 开始实验 (1) 我们定义一个空闲的数组heap_buf[1024], 它就…

【HarmonyOS之旅】ArkTS语法(四) -> 使用限制与扩展

目录 1 -> 在生成器函数中的使用限制 2 -> 变量的双向绑定 3 -> 自定义组件成员变量初始化的方式与约束 1 -> 在生成器函数中的使用限制 ArkTS语言的使用在生成器函数中存在一定的限制&#xff1a; 表达式仅允许在字符串(${expression})、if条件、ForEach的参…

大麦抢票科技狠活

仅供学习参考&#xff0c;切勿再令您所爱的人耗费高昂的价格去购置黄牛票 ⚠️核心内容参考: 据悉&#xff0c;于购票环节&#xff0c;大麦凭借恶意流量清洗技术&#xff0c;于网络层实时甄别并阻拦凭借自动化手段发起下单请求的流量&#xff0c;强化对刷票脚本、刷票软件以及…

【C++】B2106 矩阵转置

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;题目解析&#x1f4af;第一种实现方式&#xff1a;我的初始做法实现思路优缺点分析 &#x1f4af;第二种实现方式&#xff1a;我的优化做法实现思路优缺点分析 &#x1f4a…

五月天TV 1.1.0 | 频道丰富的娱乐向电视直播应用

五月天IPTV是一款提供多种频道的电视直播应用&#xff0c;包括体育、动漫、音乐等。虽然频道种类繁多&#xff0c;但正经频道较少&#xff0c;更适合追求娱乐和轻松内容的用户群体。软件在测试中发现存在一些小问题&#xff0c;频道质量参差不齐。 大小&#xff1a;15M 下载地…

高等数学 8.2 数量积 向量积 *混合积

文章目录 一、数量积二、向量积三、*混合积 一、数量积 对两个向量做运算 a \boldsymbol{a} a 和 b \boldsymbol{b} b &#xff0c;运算结果是一个数&#xff0c;它等于 ∣ a ∣ |\boldsymbol{a}| ∣a∣ &#xff0c; ∣ b ∣ |\boldsymbol{b}| ∣b∣ 及它们的夹角 θ \th…

Crosslink-NX应用连载(12):如何复用特殊功能管脚

作者&#xff1a;Hello,Panda 大家早上好。 昨天有朋友私信我&#xff0c;如何复用Crosslink-NX的特殊功能引脚如PROGRAMN、DONE、INITN诸如这些。熊猫君在这里简单介绍下&#xff1a; 以LIFCL-33U-8CTG104C为例&#xff0c;我们建立一个简单的指示灯LED周期闪烁的工程&…

Spring MVC和servlet

1.Spring MVC是Spring框架的一个扩展 2.Spring MVC工作流程 1、用户发送请求至前端控制器DispatcherServlet。 2、DispatcherServlet收到请求调用HandlerMapping处理器映射器。 3、处理器映射器找到具体的处理器(可以根据xml配置、注解进行查找)&#xff0c;生成处理器对象及…

TI毫米波雷达原始数据解析之Lane数据交换

TI毫米波雷达原始数据解析之Lane数据交换 背景Lane 定义Lane 确认确认LVDS Lane 数量的Matlab 代码数据格式参考 背景 解析使用mmWave Studio 抓取的ADC Data Lane 定义 芯片与DCA100之间的数据使用LVDS接口传输&#xff0c;使用mmWave Studio 配置过程中有一个选项是LVDS L…

VisionPro软件Image Stitch拼接算法

2D图像拼接的3种情景 1.一只相机取像位置固定&#xff0c;或者多只相机固定位置拍图&#xff0c;硬拷贝拼图&#xff0c;采用CopyRegion工具实现 2.一只或多只相机在多个位置拍照&#xff0c;相机视野互相重叠&#xff0c;基于Patmax特征定位后&#xff0c;无缝 拼图&#xff…

const修饰指针总结

作者简介&#xff1a; 一个平凡而乐于分享的小比特&#xff0c;中南民族大学通信工程专业研究生在读&#xff0c;研究方向无线联邦学习 擅长领域&#xff1a;驱动开发&#xff0c;嵌入式软件开发&#xff0c;BSP开发 作者主页&#xff1a;一个平凡而乐于分享的小比特的个人主页…

回归预测 | MATLAB实ELM-Adaboost多输入单输出回归预测

回归预测 | MATLAB实ELM-Adaboost多输入单输出回归预测 目录 回归预测 | MATLAB实ELM-Adaboost多输入单输出回归预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 一、极限学习机&#xff08;ELM&#xff09; 极限学习机是一种单层前馈神经网络&#xff0c;具有训练速…

centos,789使用mamba快速安装R及语言包devtools

如何进入R语言运行环境请参考&#xff1a;Centos7_miniconda_devtools安装_R语言入门之R包的安装_r语言devtools包怎么安装-CSDN博客 在R里面使用安装devtools经常遇到依赖问题&#xff0c;排除过程过于费时&#xff0c;使用conda安装包等待时间长等。下面演示centos,789都是一…

Pytest钩子函数,测试框架动态切换测试环境

在软件测试中&#xff0c;测试环境的切换是个令人头疼的问题。不同环境的配置不同&#xff0c;如何高效切换测试环境成为许多测试开发人员关注的重点。你是否希望在运行测试用例时&#xff0c;能够动态选择测试环境&#xff0c;而不是繁琐地手动修改配置&#xff1f; Pytest 测…