skynet的cluster集群

集群的使用

现在的游戏服务器框架中,分布式是一种常见的需求。一个游戏服务器组通常可以分成网关服务器、登录服务器、逻辑服务器、跨服服务器等等。
skynet中,我们可以通过cluster来组建一个集群,实现分布式的部署。

示例

我们先来看一个简单的例子,在这里我们实现了两个skynet结点(进程):一个称为center,一个称为game
center中启动一个data服务,然后game结点向center结点的data服务获取数据。
在这里插入图片描述

center结点有两个文件centerMaindataService

--centerMain.lua
local skynet = require("skynet")
require("skynet.manager")
local cluster = require("skynet.cluster")

skynet.start(function()
   --打开结点:表示当前进程是center结点,监听center端口
   cluster.open("center")

   --启动dataService服务
   local addr = skynet.newservice("dataService")

   --注册名字,以便其他结点访问
   skynet.name(".dataService", addr)
end)
--dataService.lua
local skynet = require "skynet"
local kv = {}

skynet.start(function()
   skynet.dispatch("lua", function(session, source, cmd, k, v)
      if cmd == "set" then
         kv[k] = v
      elseif cmd == "get" then
         skynet.retpack(kv[k])
      end
   end)
end)

game结点连接center结点,访问dataService服务:

local skynet = require("skynet")
require("skynet.manager")
local cluster = require("skynet.cluster")

skynet.start(function()
   --使用cluster.send/call
   cluster.send("center", ".data", "set", "k1", "v1")
   print(cluster.call("center", ".data", "get", "k1"))

   --使用skynet.send/call
   local addr = cluster.proxy("center", ".data")
   skynet.send(addr, "lua", "set", "k1", "v2")
   print(skynet.call(addr, "lua", "get", "k1"))
end)

在配置文件中,我们需要指定一个cluster文件:

--config
cluster = "./config/clusterConfig.lua"

--config/clusterConfig.lua
__nowaiting = true 
center="127.0.0.1:14880"
game="127.0.0.1:14881"

这样一个简单的skynet集群就搭建好了

cluster

在示例中,我们看到集群的监听和发送,都是通过cluster这个库来操作的。cluster是一个封装的用来进行集群相关操作的函数库,通过local cluster = require("skynet.cluster")引入
主要的函数有:

1. cluster.reload

cluster.reload用来加载配置。除了通过配置文件之外,我们也可以使用cluster.reload来加载配置:

cluster.reload({
   center="127.0.0.1:14880",
   game="127.0.0.1:14881",
})

如果不传参数的话,则表示重新从配置文件中加载:

cluster.reload()

reload是新加配置,对于旧的结点,如果没有被覆盖到则没有影响。想要清除旧的结点,可以配置node=nil

可以通过配置节点名字对应的地址为 false 来明确指出这个节点已经下线

2. cluster.open

cluster.open表示监听指定的端口。示例中,在center结点,我们使用cluster.open('center')来监听center端口,在这里是监听127.0.0.1:14880

这里会发一条listen指令给到clusterdclusterd启动gate服务,并通过gate服务来监听端口。

clusterd在收到listen时,会启动一个gate服务,所以如果你有自定义的网关服务,最好不要用gate这个服务名。

在一个进程里,我们可以open多个端口,如果其他结点不会主动连接本结点,那也可以不open任何端口。

3. cluster.send / cluster.call

sendcall可以向指定的结点发送lua消息。
其效果和在目标结点上,发送lua消息一样。
game结点上执行:
cluster.send("center", ".dataService", "set", "k1", "v1")
相当于在center结点上执行:
skynet.send(".dataService", "lua", "set", "k1", "v1")

4. cluster.proxy

cluster.proxy用于生成一个代理地址,使发送跨结点的消息看起来和发送本地消息一样:

local addr = cluster.proxy('center', '.dataService')
skynet.send(addr, 'lua', 'set', 'k1', 'v1')

和发送本地消息的区别在于,这种方式下,skynet.send只能支持lua消息。

5. cluster.register / cluster.query

cluster.register用于给某个服务在当前结点注册个名字。
其他结点,则可以使用cluster.query来判断目标结点是否存在某个服务。

clusterd服务

当我们引入cluster库时,会启动一个唯一服务clusterd,这是在cluster.lua文件中,通过skynet.init的方式启动的:

--cluster.lua
skynet.init(function()
    clusterd = skynet.uniqueservice("clusterd")
end)

cluster库的函数,基本上都是通过发送lua消息到clusterd服务来进行的。
在这里插入图片描述

clusterd会处理以下lua消息:

1. reload

clusterd维护一个node_address表,用来记录每个结点对应的IP地址和端口。

2. listen

收到listen消息时,clusterd会启动一个gate服务。然后根据结点名,获取结点地址,启动gate监听网络端口。
当这个gate服务收到网络连接时,会发送lua消息socketclusterd服务,然后由clusterd启动一个clusteragent服务,来处理这个连接的消息:

function command.socket(source, subcmd, fd, msg)
	if subcmd == "open" then
		skynet.error(string.format("socket accept from %s", msg))
		-- new cluster agent
		cluster_agent[fd] = false
		local agent = skynet.newservice("clusteragent", skynet.self(), source, fd)
		--...其他代码
	else
		--...关闭和错误处理
	end
end
3. register

clusterd维护一个register_name表,用来记录注册的服务名和地址。

4. proxy

clusterd维护一个proxy表,记录结点名.服务名对应的代理地址,当代理地址不存在时,则创建一个clusterproxy服务。

5. sender

clusterd维护一个node_channel表,记录结点对应的clustersender地址,这里会返回结点对应的clustersender地址,没有则先创建一个。

发送消息到其他结点

  • cluster.send
    cluster.send会通过一个clustersender服务来发送消息。先看看代码:
function cluster.send(node, address, ...)
	-- push is the same with req, but no response
	local s = sender[node]
	if not s then
		table.insert(task_queue[node], skynet.packstring(address, ...))
	else
		skynet.send(sender[node], "lua", "push", address, skynet.pack(...))
	end
end

cluster中维护着sender列表,在同一个服务里,sender记录每个node对应的clustersender服务。
sender[node]不存在的时候,没有直接创建一个clustersender,而是将当前的参数打包,然后插入到一个task_queue队列中,这是因为创建服务是一个阻塞的过程,在创建服务的过程中,可能会再次调用cluster.send,所以这里将所有创建过程中的参数都缓存起来,等clustersender创建完成后,再统一发送到目标结点。
而创建clustersender,又是发送消息到clusterd服务:local ok, c = pcall(skynet.call, clusterd, "lua", "sender", node)

skynet.packstring可以将多个参数(字符串,数字,表,布尔值)序列化成一个字符串。可以使用skynet.unpack反序列化将参数解出。

  • cluster.call
    cluster.call同样是通过clustersender服务来发送消息。
function cluster.call(node, address, ...)
	-- skynet.pack(...) will free by cluster.core.packrequest
	local s = sender[node]
	if not s then
		local task = skynet.packstring(address, ...)
		return skynet.call(get_sender(node), "lua", "req", repack(skynet.unpack(task)))
	end
	return skynet.call(s, "lua", "req", address, skynet.pack(...))
end

cluster.send使用task_queue可以立刻返回,不会阻塞,而cluster.call本身就是会阻塞的,所以可以直接使用get_sender,以阻塞的形式获取一个clustersender

注意这里先将参数序列化,等获取到sender之后才重新化序列化,这是因为get_sender这个过程是阻塞的,而参数有可能是个table,在阻塞的过程中,这个table中的值有可能发生变化,导致逻辑不符合预期,所以这里通过序列化来保证发送时的参数不会被改变。

clustersender服务

clustersender用来连接指定结点并发送数据。这个服务是在第一次发送数据时才创建的。
在集群中,结点A向结点B发送过消息,那么结点A就有一个指向结点B的clustersender服务,且只有一个

clustersender服务是在clusterd服务中创建的,clusterd是一个唯一服务,在这个服务的管理下,每个目标结点只有一个clustersender服务。

现在我们来看看clustersender服务是怎么发送数据的。

启动服务时,我们传过来四个参数
node: 连接的集群的结点名字
nodename: 主机的hostname
init_host: socket连接的地址
init_port: socket连接的端口

skynet.start时,创建了一个skynet.socketchannel,然后设置lua消息处理函数,这里主要处理两种服务:

  • push
    push对应的是cluster.send,只负责发送,不需要响应。
function command.push(addr, msg, sz)
	local request, new_session, padding = cluster.packpush(addr, session, msg, sz)
	if padding then	-- is multi push
		session = new_session
	end

	channel:request(request, nil, padding)
end

这里的clusterC层的库,不是lua层的库。
cluster.packpush按特定的协议,来打包数据。
cluster.packpush返回三个值:
request:打包后的二进制数据。
new_sessionsession+1
padding:如果数据过大,则将超过单包上限的二进制数据以table数组的形式放在padding里。

channel:request则是将request发送给对端主机,如果有padding,则分多个包发出去。这里第二个参数是response,这里为nil表示不需要响应。

  • req
    req对应的是cluster.call,需要等待响应的返回。
local function send_request(addr, msg, sz)
	-- msg is a local pointer, cluster.packrequest will free it
	local current_session = session
	local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
	session = new_session

	local tracetag = skynet.tracetag()
	if tracetag then
		if tracetag:sub(1,1) ~= "(" then
			-- add nodename
			local newtag = string.format("(%s-%s-%d)%s", nodename, node, session, tracetag)
			skynet.tracelog(tracetag, string.format("session %s", newtag))
			tracetag = newtag
		end
		skynet.tracelog(tracetag, string.format("cluster %s", node))
		channel:request(cluster.packtrace(tracetag))
	end
	return channel:request(request, current_session, padding)
end

function command.req(...)
	local ok, msg = pcall(send_request, ...)
	if ok then
		if type(msg) == "table" then
			skynet.ret(cluster.concat(msg))
		else
			skynet.ret(msg)
		end
	else
		skynet.error(msg)
		skynet.response()(false)
	end
end

command.req调用send_request,返回值如果是table表示这是一个大包切割成多个小包,需要使用cluster.concat连接起来再返回,如果是string则直接返回。
send_request中,cluster.packrequestcluster.packpush类似,来打包request类型的数据。
最后同样是交给channel:request来发送socket消息,和push不同的是,这里第二个参数传入了current_session,表示接收响应的会话ID。

这里简单的讲一下channel:request是怎么发送和接收数据的。
channel:request首先会检查当前的连接状态:

  • 如果socket连接还没建立,则先建立连接,再发送数据;
  • 如果socket连接已断开,则会抛出异常;
  • 如果socket连接正常,则直接发送数据。
    在发送数据的时候,如果数据包太大(超过32K),则会切分成多个包来发送。

如果需要等待响应数据,那么会调用socketchannel__response函数,这个函数是在socketchannel初始化的时候传入的,在socketsender这里,则是read_response函数:

local function read_response(sock)
	local sz = socket.header(sock:read(2))
	local msg = sock:read(sz)
	return cluster.unpackresponse(msg)	-- session, ok, data, padding
end

skynet.start(function()
	channel = sc.channel {
			host = init_host,
			port = tonumber(init_port),
			response = read_response,
			nodelay = true,
		}
	skynet.dispatch("lua", function(session , source, cmd, ...)
		local f = assert(command[cmd])
		f(...)
	end)
end)

read_response中,sock:read是一个阻塞函数,接收对端socket传回来的网络消息。

clsteragent服务

前面提到过,当clustersender第一次发送数据时,会先建立socket连接,而当socket连接建立时,对面的结点会创建一个clusteragent服务,来处理收到的数据。
clusteragent的创建是在clusterd服务中:

function command.socket(source, subcmd, fd, msg)
	if subcmd == "open" then
		skynet.error(string.format("socket accept from %s", msg))
		-- new cluster agent
		cluster_agent[fd] = false
		local agent = skynet.newservice("clusteragent", skynet.self(), source, fd)
        --...其他代码
	else
        --...其他代码
	end
end

可以看到,创建clusteragent的时候,传入了三个参数:clusterd地址、source地址(即gate地址)、fd文件描述符(代表这个socket)。
clusteragent在调用skynet.start的时候,设置gate服务的转发,将来自fd的网络消息,都转发到这个clusteragent地址,然后设置了对网络消息的处理:

skynet.register_protocol {
    name = "client",
    id = skynet.PTYPE_CLIENT,
    unpack = cluster.unpackrequest,
    dispatch = dispatch_request,
}

这里的cluster.unpackrequestclustersender传过来的网络数据进行解析,然后分配给dispatch_request处理:
dispatch_request(_,_,addr, session, msg, sz, padding, is_push)
clustersender中会将服务地址addr打包,这里将addr解析出来,addr可以是字符串,也可以是数字。
addr是数字0的时候,表示查询某个注册名字的数字地址:

--结点A
cluster.register("name", addr)

--结点B
local addr = cluster.call("NodeA", 0, "name") --返回NodeA中注册的"name"的数字地址

addr是字符串或大于0的数字,则判断addr是不是通过cluster.register注册过的,如果是则addr转化成注册的地址。 然后再根据is_push,来执行skynet.rawcallskynet.rawsend,进行数据转发。 如果是call,最后还需要将数据通过socket返回给clustersender

clusterproxy服务

当我们调用cluster.proxy(node, addr)时,会向clusterd申请一个clusterproxy服务。
这里的参数,有三种形式:

  • cluster.proxy(“center”, “.data”)
  • cluster.proxy(“center.addr”) :等价于 cluster.proxy(“center”, “.data”)
  • cluster.proxy(“center@addr”) :等价于 cluster.proxy(“center”, “@data”)
    clusterd会以node .. "." .. name作为key,保证同一个key只有一个clusterproxy服务。

clusterproxy服务很简单:
它会向clusterd申请一个面向nodeclustersender,然后就收到的lua消息,转发到这个clustersender上,参数使用服务初始化时的addr
在这里插入图片描述

参考资料

  • github skynet wiki
  • skynet cluster 模块的设计与编码协议

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/902376.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Win11安装基于WSL2的Ubuntu

1. 概述 趁着还没有完全忘记,详细记录一下在Win11下安装基于WSL2的Ubuntu的详细过程。不得不说WSL2现在被微软开发的比较强大了,还是很值得安装和使用的,笔者就通过WSL2安装的Ubuntu成功搭建了ROS环境。 2. 详论 2.1 子系统安装 在Win11搜…

在Debian上安装向日葵

说明: 因为之前服务器上安装了 PVE (Proxmox VE),之前是用 Proxmox VE 进行服务器资源管理的。出于某些原因,现在不再通过 PVE构建的虚拟机来使用计算资源,而是通过 PVE 自带的 Debian 系统直接使用虚拟机资源(因为积…

NVR接入录像回放平台EasyCVR视频融合平台语音对讲配置

国标GB28181视频平台EasyCVR视频融合平台可拓展性强、视频能力灵活,平台可提供视频监控直播、云端录像、云存储、录像检索与回看、智能告警、平台级联、云台控制、语音对讲、智能分析接入等功能。其中,在语音对讲方面,NVR接入录像回放平台目前…

JavaEE初阶---多线程(三)---内存可见性/单例模式/wait,notify的使用解决线程饿死问题

文章目录 1.volatile关键字1.1保证内存的可见性--引入1.2保证内存的可见性--分析1.3保证内存的可见性--解决1.4内存可见性-JMM内存模型 2.notify和wait介绍2.1作用一:控制调度顺序2.2作用二:避免线程饿死2.3notify和notifyAll区分 3.单例模式--经典设计模…

数据库编程 SQLITE3 Linux环境

永久存储程序数据有两种方式: 用文件存储用数据库存储 对于多条记录的存储而言,采用文件时,插入、删除、查找的效率都会很差,为了提高这些操作的效率,有计算机科学家设计出了数据库存储方式 一、数据库 用来管理数据…

【Android】多渠道打包配置

目录 简介打包配置签名配置渠道配置配置打包出来的App名称正式包与测试包配置 打包方式开发工具打包命令行打包 优缺点 简介 多渠道打包 是指在打包一个 Android 应用时,一次编译生成多个 APK 文件,每个 APK 文件针对一个特定的渠道。不同的渠道可能代表…

Prompt提示词设计:如何让你的AI对话更智能?

Prompt设计:如何让你的AI对话更智能? 在人工智能的世界里,Prompt(提示词)就像是一把钥匙,能够解锁AI的潜力,让它更好地理解和响应你的需求。今天,我们就来聊聊如何通过精心设计的Pr…

厂房区域人员进出人数统计-实施方案

1.1 现状分析 传统的人流量统计方法往往依赖于人工计数或简单的视频监控系统,这些方法不仅效率低下,而且容易出错,无法满足现代仓库管理的需求。因此,我厂区决定引入先进的智能监控系统,通过集成高清摄像头、GPU服务器…

【Unity】仓库逻辑:拾取物体进仓库和扔掉物品

需求说明 目标:实现玩家移动过程中,拾取物体,物体被放入仓库;点击仓库中物体,重新扔回3D场景中逻辑。 逻辑分析: 需要玩家可以移动;需要检测玩家和物体的碰撞,并摧毁物体&#xf…

css知识点梳理2

1. 选择器拓展 在 CSS 中,可以根据选择器的类型把选择器分为基础选择器和复合选择器,复合选择器是建立在基础选择器之上,对基本选择器进行组合形成的。 ​ 复合选择器是由两个或多个基础选择器,通过不同的方式组合而成的&#xf…

【Flask】一、安装与第一个测试程序

目录 Flask简介 安装Flask 安装pip(Python包管理器) 使用pip安装Flask 验证安装 创建Flask程序 创建应用 运行 访问测试 Flask简介 Flask是一个用Python编写的轻量级Web应用框架。它被设计为易于使用和扩展,使其成为构建简单网站或复…

[项目][boost搜索引擎#4] cpp-httplib使用 | log.hpp | 前端 | 测试及总结

目录 编写http_server模块 1. 引入cpp-httplib到项目中 2. cpp-httplib的使用介绍 3. 正式编写http_server 九、添加日志到项目中 十、编写前端模块 十一. 详解传 gitee 十二、项目总结 项目的扩展 写在前面 项目 gitee 已经上传啦 (还是决定将学校和个人…

网络编程基础-Reactor线程模型-原理剖析

1、Reactor基本概念 Reactor线程模型其实是一种设计模式,其核心思想就是将输入多路复用和事件派发相结合,从而减少系统中活跃线程的数量。 像我们之前讲到的文章网络编程基础-IO模型深入理解_网络io-CSDN博客提到了其中网络IO模型(BIO、NIO…

asp.net core 入口 验证token,但有的接口要跳过验证

asp.net core 入口 验证token,但有的接口要跳过验证 在ASP.NET Core中,你可以使用中间件来验证token,并为特定的接口创建一个属性来标记是否跳过验证。以下是一个简化的例子: 创建一个自定义属性来标记是否跳过验证: public clas…

基于PHP的http字段查询与注册(V1)(持续迭代)

目录 版本说明: 实现环境(WAMP): 数据库链接 查询页面 php处理逻辑 字段添加 版本说明: 该查询功能以查询http首部字段为目的实现的字段属性、字段内容的查询,以及对新字段信息的数据注册。 v1实现…

python 制作 发货单 (生成 html, pdf)

起因, 目的: 某个小店,想做个发货单。 过程: 先写一个 html 模板。准备数据, 一般是从数据库读取,也可以是 json 格式,或是 python 字典。总之,是数据内容。使用 jinja2 来渲染模板。最终的结果可以是 h…

多线程进阶——线程池的实现

什么是池化技术 池化技术是一种资源管理策略,它通过重复利用已存在的资源来减少资源的消耗,从而提高系统的性能和效率。在计算机编程中,池化技术通常用于管理线程、连接、数据库连接等资源。 我们会将可能使用的资源预先创建好,…

WPF+MVVM案例实战(七)- 系统初始化界面字体描边效果实现

文章目录 1、案例效果展示2、项目准备3、功能实现1、资源获取2、界面代码3、后台代码 4 源代码获取 1、案例效果展示 2、项目准备 打开项目 Wpf_Examples,新建系统初始化界面 WelcomeWindow.xmal,如下所示: 3、功能实现 1、资源获取 案例中使用的CSD…

Java | Leetcode Java题解之第516题最长回文子序列

题目&#xff1a; 题解&#xff1a; class Solution {public int longestPalindromeSubseq(String s) {int n s.length();int[][] dp new int[n][n];for (int i n - 1; i > 0; i--) {dp[i][i] 1;char c1 s.charAt(i);for (int j i 1; j < n; j) {char c2 s.char…

【Java并发编程】信号量Semaphore详解

一、简介 Semaphore&#xff08;信号量&#xff09;&#xff1a;是用来控制同时访问特定资源的线程数量&#xff0c;它通过协调各个线程&#xff0c;以保证合理的使用公共资源。 Semaphore 一般用于流量的控制&#xff0c;特别是公共资源有限的应用场景。例如数据库的连接&am…