前篇说到告警写入后被分发到dispatcher
的aggrGroupsPerRoute
中的aggrGroup
里,然后每个aggrGroup
会启动一个自己的goroutine
按照group_wait
和group_interval
两种频率来定时调用dispatcher.stage.Exec
方法来处理告警,实际上dispatcher.stage
中存储的就是由多种处理函数编排成的一个告警处理流水线,也就是架构图中的下面这部分:
pipeline
的构建是在main
函数中创建dispatcher
的时候,很容易找到,这里不赘述了,我们看看 pipeline 是怎样定义自己的 Exec 方法的,
// pipeline 就是 RoutingStage 类型,
// 它是基于 ctx 中的 receiver 进入这个 receiver 的 Stage
type RoutingStage map[string]Stage
// 看看流水线构建函数是如何为每个 receiver 配置 Stage 的
func (pb *PipelineBuilder) New(
receivers map[string][]Integration,
wait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silencer *silence.Silencer,
muteTimes map[string][]timeinterval.TimeInterval,
notificationLog NotificationLog,
peer Peer,
) RoutingStage {
rs := make(RoutingStage, len(receivers))
ms := NewGossipSettleStage(peer)
is := NewMuteStage(inhibitor)
ss := NewMuteStage(silencer)
tms := NewTimeMuteStage(muteTimes)
// 基于 receiver 的 name 编排了一个包含多个 Stage 对象的 MultiStage
for name := range receivers {
st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)
rs[name] = MultiStage{ms, is, tms, ss, st}
}
return rs
}
// 实现了 Exec 方法实际上是实现了 Stage 接口
// 流水线就是由各种 Stage 对象组合成的,后面再说 Stage 的设计,
// 先看看 RoutingStage 做了什么
func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
// 从 context 中获取当前路由下配置的告警接收器
receiver, ok := ReceiverName(ctx)
if !ok {
return ctx, nil, errors.New("receiver missing")
}
// 从 RoutingStage 中找到对应的 MultiStage 执行 MultiStage.Exec
s, ok := rs[receiver]
if !ok {
return ctx, nil, errors.New("stage for receiver missing")
}
return s.Exec(ctx, l, alerts...)
}
// 这个时候 aggGroup 经过 RoutingStage
// 为这些 Alerts 找到了 MultiStage
// 我们看看 MultiStage
// MultiStage 就是个包含了多个 Stage 的数组
type MultiStage []Stage
func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var err error
// 顺序执行使用数组中的每个 Stage.Exec()
for _, s := range ms {
if len(alerts) == 0 {
return ctx, nil, nil
}
ctx, alerts, err = s.Exec(ctx, l, alerts...)
if err != nil {
return ctx, nil, err
}
}
return ctx, alerts, nil
}
复制代码
到这里总结一下,Dispatcher
下的每个aggGroup
先按照自己的receiver.Name
通过调用RoutingStage.Exec
中找到对应的MultiStage
,然后顺序调用其中的每个Stage.Exec
,接下来看下Stage
的设计:
type Stage interface {
Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context context.Context, []*types.Alert, error)
}
// 举几个具体的 Stage 类型
type FanoutStage []Stage
type GossipSettleStage struct { peer Peer }
type MuteStage struct { muter types.Muter }
复制代码
Stage
这里是一个只约定了Exec
函数的接口,所以任何一个对象只要定义了相同签名的Exec
函数就是Stage
类型,你会在源码中很容找到各种Stage
,然后在对应的Exec
方法中就知道告警在当前Stage
中会被怎样处理,Exec
的入参数中alerts
表示哪些告警进入这个Stage
,然后出参中的alerts
就是经过当前Stage
处理还剩哪些告警,ctx
可以很方便各个Stage
获取当前流水线上的参数,当然也可以写入参数让后面的Stage
使用。
前面RoutingStage.Exec
和MultiStage.Exec
已经看过了我这里再找几个Stage
看看里面的具体行为:
// 负责并发的执行一些 Stage
type FanoutStage []Stage
func (fs FanoutStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var (
wg sync.WaitGroup
me types.MultiError
)
wg.Add(len(fs))
// FanoutStage 和 MultiStage 使用的相同结构 []Stage
// 但是 FanoutStage 是并发的执行
for _, s := range fs {
go func(s Stage) {
if _, _, err := s.Exec(ctx, l, alerts...); err != nil {
me.Add(err)
}
wg.Done()
}(s)
}
wg.Wait()
if me.Len() > 0 {
return ctx, alerts, &me
}
return ctx, alerts, nil
}
func createReceiverStage(
name string,
integrations []Integration,
wait func() time.Duration,
notificationLog NotificationLog,
metrics *Metrics,
) Stage {
// 这个是 FanoutStage 构建时
// 里面是多个可以并发的 MultiStage
var fs FanoutStage
for i := range integrations {
recv := &nflogpb.Receiver{
GroupName: name,
Integration: integrations[i].Name(),
Idx: uint32(integrations[i].Index()),
}
var s MultiStage
s = append(s, NewWaitStage(wait))
s = append(s, NewDedupStage(&integrations[i], notificationLog, recv))
s = append(s, NewRetryStage(integrations[i], name, metrics))
s = append(s, NewSetNotifiesStage(notificationLog, recv))
fs = append(fs, s)
}
return fs
}
复制代码
再看看静默和抑制的Stage
type MuteStage struct {
muter types.Muter
}
func NewMuteStage(m types.Muter) *MuteStage {
return &MuteStage{muter: m}
}
func (n *MuteStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var filtered []*types.Alert
// 检查每个 alert 的 labelset 是否跟静默规则或者抑制规则的 labelSet 匹配
// 如果 alert 的 Labels 不匹配 Mute 就保留下来, 传给下一个 stage
for _, a := range alerts {
if !n.muter.Mutes(a.Labels) {
filtered = append(filtered, a)
}
}
return ctx, filtered, nil
}
复制代码
MuteStage
被用来实现SilenceStage
和InhibitStage
, 它包含了一个 muter
,MuteStage.Exec
最重要的就是调用muter.Mutes
方法,那么muter
就是一个包含Mutes
方法的接口,Silencer
和Inhibitor
实现各自的 Mutes
方法就可以作为MuteStage
,那我们再看看它们各自是怎样实现Mutes
方法的:
// 这个就是 Inhibitor 实现的 Muter 接口
// 抑制功能设计是这样的:
// 对于 a, b 两个 alert
// 如果 a 满足 SourceMatchers
// b 满足 TargetMatchers
// 则 Equal 成立时, 用 a 抑制 b
// Equal 成立的两个极端情况:
// 1. a 和 b 都没有 Equal 中的 labels, 成立
// 2. a 和 b 都有 Equal 中的 labels, 且都为空值, 成立
// 关于抑制不生效的极端情况:
// 1. a 同时满足 SourceMatchers, TargetMatchers, b 同时满足 SourceMatchers, TargetMatchers, 且 Equal 成立, 不生效
// 抑制不生效的极端情况是为了避免告警的自抑制
// 所以,告警写入阶段 Inhibitor 会通过 Sub 的方式监听新的 alert 并判断 source 侧是否匹配,
// 匹配的话表示这个 alert 可能会抑制其他的 alert, 就会被缓存起来
// 在 Inhibitor 对应的 MuteStage 中取出来检查
func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
fp := lset.Fingerprint()
// 检查内存中所有 rule 是否匹配 lset
for _, r := range ih.rules {
// target 不匹配就没必要计算了
// 因为我们就是为了抑制 target
if !r.TargetMatchers.Matches(lset) {
continue
}
// target 匹配就检查 source, 如果 source 也匹配
// 那么就需要排除两端都匹配的情况
if inhibitedByFP, eq := r.hasEqual(lset, r.SourceMatchers.Matches(lset)); eq {
ih.marker.SetInhibited(fp, inhibitedByFP.String())
return true
}
}
// 这个位置没传 ids, 那么这个 alert 被置为 "active"
ih.marker.SetInhibited(fp)
return false
}
// 调用这个函数之前, 被检查 alert 已经满足了规则的 target,
// 而 scache 中的 alert 已经满足了规则的 source
// 剩下要确认的是:
// scache 中的 alert 有没有标签和被检查 alert 标签一致的,
// 再避免 alert 自我抑制的场景就可以了
func (r *InhibitRule) hasEqual(lset model.LabelSet, excludeTwoSidedMatch bool) (model.Fingerprint, bool) {
Outer:
for _, a := range r.scache.List() {
// The cache might be stale and contain resolved alerts.
if a.Resolved() {
continue
}
// 检查规则标签
for n := range r.Equal {
if a.Labels[n] != lset[n] {
continue Outer
}
}
// a 在加入 r.scache 的时候已经满足了 r.Source, 如果再通过 target 检查, 那么 scache 中的这个 a 同时满足 source 和 target
// 而 excludeTwoSidedMatch 如果为 true, 表示当前 dispatcher 处理的 alert 在 source 和 target 都满足
// 所以这个条件变成了:
// 如果被检查的 alert 标签还和 a 标签相同, 即抑制规则生效, 而且 a 和被检查的 alert 都同时满足 source 和 target,
// 就忽略 a 对被检查 alert 的抑制, 这里防止了一个告警自己抑制自己情况
if excludeTwoSidedMatch && r.TargetMatchers.Matches(a.Labels) {
continue Outer
}
// 出现一个抑制生效, 剩下的就不继续检查
return a.Fingerprint(), true
}
return model.Fingerprint(0), false
}
// 这个就是 Silencer 实现的 Muter 接口
func (s *Silencer) Mutes(lset model.LabelSet) bool {
fp := lset.Fingerprint()
activeIDs, pendingIDs, markerVersion, _ := s.marker.Silenced(fp)
var (
err error
allSils []*pb.Silence
newVersion = markerVersion
)
// 找出现在正在生效的静默规则
// 用来判断当前 alerts 哪些需要被静默掉
// version 相同表示 fp 标记时的 silences 到现在没有新的静默规则加入
if markerVersion == s.silences.Version() {
totalSilences := len(activeIDs) + len(pendingIDs)
if totalSilences == 0 {
return false
}
allIDs := append(append(make([]string, 0, totalSilences), activeIDs...), pendingIDs...)
allSils, _, err = s.silences.Query(
// 静默规则是用户在 web 端写入
// 这个位置使用 ids 和两种状态来过滤出需要判断的静默规则
// 这个 query 的封装也很特别,我后面会在golang代码设计的文章中聊
QIDs(allIDs...),
QState(types.SilenceStateActive, types.SilenceStatePending),
)
} else {
allSils, newVersion, err = s.silences.Query(
QState(types.SilenceStateActive, types.SilenceStatePending),
QMatches(lset),
)
}
if err != nil {
level.Error(s.logger).Log("msg", "Querying silences failed, alerts might not get silenced correctly", "err", err)
}
if len(allSils) == 0 {
s.marker.SetSilenced(fp, newVersion, nil, nil)
return false
}
activeIDs, pendingIDs = nil, nil
now := s.silences.now()
// 这里仅根据搜索结果的数量就判断是否需要静默当前的 alert
// 并没有计算 silence 的时间区间和当前时间是否重合,
// 因为 silence 有效的计算是在 Maintenance 过程中使用 GC 来维护的
// 所以匹配的一定是现在就生效的
for _, sil := range allSils {
switch getState(sil, now) {
case types.SilenceStatePending:
pendingIDs = append(pendingIDs, sil.Id)
case types.SilenceStateActive:
activeIDs = append(activeIDs, sil.Id)
default:
// Do nothing, silence has expired in the meantime.
}
}
level.Debug(s.logger).Log(
"msg", "determined current silences state",
"now", now,
"total", len(allSils),
"active", len(activeIDs),
"pending", len(pendingIDs),
)
sort.Strings(activeIDs)
sort.Strings(pendingIDs)
// activeIDs 为空且没有 inhibitBy 的话, fp 仍然会是 active 的
// pendingIDs 不会对 fp 的状态有影响
s.marker.SetSilenced(fp, newVersion, activeIDs, pendingIDs)
return len(activeIDs) > 0
}
复制代码
到这里,流水线的大致情况就介绍的差不多了,总结一下:
- 先约定
Stage
接口, - 再定义一些控制流程的
Stage
,比如RoutingStage
,MultiStage
,FanoutStage
等 - 然后根据需要定义一些对
alerts
做真正处理的的Stage
,比如InhibitStage
,SilenceStage
,TimeMuteStage
等 - 最后把这些处理
alerts
的Stage
使用流程控制的Stage
进行编排,就成了流水线
近期评论