提交 a71edc40 编写于 作者: U Ulric Qin

extract IamLeader function and fix repeat

上级 23b6cf1a
......@@ -104,9 +104,8 @@ func AlertMuteDel(ids []int64) error {
return DB().Where("id in ?", ids).Delete(new(AlertMute)).Error
}
func AlertMuteStatistics(cluster string, btime int64) (*Statistics, error) {
session := DB().Model(&AlertMute{}).Select("count(*) as total", "max(create_at) as last_updated").Where("btime <= ?", btime)
func AlertMuteStatistics(cluster string) (*Statistics, error) {
session := DB().Model(&AlertMute{}).Select("count(*) as total", "max(create_at) as last_updated")
if cluster != "" {
session = session.Where("cluster = ?", cluster)
}
......@@ -120,7 +119,7 @@ func AlertMuteStatistics(cluster string, btime int64) (*Statistics, error) {
return stats[0], nil
}
func AlertMuteGetsByCluster(cluster string, btime int64) ([]*AlertMute, error) {
func AlertMuteGetsByCluster(cluster string) ([]*AlertMute, error) {
// clean expired first
buf := int64(30)
err := DB().Where("etime < ?", time.Now().Unix()+buf).Delete(new(AlertMute)).Error
......@@ -129,7 +128,7 @@ func AlertMuteGetsByCluster(cluster string, btime int64) ([]*AlertMute, error) {
}
// get my cluster's mutes
session := DB().Model(&AlertMute{}).Where("btime <= ?", btime)
session := DB().Model(&AlertMute{})
if cluster != "" {
session = session.Where("cluster = ?", cluster)
}
......
......@@ -12,7 +12,6 @@ import (
"github.com/didi/nightingale/v5/src/pkg/httpx"
"github.com/didi/nightingale/v5/src/pkg/logx"
"github.com/didi/nightingale/v5/src/server/naming"
"github.com/didi/nightingale/v5/src/server/reader"
"github.com/didi/nightingale/v5/src/server/writer"
"github.com/didi/nightingale/v5/src/storage"
......@@ -77,7 +76,6 @@ func MustLoad(fpaths ...string) {
}
C.Heartbeat.Endpoint = fmt.Sprintf("%s:%d", C.Heartbeat.IP, C.HTTP.Port)
C.Heartbeat.Cluster = C.ClusterName
C.Alerting.RedisPub.ChannelKey = C.Alerting.RedisPub.ChannelPrefix + C.ClusterName
......@@ -93,7 +91,7 @@ type Config struct {
Log logx.Config
HTTP httpx.Config
BasicAuth gin.Accounts
Heartbeat naming.HeartbeatConfig
Heartbeat HeartbeatConfig
Alerting Alerting
NoData NoData
Redis storage.RedisConfig
......@@ -106,6 +104,12 @@ type Config struct {
Ibex Ibex
}
type HeartbeatConfig struct {
IP string
Interval int64
Endpoint string
}
type Alerting struct {
NotifyScriptPath string
NotifyConcurrency int
......
......@@ -4,10 +4,12 @@ import (
"context"
"time"
"github.com/toolkits/pkg/logger"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/memsto"
"github.com/toolkits/pkg/logger"
"github.com/didi/nightingale/v5/src/server/naming"
)
func loopRepeat(ctx context.Context) {
......@@ -24,6 +26,17 @@ func loopRepeat(ctx context.Context) {
// 拉取未恢复的告警表中需要重复通知的数据
func repeat() {
isLeader, err := naming.IamLeader()
if err != nil {
logger.Errorf("repeat: %v", err)
return
}
if !isLeader {
logger.Info("repeat: i am not leader")
return
}
events, err := models.AlertCurEventNeedRepeat(config.C.ClusterName)
if err != nil {
logger.Errorf("repeat: AlertCurEventNeedRepeat: %v", err)
......@@ -35,35 +48,39 @@ func repeat() {
}
for i := 0; i < len(events); i++ {
event := events[i]
rule := memsto.AlertRuleCache.Get(event.RuleId)
rule := memsto.AlertRuleCache.Get(events[i].RuleId)
if rule == nil {
// 可能告警规则已经被删了,理论上不应该出现这种情况,因为删除告警规则的时候,会顺带删除活跃告警,无论如何,自保一下
continue
}
if rule.NotifyRepeatStep == 0 {
// 用户后来调整了这个字段,不让继续发送了
continue
}
event.DB2Mem()
repeatOne(events[i], rule)
// 重复通知的告警,应该用新的时间来判断是否生效和是否屏蔽,
// 不能使用TriggerTime,因为TriggerTime是触发时的时间,是一个比较老的时间
// 先发了告警,又做了屏蔽,本质是不想发了,如果继续用TriggerTime判断,就还是会发,不符合预期
if isNoneffective(event.NotifyRepeatNext, rule) {
continue
if err = events[i].IncRepeatStep(int64(rule.NotifyRepeatStep * 60)); err != nil {
logger.Errorf("repeat: IncRepeatStep: %v", err)
}
}
}
if isMuted(event, event.NotifyRepeatNext) {
continue
}
func repeatOne(event *models.AlertCurEvent, rule *models.AlertRule) {
if rule.NotifyRepeatStep == 0 {
// 用户后来调整了这个字段,不让继续发送了
return
}
fillUsers(event)
notify(event)
event.DB2Mem()
if err = event.IncRepeatStep(int64(rule.NotifyRepeatStep * 60)); err != nil {
logger.Errorf("repeat: IncRepeatStep: %v", err)
}
// 重复通知的告警,应该用新的时间来判断是否生效和是否屏蔽,
// 不能使用TriggerTime,因为TriggerTime是触发时的时间,是一个比较老的时间
// 先发了告警,又做了屏蔽,本质是不想发了,如果继续用TriggerTime判断,就还是会发,不符合预期
if isNoneffective(event.NotifyRepeatNext, rule) {
return
}
if isMuted(event, event.NotifyRepeatNext) {
return
}
fillUsers(event)
notify(event)
}
......@@ -187,6 +187,10 @@ func (r RuleEval) judge(vectors []Vector) {
alertingKeys := make(map[string]struct{})
now := time.Now().Unix()
for i := 0; i < count; i++ {
// compute hash
hash := str.MD5(fmt.Sprintf("%d_%s", r.rule.Id, vectors[i].Key))
alertingKeys[hash] = struct{}{}
// rule disabled in this time span?
if isNoneffective(vectors[i].Timestamp, r.rule) {
continue
......@@ -226,10 +230,6 @@ func (r RuleEval) judge(vectors []Vector) {
continue
}
// compute hash
hash := str.MD5(fmt.Sprintf("%d_%s", r.rule.Id, vectors[i].Key))
alertingKeys[hash] = struct{}{}
tagsArr := labelMapToArr(tagsMap)
sort.Strings(tagsArr)
......
......@@ -3,7 +3,6 @@ package idents
import (
"context"
"fmt"
"sort"
"strconv"
"time"
......@@ -92,20 +91,13 @@ func loopPushMetrics(ctx context.Context) {
}
func pushMetrics() {
servers, err := naming.ActiveServers(context.Background(), config.C.ClusterName)
isLeader, err := naming.IamLeader()
if err != nil {
logger.Errorf("handle_idents: failed to get active servers: %v", err)
logger.Errorf("handle_idents: %v", err)
return
}
if len(servers) == 0 {
logger.Errorf("handle_idents: active servers empty")
return
}
sort.Strings(servers)
if config.C.Heartbeat.Endpoint != servers[0] {
if !isLeader {
logger.Info("handle_idents: i am not leader")
return
}
......
......@@ -89,9 +89,8 @@ func loopSyncAlertMutes() {
func syncAlertMutes() error {
start := time.Now()
btime := start.Unix() - int64(30)
stat, err := models.AlertMuteStatistics(config.C.ClusterName, btime)
stat, err := models.AlertMuteStatistics(config.C.ClusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec AlertMuteStatistics")
}
......@@ -103,7 +102,7 @@ func syncAlertMutes() error {
return nil
}
lst, err := models.AlertMuteGetsByCluster(config.C.ClusterName, btime)
lst, err := models.AlertMuteGetsByCluster(config.C.ClusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec AlertMuteGetsByCluster")
}
......
......@@ -10,34 +10,28 @@ import (
"github.com/toolkits/pkg/logger"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/storage"
)
// local servers
var localss string
type HeartbeatConfig struct {
IP string
Interval int64
Endpoint string
Cluster string
}
func Heartbeat(ctx context.Context, cfg HeartbeatConfig) error {
if err := heartbeat(ctx, cfg); err != nil {
func Heartbeat(ctx context.Context) error {
if err := heartbeat(ctx); err != nil {
fmt.Println("failed to heartbeat:", err)
return err
}
go loopHeartbeat(ctx, cfg)
go loopHeartbeat(ctx)
return nil
}
func loopHeartbeat(ctx context.Context, cfg HeartbeatConfig) {
interval := time.Duration(cfg.Interval) * time.Millisecond
func loopHeartbeat(ctx context.Context) {
interval := time.Duration(config.C.Heartbeat.Interval) * time.Millisecond
for {
time.Sleep(interval)
if err := heartbeat(ctx, cfg); err != nil {
if err := heartbeat(ctx); err != nil {
logger.Warning(err)
}
}
......@@ -52,15 +46,15 @@ func redisKey(cluster string) string {
return fmt.Sprintf("/server/heartbeat/%s", cluster)
}
func heartbeat(ctx context.Context, cfg HeartbeatConfig) error {
func heartbeat(ctx context.Context) error {
now := time.Now().Unix()
key := redisKey(cfg.Cluster)
err := storage.Redis.HSet(ctx, key, cfg.Endpoint, now).Err()
key := redisKey(config.C.ClusterName)
err := storage.Redis.HSet(ctx, key, config.C.Heartbeat.Endpoint, now).Err()
if err != nil {
return err
}
servers, err := ActiveServers(ctx, cfg.Cluster)
servers, err := ActiveServers(ctx, config.C.ClusterName)
if err != nil {
return err
}
......
package naming
import (
"context"
"sort"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/toolkits/pkg/logger"
)
func IamLeader() (bool, error) {
servers, err := ActiveServers(context.Background(), config.C.ClusterName)
if err != nil {
logger.Errorf("failed to get active servers: %v", err)
return false, err
}
if len(servers) == 0 {
logger.Errorf("active servers empty")
return false, err
}
sort.Strings(servers)
return config.C.Heartbeat.Endpoint == servers[0], nil
}
......@@ -134,7 +134,7 @@ func (s Server) initialize() (func(), error) {
memsto.Sync()
// start heartbeat
if err = naming.Heartbeat(ctx, config.C.Heartbeat); err != nil {
if err = naming.Heartbeat(ctx); err != nil {
return fns.Ret(), err
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册