Kubeedge源码版本:v1.15.1
在看Metamanager之前,先看一下Metamanager源码的目录结构(位于edge/pkg下)和官方文档:
目录结构如下面的两张图所示。请忽略绿色的文件高亮,这是Jetbrains goland对未提交修改的文件的标记。
 
 
 然后简单地看一下官方文档。
官方文档宣称,Metamanager做的事情主要包括两类,分别是Edged和Edgehub之间的消息中介处理和消息持久化。
此外:
- Metamanager抽象了一系列客户端接口,允许edged封装资源的变更信息并且发给Metamanager
- Metamanager开启了一个HTTPServer用来接受k8s API的直连。
Metamanager基于下面的这些操作收发不同种类的message:
Insert
 Update
 Delete
 Query
 Response
 NodeConnection
 MetaSync
其中:
Insert操作(比如,新增一个pod)的主要流程图如下:
 
Update操作(比如云端给pod追加标签,或者edged检测到了pod的变更,向云上汇报)的主要流程图如下,根据变更的来源不同,有两种信息流动的流程:
 
Delete操作主要流程图如下:
 
 在delete操作中,云端下达指令,edgehub转发,metamng会先把边缘里的数据删掉,然后把指令下发给edged。
Query操作主要允许edged查询本地的数据库缓存和云上(比如configmap/secret)的etcd。消息源可以拆成3个part(resKey/resType/resId),主要流程如下:
 
Response操作:就是上面那些图片里的请求对应的相应。
NodeConnection操作:edgehub会发送向边缘组件广播当前边缘节点的连接状态——告知云边是否连接。metamanager会在内存里维护这个信息,用于特定的操作(比如向云发送query)。
MetaSync操作:定期同步edge上pod的状态。
下面考察Metamanager的源码。从Start函数开始:
func (m *metaManager) Start() {
	if metaserverconfig.Config.Enable {
		imitator.StorageInit() // 初始化资源版本(RV)
		go metaserver.NewMetaServer().Start(beehiveContext.Done())
	}
	m.runMetaManager()
}
StorageInit做的事情主要是初始化RV。从边缘数据库metav2表里拿到最新的资源版本。
// StorageInit must be called before using imitator storage (run metaserver or metamanager)
func StorageInit() {
	m := new(v2.MetaV2)
	// get the most recent record as the init resource version
	_, err := dbm.DBAccess.QueryTable(v2.NewMetaTableName).OrderBy("-" + v2.RV).Limit(1).All(m)
	utilruntime.Must(err)
	DefaultV2Client.SetRevision(m.ResourceVersion)
}
然后,根据指定的配置生成一个metaserver:
func NewMetaServer() *MetaServer {
	ls := MetaServer{
		HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup),
		LongRunningFunc:       genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
		NegotiatedSerializer:  serializer.NewNegotiatedSerializer(),
		Factory:               handlerfactory.NewFactory(),
		Auth:                  buildAuth(),
	}
	return &ls
}
最后Start:
func (ls *MetaServer) Start(stopChan <-chan struct{}) {
	if kefeatures.DefaultFeatureGate.Enabled(kefeatures.RequireAuthorization) {
		ls.startHTTPSServer(stopChan)
	} else {
		ls.startHTTPServer(stopChan)
	}
}
HTTPSServer主要就是增加了openssl x509的那些密钥,用于构建安全的HTTP服务器。
为代码分析方便起见,我们忽略安全性部分,看一下HTTPServer里干了什么事情:
主要是指定了Handler,用于处理云端APIServer直连时收发的信息,最后启动了一个http服务器。
func (ls *MetaServer) startHTTPServer(stopChan <-chan struct{}) {
	h := ls.BuildBasicHandler()
	h = BuildHandlerChain(h, ls)
	s := http.Server{
		Addr:    metaserverconfig.Config.Server,
		Handler: h,
	}
	go func() { // 用于server的退出
		<-stopChan
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) // 把退出的消息往下传
		defer cancel()
		if err := s.Shutdown(ctx); err != nil {
			klog.Errorf("Server shutdown failed: %s", err)
		}
	}()
	klog.Infof("[metaserver]start to listen and server at http://%v", s.Addr)
	utilruntime.HandleError(s.ListenAndServe())
	// When the MetaServer stops abnormally, other module services are stopped at the same time.
	beehiveContext.Cancel()
}
具体的handler函数逻辑见下:
func (ls *MetaServer) BuildBasicHandler() http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
		ctx := req.Context()
		reqInfo, ok := apirequest.RequestInfoFrom(ctx)
		//klog.Infof("[metaserver]get a req(%v)(%v)", reqInfo.Path, reqInfo.Verb)
		//klog.Infof("[metaserver]get a req(\nPath:%v; \nVerb:%v; \nHeader:%+v)", reqInfo.Path, reqInfo.Verb, req.Header)
		if !ok {
			err := fmt.Errorf("invalid request")
			responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req)
			return
		}
		if reqInfo.IsResourceRequest {
			switch {
			case reqInfo.Verb == "get":
				ls.Factory.Get().ServeHTTP(w, req)
			case reqInfo.Verb == "list", reqInfo.Verb == "watch":
				ls.Factory.List().ServeHTTP(w, req)
			case reqInfo.Verb == "create":
				ls.Factory.Create(reqInfo).ServeHTTP(w, req)
			case reqInfo.Verb == "delete":
				ls.Factory.Delete().ServeHTTP(w, req)
			case reqInfo.Verb == "update":
				ls.Factory.Update(reqInfo).ServeHTTP(w, req)
			case reqInfo.Verb == "patch":
				ls.Factory.Patch(reqInfo).ServeHTTP(w, req)
			default:
				err := fmt.Errorf("unsupported req verb")
				responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req)
			}
			return
		}
		if passthrough.IsPassThroughPath(reqInfo.Path, reqInfo.Verb) {
			ls.Factory.PassThrough().ServeHTTP(w, req)
			return
		}
		err := fmt.Errorf("request[%s::%s] isn't supported", reqInfo.Path, reqInfo.Verb)
		responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req)
	})
}
最后,在Start函数里执行runMetaManager():
func (m *metaManager) runMetaManager() {
	go func() {
		for {
			select {
			case <-beehiveContext.Done():
				klog.Warning("MetaManager main loop stop")
				return
			default:
			}
			msg, err := beehiveContext.Receive(m.Name())
			if err != nil {
				klog.Errorf("get a message %+v: %v", msg, err)
				continue
			}
			klog.V(2).Infof("get a message %+v", msg)
			m.process(msg)
		}
	}()
}
主要的逻辑就是不断地从beehive框架那里拿一个message然后进行处理。但是处理的过程比较长。重点是这个process函数:
func (m *metaManager) process(message model.Message) {
	operation := message.GetOperation()
	switch operation {
	case model.InsertOperation:
		m.processInsert(message)
	case model.UpdateOperation:
		m.processUpdate(message)
	case model.PatchOperation:
		m.processPatch(message)
	case model.DeleteOperation:
		m.processDelete(message)
	case model.QueryOperation:
		m.processQuery(message)
	case model.ResponseOperation:
		m.processResponse(message)
	case constants.CSIOperationTypeCreateVolume,
		constants.CSIOperationTypeDeleteVolume,
		constants.CSIOperationTypeControllerPublishVolume,
		constants.CSIOperationTypeControllerUnpublishVolume:
		m.processVolume(message)
	default:
		klog.Errorf("metamanager not supported operation: %v", operation)
	}
}
可以看到,process函数会处理Insert、Update、Patch、Delete、Query、Response信息,以及处理和卷相关的信息。
我们以m.processUpdate(message)为例考察process的处理逻辑。主要是看kubeedge的源码,跳过migration的部分:
func (m *metaManager) processUpdate(message model.Message) {
	imitator.DefaultV2Client.Inject(message)
	msgSource := message.GetSource()
	_, resType, _ := parseResource(&message)
	if msgSource == modules.EdgedModuleName && resType == model.ResourceTypeLease {
		// 来自于edged的消息(需要转发到云上)并且type为lease(用于节点心跳)
		// edged里的kubelet会定时地向云端发送心跳信息,但是在边缘设备里需要对消息做更进一步的包裹
		if !connect.IsConnected() { // 云边断连就直接返回
			klog.Warningf("process remote failed, req[%s], err: %v", msgDebugInfo(&message), errNotConnected)
			feedbackError(fmt.Errorf("failed to process remote: %s", errNotConnected), message) // 把错误消息返回给edged
			return
		}
		m.processRemote(message) // 这个函数相当于由metamng代替edged发送消息,并且处理自动回复,当然processRemote的功能不止于此
		return
	}
	// 如果不是“edged的心跳信息”,比如是pod的更新信息,那么咱们自己处理一手
	if err := m.handleMessage(&message); err != nil { // 拿到消息后先经过m.handleMessage函数记录到db中
		feedbackError(err, message)
		return
	}
	// 根据edged的模块名,自行决定转发路径
	switch msgSource {
	case modules.EdgedModuleName:
		// For pod status update message, we need to wait for the response message
		// to ensure that the pod status is correctly reported to the kube-apiserver
		sendToCloud(&message)
		resp := message.NewRespByMessage(&message, OK)
		sendToEdged(resp, message.IsSync())
	case cloudmodules.EdgeControllerModuleName, cloudmodules.DynamicControllerModuleName:
		sendToEdged(&message, message.IsSync())
		resp := message.NewRespByMessage(&message, OK)
		sendToCloud(resp)
	case cloudmodules.DeviceControllerModuleName:
		resp := message.NewRespByMessage(&message, OK)
		sendToCloud(resp)
		message.SetRoute(modules.MetaGroup, modules.DeviceTwinModuleName)
		beehiveContext.Send(modules.DeviceTwinModuleName, message)
	case cloudmodules.PolicyControllerModuleName:
		resp := message.NewRespByMessage(&message, OK)
		sendToCloud(resp)
	default:
		klog.Errorf("unsupport message source, %s", msgSource)
	}
}



















