Kubeedge源码版本:v1.15.1
在看Metamanager之前,先看一下Metamanager源码的目录结构(位于edge/pkg
下)和官方文档:
目录结构如下面的两张图所示。请忽略绿色的文件高亮,这是Jetbrains goland对未提交修改的文件的标记。
然后简单地看一下官方文档。
官方文档宣称,Metamanager做的事情主要包括两类,分别是Edged和Edgehub之间的消息中介处理和消息持久化。
此外:
- Metamanager抽象了一系列客户端接口,允许edged封装资源的变更信息并且发给Metamanager
- Metamanager开启了一个HTTPServer用来接受k8s API的直连。
Metamanager基于下面的这些操作收发不同种类的message:
Insert
Update
Delete
Query
Response
NodeConnection
MetaSync
其中:
Insert操作(比如,新增一个pod)的主要流程图如下:
Update操作(比如云端给pod追加标签,或者edged检测到了pod的变更,向云上汇报)的主要流程图如下,根据变更的来源不同,有两种信息流动的流程:
Delete操作主要流程图如下:
在delete操作中,云端下达指令,edgehub转发,metamng会先把边缘里的数据删掉,然后把指令下发给edged。
Query操作主要允许edged查询本地的数据库缓存和云上(比如configmap/secret)的etcd。消息源可以拆成3个part(resKey/resType/resId),主要流程如下:
Response操作:就是上面那些图片里的请求对应的相应。
NodeConnection操作:edgehub会发送向边缘组件广播当前边缘节点的连接状态——告知云边是否连接。metamanager会在内存里维护这个信息,用于特定的操作(比如向云发送query)。
MetaSync操作:定期同步edge上pod的状态。
下面考察Metamanager的源码。从Start函数开始:
func (m *metaManager) Start() {
if metaserverconfig.Config.Enable {
imitator.StorageInit() // 初始化资源版本(RV)
go metaserver.NewMetaServer().Start(beehiveContext.Done())
}
m.runMetaManager()
}
StorageInit
做的事情主要是初始化RV。从边缘数据库metav2
表里拿到最新的资源版本。
// StorageInit must be called before using imitator storage (run metaserver or metamanager)
func StorageInit() {
m := new(v2.MetaV2)
// get the most recent record as the init resource version
_, err := dbm.DBAccess.QueryTable(v2.NewMetaTableName).OrderBy("-" + v2.RV).Limit(1).All(m)
utilruntime.Must(err)
DefaultV2Client.SetRevision(m.ResourceVersion)
}
然后,根据指定的配置生成一个metaserver:
func NewMetaServer() *MetaServer {
ls := MetaServer{
HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup),
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
NegotiatedSerializer: serializer.NewNegotiatedSerializer(),
Factory: handlerfactory.NewFactory(),
Auth: buildAuth(),
}
return &ls
}
最后Start:
func (ls *MetaServer) Start(stopChan <-chan struct{}) {
if kefeatures.DefaultFeatureGate.Enabled(kefeatures.RequireAuthorization) {
ls.startHTTPSServer(stopChan)
} else {
ls.startHTTPServer(stopChan)
}
}
HTTPSServer主要就是增加了openssl x509的那些密钥,用于构建安全的HTTP服务器。
为代码分析方便起见,我们忽略安全性部分,看一下HTTPServer里干了什么事情:
主要是指定了Handler,用于处理云端APIServer直连时收发的信息,最后启动了一个http服务器。
func (ls *MetaServer) startHTTPServer(stopChan <-chan struct{}) {
h := ls.BuildBasicHandler()
h = BuildHandlerChain(h, ls)
s := http.Server{
Addr: metaserverconfig.Config.Server,
Handler: h,
}
go func() { // 用于server的退出
<-stopChan
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) // 把退出的消息往下传
defer cancel()
if err := s.Shutdown(ctx); err != nil {
klog.Errorf("Server shutdown failed: %s", err)
}
}()
klog.Infof("[metaserver]start to listen and server at http://%v", s.Addr)
utilruntime.HandleError(s.ListenAndServe())
// When the MetaServer stops abnormally, other module services are stopped at the same time.
beehiveContext.Cancel()
}
具体的handler函数逻辑见下:
func (ls *MetaServer) BuildBasicHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
reqInfo, ok := apirequest.RequestInfoFrom(ctx)
//klog.Infof("[metaserver]get a req(%v)(%v)", reqInfo.Path, reqInfo.Verb)
//klog.Infof("[metaserver]get a req(\nPath:%v; \nVerb:%v; \nHeader:%+v)", reqInfo.Path, reqInfo.Verb, req.Header)
if !ok {
err := fmt.Errorf("invalid request")
responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req)
return
}
if reqInfo.IsResourceRequest {
switch {
case reqInfo.Verb == "get":
ls.Factory.Get().ServeHTTP(w, req)
case reqInfo.Verb == "list", reqInfo.Verb == "watch":
ls.Factory.List().ServeHTTP(w, req)
case reqInfo.Verb == "create":
ls.Factory.Create(reqInfo).ServeHTTP(w, req)
case reqInfo.Verb == "delete":
ls.Factory.Delete().ServeHTTP(w, req)
case reqInfo.Verb == "update":
ls.Factory.Update(reqInfo).ServeHTTP(w, req)
case reqInfo.Verb == "patch":
ls.Factory.Patch(reqInfo).ServeHTTP(w, req)
default:
err := fmt.Errorf("unsupported req verb")
responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req)
}
return
}
if passthrough.IsPassThroughPath(reqInfo.Path, reqInfo.Verb) {
ls.Factory.PassThrough().ServeHTTP(w, req)
return
}
err := fmt.Errorf("request[%s::%s] isn't supported", reqInfo.Path, reqInfo.Verb)
responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req)
})
}
最后,在Start
函数里执行runMetaManager()
:
func (m *metaManager) runMetaManager() {
go func() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("MetaManager main loop stop")
return
default:
}
msg, err := beehiveContext.Receive(m.Name())
if err != nil {
klog.Errorf("get a message %+v: %v", msg, err)
continue
}
klog.V(2).Infof("get a message %+v", msg)
m.process(msg)
}
}()
}
主要的逻辑就是不断地从beehive框架那里拿一个message然后进行处理。但是处理的过程比较长。重点是这个process
函数:
func (m *metaManager) process(message model.Message) {
operation := message.GetOperation()
switch operation {
case model.InsertOperation:
m.processInsert(message)
case model.UpdateOperation:
m.processUpdate(message)
case model.PatchOperation:
m.processPatch(message)
case model.DeleteOperation:
m.processDelete(message)
case model.QueryOperation:
m.processQuery(message)
case model.ResponseOperation:
m.processResponse(message)
case constants.CSIOperationTypeCreateVolume,
constants.CSIOperationTypeDeleteVolume,
constants.CSIOperationTypeControllerPublishVolume,
constants.CSIOperationTypeControllerUnpublishVolume:
m.processVolume(message)
default:
klog.Errorf("metamanager not supported operation: %v", operation)
}
}
可以看到,process函数会处理Insert、Update、Patch、Delete、Query、Response信息,以及处理和卷相关的信息。
我们以m.processUpdate(message)
为例考察process的处理逻辑。主要是看kubeedge的源码,跳过migration的部分:
func (m *metaManager) processUpdate(message model.Message) {
imitator.DefaultV2Client.Inject(message)
msgSource := message.GetSource()
_, resType, _ := parseResource(&message)
if msgSource == modules.EdgedModuleName && resType == model.ResourceTypeLease {
// 来自于edged的消息(需要转发到云上)并且type为lease(用于节点心跳)
// edged里的kubelet会定时地向云端发送心跳信息,但是在边缘设备里需要对消息做更进一步的包裹
if !connect.IsConnected() { // 云边断连就直接返回
klog.Warningf("process remote failed, req[%s], err: %v", msgDebugInfo(&message), errNotConnected)
feedbackError(fmt.Errorf("failed to process remote: %s", errNotConnected), message) // 把错误消息返回给edged
return
}
m.processRemote(message) // 这个函数相当于由metamng代替edged发送消息,并且处理自动回复,当然processRemote的功能不止于此
return
}
// 如果不是“edged的心跳信息”,比如是pod的更新信息,那么咱们自己处理一手
if err := m.handleMessage(&message); err != nil { // 拿到消息后先经过m.handleMessage函数记录到db中
feedbackError(err, message)
return
}
// 根据edged的模块名,自行决定转发路径
switch msgSource {
case modules.EdgedModuleName:
// For pod status update message, we need to wait for the response message
// to ensure that the pod status is correctly reported to the kube-apiserver
sendToCloud(&message)
resp := message.NewRespByMessage(&message, OK)
sendToEdged(resp, message.IsSync())
case cloudmodules.EdgeControllerModuleName, cloudmodules.DynamicControllerModuleName:
sendToEdged(&message, message.IsSync())
resp := message.NewRespByMessage(&message, OK)
sendToCloud(resp)
case cloudmodules.DeviceControllerModuleName:
resp := message.NewRespByMessage(&message, OK)
sendToCloud(resp)
message.SetRoute(modules.MetaGroup, modules.DeviceTwinModuleName)
beehiveContext.Send(modules.DeviceTwinModuleName, message)
case cloudmodules.PolicyControllerModuleName:
resp := message.NewRespByMessage(&message, OK)
sendToCloud(resp)
default:
klog.Errorf("unsupport message source, %s", msgSource)
}
}