k8s client-go源码解析之informer三

news2025/7/18 12:42:01

Informer(三)

注意:本文内容为学习笔记,内容为个人见解,不保证准确性,但欢迎大家讨论何指教。
觉得文章不错请关注跟博客及github

本篇介绍DeltaFIFOindexer
informer大致工作流程如下:
在这里插入图片描述

DeltaFIFO

DeltaFIFO是一个先进先出的队列,负责暂存监听的数据,后被process取出消费,用于中转数据。

type DeltaFIFO struct {
	// 用于控制对items和queue的访问
	lock sync.RWMutex
	cond sync.Cond
	// 存放数据的map, map可以保证数据的唯一性
    // key由keyFunc生成, value为Deltas
	items map[string]Deltas
	// 存放items中的key,用于保证数据的顺序性
	queue []string
    // 用于生成key
	keyFunc KeyFunc
    // 省略部分代码...
}
type Delta struct {
    // 数据的类型,包括:Added, Updated等
	Type   DeltaType
    // 数据
	Object interface{}
}
const (
	Added   DeltaType = "Added"
	Updated DeltaType = "Updated"
	Deleted DeltaType = "Deleted"
	// Replaced is emitted when we encountered watch errors and had to do a
	// relist. We don't know if the replaced object has changed.
	//
	// NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
	// as well. Hence, Replaced is only emitted when the option
	// EmitDeltaTypeReplaced is true.
	Replaced DeltaType = "Replaced"
	// Sync is for synthetic events during a periodic resync.
	Sync DeltaType = "Sync"
)

DeltaFIFO中实现了Queue接口,包括AddUpdate等方法。
这些方法, 最终都是交给f.queueActionLocked()处理。

func (f *DeltaFIFO) Add(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	return f.queueActionLocked(Added, obj)
}

queueActionLocked方法会将数据存储到items中,同时将key存储到queue中。操作完成后通知阻塞的协程。

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	oldDeltas := f.items[id]
	newDeltas := append(oldDeltas, Delta{actionType, obj})
    // 判断是否为重复的事件。
    // 事件类型如果均为Delete的话,会保留一个信息最多的
	newDeltas = dedupDeltas(newDeltas)

	if len(newDeltas) > 0 {
		if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas
        // 通知阻塞的协程,实际是Pop()方法
		f.cond.Broadcast()
	} else {
		// 省略部分代码...
	}
	return nil
}

从这里可以看出DeltaFIFO是一个先进先出的队列。
isInInitialList参数在之前的版本是没有的,但是在1.18版本中加入了。这个参数的作用是用于标识当前数据是否是第一次同步的数据。当你的事件方法不需要区分第一次同步的数据和后续的数据时,可以忽略这个参数。

func (f *DeltaFIFO) hasSynced_locked() bool {
    // populated为true,表示已经同步过一次数据
    // initialPopulationCount代表第一次同步的数据量
    // 在informer的场景中,启动数据变化监听时会先执行一次list获取全量数据。
    // initialPopulationCount代表第一list获取的数据量。
	return f.populated && f.initialPopulationCount == 0
}
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			if f.closed {
				return nil, ErrFIFOClosed
			}
            // 没有数据则阻塞,等待通知
			f.cond.Wait()
		}
		isInInitialList := !f.hasSynced_locked()
        // 队首弹出数据
		id := f.queue[0]
		f.queue = f.queue[1:]
		depth := len(f.queue)
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
        // 获取数据
		item, ok := f.items[id]
		if !ok {
			// This should never happen
			klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
            // 如果获取不到,会获取下一顺位的数据。
            // 这也是为什么要套在for循环的原因。虽然这种情况永远不会发生。
			continue
		}
        // 删除数据
		delete(f.items, id)
		
		if depth > 10 {
			// 一些性能日志的打印...
		}
        // 调用process处理数据
        // informer场景中,process就是上一篇中的 processDeltas
		err := process(item, isInInitialList)
		if e, ok := err.(ErrRequeue); ok {
            // 如果返回ErrRequeue,则重新加入队列
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		return item, err
	}
}

Indexer(Store)

Indexer是一个存储数据的地方,可以通过key获取数据,也可以通过index获取数据。
它实现了Store接口,包括AddUpdateDelete等方法。
DeltaFIFO中的数据最终会被存储到Indexer中。

cache是一个实现了Indexer接口的结构体。它主要是代理了cacheStorage的方法。

type cache struct {
	// 线程安全的数据存储
	cacheStorage ThreadSafeStore
	// 生成key的方法,一般来说和DeltaFIFO中的keyFunc一致
    // 生成的key用于存储和索引数据
	keyFunc KeyFunc
}

cache的方法这里不赘述,主要看ThreadSafeStore的实现。
ThreadSafeStore是一个接口,粗略的来说,它主要定义了两部分:数据和索引

type ThreadSafeStore interface {
    // 数据操作
	Add(key string, obj interface{})
	Update(key string, obj interface{})
	Delete(key string)
	Get(key string) (item interface{}, exists bool)
	List() []interface{}
	ListKeys() []string
	Replace(map[string]interface{}, string)
    // 索引操作
	Index(indexName string, obj interface{}) ([]interface{}, error)
	IndexKeys(indexName, indexedValue string) ([]string, error)
	ListIndexFuncValues(name string) []string
	ByIndex(indexName, indexedValue string) ([]interface{}, error)
	GetIndexers() Indexers
	AddIndexers(newIndexers Indexers) error
    // 弃用
	Resync() error

storeIndex实现了索引操作相关的功能。
这里的逻辑比较简单,不在赘述代码,举一个使用的例子说明一下:

比如我需要根据Podimage来获取Pod的列表,那么我需要实现一个IndexFunc,这个IndexFunc的作用是根据Podimage生成一个key。生成的image key对应的valuePodnamename实际是由cache.keyfunc生成的),存放在storeIndex.indices中。
(代码请看example)

type storeIndex struct {
	// 索引的名称和keyFunc的映射
    // 添加一个索引其实就是添加一个keyFunc
	indexers Indexers
	// keyFunc生成的key和数据标识的映射。这里是1对多的关系
    // 例如:index名称为image,keyFunc生成的key为"nginx:v1.0",数据标识为"kube-system/nginx",则indices["image"]["nginx:v1.0"] = ["kube-system/nginx"]
	indices Indices
}

storeIndex的Add、Update、Delete操作对应的都是updateIndices这个方法。

  • 对于创建,必须仅提供newObj
  • 对于更新,必须同时提供oldObj和newObj
  • 对于删除,必须仅提供oldObj
func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) {
	var oldIndexValues, indexValues []string
	var err error
	// 遍历注册的索引
	for name, indexFunc := range i.indexers {
		if oldObj != nil {
			oldIndexValues, err = indexFunc(oldObj)
		} else {
			oldIndexValues = oldIndexValues[:0]
		}
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}

		if newObj != nil {
			indexValues, err = indexFunc(newObj)
		} else {
			indexValues = indexValues[:0]
		}
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}

		index := i.indices[name]
		if index == nil {
			index = Index{}
			i.indices[name] = index
		}

		if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
			continue
		}
		// 找到对应的索引进行操作
		for _, value := range oldIndexValues {
			i.deleteKeyFromIndex(key, value, index)
		}
		for _, value := range indexValues {
			i.addKeyToIndex(key, value, index)
		}
	}
}

threadSafeMap实现了数据操作相关的功能。
本质上一个加锁的mapmap的key为cache.keyfunc生成,value为数据本身。

type threadSafeMap struct {
	lock  sync.RWMutex
	// 存储数据
	items map[string]interface{}

	// 存储索引
	index *storeIndex
}

update为例,update方法会调用updateIndices方法,更新索引。

func (c *threadSafeMap) Update(key string, obj interface{}) {
	c.lock.Lock()
	defer c.lock.Unlock()
	oldObject := c.items[key]
	c.items[key] = obj
	// 更新索引
	c.index.updateIndices(oldObject, obj, key)
}

使用对应的索引进行查找时,需要指定索引名称和搜索值。

// 默认的索引不需要指定索引名称
// key是由cache.keyfunc生成的
// 如Get("kube-system/kube-proxy")
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
	c.lock.RLock()
	defer c.lock.RUnlock()
	item, exists = c.items[key]
	return item, exists
}
// 通过索引查找指定索引名称
// 如ByIndex("image", "nginx:v1.0")
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
	c.lock.RLock()
	defer c.lock.RUnlock()

	set, err := c.index.getKeysByIndex(indexName, indexedValue)
	if err != nil {
		return nil, err
	}
	list := make([]interface{}, 0, set.Len())
	for key := range set {
		list = append(list, c.items[key])
	}

	return list, nil
}

ok,到这里,我们就知道了informer是如何实现索引的了。
我们再来复盘一下整体的流程:
在这里插入图片描述

  1. informer启动时,会调用informerRun方法,Run方法会启动informercontrollercontroller会启动reflector
  2. reflector会启动一个ListAndWatchgoroutine,将数据写入到DeltaFIFO中。
  3. controller还有一个processLoopgoroutine,从DeltaFIFO中读取数据,将数据写入到Store(indexer)中,并触发informerEventHandler
  4. DeltaFIFO是一个先进先出队列,使用list实现数据有序,使用map实现数据存储和去重。
  5. Store(indexer)本质是一个加了锁的map。自定义索引时通过IndexFunc生成索引键。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/404766.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

顺序表来喏!!!

前言:还记得前面的文章:《通讯录的实现》吗?通讯录的完成就借助了顺序表这种数据结构!!!那么今天我们就来介绍我们的顺序表介绍顺序表前,我们来了解一下线性表的概念线性表:线性表&a…

mysql笔试题18道

部门表、员工表、薪水等级表 1.取得每个部门最高薪水人员名称 第一步:取得每个部门最高薪水作为临时表t select deptno,max(sal) as maxSal from emp group by deptno 第二步:临时表t与emp表连接条件 e.deptnot.deptno and e.salt.maxSal select …

Spring - Spring IoC 容器相关面试题总结

文章目录01. Spring IoC 和依赖注入是什么?02. Spring IoC 的优点和缺点分别是什么?03. Spring IoC 有什么作用和功能?04. Spring 依赖注入的方式?05. Spring 构造器注入和 setter 方法注入的区别?06. Spring 依赖注入…

嵌入式系统实验——【玄武F103开发板】按key1熄灭两个LED灯、松开恢复点亮

这里写目录标题一、任务目标(一)分析二、设计思路(一)开启KEY1对应的GPIOx时钟1.找到KEY1(PE3)所在的GPIOx端口2.开启GPIOE端口时钟3.清空PE3的端口位4.设置PE3的端口位为输出模式的上拉模式5.一个易错点&a…

二分——力扣篇

二分——力扣篇搜索旋转排序数组搜索旋转排序数组II寻找旋转排序数组中的最小值寻找旋转排序数组中的最小值II搜索旋转排序数组 定理一:只有在顺序区间内才可以通过区间两端的数值判断target是否在其中。 定理二:判断顺序区间还是乱序区间,只…

案例学习20之内存长期占用导致系统缓慢

前言: 发现问题,解决问题,是贯穿整个项目开发过程的事情,能够处理更多的问题,随着经验的丰富,提前预知更多的问题,让问题不出现是最好的解决问题方式。 问题背景: 项目运行过程中出现…

基于redis实现点赞数,点击数,排行榜

使用场景 对于某些视频或者文章有点赞数和点击数, 通过这些数据就可以进行排行榜的功能了 使用异步队列 redis的集合 A.php //点击数 $redis->zIncrBy(click.:.date(Ymd),1,$videoId); //点赞数 $redis->zIncrBy(love.:.$videoId,1,$$user); //获取当前video的播放数…

PMP项目管理项目范围管理

目录1 项目范围管理概述2 规划范围管理3 收集需求4 定义范围5 创建 WBS6 确认范围7 控制范围1 项目范围管理概述 项目范围管理包括确保项目做且只做所需的全部工作,以成功完成项目的各 个过程。管理项目范围主要在于定义和控制哪些工作应在项目内,哪些工…

界面原型设计

引用锤子科技视觉设计总监——罗子雄在重庆TEDx活动上说的一小段话: 每当我们看到一些美妙的设计的时候,很多人心里面会有一种冲动,这种冲动会让你们想去创造一些新的东西,创造一些美妙的事物。 我们常说用户体验用户体验,用户使用你的软件,第一个会接触的是什么?没错,…

读WiscKey: Separating Keys from Values in SSD-conscious Storage

在我看来本论文的主要贡献在于相对减轻了传统LSM compact所带来的写放大问题。其核心设计在于使key、value分离以及gc只保持有效数据 key、value分离 作者对于key、value分离策略的观察主要来自于排序是以往LSM性能消耗最大的地方,但是真正影响排序的与占用大储存…

推荐五款宝藏软件,身为宝藏男孩和宝藏女孩的你,不试一下吗?

今天带来五款宝藏软件,身为宝藏男孩和宝藏女孩的你们,不试一下吗? 1.EPUB阅读器——Starrea Starrea 是一款Windows平台的EPUB电子书阅读器,它虽然只支持一个平台,但是提供了很多额外的功能,其中包括 文…

Java之线程总结一

Java之线程总结一 线程实现方式 官方文档说的是实现线程的方式有两种;本质上只有一种,就是构造Thread类,而实现线程执行单元的方式有两种: 继承Thread类,重写run方法;实现Runnable接口的run方法&#xf…

RHEL8.5解决libgdiplus绘图问题

最近有客户服务器使用RHEL8.5了。由于之前测试和编译的dotnetcore在Linux下绘图包libgdiplus都是在centos7.6编译的。把CentOS7.6编译的二进制程序之前试CentOS7.9使用没问题,然后RHEL8.5无法正常绘图。由于之前搞统信那些系统发现了包得在对应系统源码编译才行。所…

Java web基于SSM的停车场管理系统的设计与实现

1,项目介绍 Java web基于SSM的停车场管理系统拥有三种角色,分别为用户,管理员,超级管理员。 停车位管理(管理员,超级管理员)停车卡管理(用户,管理员,超级管…

Vue3视频播放器组件Vue3-video-play入门教程

Vue3-video-play适用于 Vue3 的 hls.js 播放器组件 | 并且支持MP4/WebM/Ogg格式。 1、支持快捷键操作 2、支持倍速播放设置 3、支持镜像画面设置 4、支持关灯模式设置 5、支持画中画模式播放 6、支持全屏/网页全屏播放 7、支持从固定时间开始播放 8、支持移动端,移动…

pyqt5环境搭建

1、打开Terminal ,用命令pip install pyqt5-tools 或者 pip install pyqt5-tools -i https://pypi.tuna.tsinghua.edu.cn/simple安装PyQt5安装成功后就可看到PyQt5版本2、同上方法,继续安装pyqt5-tools扩展工具,里面包括了QtDesigner等很好用的工具。3、…

二进制哈希码快速搜索:Multi-Index Hashing

前言 如果你对这篇文章感兴趣,可以点击「【访客必读 - 指引页】一文囊括主页内所有高质量博客」,查看完整博客分类与对应链接。 哈希方法通常包含两个部分: 【编码】将元素通过「data-dependent」或「data-independent」的方式映射为二进制…

C变量区域

C语言中有五大内存分区,分别是栈区、堆区、全局区/静态区、常量区和代码区。1.栈区:由编译器自动分配释放,存放函数的参数值、局部变量的值等。当调用函数的时候函数中定义的变量会被加到栈中,当函数离开的时候,被添加…

HTML看这一篇就够啦,HTML基础大全,可用于快速回顾知识,面试首选

HTML 1 基础 1.1 DOCTYPE <!DOCTYPE> 文档类型声明&#xff0c;作用就是告诉浏览器使用哪种HTML版本来显示网页。 <!DOCTYPE html> 这句代码的意思是: 当前页面采取的是 HTML5 版本来显示网页. 注意: 声明位于文档中的最前面的位置&#xff0c;处于 标签之前。 …

互联网新理念,对于WEB 3.0 你怎么看?

WEB 3.0 这个名词走进大众视野已经有一段时间了&#xff0c;也曾在各个圈子里火热一时&#xff0c;至今各大互联网企业任旧在 WEB 3.0 上不断探索。但关于 WEB 3.0 是什么这个问题&#xff0c;其实大部分人都没有一个比较明确的认知&#xff0c;包括区块链和元宇宙等相关行业的…