alertmanager源码分析三:流水线

前篇说到告警写入后被分发到dispatcheraggrGroupsPerRoute中的aggrGroup里,然后每个aggrGroup会启动一个自己的goroutine按照group_waitgroup_interval两种频率来定时调用dispatcher.stage.Exec方法来处理告警,实际上dispatcher.stage中存储的就是由多种处理函数编排成的一个告警处理流水线,也就是架构图中的下面这部分:

截屏2021-11-19 17.07.37.png

pipeline的构建是在main函数中创建dispatcher的时候,很容易找到,这里不赘述了,我们看看 pipeline 是怎样定义自己的 Exec 方法的,

// pipeline 就是 RoutingStage 类型,
// 它是基于 ctx 中的 receiver 进入这个 receiver 的 Stage
type RoutingStage map[string]Stage

// 看看流水线构建函数是如何为每个 receiver 配置 Stage 的
func (pb *PipelineBuilder) New(
   receivers map[string][]Integration,
   wait func() time.Duration,
   inhibitor *inhibit.Inhibitor,
   silencer *silence.Silencer,
   muteTimes map[string][]timeinterval.TimeInterval,
   notificationLog NotificationLog,
   peer Peer,
) RoutingStage {
   rs := make(RoutingStage, len(receivers))

   ms := NewGossipSettleStage(peer)
   is := NewMuteStage(inhibitor)
   ss := NewMuteStage(silencer)
   tms := NewTimeMuteStage(muteTimes)
   // 基于 receiver 的 name 编排了一个包含多个 Stage 对象的 MultiStage
   for name := range receivers {
      st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)
      rs[name] = MultiStage{ms, is, tms, ss, st}
   }
   return rs
}

// 实现了 Exec 方法实际上是实现了 Stage 接口
// 流水线就是由各种 Stage 对象组合成的,后面再说 Stage 的设计,
// 先看看 RoutingStage 做了什么
func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   // 从 context 中获取当前路由下配置的告警接收器
   receiver, ok := ReceiverName(ctx)
   if !ok {
      return ctx, nil, errors.New("receiver missing")
   }
   // 从 RoutingStage 中找到对应的 MultiStage 执行 MultiStage.Exec
   s, ok := rs[receiver]
   if !ok {
      return ctx, nil, errors.New("stage for receiver missing")
   }
   return s.Exec(ctx, l, alerts...)
}

// 这个时候 aggGroup 经过 RoutingStage 
// 为这些 Alerts 找到了 MultiStage
// 我们看看 MultiStage
// MultiStage 就是个包含了多个 Stage 的数组
type MultiStage []Stage

func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   var err error
   // 顺序执行使用数组中的每个 Stage.Exec()
   for _, s := range ms {
      if len(alerts) == 0 {
         return ctx, nil, nil
      }
      ctx, alerts, err = s.Exec(ctx, l, alerts...)
      if err != nil {
         return ctx, nil, err
      }
   }
   return ctx, alerts, nil
}
复制代码

到这里总结一下,Dispatcher下的每个aggGroup先按照自己的receiver.Name通过调用RoutingStage.Exec中找到对应的MultiStage,然后顺序调用其中的每个Stage.Exec,接下来看下Stage的设计:

type Stage interface {
   Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context context.Context, []*types.Alert, error)
}
// 举几个具体的 Stage 类型
type FanoutStage []Stage
type GossipSettleStage struct { peer Peer }
type MuteStage struct { muter types.Muter }
复制代码

Stage这里是一个只约定了Exec函数的接口,所以任何一个对象只要定义了相同签名的Exec函数就是Stage类型,你会在源码中很容找到各种Stage,然后在对应的Exec方法中就知道告警在当前Stage中会被怎样处理,Exec的入参数中alerts表示哪些告警进入这个Stage,然后出参中的alerts就是经过当前Stage处理还剩哪些告警,ctx可以很方便各个Stage获取当前流水线上的参数,当然也可以写入参数让后面的Stage使用。

前面RoutingStage.ExecMultiStage.Exec已经看过了我这里再找几个Stage看看里面的具体行为:

// 负责并发的执行一些 Stage
type FanoutStage []Stage
func (fs FanoutStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   var (
      wg sync.WaitGroup
      me types.MultiError
   )
   wg.Add(len(fs))
   // FanoutStage 和 MultiStage 使用的相同结构 []Stage
   // 但是 FanoutStage 是并发的执行
   for _, s := range fs {
      go func(s Stage) {
         if _, _, err := s.Exec(ctx, l, alerts...); err != nil {
            me.Add(err)
         }
         wg.Done()
      }(s)
   }
   wg.Wait()

   if me.Len() > 0 {
      return ctx, alerts, &me
   }
   return ctx, alerts, nil
}

func createReceiverStage(
   name string,
   integrations []Integration,
   wait func() time.Duration,
   notificationLog NotificationLog,
   metrics *Metrics,
) Stage {
   // 这个是 FanoutStage 构建时
   // 里面是多个可以并发的 MultiStage
   var fs FanoutStage
   for i := range integrations {
      recv := &nflogpb.Receiver{
         GroupName:   name,
         Integration: integrations[i].Name(),
         Idx:         uint32(integrations[i].Index()),
      }
      var s MultiStage
      s = append(s, NewWaitStage(wait))
      s = append(s, NewDedupStage(&integrations[i], notificationLog, recv))
      s = append(s, NewRetryStage(integrations[i], name, metrics))
      s = append(s, NewSetNotifiesStage(notificationLog, recv))

      fs = append(fs, s)
   }
   return fs
}
复制代码

再看看静默和抑制的Stage

type MuteStage struct {
   muter types.Muter
}
func NewMuteStage(m types.Muter) *MuteStage {
   return &MuteStage{muter: m}
}
func (n *MuteStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   var filtered []*types.Alert
   // 检查每个 alert 的 labelset 是否跟静默规则或者抑制规则的 labelSet 匹配
   // 如果 alert 的 Labels 不匹配 Mute 就保留下来, 传给下一个 stage
   for _, a := range alerts {
      if !n.muter.Mutes(a.Labels) {
         filtered = append(filtered, a)
      }
   }
   return ctx, filtered, nil
}
复制代码

MuteStage被用来实现SilenceStageInhibitStage, 它包含了一个 muterMuteStage.Exec最重要的就是调用muter.Mutes方法,那么muter就是一个包含Mutes方法的接口,SilencerInhibitor实现各自的 Mutes方法就可以作为MuteStage,那我们再看看它们各自是怎样实现Mutes方法的:

// 这个就是 Inhibitor 实现的 Muter 接口
// 抑制功能设计是这样的:
// 对于 a, b 两个 alert
// 如果 a 满足 SourceMatchers
// b 满足 TargetMatchers
// 则 Equal 成立时, 用 a 抑制 b
// Equal 成立的两个极端情况:
//    1. a 和 b 都没有 Equal 中的 labels, 成立
//    2. a 和 b 都有 Equal 中的 labels, 且都为空值, 成立
// 关于抑制不生效的极端情况:
//    1. a 同时满足 SourceMatchers, TargetMatchers, b 同时满足 SourceMatchers, TargetMatchers, 且 Equal 成立, 不生效
// 抑制不生效的极端情况是为了避免告警的自抑制
// 所以,告警写入阶段 Inhibitor 会通过 Sub 的方式监听新的 alert 并判断 source 侧是否匹配, 
// 匹配的话表示这个 alert 可能会抑制其他的 alert, 就会被缓存起来
// 在 Inhibitor 对应的 MuteStage 中取出来检查
func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
   fp := lset.Fingerprint()
   // 检查内存中所有 rule 是否匹配 lset
   for _, r := range ih.rules {
      // target 不匹配就没必要计算了
      // 因为我们就是为了抑制 target
      if !r.TargetMatchers.Matches(lset) {
         continue
      }
      // target 匹配就检查 source, 如果 source 也匹配
      // 那么就需要排除两端都匹配的情况
      if inhibitedByFP, eq := r.hasEqual(lset, r.SourceMatchers.Matches(lset)); eq {
         ih.marker.SetInhibited(fp, inhibitedByFP.String())
         return true
      }
   }
   // 这个位置没传 ids, 那么这个 alert 被置为 "active"
   ih.marker.SetInhibited(fp)
   return false
}

// 调用这个函数之前, 被检查 alert 已经满足了规则的 target,
// 而 scache 中的 alert 已经满足了规则的 source
// 剩下要确认的是:
//   scache 中的 alert 有没有标签和被检查 alert 标签一致的,
//   再避免 alert 自我抑制的场景就可以了
func (r *InhibitRule) hasEqual(lset model.LabelSet, excludeTwoSidedMatch bool) (model.Fingerprint, bool) {
Outer:
   for _, a := range r.scache.List() {
      // The cache might be stale and contain resolved alerts.
      if a.Resolved() {
         continue
      }

      // 检查规则标签
      for n := range r.Equal {
         if a.Labels[n] != lset[n] {
            continue Outer
         }
      }
      // a 在加入 r.scache 的时候已经满足了 r.Source, 如果再通过 target 检查, 那么 scache 中的这个 a 同时满足 source 和 target
      // 而 excludeTwoSidedMatch 如果为 true, 表示当前 dispatcher 处理的 alert 在 source 和 target 都满足
      // 所以这个条件变成了:
      // 如果被检查的 alert 标签还和 a 标签相同, 即抑制规则生效, 而且 a 和被检查的 alert 都同时满足 source 和 target,
      // 就忽略 a 对被检查 alert 的抑制, 这里防止了一个告警自己抑制自己情况
      if excludeTwoSidedMatch && r.TargetMatchers.Matches(a.Labels) {
         continue Outer
      }
      // 出现一个抑制生效, 剩下的就不继续检查
      return a.Fingerprint(), true
   }
   return model.Fingerprint(0), false
}

// 这个就是 Silencer 实现的 Muter 接口
func (s *Silencer) Mutes(lset model.LabelSet) bool {
   fp := lset.Fingerprint()
   activeIDs, pendingIDs, markerVersion, _ := s.marker.Silenced(fp)

   var (
      err        error
      allSils    []*pb.Silence
      newVersion = markerVersion
   )
   // 找出现在正在生效的静默规则
   // 用来判断当前 alerts 哪些需要被静默掉
   // version 相同表示 fp 标记时的 silences 到现在没有新的静默规则加入
   if markerVersion == s.silences.Version() {
      totalSilences := len(activeIDs) + len(pendingIDs)
      if totalSilences == 0 {
         return false
      }
      allIDs := append(append(make([]string, 0, totalSilences), activeIDs...), pendingIDs...)
      allSils, _, err = s.silences.Query(
         // 静默规则是用户在 web 端写入
         // 这个位置使用 ids 和两种状态来过滤出需要判断的静默规则
         // 这个 query 的封装也很特别,我后面会在golang代码设计的文章中聊
         QIDs(allIDs...),
         QState(types.SilenceStateActive, types.SilenceStatePending),
      )
   } else {
      allSils, newVersion, err = s.silences.Query(
         QState(types.SilenceStateActive, types.SilenceStatePending),
         QMatches(lset),
      )
   }
   if err != nil {
      level.Error(s.logger).Log("msg", "Querying silences failed, alerts might not get silenced correctly", "err", err)
   }
   if len(allSils) == 0 {
      s.marker.SetSilenced(fp, newVersion, nil, nil)
      return false
   }
   activeIDs, pendingIDs = nil, nil
   now := s.silences.now()

   // 这里仅根据搜索结果的数量就判断是否需要静默当前的 alert
   // 并没有计算 silence 的时间区间和当前时间是否重合,
   // 因为 silence 有效的计算是在 Maintenance 过程中使用 GC 来维护的
   // 所以匹配的一定是现在就生效的
   for _, sil := range allSils {
      switch getState(sil, now) {
      case types.SilenceStatePending:
         pendingIDs = append(pendingIDs, sil.Id)
      case types.SilenceStateActive:
         activeIDs = append(activeIDs, sil.Id)
      default:
         // Do nothing, silence has expired in the meantime.
      }
   }
   level.Debug(s.logger).Log(
      "msg", "determined current silences state",
      "now", now,
      "total", len(allSils),
      "active", len(activeIDs),
      "pending", len(pendingIDs),
   )
   sort.Strings(activeIDs)
   sort.Strings(pendingIDs)

   // activeIDs 为空且没有 inhibitBy 的话, fp 仍然会是 active 的
   // pendingIDs 不会对 fp 的状态有影响
   s.marker.SetSilenced(fp, newVersion, activeIDs, pendingIDs)
   return len(activeIDs) > 0
}
复制代码

到这里,流水线的大致情况就介绍的差不多了,总结一下:

  1. 先约定Stage接口,
  2. 再定义一些控制流程的Stage,比如RoutingStageMultiStageFanoutStage
  3. 然后根据需要定义一些对alerts做真正处理的的Stage,比如InhibitStageSilenceStageTimeMuteStage
  4. 最后把这些处理alertsStage使用流程控制的Stage进行编排,就成了流水线