Kubeedge:Metamanager源码速读(不定期更新)

Kubeedge源码版本:v1.15.1

在看Metamanager之前,先看一下Metamanager源码的目录结构(位于edge/pkg下)和官方文档:

目录结构如下面的两张图所示。请忽略绿色的文件高亮,这是Jetbrains goland对未提交修改的文件的标记。
在这里插入图片描述
在这里插入图片描述
然后简单地看一下官方文档。

官方文档宣称,Metamanager做的事情主要包括两类,分别是Edged和Edgehub之间的消息中介处理和消息持久化。

此外:

  • Metamanager抽象了一系列客户端接口,允许edged封装资源的变更信息并且发给Metamanager
  • Metamanager开启了一个HTTPServer用来接受k8s API的直连。

Metamanager基于下面的这些操作收发不同种类的message:

Insert
Update
Delete
Query
Response
NodeConnection
MetaSync

其中:

Insert操作(比如,新增一个pod)的主要流程图如下:
在这里插入图片描述

Update操作(比如云端给pod追加标签,或者edged检测到了pod的变更,向云上汇报)的主要流程图如下,根据变更的来源不同,有两种信息流动的流程:
在这里插入图片描述

Delete操作主要流程图如下:
在这里插入图片描述
在delete操作中,云端下达指令,edgehub转发,metamng会先把边缘里的数据删掉,然后把指令下发给edged。

Query操作主要允许edged查询本地的数据库缓存和云上(比如configmap/secret)的etcd。消息源可以拆成3个part(resKey/resType/resId),主要流程如下:
在这里插入图片描述

Response操作:就是上面那些图片里的请求对应的相应。

NodeConnection操作:edgehub会发送向边缘组件广播当前边缘节点的连接状态——告知云边是否连接。metamanager会在内存里维护这个信息,用于特定的操作(比如向云发送query)。

MetaSync操作:定期同步edge上pod的状态。

下面考察Metamanager的源码。从Start函数开始:

func (m *metaManager) Start() {
	if metaserverconfig.Config.Enable {
		imitator.StorageInit() // 初始化资源版本(RV)
		go metaserver.NewMetaServer().Start(beehiveContext.Done())
	}

	m.runMetaManager()
}

StorageInit做的事情主要是初始化RV。从边缘数据库metav2表里拿到最新的资源版本。

// StorageInit must be called before using imitator storage (run metaserver or metamanager)
func StorageInit() {
	m := new(v2.MetaV2)
	// get the most recent record as the init resource version
	_, err := dbm.DBAccess.QueryTable(v2.NewMetaTableName).OrderBy("-" + v2.RV).Limit(1).All(m)
	utilruntime.Must(err)
	DefaultV2Client.SetRevision(m.ResourceVersion)
}

然后,根据指定的配置生成一个metaserver:

func NewMetaServer() *MetaServer {
	ls := MetaServer{
		HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup),
		LongRunningFunc:       genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
		NegotiatedSerializer:  serializer.NewNegotiatedSerializer(),
		Factory:               handlerfactory.NewFactory(),
		Auth:                  buildAuth(),
	}
	return &ls
}

最后Start:

func (ls *MetaServer) Start(stopChan <-chan struct{}) {
	if kefeatures.DefaultFeatureGate.Enabled(kefeatures.RequireAuthorization) {
		ls.startHTTPSServer(stopChan)
	} else {
		ls.startHTTPServer(stopChan)
	}
}

HTTPSServer主要就是增加了openssl x509的那些密钥,用于构建安全的HTTP服务器。

为代码分析方便起见,我们忽略安全性部分,看一下HTTPServer里干了什么事情:

主要是指定了Handler,用于处理云端APIServer直连时收发的信息,最后启动了一个http服务器。

func (ls *MetaServer) startHTTPServer(stopChan <-chan struct{}) {
	h := ls.BuildBasicHandler()
	h = BuildHandlerChain(h, ls)
	s := http.Server{
		Addr:    metaserverconfig.Config.Server,
		Handler: h,
	}

	go func() { // 用于server的退出
		<-stopChan

		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) // 把退出的消息往下传
		defer cancel()
		if err := s.Shutdown(ctx); err != nil {
			klog.Errorf("Server shutdown failed: %s", err)
		}
	}()

	klog.Infof("[metaserver]start to listen and server at http://%v", s.Addr)
	utilruntime.HandleError(s.ListenAndServe())
	// When the MetaServer stops abnormally, other module services are stopped at the same time.
	beehiveContext.Cancel()
}

具体的handler函数逻辑见下:

func (ls *MetaServer) BuildBasicHandler() http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
		ctx := req.Context()
		reqInfo, ok := apirequest.RequestInfoFrom(ctx)
		//klog.Infof("[metaserver]get a req(%v)(%v)", reqInfo.Path, reqInfo.Verb)
		//klog.Infof("[metaserver]get a req(\nPath:%v; \nVerb:%v; \nHeader:%+v)", reqInfo.Path, reqInfo.Verb, req.Header)
		if !ok {
			err := fmt.Errorf("invalid request")
			responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req)
			return
		}

		if reqInfo.IsResourceRequest {
			switch {
			case reqInfo.Verb == "get":
				ls.Factory.Get().ServeHTTP(w, req)
			case reqInfo.Verb == "list", reqInfo.Verb == "watch":
				ls.Factory.List().ServeHTTP(w, req)
			case reqInfo.Verb == "create":
				ls.Factory.Create(reqInfo).ServeHTTP(w, req)
			case reqInfo.Verb == "delete":
				ls.Factory.Delete().ServeHTTP(w, req)
			case reqInfo.Verb == "update":
				ls.Factory.Update(reqInfo).ServeHTTP(w, req)
			case reqInfo.Verb == "patch":
				ls.Factory.Patch(reqInfo).ServeHTTP(w, req)
			default:
				err := fmt.Errorf("unsupported req verb")
				responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req)
			}
			return
		}

		if passthrough.IsPassThroughPath(reqInfo.Path, reqInfo.Verb) {
			ls.Factory.PassThrough().ServeHTTP(w, req)
			return
		}

		err := fmt.Errorf("request[%s::%s] isn't supported", reqInfo.Path, reqInfo.Verb)
		responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req)
	})
}

最后,在Start函数里执行runMetaManager()

func (m *metaManager) runMetaManager() {
	go func() {
		for {
			select {
			case <-beehiveContext.Done():
				klog.Warning("MetaManager main loop stop")
				return
			default:
			}
			msg, err := beehiveContext.Receive(m.Name())
			if err != nil {
				klog.Errorf("get a message %+v: %v", msg, err)
				continue
			}
			klog.V(2).Infof("get a message %+v", msg)
			m.process(msg)
		}
	}()
}

主要的逻辑就是不断地从beehive框架那里拿一个message然后进行处理。但是处理的过程比较长。重点是这个process函数:

func (m *metaManager) process(message model.Message) {
	operation := message.GetOperation()

	switch operation {
	case model.InsertOperation:
		m.processInsert(message)
	case model.UpdateOperation:
		m.processUpdate(message)
	case model.PatchOperation:
		m.processPatch(message)
	case model.DeleteOperation:
		m.processDelete(message)
	case model.QueryOperation:
		m.processQuery(message)
	case model.ResponseOperation:
		m.processResponse(message)
	case constants.CSIOperationTypeCreateVolume,
		constants.CSIOperationTypeDeleteVolume,
		constants.CSIOperationTypeControllerPublishVolume,
		constants.CSIOperationTypeControllerUnpublishVolume:
		m.processVolume(message)
	default:
		klog.Errorf("metamanager not supported operation: %v", operation)
	}
}

可以看到,process函数会处理Insert、Update、Patch、Delete、Query、Response信息,以及处理和卷相关的信息。

我们以m.processUpdate(message)为例考察process的处理逻辑。主要是看kubeedge的源码,跳过migration的部分:

func (m *metaManager) processUpdate(message model.Message) {
	imitator.DefaultV2Client.Inject(message)

	msgSource := message.GetSource()
	_, resType, _ := parseResource(&message)
	if msgSource == modules.EdgedModuleName && resType == model.ResourceTypeLease {
		// 来自于edged的消息(需要转发到云上)并且type为lease(用于节点心跳)
		// edged里的kubelet会定时地向云端发送心跳信息,但是在边缘设备里需要对消息做更进一步的包裹
		if !connect.IsConnected() { // 云边断连就直接返回
			klog.Warningf("process remote failed, req[%s], err: %v", msgDebugInfo(&message), errNotConnected)
			feedbackError(fmt.Errorf("failed to process remote: %s", errNotConnected), message) // 把错误消息返回给edged
			return
		}
		m.processRemote(message) // 这个函数相当于由metamng代替edged发送消息,并且处理自动回复,当然processRemote的功能不止于此
		return
	}

	// 如果不是“edged的心跳信息”,比如是pod的更新信息,那么咱们自己处理一手
	if err := m.handleMessage(&message); err != nil { // 拿到消息后先经过m.handleMessage函数记录到db中
		feedbackError(err, message)
		return
	}

	// 根据edged的模块名,自行决定转发路径
	switch msgSource {
	case modules.EdgedModuleName:
		// For pod status update message, we need to wait for the response message
		// to ensure that the pod status is correctly reported to the kube-apiserver
		sendToCloud(&message)
		resp := message.NewRespByMessage(&message, OK)
		sendToEdged(resp, message.IsSync())
	case cloudmodules.EdgeControllerModuleName, cloudmodules.DynamicControllerModuleName:
		sendToEdged(&message, message.IsSync())
		resp := message.NewRespByMessage(&message, OK)
		sendToCloud(resp)
	case cloudmodules.DeviceControllerModuleName:
		resp := message.NewRespByMessage(&message, OK)
		sendToCloud(resp)
		message.SetRoute(modules.MetaGroup, modules.DeviceTwinModuleName)
		beehiveContext.Send(modules.DeviceTwinModuleName, message)
	case cloudmodules.PolicyControllerModuleName:
		resp := message.NewRespByMessage(&message, OK)
		sendToCloud(resp)
	default:
		klog.Errorf("unsupport message source, %s", msgSource)
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/572021.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【教程】MySQL数据库学习笔记(五)——约束(持续更新)

写在前面&#xff1a; 如果文章对你有帮助&#xff0c;记得点赞关注加收藏一波&#xff0c;利于以后需要的时候复习&#xff0c;多谢支持&#xff01; 【MySQL数据库学习】系列文章 第一章 《认识与环境搭建》 第二章 《数据类型》 第三章 《数据定义语言DDL》 第四章 《数据操…

【求助】西门子S7-200PLC定时中断+数据归档的使用

前言 已经经历了种种磨难来记录我的数据&#xff08;使用过填表程序、触摸屏的历史记录和数据归档&#xff09;之后&#xff0c;具体可以看看这篇文章&#xff1a;&#x1f6aa;西门子S7-200PLC的数据归档怎么用&#xff1f;&#xff0c;出现了新的问题。 问题的提出 最新的…

25 - MOV 指令

---- 整理自B站UP主 踌躇月光 的视频 文章目录 1. 指令系统设计2. MOV 指令3. 实现3.1 CPU 电路图3.2 代码实现3.3 实验过程3.4 实验结果3.5 实验工程 1. 指令系统设计 指令 IR 8 位程序状态字 4 位微程序周期 4 位 2. MOV 指令 MOV A, 5; 立即寻址 MOV A, B; 寄存器寻址 MO…

基于PaddlePaddle平台训练物体分类——猫狗分类

学习目标&#xff1a; 在百度的PaddlePaddle平台训练自己需要的模型&#xff0c;以训练一个猫狗分类模型为例 PaddlePaddle平台&#xff1a; 飞桨&#xff08;PaddlePaddle&#xff09;是百度开发的深度学习平台&#xff0c;具有动静统一框架、端到端开发套件等特性&#xf…

tailwindcss在使用cdn引入静态html的时候,vscode默认不会提示问题

1.首先确保vscode下载tailwind插件&#xff1a;Tailwind CSS IntelliSense 2.需要在根目录文件夹创建一个tailwind.config.js文件 export default {theme: {extend: {// 可根据需要自行配置&#xff0c;空配置项可以正常使用},}, }3.在html文件的标签中引入配置文件&#xf…

程序员到架构师,除了代码,还有文档和图

文章目录 前言一、书面设计文档文档应该作为代码和口头交流的补充文档应该注意鲜活 二、图——架构讨论的直观语言总结 前言 作为人类&#xff0c;我们天生就被视觉所吸引。在这个信息爆炸的时代&#xff0c;从精炼的代码到清晰的文档&#xff0c;再到直观的图&#xff0c;我们…

【数据结构】串(String)

文章目录 基本概念顺序存储结构比较当前串与串s的大小取子串插入删除其他构造函数拷贝构造函数扩大数组空间。重载重载重载重载[]重载>>重载<< 链式存储结构链式存储结构链块存储结构 模式匹配朴素的模式匹配算法(BF算法)KMP算法字符串的前缀、后缀和部分匹配值nex…

Parade Series - CoreAudio Reformating

// 获得音频播放设备格式信息CComHeapPtr<WAVEFORMATEX> pDeviceFormat;pAudioClient->GetMixFormat(&pDeviceFormat);constexpr int REFTIMES_PER_SEC 10000000; // 1 reference_time 100nsconstexpr int REFTIMES_PER_MILLISEC 10000;// Microsoftif (p…

Golang | Leetcode Golang题解之第49题字母异位词分组

题目&#xff1a; 题解&#xff1a; func groupAnagrams(strs []string) [][]string {mp : map[[26]int][]string{}for _, str : range strs {cnt : [26]int{}for _, b : range str {cnt[b-a]}mp[cnt] append(mp[cnt], str)}ans : make([][]string, 0, len(mp))for _, v : ra…

Alibaba 的fastjson源码详解

一、概述 Fastjson 是阿里巴巴开源的一个 Java 工具库&#xff0c;它常常被用来完成 Java 的对象与 JSON 格式的字符串的相互转化。 Fastjson 可以操作任何 Java 对象&#xff0c;即使是一些预先存在的没有源码的对象。 二、源码分析 1.首先以fastjson-1.2.70为例&#xff0c;…

nodejs

334 先下载zip文件&#xff0c;然后加上.zip,可以看到两个文件 在user中可以看到 输入即可得到flag。 335. 这里提到eval函数&#xff0c;eval中可以执行js代码&#xff0c;可以尝试使用这个函数进行测试 payload&#xff08;显示当前目录下的文件和文件夹列表&#xff09; …

基于emp的mysql查询

SQL命令 结构化查询语句&#xff1a;Structured Query Language 结构化查询语言是高级的非过程化变成语言&#xff0c;允许用户在高层数据结构上工作。是一种特殊目的的变成语言&#xff0c;是一种数据库查询和程序设计语言&#xff0c;用于存取数据以及查询、更新和管理关系数…

Python 网络与并发编程(四)

文章目录 协程Coroutines协程的核心(控制流的让出和恢复)协程和多线程比较协程的优点协程的缺点 asyncio实现协程(重点) 协程Coroutines 协程&#xff0c;全称是“协同程序”&#xff0c;用来实现任务协作。是一种在线程中&#xff0c;比线程更加轻量级的存在&#xff0c;由程…

android脱壳第二发:grpc-dumpdex加修复

上一篇我写的dex脱壳&#xff0c;写到银行类型的app的dex修复问题&#xff0c;因为dex中被抽取出来的函数的code_item_off 的偏移所在的内存&#xff0c;不在dex文件范围内&#xff0c;所以需要进行一定的修复&#xff0c;然后就停止了。本来不打算接着搞得&#xff0c;但是写了…

基础SQL DCL语句

DCL是数据控制语言&#xff0c;用来管理数据库用户&#xff0c;还有控制用户的访问权限 1.用户的查询 MySQL的用户信息存储在mysql数据库中&#xff0c;查询用户时&#xff0c;我们需要使用这个数据库。 后面&#xff0c;还有很多数据&#xff0c;因为篇幅的问题&#xff0c;就…

【FFmpeg】音视频录制 ② ( 使用 Screen Capturer Recorder 软件生成 ffmpeg 可录制的音视频设备 )

文章目录 一、使用 Screen Capturer Recorder 软件生成音视频设备1、设备查找问题 - 引入 Screen Capturer Recorder 软件2、下载安装 Screen Capturer Recorder 软件3、验证 Screen Capturer Recorder 生成的设备 一、使用 Screen Capturer Recorder 软件生成音视频设备 1、设…

【PyTorch】torch.gather() 用法

gather常被用于image做mask的操作中&#xff0c;对哪些地方进行赋值0/1 API&#xff1a; torch.gather — PyTorch 2.2 documentation torch.gather(input, dim, index, outNone) → Tensor gather()的意义&#xff1a; 顾名思义&#xff0c;聚集、集合&#xff1a;gather…

在mac上安装node.js及使用npm,yarn相关命令教程

1、安装node.js 官网&#xff1a;Node.js — Download Node.js 选择需要的版本&#xff0c;点击DownLoad 2、点击继续&#xff0c;直到安装成功。 2.1打开终端输入命令node -v 显示版本号则说明已安装成功 3、全局安装yarn命令 1、sudo npm install --global yarn &#xf…

Python构建学生信息管理系统:构建RESTful API - 学生信息管理系统的后端逻辑

在之前的博客里&#xff0c;我们已经完成了项目初始化&#xff0c;在本篇博客中&#xff0c;我们将深入探讨如何使用Flask框架实现学生信息管理系统的后端逻辑&#xff0c;特别是通过RESTful API来实现学生信息的增删改查&#xff08;CRUD&#xff09;操作。 Flask RESTful AP…