k8s client-go源码解析之informer 一

news2025/8/6 16:20:46

Informer(一)

注意:本文内容为学习笔记,内容为个人见解,不保证准确性,但欢迎大家讨论何指教。

本篇为先导篇, 介绍informer的入口工厂函数。
在这里插入图片描述

informer目录结构 (仅展示部分目录,省略的目录相似)

client-go|master⚡ ⇒ tree informers -L 2
informers
├── apps
│   ├── interface.go
│   ├── v1
│   ├── v1beta1
│   └── v1beta2
├── core
│   ├── interface.go
│   └── v1
├── doc.go
├── factory.go
├── flowcontrol
├── generic.go
├── node
│   ├── interface.go
│   ├── v1
│   ├── v1alpha1
│   └── v1beta1
└── storage
    ├── interface.go
    ├── v1
    ├── v1alpha1
    └── v1beta1

65 directories, 23 files

可以看到,factory.go为工厂函数的文件,作为调用的入口。每个资源类型为单独的文件夹, 按照版本号划分子文件夹。

factory

type sharedInformerFactory struct {
	client           kubernetes.Interface
	namespace        string
	tweakListOptions internalinterfaces.TweakListOptionsFunc
	lock             sync.Mutex
	defaultResync    time.Duration
	customResync     map[reflect.Type]time.Duration
   
   // 存放对应资源类型的informer
	informers map[reflect.Type]cache.SharedIndexInformer
   // informer启动状态
	startedInformers map[reflect.Type]bool
	// 用于等待多个资源类型的informer启动
	wg sync.WaitGroup
	
	shuttingDown bool
}

对应资源的监听实现,通过InformerFor方法传入并记录。
sharedInformer 将多种资源放在map中保存。
重复监听相同资源的动作是安全的。

func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()

	informerType := reflect.TypeOf(obj)
	informer, exists := f.informers[informerType]
	// 如果资源已经监听过了,则什么都不做
	if exists {
		return informer
	}

	resyncPeriod, exists := f.customResync[informerType]
	if !exists {
		resyncPeriod = f.defaultResync
	}

	informer = newFunc(f.client, resyncPeriod)
	f.informers[informerType] = informer

	return informer
}

多次调用Start()是安全的

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()

	if f.shuttingDown {
		return
	}

	for informerType, informer := range f.informers {
	   // 只会run新的资源类型
		if !f.startedInformers[informerType] {
			f.wg.Add(1)
			informer := informer
			go func() {
				defer f.wg.Done()
				informer.Run(stopCh)
			}()
			f.startedInformers[informerType] = true
		}
	}
}

调用对应资源方法,对应的实现在上面的资源目录

func (f *sharedInformerFactory) Internal() apiserverinternal.Interface {
	return apiserverinternal.New(f, f.namespace, f.tweakListOptions)
}

func (f *sharedInformerFactory) Apps() apps.Interface {
	return apps.New(f, f.namespace, f.tweakListOptions)
}

resource

以apps目录举例

informers
├── apps
│   ├── interface.go
│   ├── v1
│   ├── v1beta1
│   └── v1beta2

apps在当前的存在3个版本,故对应三个文件夹。
interface.go为当前资源入口。

type group struct {
   // 传入的工厂对象
	factory          internalinterfaces.SharedInformerFactory
	namespace        string
	tweakListOptions internalinterfaces.TweakListOptionsFunc
}

// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
   // 当调用factory.Apps()时,工厂对象是传入的,不会创建新的工厂,也就不会创建新的liste/watch连接。
   // SharedInformer中的shared就是指这个。
	return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}

// V1 returns a new v1.Interface.
func (g *group) V1() v1.Interface {
	return v1.New(g.factory, g.namespace, g.tweakListOptions)
}

// V1beta1 returns a new v1beta1.Interface.
func (g *group) V1beta1() v1beta1.Interface {
	return v1beta1.New(g.factory, g.namespace, g.tweakListOptions)
}

// V1beta2 returns a new v1beta2.Interface.
func (g *group) V1beta2() v1beta2.Interface {
	return v1beta2.New(g.factory, g.namespace, g.tweakListOptions)
}

当我们调用factory.Apps().V1().Deployments(),实现文件为:

informers
├── apps
│   ├── interface.go
│   ├── v1
        ├── deployment.go

调用factory.Apps().V1().Deployments().Informer(), 会触发工厂函数的InformerFor()方法监听资源。
重复:InformerFor()方法,重复监听相同资源的动作是安全的。

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

任意资源有自己的监听函数的实现, Deployments的为:

func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
	   // 定义 list/watch规则。
	   // 实际上所有资源类型的informer, 最终都会走到cache.SharedIndexInforme。
	   // 根据不同的ListWatch对象决定监听不同的资源。 这是informer实现的基础。
	   // 这里 ListWatch 监听的是 AppsV1().Deployments
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.AppsV1().Deployments(namespace).List(context.TODO(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)
			},
		},
		&appsv1.Deployment{},
		resyncPeriod,
		indexers,
	)
}
// 上面Informer()函数中传入的方法
func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
	return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

总结

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8VqWCMFv-1678153878970)(/Users/chenyang/Projects/github.com/MyNotes/images/informer-1.png)]
informers/factory.go为工厂方法实现文件。
“工厂”产出不同资源类型的informer。
资源通过InformerFor记录到“工厂”中,通过Start方法启动监听。这两个方法重复调用均为安全操作。
最终的数据处理由cache.SharedIndexInforme实现。

Informer(一)

注意:本文内容为学习笔记,内容为个人见解,不保证准确性,但欢迎大家讨论何指教。

本篇为先导篇, 介绍informer的入口工厂函数。

informer目录结构 (仅展示部分目录,省略的目录相似)

client-go|master⚡ ⇒ tree informers -L 2
informers
├── apps
│   ├── interface.go
│   ├── v1
│   ├── v1beta1
│   └── v1beta2
├── core
│   ├── interface.go
│   └── v1
├── doc.go
├── factory.go
├── flowcontrol
├── generic.go
├── node
│   ├── interface.go
│   ├── v1
│   ├── v1alpha1
│   └── v1beta1
└── storage
    ├── interface.go
    ├── v1
    ├── v1alpha1
    └── v1beta1

65 directories, 23 files

可以看到,factory.go为工厂函数的文件,作为调用的入口。每个资源类型为单独的文件夹, 按照版本号划分子文件夹。

factory

type sharedInformerFactory struct {
	client           kubernetes.Interface
	namespace        string
	tweakListOptions internalinterfaces.TweakListOptionsFunc
	lock             sync.Mutex
	defaultResync    time.Duration
	customResync     map[reflect.Type]time.Duration
   
   // 存放对应资源类型的informer
	informers map[reflect.Type]cache.SharedIndexInformer
   // informer启动状态
	startedInformers map[reflect.Type]bool
	// 用于等待多个资源类型的informer启动
	wg sync.WaitGroup
	
	shuttingDown bool
}

对应资源的监听实现,通过InformerFor方法传入并记录。
sharedInformer 将多种资源放在map中保存。
重复监听相同资源的动作是安全的。

func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()

	informerType := reflect.TypeOf(obj)
	informer, exists := f.informers[informerType]
	// 如果资源已经监听过了,则什么都不做
	if exists {
		return informer
	}

	resyncPeriod, exists := f.customResync[informerType]
	if !exists {
		resyncPeriod = f.defaultResync
	}

	informer = newFunc(f.client, resyncPeriod)
	f.informers[informerType] = informer

	return informer
}

多次调用Start()是安全的

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()

	if f.shuttingDown {
		return
	}

	for informerType, informer := range f.informers {
	   // 只会run新的资源类型
		if !f.startedInformers[informerType] {
			f.wg.Add(1)
			informer := informer
			go func() {
				defer f.wg.Done()
				informer.Run(stopCh)
			}()
			f.startedInformers[informerType] = true
		}
	}
}

调用对应资源方法,对应的实现在上面的资源目录

func (f *sharedInformerFactory) Internal() apiserverinternal.Interface {
	return apiserverinternal.New(f, f.namespace, f.tweakListOptions)
}

func (f *sharedInformerFactory) Apps() apps.Interface {
	return apps.New(f, f.namespace, f.tweakListOptions)
}

resource

以apps目录举例

informers
├── apps
│   ├── interface.go
│   ├── v1
│   ├── v1beta1
│   └── v1beta2

apps在当前的存在3个版本,故对应三个文件夹。
interface.go为当前资源入口。

type group struct {
   // 传入的工厂对象
	factory          internalinterfaces.SharedInformerFactory
	namespace        string
	tweakListOptions internalinterfaces.TweakListOptionsFunc
}

// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
   // 当调用factory.Apps()时,工厂对象是传入的,不会创建新的工厂,也就不会创建新的liste/watch连接。
   // SharedInformer中的shared就是指这个。
	return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}

// V1 returns a new v1.Interface.
func (g *group) V1() v1.Interface {
	return v1.New(g.factory, g.namespace, g.tweakListOptions)
}

// V1beta1 returns a new v1beta1.Interface.
func (g *group) V1beta1() v1beta1.Interface {
	return v1beta1.New(g.factory, g.namespace, g.tweakListOptions)
}

// V1beta2 returns a new v1beta2.Interface.
func (g *group) V1beta2() v1beta2.Interface {
	return v1beta2.New(g.factory, g.namespace, g.tweakListOptions)
}

当我们调用factory.Apps().V1().Deployments(),实现文件为:

informers
├── apps
│   ├── interface.go
│   ├── v1
        ├── deployment.go

调用factory.Apps().V1().Deployments().Informer(), 会触发工厂函数的InformerFor()方法监听资源。
重复:InformerFor()方法,重复监听相同资源的动作是安全的。

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

任意资源有自己的监听函数的实现, Deployments的为:

func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
	   // 定义 list/watch规则。
	   // 实际上所有资源类型的informer, 最终都会走到cache.SharedIndexInforme。
	   // 根据不同的ListWatch对象决定监听不同的资源。 这是informer实现的基础。
	   // 这里 ListWatch 监听的是 AppsV1().Deployments
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.AppsV1().Deployments(namespace).List(context.TODO(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)
			},
		},
		&appsv1.Deployment{},
		resyncPeriod,
		indexers,
	)
}
// 上面Informer()函数中传入的方法
func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
	return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

总结

informers/factory.go为工厂方法实现文件。
“工厂”产出不同资源类型的informer。
资源通过InformerFor记录到“工厂”中,通过Start方法启动监听。这两个方法重复调用均为安全操作。
最终的数据处理由cache.SharedIndexInforme实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/394099.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

项目实战典型案例27——对生产环境以及生产数据的敬畏之心

对生产环境以及生产数据的敬畏之心一&#xff1a;背景介绍总结升华一&#xff1a;背景介绍 本篇博客是对项目开发中出现的对生产环境以及生产数据的敬畏之心行的总结并进行的改进。目的是将经历转变为自己的经验。通过博客的方式分享给大家&#xff0c;大家一起共同进步和提高…

SpringCloud之 Gateway路由网关

文章目录Gateway 路由网关一、部署网关&#x1f34d;①添加依赖&#x1f34d;②设置配置文件&#x1f34d;③创建启动类&#x1f34d;④路由功能配置&#x1f34d;⑤路由访问服务二、路由过滤器2.1 单个过滤器: 配置文件2.2 全局过滤器: 自定义类提示&#xff1a;以下是本篇文章…

【数据结构初阶】详解链表OJ题

目录一.删除链表中等于给定值的节点二.合并有序链表并返回三.链表的回文结构1.反转单链表2.返回非空链表的中间节点四.输出链表倒数第K个节点五.基于给定值x分割单链表六.返回两个链表的第一个中间节点一.删除链表中等于给定值的节点 我们先来看第一题(题目链接): 因为我们需…

王道《操作系统》学习(二)—— 进程管理(一)

2.1 进程的概念、组成、特征、组织 2.1.1 进程的概念 这里像QQ这个程序执行了多次&#xff0c;虽然名字一样&#xff0c;但是它们的PID不同。 2.1.2 进程的组成 &#xff08;1&#xff09;PCB &#xff08;2&#xff09;程序段 和 数据段 小例子&#xff1a;程序是如何执行的&…

Nacos实现配置中心

文章目录配置中心提供的基础功能Nacos实现配置中心1、在配置文件增加配置&#xff08;application.yml&#xff09;2、使用Value来引用配置使用配置中心&#xff0c;怎样的配置方式呢一、安装nacos二、启动服务发现1、引入依赖2、配置文件3、开启服务注册发现功能4、启动服务三…

quarkus 生产环境与k8s集成总结

quarkus 生产环境与k8s集成总结 大纲 基础准备quarkus2.13.7脚手架工程配置GraalVM-java11 安装配置配置maven3.8.7linux环境下云原生二进制文件打包环境搭建编译运行quarkus二进制文件quarkus二进制文件制作为docker镜像并运行使用k8s部署quarkus二进制文件 基础准备 生产…

手把手交叉编译mysql

1.下载mysql&#xff08;注意下载boost版本&#xff0c;这样会少一步编译&#xff09; 下载mysql的时候一定要看好交叉编译工具链的版本。因为mysql 8.0需要的工具链版本较高&#xff0c;所以有可能不支持 查看链接如下&#xff1a; MySQL :: MySQL 8.0 Reference Manual :: …

InstructGPT方法简读

InstructGPT方法简读 引言 仅仅通过增大模型规模和数据规模来训练更大的模型并不能使得大模型更好地理解用户意图。由于数据的噪声极大&#xff0c;并且现在的大多数大型语言模型均为基于深度学习的“黑箱模型”&#xff0c;几乎不具有可解释性和可控性&#xff0c;因此&…

「SAP ABAP」OPEN SQL(四)【FROM语句】

&#x1f482;作者简介&#xff1a; THUNDER王&#xff0c;一名热爱财税和SAP ABAP编程以及热爱分享的博主。目前于江西师范大学会计学专业大二本科在读&#xff0c;同时任汉硕云&#xff08;广东&#xff09;科技有限公司ABAP开发顾问。在学习工作中&#xff0c;我通常使用偏后…

CIMCAI port ai shipping ai artificial intelligence smart port

上海人工智能独角兽中集集团高科技中集飞瞳&#xff0c;是全球应用落地最广&#xff0c;规模最大&#xff0c;最先进的的港航人工智能高科技企业&#xff0c;工业级成熟港航人工智能产品全球规模化落地应用&#xff0c;全球前三大船公司及港口码头应用落地。上海人工智能独角兽…

3.4 按键控制LED灯光敏传感器控制蜂鸣器

按键控制LED灯1.1 按键连接示意图1.2 代码设计1.21 设计思路我们要实现按键控制led&#xff0c;我们需要完成LED和按键驱动代码&#xff0c;但如果把这两部分代码都混在主函数里面&#xff0c;那么代码显得过于杂乱&#xff0c;不容易管理和移植&#xff0c;所以对于这种驱动代…

记一次反射型XSS

记一次反射型XSS1.反射型XSS1.1.前言1.2.测试过程1.3.实战演示1.3.1.输入框1.3.2.插入代码1.3.3.跳转链接2.总结1.反射型XSS 1.1.前言 关于这个反射型XSS&#xff0c;利用的方式除了钓鱼&#xff0c;可能更多的就是自娱自乐&#xff0c;那都说是自娱自乐了&#xff0c;并且对系…

Maxscale读写分离实施文档

Maxscale介绍 MaxScale是maridb开发的一个mysql数据中间件&#xff0c;其配置简单&#xff0c;能够实现读写分离&#xff0c;并且可以根据主从状态实现写库的自动切换。 使用Maxscale无需对业务代码进行修改&#xff0c;其自带的读写分离模块&#xff0c;能够解析SQL语句&…

DD-1/40 10-40mA型【接地继电器】

系列型号&#xff1a; DD-1/40接地继电器 DD-1/50接地继电器 DD-1/60接地继电器 一、 用途及工作原理 DD-1型接地继电器为瞬时动作的过电流继电器&#xff0c;用作小电流接地电力系统高电压三相交流发电机和电动机的接地零序过电流保护。继电器线圈接零序电流互感器(电缆式、母…

Vue动态粒子特效插件(背景线条吸附动画)

目录 效果图&#xff1a; 一、安装&#xff1a; 二、引入 main.js 文件&#xff1a; 三、使用&#xff1a; 四、属性说明&#xff1a; 效果图&#xff1a; 一、安装&#xff1a; npm install vue-particles --save 二、引入 main.js 文件&#xff1a; import VueParticles…

【C++】30h速成C++从入门到精通(多态)

多态的概念多态&#xff1a;通俗来说就是多种心态&#xff0c;具体点就是去完成某个行为&#xff0c;当不同的对象去完成时会产生出不同的状态。多态的定义及实现多态的构成条件多态是在不同继承关系的类对象&#xff0c;去调用同意函数&#xff0c;产生了不同的行为&#xff0…

C/C++每日一练(20230307)

目录 1. 国名排序 ★★ 2. 重复的DNA序列 ★★★ 3. 买卖股票的最佳时机 III ★★★ &#x1f31f; 每日一练刷题专栏 C/C 每日一练 ​专栏 Python 每日一练 ​专栏 1. 国名排序 小李在准备明天的广交会&#xff0c;明天有来自世界各国的客房跟他们谈生意&#xff0c…

结合基于规则和机器学习的方法构建强大的混合系统

经过这些年的发展&#xff0c;我们都确信ML即使不能表现得更好&#xff0c;至少也可以在几乎所有地方与前ML时代的解决方案相匹配。比如说一些规则约束&#xff0c;我们都会想到能否把它们替换为基于树的ml模型。但是世界并不总是黑白分明的&#xff0c;虽然机器学习在解决问题…

spring boot actuator 动态修改日志级别

1 日志级别 Spring Boot Actuator包括在运行时查看和配置应用程序日志级别的功能。您可以查看整个列表&#xff0c;也可以查看单个记录器的配置&#xff0c;该配置由显式配置的日志级别和日志框架给出的有效日志级别组成。这些级别可以是: TRACEDEBUGINFOWARNERRORFATALOFFnu…

ruoyi-pro 代码生成api,swagger扫描不到

背景 重新创建一个新的maven工程&#xff0c;按照芋道源码ruoyi-pro官方文档生成代码后&#xff0c;新的maven工程目录下的接口不能被swagger扫描到&#xff0c;swagger-ui不显示新增的maven工程模块下的api。 解决方法 新增maven工程类中&#xff0c;新增swagger扫描配置类…