技术分享 | AlertManager 源码解析

news2025/7/7 1:37:28

作者:石蓓蓓

爱可生研发工程师,主要负责爱可生产品云DMP树产品的研发工作。

本文来源:原创投稿

*爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。


AlertManager 是处理对应用程序的告警的,比如Promethus的服务端。对于输入的告警,会经过分组、抑制、静默、去重等步骤,最终并将告警发送到接受者(邮箱等)。

alertManager 的框架图如下:

今天主要是分享 AlertManager 中 Pipeline 相关的流程代码,pipeline 主要是用来处理分组后的告警,经过抑制、静默、去重,然后发送。

首先在创建 Pipeline 的时候,会创建 GossipSettleStage 、MuteStage(包含抑制和静默)、WaitStage 、DedupStage 、RetryStage 、SetNotifiesStage 。

// New returns a map of receivers to Stages.
func (pb *PipelineBuilder) New(
 receivers map[string][]Integration,
 wait func() time.Duration,
 inhibitor *inhibit.Inhibitor,
 silencer *silence.Silencer,
 notificationLog NotificationLog,
 peer *cluster.Peer,
) RoutingStage {
 rs := make(RoutingStage, len(receivers))

 ms := NewGossipSettleStage(peer)
 is := NewMuteStage(inhibitor)
 ss := NewMuteStage(silencer)

 for name := range receivers {
  st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)
  rs[name] = MultiStage{ms, is, ss, st}
 }
 return rs
}

// createReceiverStage creates a pipeline of stages for a receiver.
func createReceiverStage(
 name string,
 integrations []Integration,
 wait func() time.Duration,
 notificationLog NotificationLog,
 metrics *metrics,
) Stage {
 var fs FanoutStage
 for i := range integrations {
  recv := &nflogpb.Receiver{
   GroupName:   name,
   Integration: integrations[i].Name(),
   Idx:         uint32(integrations[i].Index()),
  }
  var s MultiStage
  s = append(s, NewWaitStage(wait))
  s = append(s, NewDedupStage(&integrations[i], notificationLog, recv))
  s = append(s, NewRetryStage(integrations[i], name, metrics))
  s = append(s, NewSetNotifiesStage(notificationLog, recv))

  fs = append(fs, s)
 }
 return fs
}

从上面的代码可以看到 AlertManager 在某一通道处理时会经过 GossipSettleStage 、MuteStage(包含抑制和静默)、WaitStage 、DedupStage 、RetryStage 、SetNotifiesStage 这7个 stage ,并且顺序执行。

Pipeline 的执行是遍历了所有的 stage ,每次执行 Exec 方法(见代码的第8行),且每次执行后返回的 alert 列表是下一步的参数(第8行的代码对传入的参数alerts赋予新的告警值,再下次执行Exec的时候传入的alerts的值是新的值),最终得到的alert列表是经过每次过滤后的告警列表

func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
 var err error
 for _, s := range ms {
  if len(alerts) == 0 {
   return ctx, nil, nil
  }

  ctx, alerts, err = s.Exec(ctx, l, alerts...)
  if err != nil {
   return ctx, nil, err
  }
 }
 return ctx, alerts, nil
}

GossipSettle

等待集群准备完毕。

func (n *GossipSettleStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
 if n.peer != nil {
  n.peer.WaitReady()
 }
 return ctx, alerts, nil
}

Inhibitor 抑制

抑制首先是会执行MuteStage的Exec,再匹配到后,就不会发送告警。主要是执行第6行的n.muter.Mutes方法来进行匹配:

func (n *MuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
 var filtered []*types.Alert
 for _, a := range alerts {
  // TODO(fabxc): increment total alerts counter.
  // Do not send the alert if muted.
  if !n.muter.Mutes(a.Labels) {
   filtered = append(filtered, a)
  } else {
   n.postMuteHandle(a)
  }
  // TODO(fabxc): increment muted alerts counter if muted.
 }
 return ctx, filtered, nil
}

抑制条件是如何匹配的呢?

我们在设置抑制规则时,会设置抑制源和抑制目标。在启动 Inhibitor 的时候,会先匹配抑制源(也就是Source),如果某条告警的 label 满足抑制源的条件,则会被放入 scache 中(第17行进行匹配,在21行时匹配成功写入 scache 中)。

func (ih *Inhibitor) run(ctx context.Context) {
 it := ih.alerts.Subscribe()
 defer it.Close()

 for {
  select {
  case <-ctx.Done():
   return
  case a := <-it.Next():
   if err := it.Err(); err != nil {
    level.Error(ih.logger).Log("msg", "Error iterating alerts", "err", err)
    continue
   }
   // Update the inhibition rules' cache.
   for _, r := range ih.rules {
    if r.IsExpressionMatch {
     if matched, err := r.SourceExpMatcher.Match(a.Labels); err != nil {
      level.Error(ih.logger).Log("msg", "Error expression match alerts", "err", err)
      continue
     } else if matched {
      if err := r.scache.Set(a); err != nil {
       level.Error(ih.logger).Log("msg", "error on set alert", "err", err)
      }
     }
    } else if r.SourceMatchers.Match(a.Labels) {
     if err := r.scache.Set(a); err != nil {
      level.Error(ih.logger).Log("msg", "error on set alert", "err", err)
     }
    }
   }
  }
 }
}

此时如果有新产生的告警正好满足抑制规则的抑制目标(也就是 target)规则,那么这条规则会被通过方法 SetInhibited 设置成为抑制。在被设置为抑制时,被抑制的告警也会被设置抑制源告警的指纹。

// Mutes returns true if the given label set is muted. It implements the Muter
// interface.
func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
fp := lset.Fingerprint()

 for _, r := range ih.rules {
if r.IsExpressionMatch {
 if targetMatched, err := r.TargetExpMatcher.Match(lset); err != nil {
  level.Error(ih.logger).Log("msg", "Error inhibitor expression match alerts", "err", err)
  continue
   } else {
  if !targetMatched {
  continue
    }
   }
  } else {
 if !r.TargetMatchers.Match(lset) {
   // If target side of rule doesn't match, we don't need to look any further.
   continue
  }
 }

 // If we are here, the target side matches. If the source side matches, too, we
 // need to exclude inhibiting alerts for which the same is true.
 sourceMatched := false
 if r.IsExpressionMatch {
  if matched, err := r.SourceExpMatcher.Match(lset); err != nil {
  level.Error(ih.logger).Log("msg", "Error inhibitor expression match alerts", "err", err)
 continue
   } else {
    sourceMatched = matched
   }
 } else {
  sourceMatched = r.SourceMatchers.Match(lset)
  }
  if inhibitedByFP, eq := r.hasEqual(ih.logger, lset, sourceMatched); eq {
   ih.marker.SetInhibited(fp, inhibitedByFP.String())
   return true
  }
 }
 ih.marker.SetInhibited(fp)

 return false
}

Silencer 静默

静默规则执行MuteStage的Exec,新的告警的labels匹配到静默规则的条件后,新的告警就会被静默,通过SetInhibited进行标记,同时会设置抑制源告警的指纹

// Mutes implements the Muter interface.
func (s *Silencer) Mutes(lset model.LabelSet) bool {
 fp := lset.Fingerprint()
 ids, markerVersion, _ := s.marker.Silenced(fp)

 var (
  err        error
  sils       []*pb.Silence
  newVersion = markerVersion
 )
 if markerVersion == s.silences.Version() {
  // No new silences added, just need to check which of the old
  // silences are still revelant.
  if len(ids) == 0 {
   // Super fast path: No silences ever applied to this
   // alert, none have been added. We are done.
   return false
  }
  // This is still a quite fast path: No silences have been added,
  // we only need to check which of the applicable silences are
  // currently active. Note that newVersion is left at
  // markerVersion because the Query call might already return a
  // newer version, which is not the version our old list of
  // applicable silences is based on.
  sils, _, err = s.silences.Query(
   QIDs(ids...),
   QState(types.SilenceStateActive),
  )
 } else {
  // New silences have been added, do a full query.
  sils, newVersion, err = s.silences.Query(
   QState(types.SilenceStateActive),
   QMatches(lset),
  )
 }
 if err != nil {
  level.Error(s.logger).Log("msg", "Querying silences failed, alerts might not get silenced correctly", "err", err)
 }
 if len(sils) == 0 {
  s.marker.SetSilenced(fp, newVersion)
  return false
 }
 idsChanged := len(sils) != len(ids)
 if !idsChanged {
  // Length is the same, but is the content the same?
  for i, s := range sils {
   if ids[i] != s.Id {
    idsChanged = true
    break
   }
  }
 }
 if idsChanged {
  // Need to recreate ids.
  ids = make([]string, len(sils))
  for i, s := range sils {
   ids[i] = s.Id
  }
  sort.Strings(ids) // For comparability.
 }
 if idsChanged || newVersion != markerVersion {
  // Update marker only if something changed.
  s.marker.SetSilenced(fp, newVersion, ids...)
 }
 return true
}

WaitStage

WaitStage 表示向其他实例发送 Notification Log 的时间间隔,只是单纯的时间等待。

// Exec implements the Stage interface.
func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
 select {
 case <-time.After(ws.wait()):
 case <-ctx.Done():
  return ctx, nil, ctx.Err()
 }
 return ctx, alerts, nil
}

DedupStage

DedupStage 主要是通过计算告警的hash值来起到去重的作用。

func (n *DedupStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
 gkey, ok := GroupKey(ctx)
 if !ok {
  return ctx, nil, fmt.Errorf("group key missing")
 }

 repeatInterval, ok := RepeatInterval(ctx)
 if !ok {
  return ctx, nil, fmt.Errorf("repeat interval missing")
 }

 firingSet := map[uint64]struct{}{}
 resolvedSet := map[uint64]struct{}{}
 firing := []uint64{}
 resolved := []uint64{}

 var hash uint64
 for _, a := range alerts {
  hash = n.hash(a)
  if a.Resolved() {
   resolved = append(resolved, hash)
   resolvedSet[hash] = struct{}{}
  } else {
   firing = append(firing, hash)
   firingSet[hash] = struct{}{}
  }
 }

 ctx = WithFiringAlerts(ctx, firing)
 ctx = WithResolvedAlerts(ctx, resolved)

 entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv))
 if err != nil && err != nflog.ErrNotFound {
  return ctx, nil, err
 }

 var entry *nflogpb.Entry
 switch len(entries) {
 case 0:
 case 1:
  entry = entries[0]
 default:
  return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries))
 }

 if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
  return ctx, alerts, nil
 }
 return ctx, nil, nil
}

RetryStage

主要是根据不同的通道来发送告警,如果失败,会进行重试。

func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
 var sent []*types.Alert

 // If we shouldn't send notifications for resolved alerts, but there are only
 // resolved alerts, report them all as successfully notified (we still want the
 // notification log to log them for the next run of DedupStage).
 if !r.integration.SendResolved() {
  firing, ok := FiringAlerts(ctx)
 if !ok {
   return ctx, nil, fmt.Errorf("firing alerts missing")
  }
  if len(firing) == 0 {
   return ctx, alerts, nil
  }
  for _, a := range alerts {
   if a.Status() != model.AlertResolved {
    sent = append(sent, a)
   }
  }
 } else {
  sent = alerts
 }

 var (
  i    = 0
  b    = backoff.NewExponentialBackOff()
  tick = backoff.NewTicker(b)
  iErr error
 )
 defer tick.Stop()

 for {
  i++
  // Always check the context first to not notify again.
  select {
  case <-ctx.Done():
   if iErr != nil {
    return ctx, nil, iErr
   }

   return ctx, nil, ctx.Err()
  default:
  }

  select {
  case <-tick.C:
   now := time.Now()
   retry, err := r.integration.Notify(ctx, sent...)
   r.metrics.notificationLatencySeconds.WithLabelValues(r.integration.Name()).Observe(time.Since(now).Seconds())
   r.metrics.numNotifications.WithLabelValues(r.integration.Name()).Inc()
   if err != nil {
    r.metrics.numFailedNotifications.WithLabelValues(r.integration.Name()).Inc()
    level.Debug(l).Log("msg", "Notify attempt failed", "attempt", i, "integration", r.integration.Name(), "receiver", r.groupName, "err", err)
    if !retry {
     return ctx, alerts, fmt.Errorf("cancelling notify retry for %q due to unrecoverable error: %s", r.integration.Name(), err)
    }

    // Save this error to be able to return the last seen error by an
    // integration upon context timeout.
    iErr = err
   } else {
    return ctx, alerts, nil
   }
  case <-ctx.Done():
   if iErr != nil {
    return ctx, nil, iErr
   }

   return ctx, nil, ctx.Err()
  }
 }
}

SetNotifiesStage

SetNotifiesStage 主要是用来确保告警已经发送给 了通道,并记录到 alertManager 的日志中。

func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, fmt.Errorf("group key missing")
}

firing, ok := FiringAlerts(ctx)
if !ok {
return ctx, nil, fmt.Errorf("firing alerts missing")
}

resolved, ok := ResolvedAlerts(ctx)
if !ok {
return ctx, nil, fmt.Errorf("resolved alerts missing")
}

return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/9700.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ZIP压缩文件的打开密码和自动加密有什么不同?

ZIP是常用的压缩文件格式&#xff0c;对于重要的内容&#xff0c;我们还可以设置密码&#xff0c;从而达到保护文件内容的目的。 通过WinRAR给ZIP文件设置密码保护&#xff0c;可以设置“打开密码”和“自动加密”&#xff0c;那两者有什么不同呢&#xff1f; 设置打开密码是…

【附源码】Python计算机毕业设计万达影院售票管理系统

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

图相似度计算——SIMGnn源码解读

在运行代码的时候&#xff0c;需要首先指定参数&#xff0c;--histogram&#xff0c;表示使用直方图特征 1.数据集 数据集我们使用的是AIDS数据集&#xff0c;为内置的数据集&#xff0c;整个数据集大约700张图&#xff0c;每个图少于10个点&#xff0c;每个点由29维的向量组成…

多视角碰撞,探索 Serverless 企业落地更多可能性丨阿里云用户组厦门站

2022 年 9 月 24 日&#xff0c;阿里云在厦门举办了第 12 场阿里云用户组&#xff08;AUG&#xff09;活动&#xff0c;活动主题为“提效降本&#xff0c;Serverless 助力企业快速落地云原生”&#xff0c;吸引了众多技术从业者及企业管理者到场交流。 2009 年&#xff0c;伯克…

油气田工业控制系统现状

石油石化行业综述 石油石化行业分为上游、中游和下游。其中&#xff0c;上游从事的业务包括原油、天然气 的勘探、开发&#xff0c;中游主要是油气 的存储与运输&#xff0c;下游则涵盖炼油、化工、天然气加工等流程型业务及加油站零售等产品配送、销售型业务。通常情况下&…

常用 numpy 函数(长期更新)

文章目录np.where()np.zeros()np.zeros_like()np.divide()np.linalg.norm()np.uint8()np.clip()np.where() np.where有两种用法 np.where(condition,x,y) 当where内有三个参数时&#xff0c;第一个参数表示条件&#xff0c;当条件成立时where方法返回x&#xff0c;当条件不成…

超强功能WebSSH安装,解决Web远程SSH终端

项目地址&#xff1a;https://github.com/huashengdun/webssh 一个简单的 Web 应用程序&#xff0c;用作 ssh 客户端以连接到您的 ssh 服务器。它是用 Python 编写的&#xff0c;基于 tornado、paramiko 和 xterm.js。 特征&#xff1a; 支持SSH密码认证&#xff0c;包括空密…

Windows系统配置CUDA编程环境

像配置一个简单的可以进行CUDA编程的Windows系统环境&#xff0c;分别需要CUDA以及Visual stdio。 注意&#xff0c;如果是新配置的电脑&#xff0c;一定要先安装visual stdio再安装CUDA&#xff0c;否则后面在VS中创建.cu文件时容易出现找不到模块的情况。 一、安装Visual st…

动态规划--(回文子串,最长回文子序列)

代码随想录day 57 动态规划模块 回文子串,最长回文子序列 文章目录1.leetcode 647. 回文子串1.1 详细思路及解题步骤1.2 Java版代码示例2.leetcode 516. 最长回文子序列2.1 详细思路及解题步骤2.2 Java版代码示例1.leetcode 647. 回文子串 1.1 详细思路及解题步骤 该题用动态规…

2023最新SSM计算机毕业设计选题大全(附源码+LW)之java杨佑川音乐播放器908v6

大部分步骤是 1.确定选题 选题的确定需要查阅大量的资料&#xff0c;要搞清楚自己大概想要研究的方向是什么。可以选择自己感兴趣的学科或者强势的学科进行研究&#xff0c;同时要多和毕业指导老师多交流&#xff0c;征求老师的意见和建议&#xff0c;最后确立选题。计算机专…

【K8S系列】第九讲:Kubernetes 之探针

目录 一、探针是什么 二、探针类型 2.1 livenessProbe 2.1.1 容器重启策略 2.2 readinessProbe 2.3 startupProbe 2.4 总结 2.5 探针示例 2.6 配置字段介绍 三、探测机制 3.1 HTTP GET探针 3.2 TCP套接字探针 3.3 Exec探针 Tips 一、探针是什么 探针:是由 kub…

OpenCV众筹了一款ROS2机器人rae,开源、功能强、上手简单。来瞅瞅~

编辑&#xff1a;OAK中国 首发&#xff1a;oakchina.cn 喜欢的话&#xff0c;请多多&#x1f44d;⭐️✍ ▌前言 Hello&#xff0c;大家好&#xff0c;这里是OAK中国&#xff0c;我是助手君。 在2020年、2021年OpenCV分别在Kickstarter上众筹了两款OAK产品&#xff0c;均筹集…

设计模式——桥接模式

桥接&#xff08;Bridge&#xff09;模式 一、基本思想 当一个类内部具备两种或多种变化维度时&#xff0c;使用桥接模式可以解耦这些变化的维度&#xff0c;使高层代码架构稳定。 将抽象与实现分离&#xff0c;使它们可以独立变化。 用组合关系代替继承关系来实现&#xff0…

运维面试必问的中间件高频面试题

1. redis是单线程还是多线程&#xff1f; 这个问题已经被问过很多次了&#xff0c;从redis4.0开始引入多线程&#xff0c;redis 6.0 中&#xff0c;多线程主要用于网络 I/O 阶段&#xff0c;也就是接收命令和写回结果阶段&#xff0c;而在执行命令阶段&#xff0c;还是由单线程…

综述类论文_Machine Learning for Encrypted Malicious Traffic Detection(重要)

文章目录Machine Learning for Encrypted Malicious Traffic Detection: Approaches, Datasets and Comparative Study摘要存在的问题论文贡献1. 基于机器学习的加密流量检测模型的总体框架1.1 Research Target&#xff08;研究目标&#xff09;1.2 Traffic Dataset Collection…

Allegro给各种形式的板框导弧操作指导

Allegro给各种形式的板框导弧操作指导 Allegro可以给板框导弧,让加工出来的板框更加圆滑,具体操作步骤如下 板框是line形式的 选择Manufacture-Drafting-Fillet命令 在Options里面Radius输出导弧的半径,比如78.74 框选两个线段的部分 完成后的效果如下图 框选4个角落,…

Python基础学习

一、Python基础 1.Python介绍 2.发展史 3.Python 2 or 3? 4.安装 5.Hello World程序 6.变量 7.用户输入 8.模块初识 9. .pyc是个什么鬼&#xff1f; 10.数据类型初识 11.数据运算 12.表达式if ...else语句 13.表达式for 循环 14.break and continue 15.表达式while 循环…

最好的Python入门教材是哪本?这本书当之无愧

1.门槛低 适合编程零基础上手 《Python编程 从入门到实践&#xff08;第二版&#xff09;》的作者埃里克马瑟斯&#xff08;Eric Matthes&#xff09;是一名高中科学和数学老师&#xff0c;现居住在阿拉斯加&#xff0c;在当地讲授 Python 入门课程。他从 5 岁开始就一直在编写…

带你深度解析虚幻引擎4的照明和阴影知识

照明是渲染的重要组成部分。有静态光和动态光&#xff0c;它们往往很重并且需要大量计算。今天就让赞奇云工作站带领小伙伴们来学习一下虚幻引擎4中的光照和阴影的知识。 静态照明 静态光在编辑器中预先计算并保存在光照贴图中。 〇&#xff1a;良好的性能和质量&#xff08…

[go学习笔记.第十五章.反射,常量] 1.反射的基本介绍以及实践

一.反射的引入以及基本介绍 1.看两个问题 (1).对于结构体的序列化和反序列化&#xff0c;看一段代码 package mainimport("fmt" "encoding/josn" )type Monster struct {Name string json:"monsterName"Age int json:"monsterAge&quo…