提交 54c3b32d 编写于 作者: N ning

new ds

上级 df3f3ceb
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
"github.com/ccfos/nightingale/v6/prom" "github.com/ccfos/nightingale/v6/prom"
"github.com/ccfos/nightingale/v6/pushgw/pconf" "github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/pushgw/writer" "github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/ccfos/nightingale/v6/tdengine"
) )
func Initialize(configDir string, cryptoKey string) (func(), error) { func Initialize(configDir string, cryptoKey string) (func(), error) {
...@@ -52,10 +53,11 @@ func Initialize(configDir string, cryptoKey string) (func(), error) { ...@@ -52,10 +53,11 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats) userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
promClients := prom.NewPromClient(ctx, config.Alert.Heartbeat) promClients := prom.NewPromClient(ctx, config.Alert.Heartbeat)
tdengineClients := tdengine.NewTdengineClient(ctx, config.Alert.Heartbeat)
externalProcessors := process.NewExternalProcessors() externalProcessors := process.NewExternalProcessors()
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients, userCache, userGroupCache) Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP) r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
rt := router.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors) rt := router.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
...@@ -71,7 +73,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) { ...@@ -71,7 +73,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
} }
func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, alertStats *astats.Stats, externalProcessors *process.ExternalProcessorsType, targetCache *memsto.TargetCacheType, busiGroupCache *memsto.BusiGroupCacheType, func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, alertStats *astats.Stats, externalProcessors *process.ExternalProcessorsType, targetCache *memsto.TargetCacheType, busiGroupCache *memsto.BusiGroupCacheType,
alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, notifyConfigCache *memsto.NotifyConfigCacheType, datasourceCache *memsto.DatasourceCacheType, ctx *ctx.Context, promClients *prom.PromClientMap, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType) { alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, notifyConfigCache *memsto.NotifyConfigCacheType, datasourceCache *memsto.DatasourceCacheType, ctx *ctx.Context,
promClients *prom.PromClientMap, tdendgineClients *tdengine.TdengineClientMap, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType) {
alertSubscribeCache := memsto.NewAlertSubscribeCache(ctx, syncStats) alertSubscribeCache := memsto.NewAlertSubscribeCache(ctx, syncStats)
recordingRuleCache := memsto.NewRecordingRuleCache(ctx, syncStats) recordingRuleCache := memsto.NewRecordingRuleCache(ctx, syncStats)
...@@ -82,7 +85,7 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al ...@@ -82,7 +85,7 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
writers := writer.NewWriters(pushgwc) writers := writer.NewWriters(pushgwc)
record.NewScheduler(alertc, recordingRuleCache, promClients, writers, alertStats) record.NewScheduler(alertc, recordingRuleCache, promClients, writers, alertStats)
eval.NewScheduler(alertc, externalProcessors, alertRuleCache, targetCache, busiGroupCache, alertMuteCache, datasourceCache, promClients, naming, ctx, alertStats) eval.NewScheduler(alertc, externalProcessors, alertRuleCache, targetCache, busiGroupCache, alertMuteCache, datasourceCache, promClients, tdendgineClients, naming, ctx, alertStats)
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, alertc.Alerting, ctx, alertStats) dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, alertc.Alerting, ctx, alertStats)
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp) consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp)
......
...@@ -12,6 +12,8 @@ import ( ...@@ -12,6 +12,8 @@ import (
"github.com/ccfos/nightingale/v6/memsto" "github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/pkg/ctx" "github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/prom" "github.com/ccfos/nightingale/v6/prom"
"github.com/ccfos/nightingale/v6/tdengine"
"github.com/toolkits/pkg/logger" "github.com/toolkits/pkg/logger"
) )
...@@ -29,7 +31,8 @@ type Scheduler struct { ...@@ -29,7 +31,8 @@ type Scheduler struct {
alertMuteCache *memsto.AlertMuteCacheType alertMuteCache *memsto.AlertMuteCacheType
datasourceCache *memsto.DatasourceCacheType datasourceCache *memsto.DatasourceCacheType
promClients *prom.PromClientMap promClients *prom.PromClientMap
tdengineClients *tdengine.TdengineClientMap
naming *naming.Naming naming *naming.Naming
...@@ -38,8 +41,8 @@ type Scheduler struct { ...@@ -38,8 +41,8 @@ type Scheduler struct {
} }
func NewScheduler(aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType, func NewScheduler(aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType,
busiGroupCache *memsto.BusiGroupCacheType, alertMuteCache *memsto.AlertMuteCacheType, datasourceCache *memsto.DatasourceCacheType, promClients *prom.PromClientMap, naming *naming.Naming, busiGroupCache *memsto.BusiGroupCacheType, alertMuteCache *memsto.AlertMuteCacheType, datasourceCache *memsto.DatasourceCacheType,
ctx *ctx.Context, stats *astats.Stats) *Scheduler { promClients *prom.PromClientMap, tdengineClients *tdengine.TdengineClientMap, naming *naming.Naming, ctx *ctx.Context, stats *astats.Stats) *Scheduler {
scheduler := &Scheduler{ scheduler := &Scheduler{
aconf: aconf, aconf: aconf,
alertRules: make(map[string]*AlertRuleWorker), alertRules: make(map[string]*AlertRuleWorker),
...@@ -52,8 +55,9 @@ func NewScheduler(aconf aconf.Alert, externalProcessors *process.ExternalProcess ...@@ -52,8 +55,9 @@ func NewScheduler(aconf aconf.Alert, externalProcessors *process.ExternalProcess
alertMuteCache: alertMuteCache, alertMuteCache: alertMuteCache,
datasourceCache: datasourceCache, datasourceCache: datasourceCache,
promClients: promClients, promClients: promClients,
naming: naming, tdengineClients: tdengineClients,
naming: naming,
ctx: ctx, ctx: ctx,
stats: stats, stats: stats,
...@@ -102,9 +106,9 @@ func (s *Scheduler) syncAlertRules() { ...@@ -102,9 +106,9 @@ func (s *Scheduler) syncAlertRules() {
logger.Debugf("datasource %d status is %s", dsId, ds.Status) logger.Debugf("datasource %d status is %s", dsId, ds.Status)
continue continue
} }
processor := process.NewProcessor(rule, dsId, s.alertRuleCache, s.targetCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.promClients, s.ctx, s.stats) processor := process.NewProcessor(rule, dsId, s.alertRuleCache, s.targetCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats)
alertRule := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.ctx) alertRule := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.tdengineClients, s.ctx)
alertRuleWorkers[alertRule.Hash()] = alertRule alertRuleWorkers[alertRule.Hash()] = alertRule
} }
} else if rule.IsHostRule() && s.ctx.IsCenter { } else if rule.IsHostRule() && s.ctx.IsCenter {
...@@ -112,8 +116,8 @@ func (s *Scheduler) syncAlertRules() { ...@@ -112,8 +116,8 @@ func (s *Scheduler) syncAlertRules() {
if !naming.DatasourceHashRing.IsHit(naming.HostDatasource, fmt.Sprintf("%d", rule.Id), s.aconf.Heartbeat.Endpoint) { if !naming.DatasourceHashRing.IsHit(naming.HostDatasource, fmt.Sprintf("%d", rule.Id), s.aconf.Heartbeat.Endpoint) {
continue continue
} }
processor := process.NewProcessor(rule, 0, s.alertRuleCache, s.targetCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.promClients, s.ctx, s.stats) processor := process.NewProcessor(rule, 0, s.alertRuleCache, s.targetCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats)
alertRule := NewAlertRuleWorker(rule, 0, processor, s.promClients, s.ctx) alertRule := NewAlertRuleWorker(rule, 0, processor, s.promClients, s.tdengineClients, s.ctx)
alertRuleWorkers[alertRule.Hash()] = alertRule alertRuleWorkers[alertRule.Hash()] = alertRule
} else { } else {
// 如果 rule 不是通过 prometheus engine 来告警的,则创建为 externalRule // 如果 rule 不是通过 prometheus engine 来告警的,则创建为 externalRule
...@@ -129,7 +133,7 @@ func (s *Scheduler) syncAlertRules() { ...@@ -129,7 +133,7 @@ func (s *Scheduler) syncAlertRules() {
logger.Debugf("datasource %d status is %s", dsId, ds.Status) logger.Debugf("datasource %d status is %s", dsId, ds.Status)
continue continue
} }
processor := process.NewProcessor(rule, dsId, s.alertRuleCache, s.targetCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.promClients, s.ctx, s.stats) processor := process.NewProcessor(rule, dsId, s.alertRuleCache, s.targetCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats)
externalRuleWorkers[processor.Key()] = processor externalRuleWorkers[processor.Key()] = processor
} }
} }
......
...@@ -11,8 +11,11 @@ import ( ...@@ -11,8 +11,11 @@ import (
"github.com/ccfos/nightingale/v6/alert/process" "github.com/ccfos/nightingale/v6/alert/process"
"github.com/ccfos/nightingale/v6/models" "github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx" "github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/hash"
"github.com/ccfos/nightingale/v6/pkg/parser"
promsdk "github.com/ccfos/nightingale/v6/pkg/prom" promsdk "github.com/ccfos/nightingale/v6/pkg/prom"
"github.com/ccfos/nightingale/v6/prom" "github.com/ccfos/nightingale/v6/prom"
"github.com/ccfos/nightingale/v6/tdengine"
"github.com/toolkits/pkg/logger" "github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str" "github.com/toolkits/pkg/str"
...@@ -28,19 +31,21 @@ type AlertRuleWorker struct { ...@@ -28,19 +31,21 @@ type AlertRuleWorker struct {
processor *process.Processor processor *process.Processor
promClients *prom.PromClientMap promClients *prom.PromClientMap
ctx *ctx.Context tdengineClients *tdengine.TdengineClientMap
ctx *ctx.Context
} }
func NewAlertRuleWorker(rule *models.AlertRule, datasourceId int64, processor *process.Processor, promClients *prom.PromClientMap, ctx *ctx.Context) *AlertRuleWorker { func NewAlertRuleWorker(rule *models.AlertRule, datasourceId int64, processor *process.Processor, promClients *prom.PromClientMap, tdengineClients *tdengine.TdengineClientMap, ctx *ctx.Context) *AlertRuleWorker {
arw := &AlertRuleWorker{ arw := &AlertRuleWorker{
datasourceId: datasourceId, datasourceId: datasourceId,
quit: make(chan struct{}), quit: make(chan struct{}),
rule: rule, rule: rule,
processor: processor, processor: processor,
promClients: promClients, promClients: promClients,
ctx: ctx, tdengineClients: tdengineClients,
ctx: ctx,
} }
return arw return arw
...@@ -93,12 +98,15 @@ func (arw *AlertRuleWorker) Eval() { ...@@ -93,12 +98,15 @@ func (arw *AlertRuleWorker) Eval() {
arw.processor.Stats.CounterRuleEval.WithLabelValues().Inc() arw.processor.Stats.CounterRuleEval.WithLabelValues().Inc()
typ := cachedRule.GetRuleType() typ := cachedRule.GetRuleType()
var lst []common.AnomalyPoint var anomalyPoints []common.AnomalyPoint
var recoverPoints []common.AnomalyPoint
switch typ { switch typ {
case models.PROMETHEUS: case models.PROMETHEUS:
lst = arw.GetPromAnomalyPoint(cachedRule.RuleConfig) anomalyPoints = arw.GetPromAnomalyPoint(cachedRule.RuleConfig)
case models.HOST: case models.HOST:
lst = arw.GetHostAnomalyPoint(cachedRule.RuleConfig) anomalyPoints = arw.GetHostAnomalyPoint(cachedRule.RuleConfig)
case models.TDENGINE:
anomalyPoints, recoverPoints = arw.GetTdengineAnomalyPoint(cachedRule, arw.processor.DatasourceId())
default: default:
return return
} }
...@@ -108,7 +116,11 @@ func (arw *AlertRuleWorker) Eval() { ...@@ -108,7 +116,11 @@ func (arw *AlertRuleWorker) Eval() {
return return
} }
arw.processor.Handle(lst, "inner", arw.inhibit) arw.processor.Handle(anomalyPoints, "inner", arw.inhibit)
for _, point := range recoverPoints {
str := fmt.Sprintf("%v", point.Value)
arw.processor.RecoverSingle(process.Hash(cachedRule.Id, arw.processor.DatasourceId(), point), point.Timestamp, &str)
}
} }
func (arw *AlertRuleWorker) Stop() { func (arw *AlertRuleWorker) Stop() {
...@@ -175,6 +187,110 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.Anom ...@@ -175,6 +187,110 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.Anom
return lst return lst
} }
func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId int64) ([]common.AnomalyPoint, []common.AnomalyPoint) {
// 获取查询和规则判断条件
points := []common.AnomalyPoint{}
recoverPoints := []common.AnomalyPoint{}
ruleConfig := strings.TrimSpace(rule.RuleConfig)
if ruleConfig == "" {
logger.Warningf("rule_eval:%d promql is blank", rule.Id)
return points, recoverPoints
}
var ruleQuery models.RuleQuery
err := json.Unmarshal([]byte(ruleConfig), &ruleQuery)
if err != nil {
logger.Warningf("rule_eval:%d promql parse error:%s", rule.Id, err.Error())
return points, recoverPoints
}
if len(ruleQuery.Queries) > 0 {
seriesStore := make(map[uint64]*models.DataResp)
seriesTagIndex := make(map[uint64][]uint64)
for _, query := range ruleQuery.Queries {
cli := arw.tdengineClients.GetCli(dsId)
if cli == nil {
logger.Warningf("rule_eval:%d tdengine client is nil", rule.Id)
continue
}
series, err := cli.Query(query)
if err != nil {
logger.Warningf("rule_eval rid:%d query data error: %v", rule.Id, err)
continue
}
// 此条日志很重要,是告警判断的现场值
logger.Debugf("rule_eval rid:%d req:%+v resp:%+v", rule.Id, query, series)
for i := 0; i < len(series); i++ {
serieHash := hash.GetHash(series[i].Metric, series[i].Ref)
tagHash := hash.GetTagHash(series[i].Metric)
seriesStore[serieHash] = series[i]
// 将曲线按照相同的 tag 分组
if _, exists := seriesTagIndex[tagHash]; !exists {
seriesTagIndex[tagHash] = make([]uint64, 0)
}
seriesTagIndex[tagHash] = append(seriesTagIndex[tagHash], serieHash)
}
}
// 判断
for _, trigger := range ruleQuery.Triggers {
for _, seriesHash := range seriesTagIndex {
m := make(map[string]float64)
var ts int64
var sample *models.DataResp
var value float64
for _, serieHash := range seriesHash {
series, exists := seriesStore[serieHash]
if !exists {
logger.Warningf("rule_eval rid:%d series:%+v not found", rule.Id, series)
continue
}
t, v, exists := series.Last()
if !exists {
logger.Warningf("rule_eval rid:%d series:%+v value not found", rule.Id, series)
continue
}
if !strings.Contains(trigger.Exp, "$"+series.Ref) {
// 表达式中不包含该变量
continue
}
m["$"+series.Ref] = v
m["$"+series.Ref+"."+series.MetricName()] = v
ts = int64(t)
sample = series
value = v
}
isTriggered := parser.Calc(trigger.Exp, m)
// 此条日志很重要,是告警判断的现场值
logger.Debugf("rule_eval rid:%d trigger:%+v exp:%s res:%v m:%v", rule.Id, trigger, trigger.Exp, isTriggered, m)
point := common.AnomalyPoint{
Key: sample.MetricName(),
Labels: sample.Metric,
Timestamp: int64(ts),
Value: value,
Severity: trigger.Severity,
Triggered: isTriggered,
}
if isTriggered {
points = append(points, point)
} else {
recoverPoints = append(recoverPoints, point)
}
}
}
}
return points, recoverPoints
}
func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.AnomalyPoint { func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.AnomalyPoint {
var lst []common.AnomalyPoint var lst []common.AnomalyPoint
var severity int var severity int
......
...@@ -94,7 +94,7 @@ func (p *Processor) Hash() string { ...@@ -94,7 +94,7 @@ func (p *Processor) Hash() string {
} }
func NewProcessor(rule *models.AlertRule, datasourceId int64, atertRuleCache *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType, func NewProcessor(rule *models.AlertRule, datasourceId int64, atertRuleCache *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType,
busiGroupCache *memsto.BusiGroupCacheType, alertMuteCache *memsto.AlertMuteCacheType, datasourceCache *memsto.DatasourceCacheType, promClients *prom.PromClientMap, ctx *ctx.Context, busiGroupCache *memsto.BusiGroupCacheType, alertMuteCache *memsto.AlertMuteCacheType, datasourceCache *memsto.DatasourceCacheType, ctx *ctx.Context,
stats *astats.Stats) *Processor { stats *astats.Stats) *Processor {
p := &Processor{ p := &Processor{
...@@ -107,9 +107,8 @@ func NewProcessor(rule *models.AlertRule, datasourceId int64, atertRuleCache *me ...@@ -107,9 +107,8 @@ func NewProcessor(rule *models.AlertRule, datasourceId int64, atertRuleCache *me
atertRuleCache: atertRuleCache, atertRuleCache: atertRuleCache,
datasourceCache: datasourceCache, datasourceCache: datasourceCache,
promClients: promClients, ctx: ctx,
ctx: ctx, Stats: stats,
Stats: stats,
HandleFireEventHook: func(event *models.AlertCurEvent) {}, HandleFireEventHook: func(event *models.AlertCurEvent) {},
HandleRecoverEventHook: func(event *models.AlertCurEvent) {}, HandleRecoverEventHook: func(event *models.AlertCurEvent) {},
......
...@@ -25,6 +25,7 @@ import ( ...@@ -25,6 +25,7 @@ import (
"github.com/ccfos/nightingale/v6/pushgw/idents" "github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/pushgw/writer" "github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/ccfos/nightingale/v6/storage" "github.com/ccfos/nightingale/v6/storage"
"github.com/ccfos/nightingale/v6/tdengine"
alertrt "github.com/ccfos/nightingale/v6/alert/router" alertrt "github.com/ccfos/nightingale/v6/alert/router"
centerrt "github.com/ccfos/nightingale/v6/center/router" centerrt "github.com/ccfos/nightingale/v6/center/router"
...@@ -79,9 +80,10 @@ func Initialize(configDir string, cryptoKey string) (func(), error) { ...@@ -79,9 +80,10 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats) userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
promClients := prom.NewPromClient(ctx, config.Alert.Heartbeat) promClients := prom.NewPromClient(ctx, config.Alert.Heartbeat)
tdengineClients := tdengine.NewTdengineClient(ctx, config.Alert.Heartbeat)
externalProcessors := process.NewExternalProcessors() externalProcessors := process.NewExternalProcessors()
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients, userCache, userGroupCache) alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache)
writers := writer.NewWriters(config.Pushgw) writers := writer.NewWriters(config.Pushgw)
...@@ -89,7 +91,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) { ...@@ -89,7 +91,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
go version.GetGithubVersion() go version.GetGithubVersion()
alertrtRouter := alertrt.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors) alertrtRouter := alertrt.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
centerRouter := centerrt.New(config.HTTP, config.Center, cconf.Operations, dsCache, notifyConfigCache, promClients, redis, sso, ctx, metas, idents, targetCache, userCache, userGroupCache) centerRouter := centerrt.New(config.HTTP, config.Center, cconf.Operations, dsCache, notifyConfigCache, promClients, tdengineClients,
redis, sso, ctx, metas, idents, targetCache, userCache, userGroupCache)
pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, idents, writers, ctx) pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, idents, writers, ctx)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP) r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
......
...@@ -21,6 +21,7 @@ import ( ...@@ -21,6 +21,7 @@ import (
"github.com/ccfos/nightingale/v6/prom" "github.com/ccfos/nightingale/v6/prom"
"github.com/ccfos/nightingale/v6/pushgw/idents" "github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/storage" "github.com/ccfos/nightingale/v6/storage"
"github.com/ccfos/nightingale/v6/tdengine"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/rakyll/statik/fs" "github.com/rakyll/statik/fs"
...@@ -36,6 +37,7 @@ type Router struct { ...@@ -36,6 +37,7 @@ type Router struct {
DatasourceCache *memsto.DatasourceCacheType DatasourceCache *memsto.DatasourceCacheType
NotifyConfigCache *memsto.NotifyConfigCacheType NotifyConfigCache *memsto.NotifyConfigCacheType
PromClients *prom.PromClientMap PromClients *prom.PromClientMap
TdendgineClients *tdengine.TdengineClientMap
Redis storage.Redis Redis storage.Redis
MetaSet *metas.Set MetaSet *metas.Set
IdentSet *idents.Set IdentSet *idents.Set
...@@ -49,7 +51,7 @@ type Router struct { ...@@ -49,7 +51,7 @@ type Router struct {
} }
func New(httpConfig httpx.Config, center cconf.Center, operations cconf.Operation, ds *memsto.DatasourceCacheType, ncc *memsto.NotifyConfigCacheType, func New(httpConfig httpx.Config, center cconf.Center, operations cconf.Operation, ds *memsto.DatasourceCacheType, ncc *memsto.NotifyConfigCacheType,
pc *prom.PromClientMap, redis storage.Redis, sso *sso.SsoClient, ctx *ctx.Context, metaSet *metas.Set, idents *idents.Set, tc *memsto.TargetCacheType, pc *prom.PromClientMap, tdendgineClients *tdengine.TdengineClientMap, redis storage.Redis, sso *sso.SsoClient, ctx *ctx.Context, metaSet *metas.Set, idents *idents.Set, tc *memsto.TargetCacheType,
uc *memsto.UserCacheType, ugc *memsto.UserGroupCacheType) *Router { uc *memsto.UserCacheType, ugc *memsto.UserGroupCacheType) *Router {
return &Router{ return &Router{
HTTP: httpConfig, HTTP: httpConfig,
...@@ -58,6 +60,7 @@ func New(httpConfig httpx.Config, center cconf.Center, operations cconf.Operatio ...@@ -58,6 +60,7 @@ func New(httpConfig httpx.Config, center cconf.Center, operations cconf.Operatio
DatasourceCache: ds, DatasourceCache: ds,
NotifyConfigCache: ncc, NotifyConfigCache: ncc,
PromClients: pc, PromClients: pc,
TdendgineClients: tdendgineClients,
Redis: redis, Redis: redis,
MetaSet: metaSet, MetaSet: metaSet,
IdentSet: idents, IdentSet: idents,
...@@ -166,11 +169,19 @@ func (rt *Router) Config(r *gin.Engine) { ...@@ -166,11 +169,19 @@ func (rt *Router) Config(r *gin.Engine) {
pages.POST("/query-range-batch", rt.promBatchQueryRange) pages.POST("/query-range-batch", rt.promBatchQueryRange)
pages.POST("/query-instant-batch", rt.promBatchQueryInstant) pages.POST("/query-instant-batch", rt.promBatchQueryInstant)
pages.GET("/datasource/brief", rt.datasourceBriefs) pages.GET("/datasource/brief", rt.datasourceBriefs)
pages.GET("/tdengine-databases", rt.tdengineDatabases)
pages.GET("/tdengine-tables", rt.tdengineTables)
pages.GET("/tdengine-columns", rt.tdengineColumns)
} else { } else {
pages.Any("/proxy/:id/*url", rt.auth(), rt.dsProxy) pages.Any("/proxy/:id/*url", rt.auth(), rt.dsProxy)
pages.POST("/query-range-batch", rt.auth(), rt.promBatchQueryRange) pages.POST("/query-range-batch", rt.auth(), rt.promBatchQueryRange)
pages.POST("/query-instant-batch", rt.auth(), rt.promBatchQueryInstant) pages.POST("/query-instant-batch", rt.auth(), rt.promBatchQueryInstant)
pages.GET("/datasource/brief", rt.auth(), rt.datasourceBriefs) pages.GET("/datasource/brief", rt.auth(), rt.datasourceBriefs)
pages.GET("/tdengine-databases", rt.auth(), rt.tdengineDatabases)
pages.GET("/tdengine-tables", rt.auth(), rt.tdengineTables)
pages.GET("/tdengine-columns", rt.auth(), rt.tdengineColumns)
} }
pages.POST("/auth/login", rt.jwtMock(), rt.loginPost) pages.POST("/auth/login", rt.jwtMock(), rt.loginPost)
......
package router
import (
"net/http"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
)
// {
// "cate": "tdengine",
// "datasource_id": 1
// }
type databasesQueryForm struct {
Cate string `json:"cate" form:"cate"`
DatasourceId int64 `json:"datasource_id" form:"datasource_id"`
}
// tdengineDatabases
func (rt *Router) tdengineDatabases(c *gin.Context) {
var f databasesQueryForm
ginx.BindJSON(c, &f)
tdClient := rt.TdendgineClients.GetCli(f.DatasourceId)
if tdClient == nil {
ginx.NewRender(c, http.StatusNotFound).Message("No such datasource")
return
}
databases, err := tdClient.GetDatabases()
ginx.NewRender(c).Data(databases, err)
}
type tablesQueryForm struct {
Cate string `json:"cate"`
DatasourceId int64 `json:"datasource_id" `
Database string `json:"database"`
IsTable bool `json:"is_table"`
}
// get tdengine tables
func (rt *Router) tdengineTables(c *gin.Context) {
var f tablesQueryForm
ginx.BindJSON(c, &f)
tdClient := rt.TdendgineClients.GetCli(f.DatasourceId)
if tdClient == nil {
ginx.NewRender(c, http.StatusNotFound).Message("No such datasource")
return
}
tables, err := tdClient.GetTables(f.Database, f.IsTable)
ginx.NewRender(c).Data(tables, err)
}
type columnsQueryForm struct {
Cate string `json:"cate"`
DatasourceId int64 `json:"datasource_id" `
Database string `json:"database"`
Table string `json:"table"`
}
// get tdengine columns
func (rt *Router) tdengineColumns(c *gin.Context) {
var f columnsQueryForm
ginx.BindJSON(c, &f)
tdClient := rt.TdendgineClients.GetCli(f.DatasourceId)
if tdClient == nil {
ginx.NewRender(c, http.StatusNotFound).Message("No such datasource")
return
}
columns, err := tdClient.GetColumns(f.Database, f.Table)
ginx.NewRender(c).Data(columns, err)
}
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"github.com/ccfos/nightingale/v6/prom" "github.com/ccfos/nightingale/v6/prom"
"github.com/ccfos/nightingale/v6/pushgw/idents" "github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/pushgw/writer" "github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/ccfos/nightingale/v6/tdengine"
alertrt "github.com/ccfos/nightingale/v6/alert/router" alertrt "github.com/ccfos/nightingale/v6/alert/router"
pushgwrt "github.com/ccfos/nightingale/v6/pushgw/router" pushgwrt "github.com/ccfos/nightingale/v6/pushgw/router"
...@@ -58,9 +59,11 @@ func Initialize(configDir string, cryptoKey string) (func(), error) { ...@@ -58,9 +59,11 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats) userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
promClients := prom.NewPromClient(ctx, config.Alert.Heartbeat) promClients := prom.NewPromClient(ctx, config.Alert.Heartbeat)
tdengineClients := tdengine.NewTdengineClient(ctx, config.Alert.Heartbeat)
externalProcessors := process.NewExternalProcessors() externalProcessors := process.NewExternalProcessors()
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients, userCache, userGroupCache) alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache,
alertRuleCache, notifyConfigCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache)
alertrtRouter := alertrt.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors) alertrtRouter := alertrt.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
......
...@@ -5,6 +5,7 @@ go 1.18 ...@@ -5,6 +5,7 @@ go 1.18
require ( require (
github.com/BurntSushi/toml v0.3.1 github.com/BurntSushi/toml v0.3.1
github.com/coreos/go-oidc v2.2.1+incompatible github.com/coreos/go-oidc v2.2.1+incompatible
github.com/davecgh/go-spew v1.1.1
github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/gin-contrib/pprof v1.4.0 github.com/gin-contrib/pprof v1.4.0
github.com/gin-gonic/gin v1.9.1 github.com/gin-gonic/gin v1.9.1
...@@ -27,6 +28,7 @@ require ( ...@@ -27,6 +28,7 @@ require (
github.com/prometheus/prometheus v2.5.0+incompatible github.com/prometheus/prometheus v2.5.0+incompatible
github.com/rakyll/statik v0.1.7 github.com/rakyll/statik v0.1.7
github.com/redis/go-redis/v9 v9.0.2 github.com/redis/go-redis/v9 v9.0.2
github.com/spaolacci/murmur3 v1.1.0
github.com/tidwall/gjson v1.14.0 github.com/tidwall/gjson v1.14.0
github.com/toolkits/pkg v1.3.3 github.com/toolkits/pkg v1.3.3
golang.org/x/oauth2 v0.4.0 golang.org/x/oauth2 v0.4.0
...@@ -74,7 +76,6 @@ require ( ...@@ -74,7 +76,6 @@ require (
github.com/pquerna/cachecontrol v0.1.0 // indirect github.com/pquerna/cachecontrol v0.1.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect github.com/tidwall/pretty v1.2.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
......
...@@ -20,6 +20,7 @@ const ( ...@@ -20,6 +20,7 @@ const (
HOST = "host" HOST = "host"
PROMETHEUS = "prometheus" PROMETHEUS = "prometheus"
TDENGINE = "tdengine"
) )
type AlertRule struct { type AlertRule struct {
...@@ -792,6 +793,10 @@ func (ar *AlertRule) IsHostRule() bool { ...@@ -792,6 +793,10 @@ func (ar *AlertRule) IsHostRule() bool {
return ar.Prod == HOST return ar.Prod == HOST
} }
func (ar *AlertRule) IsTdengineRule() bool {
return ar.Cate == TDENGINE
}
func (ar *AlertRule) GetRuleType() string { func (ar *AlertRule) GetRuleType() string {
if ar.Prod == METRIC { if ar.Prod == METRIC {
return ar.Cate return ar.Cate
......
package models
import "github.com/prometheus/common/model"
type DataResp struct {
Ref string `json:"ref"`
Metric model.Metric `json:"metric"`
Labels string `json:"-"`
Values [][]float64 `json:"values"`
}
func (d *DataResp) Last() (float64, float64, bool) {
if len(d.Values) == 0 {
return 0, 0, false
}
lastValue := d.Values[len(d.Values)-1]
if len(lastValue) != 2 {
return 0, 0, false
}
return lastValue[0], lastValue[1], true
}
func (d *DataResp) MetricName() string {
metric := d.Metric["__name__"]
return string(metric)
}
type RelationKey struct {
LeftKey string `json:"left_key"`
RightKey string `json:"right_key"`
OP string `json:"op"`
}
type QueryParam struct {
Cate string `json:"cate"`
DatasourceId int64 `json:"datasource_id"`
Querys []interface{} `json:"query"`
}
type Series struct {
SeriesStore map[uint64]DataResp `josn:"store"`
SeriesIndex map[string]map[uint64]struct{} `json:"index"`
}
package hash
import (
"sort"
prommodel "github.com/prometheus/common/model"
"github.com/spaolacci/murmur3"
)
func GetHash(m prommodel.Metric, ref string) uint64 {
var str string
var strs []string
// get keys from m
for k, _ := range m {
strs = append(strs, string(k))
}
// sort keys use sort
sort.Strings(strs)
for _, k := range strs {
str += "/"
str += k
str += "/"
str += string(m[prommodel.LabelName(k)])
}
str += "/"
str += ref
return murmur3.Sum64([]byte(str))
}
func GetTagHash(m prommodel.Metric) uint64 {
var str string
var strs []string
// get keys from m
for k, _ := range m {
if k == "__name__" {
continue
}
strs = append(strs, string(k))
}
// sort keys use sort
sort.Strings(strs)
for _, k := range strs {
str += "/"
str += k
str += "/"
str += string(m[prommodel.LabelName(k)])
}
return murmur3.Sum64([]byte(str))
}
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hash
import (
"hash"
"github.com/davecgh/go-spew/spew"
)
// DeepHashObject writes specified object to hash using the spew library
// which follows pointers and prints actual values of the nested objects
// ensuring the hash does not change when a pointer changes.
func DeepHashObject(hasher hash.Hash, objectToWrite interface{}) {
hasher.Reset()
printer := spew.ConfigState{
Indent: " ",
SortKeys: true,
DisableMethods: true,
SpewKeys: true,
}
printer.Fprintf(hasher, "%#v", objectToWrite)
}
package hash
import (
prommodel "github.com/prometheus/common/model"
"github.com/toolkits/pkg/str"
)
func GetHash2(m prommodel.Metric, ref string) string {
var s string
for k, v := range m {
s += "/"
s += string(k)
s += "/"
s += string(v)
}
s += "/"
s += ref
return str.MD5(s)
}
func GetTagHash2(m prommodel.Metric) string {
var s string
for k, v := range m {
if k == "__name__" {
continue
}
s += "/"
s += string(k)
s += "/"
s += string(v)
}
return str.MD5(s)
}
package parser
import (
"fmt"
"reflect"
"strings"
)
type Node interface {
}
type NumberNode struct {
Type TokenType
Lit string
}
type IdentifierNode struct {
Lit string
}
type BinaryNode struct {
Type TokenType
Left Node
Right Node
}
func formatNode(node Node, field string, ident int) {
if arr, ok := node.([]Node); ok {
for _, v := range arr {
formatNode(v, "", ident)
}
return
}
typ := reflect.TypeOf(node)
val := reflect.ValueOf(node)
if typ.Kind() == reflect.Ptr {
typ = typ.Elem()
val = val.Elem()
}
if field != "" {
fmt.Printf("%s%s: %s {\n", formatIdent(ident), field, typ.Name())
} else {
fmt.Printf("%s%s {\n", formatIdent(ident), typ.Name())
}
for i := 0; i < typ.NumField(); i++ {
fieldTyp := typ.Field(i)
fieldVal := val.Field(i)
fieldKind := fieldTyp.Type.Kind()
if fieldKind != reflect.Interface && fieldKind != reflect.Ptr {
fmt.Printf("%s%s: %s\n", formatIdent(ident+1), fieldTyp.Name, fieldVal.Interface())
} else {
formatNode(fieldVal.Interface(), fieldTyp.Name, ident+1)
}
}
fmt.Printf("%s}\n", formatIdent(ident))
}
func formatIdent(n int) string {
return strings.Repeat(". ", n)
}
package parser
import (
"fmt"
"strconv"
"github.com/toolkits/pkg/logger"
)
func MathCalc(s string, data map[string]float64) (float64, error) {
var err error
p := NewParser([]rune(s))
err = p.Parse()
if err != nil {
return 0, err
}
for _, stat := range p.Stats() {
v, err := eval(stat, data)
if err != nil {
return 0, err
}
logger.Infof("exp:%s res:%v", s, v)
return v, nil
}
return 0, err
}
func Calc(s string, data map[string]float64) bool {
var err error
p := NewParser([]rune(s))
err = p.Parse()
if err != nil {
logger.Errorf("parse err:%v", err)
return false
}
for _, stat := range p.Stats() {
v, err := eval(stat, data)
if err != nil {
logger.Error("eval error:", err)
return false
}
logger.Infof("exp:%s res:%v", s, v)
if v > 0.0 {
return true
}
}
return false
}
func eval(stat Node, data map[string]float64) (float64, error) {
switch node := stat.(type) {
case *BinaryNode:
return evalBinary(node, data)
case *IdentifierNode:
return get(node.Lit, data)
case *NumberNode:
return evaluateNumber(node)
default:
return 0, fmt.Errorf("invalid node: %v", node)
}
}
func evaluateNumber(node *NumberNode) (float64, error) {
switch node.Type {
case IntLiteral:
v, err := strconv.ParseFloat(node.Lit, 64)
if err != nil {
return 0, err
}
return v, nil
}
return 0, fmt.Errorf("invalid type: %v", node.Type)
}
func get(name string, data map[string]float64) (float64, error) {
value, exists := data[name]
if !exists {
return 0, fmt.Errorf("%s not found", name)
}
return value, nil
}
func evalBinary(node *BinaryNode, data map[string]float64) (float64, error) {
left, err := eval(node.Left, data)
if err != nil {
return 0, err
}
right, err := eval(node.Right, data)
if err != nil {
return 0, err
}
switch node.Type {
case AND:
return and(left, right), nil
case OR:
return or(left, right), nil
case Plus:
return add(left, right), nil
case Minus:
return minus(left, right), nil
case Star:
return star(left, right), nil
case Slash:
return slash(left, right)
case GT:
return gt(left, right), nil
case GE:
return ge(left, right), nil
case LT:
return lt(left, right), nil
case LE:
return le(left, right), nil
case EQ:
return eq(left, right), nil
case NE:
return ne(left, right), nil
}
return 0, fmt.Errorf("invalid operator: %v", node.Type)
}
// and
func and(left, right float64) float64 {
if left > 0.0 && right > 0.0 {
return 1
}
return 0
}
// or
func or(left, right float64) float64 {
if left > 0.0 || right > 0.0 {
return 1
}
return 0
}
func gt(left, right float64) float64 {
if left > right {
return 1
}
return 0
}
func ge(left, right float64) float64 {
if left >= right {
return 1
}
return 0
}
func lt(left, right float64) float64 {
if left < right {
return 1
}
return 0
}
func le(left, right float64) float64 {
if left <= right {
return 1
}
return 0
}
func eq(left, right float64) float64 {
if left == right {
return 1
}
return 0
}
func ne(left, right float64) float64 {
if left != right {
return 1
}
return 0
}
func add(left, right float64) float64 {
return left + right
}
func minus(left, right float64) float64 {
return left - right
}
func star(left, right float64) float64 {
return left * right
}
func slash(left, right float64) (float64, error) {
if right == 0 {
return 0, fmt.Errorf("right is zero")
}
res := left / right
return res, nil
}
package parser
import (
"errors"
"fmt"
)
var (
ErrorLexEOF = errors.New("lex source EOF")
)
type TokenType int
const (
EOF TokenType = iota
AND
OR
EXP
Identifier
GT
GE
LT
LE
EQ
NE
Plus
Minus
Star
Slash
LeftParen
RightParen
IntLiteral
UintLiteral
FloatLiteral
)
func (t TokenType) String() string {
switch t {
case Identifier:
return "Identifier"
case GT:
return "GT"
case GE:
return "GE"
case LT:
return "LT"
case LE:
return "LE"
case EQ:
return "EQ"
case NE:
return "NE"
case Plus:
return "Plus"
case Minus:
return "Minus"
case Star:
return "Star"
case Slash:
return "Slash"
case LeftParen:
return "leftParen"
case RightParen:
return "rightParen"
case AND:
return "AND"
case OR:
return "OR"
case EXP:
return "expr"
case IntLiteral:
return "IntLiteral"
case FloatLiteral:
return "FloatLiteral"
default:
return "Unknown TokenType"
}
}
type Token struct {
typ TokenType
buf []rune
}
func (t *Token) push(r rune) {
t.buf = append(t.buf, r)
}
func (t *Token) String() string {
return fmt.Sprintf("token: {%v: '%s'}", t.typ, string(t.buf))
}
type Lexer struct {
buf []rune
idx int // always point to the next rune
}
func newLexer(buf []rune) *Lexer {
return &Lexer{
buf: buf,
idx: 0,
}
}
func isAlpha(ch rune) bool {
return (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') || ch == '.'
}
func isPrefix(ch rune) bool {
return ch == '$'
}
func isWhitespace(ch rune) bool {
return ch == ' ' || ch == '\t' || ch == '\r'
}
func isDigit(ch rune) bool {
return ch >= '0' && ch <= '9' || ch == '.'
}
func (l *Lexer) lex() ([]*Token, error) {
toks := make([]*Token, 0)
for {
tok, err := l.lexToken()
if err != nil {
if err == ErrorLexEOF {
break
}
return nil, err
}
toks = append(toks, tok)
}
return toks, nil
}
func (l *Lexer) lexToken() (*Token, error) {
l.skipWhitespace()
ch, err := l.next()
if err != nil {
return nil, err
}
switch {
case isPrefix(ch):
return l.lexIdentifier(ch), nil
case ch == '&':
return l.lexAnd(ch), nil
case ch == '|':
return l.lexOR(ch), nil
case ch == '>':
return l.lexGT(ch), nil
case ch == '<':
return l.lexLT(ch), nil
case ch == '!':
return l.lexNE(ch), nil
case ch == '=':
return l.lexEQ(ch), nil
case ch == '+':
return l.lexPlus(ch), nil
case ch == '-':
return l.lexMinus(ch), nil
case ch == '*':
return l.lexStar(ch), nil
case ch == '/':
return l.lexSlash(ch), nil
case ch == '(':
return l.lexLeftParen(ch), nil
case ch == ')':
return l.lexRightParen(ch), nil
case isDigit(ch):
return l.lexDigital(ch), nil
default:
return nil, errors.New("not supported rune: " + string(ch))
}
}
func (l *Lexer) lexIdentifier(ch rune) *Token {
tok := &Token{
typ: Identifier,
buf: []rune{ch},
}
for {
ch, err := l.peek()
if err != nil {
return tok
}
if isAlpha(ch) || isDigit(ch) {
l.mustNext()
tok.push(ch)
continue
}
return tok
}
}
func (l *Lexer) lexGT(ch rune) *Token {
tok := &Token{
typ: GT,
buf: []rune{ch},
}
ch, err := l.peek()
if err != nil {
return tok
}
if ch == '=' {
tok.typ = GE
tok.buf = append(tok.buf, ch)
l.mustNext()
}
return tok
}
func (l *Lexer) lexLT(ch rune) *Token {
tok := &Token{
typ: LT,
buf: []rune{ch},
}
ch, err := l.peek()
if err != nil {
return tok
}
if ch == '=' {
tok.typ = LE
tok.buf = append(tok.buf, ch)
l.mustNext()
}
return tok
}
func (l *Lexer) lexEQ(ch rune) *Token {
tok := &Token{
typ: 0,
buf: []rune{ch},
}
ch, err := l.peek()
if err != nil {
return tok
}
if ch == '=' {
tok.typ = EQ
tok.buf = append(tok.buf, ch)
l.mustNext()
}
// 如果不是 == 处理报错
return tok
}
func (l *Lexer) lexNE(ch rune) *Token {
tok := &Token{
typ: 0,
buf: []rune{ch},
}
ch, err := l.peek()
if err != nil {
return tok
}
if ch == '=' {
tok.typ = NE
tok.buf = append(tok.buf, ch)
l.mustNext()
}
// 如果不是 == 处理报错
return tok
}
func (l *Lexer) lexPlus(ch rune) *Token {
tok := &Token{
typ: Plus,
buf: []rune{ch},
}
return tok
}
func (l *Lexer) lexMinus(ch rune) *Token {
tok := &Token{
typ: Minus,
buf: []rune{ch},
}
return tok
}
func (l *Lexer) lexStar(ch rune) *Token {
tok := &Token{
typ: Star,
buf: []rune{ch},
}
return tok
}
func (l *Lexer) lexSlash(ch rune) *Token {
tok := &Token{
typ: Slash,
buf: []rune{ch},
}
return tok
}
func (l *Lexer) lexAnd(ch rune) *Token {
tok := &Token{
typ: 0,
buf: []rune{ch},
}
ch, err := l.peek()
if err != nil {
return tok
}
if ch == '&' {
tok.typ = AND
tok.buf = append(tok.buf, ch)
l.mustNext()
}
return tok
}
func (l *Lexer) lexOR(ch rune) *Token {
tok := &Token{
typ: 0,
buf: []rune{ch},
}
ch, err := l.peek()
if err != nil {
return tok
}
if ch == '|' {
tok.typ = OR
tok.buf = append(tok.buf, ch)
l.mustNext()
}
return tok
}
func (l *Lexer) lexLeftParen(ch rune) *Token {
tok := &Token{
typ: LeftParen,
buf: []rune{ch},
}
return tok
}
func (l *Lexer) lexRightParen(ch rune) *Token {
tok := &Token{
typ: RightParen,
buf: []rune{ch},
}
return tok
}
func (l *Lexer) lexDigital(ch rune) *Token {
tok := &Token{
typ: IntLiteral,
buf: []rune{ch},
}
for {
ch, err := l.peek()
if err != nil {
return tok
}
if isDigit(ch) {
l.mustNext()
tok.push(ch)
continue
}
return tok
}
}
func (l *Lexer) skipWhitespace() bool {
found := false
for {
ch, err := l.peek()
if err != nil {
return false
}
if isWhitespace(ch) {
l.mustNext()
found = true
continue
}
if ch == '\n' {
l.mustNext()
found = true
continue
}
break
}
return found
}
func (l *Lexer) next() (rune, error) {
ch, err := l.peek()
if err != nil {
return 0, err
}
l.idx++
return ch, nil
}
func (l *Lexer) mustNext() rune {
l.idx++
return l.buf[l.idx-1]
}
func (l *Lexer) peek() (rune, error) {
if l.idx >= len(l.buf) {
return 0, ErrorLexEOF
}
return l.buf[l.idx], nil
}
package parser
import (
"fmt"
"github.com/toolkits/pkg/logger"
)
type Parser struct {
buf []rune
tokens []*Token
idx int
err error
isEOF bool
stats []Node
}
/*
exp -> or | or = exp
or -> and | or || and
and -> equal | and && equal
equal -> rel | equal == rel | equal != rel
rel -> add | rel > add | rel < add | rel >= add | rel <= add
add -> mul | add + mul | add - mul
mul -> pri | mul * pri | mul / pri
pri -> Id | Literal | (exp)
*/
func NewParser(buf []rune) *Parser {
return &Parser{
buf: buf,
}
}
func (p *Parser) Parse() error {
lexer := newLexer(p.buf)
tokens, err := lexer.lex()
if err != nil {
return err
}
p.tokens = tokens
p.stats = make([]Node, 0)
for {
node := p.parseStat()
if node == nil {
return nil
}
if p.hasError() {
return nil
}
nodes, ok := node.([]Node)
if ok {
p.stats = append(p.stats, nodes...)
} else {
p.stats = append(p.stats, node)
}
}
}
func (p *Parser) PrintAST() {
for _, node := range p.Stats() {
formatNode(node, "", 0)
}
}
func (p *Parser) Stats() []Node {
return p.stats
}
func (p *Parser) Err() error {
return p.err
}
func (p *Parser) parseStat() Node {
if p.hasError() {
return nil
}
tok, valid := p.peek()
if !valid {
return nil
}
switch tok.typ {
case Identifier, IntLiteral:
p.mustNext()
opTok, valid := p.peek()
if !valid {
p.back()
return p.parseExpr()
}
switch opTok.typ {
case Plus, Minus, Star, Slash, GE, GT, LE, LT, AND, OR, LeftParen:
p.back()
return p.parseExpr()
default:
p.reportError("invalid token: %v", tok)
return nil
}
default:
p.reportError("invalid token: %v", tok)
return nil
}
}
func (p *Parser) mustNext() *Token {
p.idx++
return p.tokens[p.idx-1]
}
func (p *Parser) back() {
p.idx--
}
func (p *Parser) hasError() bool {
if p.err != nil {
logger.Errorf("parse err", p.err)
}
return p.err != nil
}
func (p *Parser) peek() (*Token, bool) {
if p.idx >= len(p.tokens) {
p.isEOF = true
return nil, false
}
return p.tokens[p.idx], true
}
// exp -> or | or = exp
func (p *Parser) parseExpr() Node {
if p.hasError() {
return nil
}
leftNode := p.parseOr()
var binNode *BinaryNode
firstTime := true
for {
opTok, valid := p.peek()
if !valid {
if firstTime {
return leftNode
}
break
}
switch opTok.typ {
case AND:
p.mustNext()
default:
if firstTime {
return leftNode
}
return binNode
}
if firstTime {
binNode = &BinaryNode{
Type: opTok.typ,
Left: leftNode,
Right: p.parseExpr(),
}
firstTime = false
} else {
binNode = &BinaryNode{
Type: opTok.typ,
Left: binNode,
Right: p.parseExpr(),
}
}
if p.hasError() {
return nil
}
}
return binNode
}
// or -> and | or || and
func (p *Parser) parseOr() Node {
if p.hasError() {
return nil
}
leftNode := p.parseAnd()
var binNode *BinaryNode
firstTime := true
for {
opTok, valid := p.peek()
if !valid {
if firstTime {
return leftNode
}
break
}
switch opTok.typ {
case AND:
p.mustNext()
default:
if firstTime {
return leftNode
}
return binNode
}
if firstTime {
binNode = &BinaryNode{
Type: opTok.typ,
Left: leftNode,
Right: p.parseOr(),
}
firstTime = false
} else {
binNode = &BinaryNode{
Type: opTok.typ,
Left: binNode,
Right: p.parseOr(),
}
}
if p.hasError() {
return nil
}
}
return binNode
}
// and -> equal | and && equal
func (p *Parser) parseAnd() Node {
if p.hasError() {
return nil
}
leftNode := p.parseEqual()
var binNode *BinaryNode
firstTime := true
for {
opTok, valid := p.peek()
if !valid {
if firstTime {
return leftNode
}
break
}
switch opTok.typ {
case AND:
p.mustNext()
default:
if firstTime {
return leftNode
}
return binNode
}
if firstTime {
binNode = &BinaryNode{
Type: opTok.typ,
Left: leftNode,
Right: p.parseAnd(),
}
firstTime = false
} else {
binNode = &BinaryNode{
Type: opTok.typ,
Left: binNode,
Right: p.parseAnd(),
}
}
if p.hasError() {
return nil
}
}
return binNode
}
// equal -> rel | equal == rel | equal != rel
func (p *Parser) parseEqual() Node {
if p.hasError() {
return nil
}
leftNode := p.parseRel()
var binNode *BinaryNode
firstTime := true
for {
opTok, valid := p.peek()
if !valid {
if firstTime {
return leftNode
}
break
}
switch opTok.typ {
case EQ, NE:
p.mustNext()
default:
if firstTime {
return leftNode
}
return binNode
}
if firstTime {
binNode = &BinaryNode{
Type: opTok.typ,
Left: leftNode,
Right: p.parseEqual(),
}
firstTime = false
} else {
binNode = &BinaryNode{
Type: opTok.typ,
Left: binNode,
Right: p.parseEqual(),
}
}
if p.hasError() {
return nil
}
}
return binNode
}
// rel -> add | rel > add | rel < add | rel >= add | rel <= add
func (p *Parser) parseRel() Node {
if p.hasError() {
return nil
}
leftNode := p.parseAdd()
var binNode *BinaryNode
firstTime := true
for {
opTok, valid := p.peek()
if !valid {
if firstTime {
return leftNode
}
break
}
switch opTok.typ {
case GE, GT, LE, LT:
p.mustNext()
default:
if firstTime {
return leftNode
}
return binNode
}
if firstTime {
binNode = &BinaryNode{
Type: opTok.typ,
Left: leftNode,
Right: p.parseRel(),
}
firstTime = false
} else {
binNode = &BinaryNode{
Type: opTok.typ,
Left: binNode,
Right: p.parseRel(),
}
}
if p.hasError() {
return nil
}
}
return binNode
}
// add -> mul ( + mul)*
func (p *Parser) parseAdd() Node {
if p.hasError() {
return nil
}
leftNode := p.parseMul()
var binNode *BinaryNode
firstTime := true
for {
opTok, valid := p.peek()
if !valid {
if firstTime {
return leftNode
}
break
}
switch opTok.typ {
case Plus, Minus:
p.mustNext()
default:
if firstTime {
return leftNode
}
return binNode
}
if firstTime {
binNode = &BinaryNode{
Type: opTok.typ,
Left: leftNode,
Right: p.parseAdd(),
}
firstTime = false
} else {
binNode = &BinaryNode{
Type: opTok.typ,
Left: binNode,
Right: p.parseAdd(),
}
}
if p.hasError() {
return nil
}
}
return binNode
}
// mul -> pri | mul * pri | mul / pri
func (p *Parser) parseMul() Node {
if p.hasError() {
return nil
}
leftNode := p.parsePri()
var binNode *BinaryNode
firstTime := true
for {
opTok, valid := p.peek()
if !valid {
if firstTime {
return leftNode
}
break
}
switch opTok.typ {
case Star, Slash:
p.mustNext()
default:
if firstTime {
return leftNode
}
return binNode
}
if firstTime {
binNode = &BinaryNode{
Type: opTok.typ,
Left: leftNode,
Right: p.parseMul(),
}
firstTime = false
} else {
binNode = &BinaryNode{
Type: opTok.typ,
Left: binNode,
Right: p.parseMul(),
}
}
if p.hasError() {
return nil
}
}
return binNode
}
// pri -> Id | Literal | (exp)
func (p *Parser) parsePri() Node {
if p.hasError() {
return nil
}
tok, valid := p.peek()
if !valid {
p.reportError("unexpected EOF")
return nil
}
if tok.typ == IntLiteral {
p.mustNext()
return &NumberNode{
Type: tok.typ,
Lit: string(tok.buf),
}
}
if tok.typ == Identifier {
p.mustNext()
return &IdentifierNode{
Lit: string(tok.buf),
}
}
if tok.typ == LeftParen {
p.mustNext()
node := p.parseExpr()
if node != nil {
tk, valid := p.peek()
if !valid {
p.reportError("unexpected EOF")
return nil
}
if tk.typ == RightParen {
p.mustNext()
} else {
p.reportError("expecting right parenthesis")
}
} else {
p.reportError("expecting an additive expression inside parenthesis")
}
}
p.reportError("expect int Identifier but met %v", tok)
return nil
}
func (p *Parser) reportError(args ...interface{}) {
if len(args) >= 1 {
i := args[0]
switch v := i.(type) {
case string:
p.err = fmt.Errorf(v, args[1:]...)
case error:
p.err = v
default:
panic(v)
}
}
}
package tdengine
import (
"encoding/base64"
"encoding/json"
"fmt"
"net"
"net/http"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/ccfos/nightingale/v6/alert/aconf"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/prometheus/common/model"
"github.com/toolkits/pkg/logger"
)
type TdengineQuery struct {
From string `json:"from"`
Interval int64 `json:"interval"`
Keys Keys `json:"keys"`
Query string `json:"query"` // 查询条件
Ref string `json:"ref"` // 变量
To string `json:"to"`
}
type Keys struct {
LabelKey string `json:"labelKey"` // 多个用空格分隔
MetricKey string `json:"metricKey"` // 多个用空格分隔
}
type TdengineClientMap struct {
sync.RWMutex
ctx *ctx.Context
heartbeat aconf.HeartbeatConfig
ReaderClients map[int64]*tdengineClient
}
func (pc *TdengineClientMap) Set(datasourceId int64, r *tdengineClient) {
if r == nil {
return
}
pc.Lock()
defer pc.Unlock()
pc.ReaderClients[datasourceId] = r
}
func (pc *TdengineClientMap) GetDatasourceIds() []int64 {
pc.RLock()
defer pc.RUnlock()
var datasourceIds []int64
for k := range pc.ReaderClients {
datasourceIds = append(datasourceIds, k)
}
return datasourceIds
}
func (pc *TdengineClientMap) GetCli(datasourceId int64) *tdengineClient {
pc.RLock()
defer pc.RUnlock()
c := pc.ReaderClients[datasourceId]
return c
}
func (pc *TdengineClientMap) IsNil(datasourceId int64) bool {
pc.RLock()
defer pc.RUnlock()
c, exists := pc.ReaderClients[datasourceId]
if !exists {
return true
}
return c == nil
}
// Hit 根据当前有效的 datasourceId 和规则的 datasourceId 配置计算有效的cluster列表
func (pc *TdengineClientMap) Hit(datasourceIds []int64) []int64 {
pc.RLock()
defer pc.RUnlock()
dsIds := make([]int64, 0, len(pc.ReaderClients))
if len(datasourceIds) == 1 && datasourceIds[0] == models.DatasourceIdAll {
for c := range pc.ReaderClients {
dsIds = append(dsIds, c)
}
return dsIds
}
for dsId := range pc.ReaderClients {
for _, id := range datasourceIds {
if id == dsId {
dsIds = append(dsIds, id)
continue
}
}
}
return dsIds
}
func (pc *TdengineClientMap) Reset() {
pc.Lock()
defer pc.Unlock()
pc.ReaderClients = make(map[int64]*tdengineClient)
}
func (pc *TdengineClientMap) Del(datasourceId int64) {
pc.Lock()
defer pc.Unlock()
delete(pc.ReaderClients, datasourceId)
}
type tdengineClient struct {
cfg *models.Datasource
client *http.Client
header map[string][]string
}
func newTdengine(po TdengineOption) *tdengineClient {
tc := &tdengineClient{}
tc.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableCompression: true,
},
}
tc.header = map[string][]string{
"Connection": {"keep-alive"},
}
for _, v := range po.Headers {
kv := strings.Split(v, ":")
if len(kv) != 2 {
continue
}
tc.header[kv[0]] = []string{kv[1]}
}
if po.BasicAuthUser != "" {
basic := base64.StdEncoding.EncodeToString([]byte(po.BasicAuthUser + ":" + po.BasicAuthPass))
tc.header["Authorization"] = []string{fmt.Sprintf("Basic %s", basic)}
}
return tc
}
type APIResponse struct {
Code int `json:"code"`
ColumnMeta [][]interface{} `json:"column_meta"`
Data [][]interface{} `json:"data"`
Rows int `json:"rows"`
}
func (tc *tdengineClient) QueryTable(query string) (APIResponse, error) {
var apiResp APIResponse
req, err := http.NewRequest("POST", tc.cfg.HTTPJson.Url+"/rest/sql", strings.NewReader(query))
if err != nil {
return apiResp, err
}
for k, v := range tc.header {
req.Header[k] = v
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := tc.client.Do(req)
if err != nil {
return apiResp, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return apiResp, fmt.Errorf("HTTP error, status: %s", resp.Status)
}
err = json.NewDecoder(resp.Body).Decode(&apiResp)
return apiResp, err
}
func (tc *tdengineClient) Query(query interface{}) ([]*models.DataResp, error) {
q := query.(TdengineQuery)
replacements := map[string]string{
"$from": q.From,
"$to": q.To,
"$interval": strconv.FormatInt(q.Interval, 10),
}
for key, val := range replacements {
q.Query = strings.ReplaceAll(q.Query, key, val)
}
data, err := tc.QueryTable(q.Query)
if err != nil {
return nil, err
}
return ConvertToTStData(data, q.Keys)
}
// get tdendgine databases
func (tc *tdengineClient) GetDatabases() ([]string, error) {
var databases []string
data, err := tc.QueryTable("show databases")
if err != nil {
return databases, err
}
for _, row := range data.Data {
databases = append(databases, row[0].(string))
}
return databases, nil
}
// get tdendgine tables by database
func (tc *tdengineClient) GetTables(database string, isStable bool) ([]string, error) {
var tables []string
sql := fmt.Sprintf("show %s.tables", database)
if isStable {
sql = fmt.Sprintf("show %s.stables", database)
}
data, err := tc.QueryTable(sql)
if err != nil {
return tables, err
}
for _, row := range data.Data {
tables = append(tables, row[0].(string))
}
return tables, nil
}
type Column struct {
Name string `json:"name"`
Type string `json:"type"`
Size int `json:"size"`
}
func (tc *tdengineClient) GetColumns(database, table string) ([]Column, error) {
var columns []Column
sql := fmt.Sprintf("select * from %s.%s limit 1", database, table)
data, err := tc.QueryTable(sql)
if err != nil {
return columns, err
}
for _, row := range data.ColumnMeta {
column := Column{
Name: row[0].(string),
Type: row[1].(string),
Size: int(row[2].(float64)),
}
columns = append(columns, column)
}
return columns, nil
}
// {
// "code": 0,
// "column_meta": [
// ["ts", "TIMESTAMP", 8],
// ["count", "BIGINT", 8],
// ["endpoint", "VARCHAR", 45],
// ["status_code", "INT", 4],
// ["client_ip", "VARCHAR", 40],
// ["request_method", "VARCHAR", 15],
// ["request_uri", "VARCHAR", 128]
// ],
// "data": [
// [
// "2022-06-29T05:50:55.401Z",
// 2,
// "LAPTOP-NNKFTLTG:6041",
// 200,
// "172.23.208.1",
// "POST",
// "/rest/sql"
// ],
// [
// "2022-06-29T05:52:16.603Z",
// 1,
// "LAPTOP-NNKFTLTG:6041",
// 200,
// "172.23.208.1",
// "POST",
// "/rest/sql"
// ]
// ],
// "rows": 2
// }
// {
// "dat": [
// {
// "ref": "",
// "metric": {
// "__name__": "count",
// "host":"host1"
// },
// "values": [
// [
// 1693219500,
// 12
// ]
// ]
// }
// ],
// "err": ""
// }
func ConvertToTStData(src APIResponse, key Keys) ([]*models.DataResp, error) {
metricIdxMap := make(map[string]int)
labelIdxMap := make(map[string]int)
metricMap := make(map[string]struct{})
if key.MetricKey != "" {
metricList := strings.Split(key.MetricKey, " ")
for _, metric := range metricList {
metricMap[metric] = struct{}{}
}
}
labelMap := make(map[string]string)
if key.LabelKey != "" {
labelList := strings.Split(key.LabelKey, " ")
for _, label := range labelList {
labelMap[label] = label
}
}
var tsIdx int
for colIndex, colData := range src.ColumnMeta {
// 类型参考 https://docs.taosdata.com/taos-sql/data-type/
colName := colData[0].(string)
colType := colData[1].(string)
switch colType {
case "TIMESTAMP":
tsIdx = colIndex
case "BIGINT", "INT", "INT UNSIGNED", "BIGINT UNSIGNED", "FLOAT", "DOUBLE",
"SMALLINT", "SMALLINT UNSIGNED", "TINYINT", "TINYINT UNSIGNED", "BOOL":
if _, ok := metricMap[colName]; !ok {
continue
}
metricIdxMap[colName] = colIndex
default:
if _, ok := labelMap[colName]; !ok {
continue
}
labelIdxMap[colName] = colIndex
}
}
var result []*models.DataResp
m := make(map[string]*models.DataResp)
for _, row := range src.Data {
for metricName, metricIdx := range metricIdxMap {
value, err := interfaceToFloat64(row[metricIdx])
if err != nil {
logger.Warningf("parse %v value failed: %v", row, err)
continue
}
metric := make(model.Metric)
for labelName, labelIdx := range labelIdxMap {
metric[model.LabelName(labelName)] = model.LabelValue(row[labelIdx].(string))
}
metric[model.MetricNameLabel] = model.LabelValue(metricName)
// transfer 2022-06-29T05:52:16.603Z to unix timestamp
t, err := time.Parse(time.RFC3339, row[tsIdx].(string))
if err != nil {
logger.Warningf("parse %v timestamp failed: %v", row, err)
continue
}
timestamp := t.UnixNano() / 1e6
if _, ok := m[metric.String()]; !ok {
m[metric.String()] = &models.DataResp{
Metric: metric,
Values: [][]float64{
{float64(timestamp), value},
},
}
} else {
m[metric.String()].Values = append(m[metric.String()].Values, []float64{float64(timestamp), value})
}
}
}
for _, v := range m {
result = append(result, v)
}
return result, nil
}
func interfaceToFloat64(input interface{}) (float64, error) {
// Check for the kind of the value first
kind := reflect.TypeOf(input).Kind()
switch kind {
case reflect.Float64:
return input.(float64), nil
case reflect.Float32:
return float64(input.(float32)), nil
case reflect.Int, reflect.Int32, reflect.Int64, reflect.Int8, reflect.Int16:
return float64(reflect.ValueOf(input).Int()), nil
case reflect.Uint, reflect.Uint32, reflect.Uint64, reflect.Uint8, reflect.Uint16:
return float64(reflect.ValueOf(input).Uint()), nil
case reflect.String:
return strconv.ParseFloat(input.(string), 64)
case reflect.Bool:
if input.(bool) {
return 1.0, nil
}
return 0.0, nil
default:
return 0, fmt.Errorf("unsupported type: %T", input)
}
}
package tdengine
import (
"sync"
"github.com/ccfos/nightingale/v6/pkg/tlsx"
)
type TdengineOption struct {
DatasourceName string
Url string
BasicAuthUser string
BasicAuthPass string
Token string
Timeout int64
DialTimeout int64
MaxIdleConnsPerHost int
Headers []string
tlsx.ClientConfig
}
func (po *TdengineOption) Equal(target TdengineOption) bool {
if po.Url != target.Url {
return false
}
if po.BasicAuthUser != target.BasicAuthUser {
return false
}
if po.BasicAuthPass != target.BasicAuthPass {
return false
}
if po.Token != target.Token {
return false
}
if po.Timeout != target.Timeout {
return false
}
if po.DialTimeout != target.DialTimeout {
return false
}
if po.MaxIdleConnsPerHost != target.MaxIdleConnsPerHost {
return false
}
if len(po.Headers) != len(target.Headers) {
return false
}
for i := 0; i < len(po.Headers); i++ {
if po.Headers[i] != target.Headers[i] {
return false
}
}
return true
}
type TdengineOptionsStruct struct {
Data map[int64]TdengineOption
sync.RWMutex
}
func (pos *TdengineOptionsStruct) Set(datasourceId int64, po TdengineOption) {
pos.Lock()
pos.Data[datasourceId] = po
pos.Unlock()
}
func (pos *TdengineOptionsStruct) Del(datasourceId int64) {
pos.Lock()
delete(pos.Data, datasourceId)
pos.Unlock()
}
func (pos *TdengineOptionsStruct) Get(datasourceId int64) (TdengineOption, bool) {
pos.RLock()
defer pos.RUnlock()
ret, has := pos.Data[datasourceId]
return ret, has
}
// Data key is cluster name
var TdengineOptions = &TdengineOptionsStruct{Data: make(map[int64]TdengineOption)}
package tdengine
import (
"fmt"
"strings"
"time"
"github.com/ccfos/nightingale/v6/alert/aconf"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/toolkits/pkg/logger"
)
func NewTdengineClient(ctx *ctx.Context, heartbeat aconf.HeartbeatConfig) *TdengineClientMap {
pc := &TdengineClientMap{
ReaderClients: make(map[int64]*tdengineClient),
heartbeat: heartbeat,
}
pc.InitReader()
return pc
}
func (pc *TdengineClientMap) InitReader() error {
go func() {
for {
pc.loadFromDatabase()
time.Sleep(time.Second)
}
}()
return nil
}
func (pc *TdengineClientMap) loadFromDatabase() {
var datasources []*models.Datasource
var err error
if !pc.ctx.IsCenter {
datasources, err = poster.GetByUrls[[]*models.Datasource](pc.ctx, "/v1/n9e/datasources?typ="+models.TDENGINE)
if err != nil {
logger.Errorf("failed to get datasources, error: %v", err)
return
}
for i := 0; i < len(datasources); i++ {
datasources[i].FE2DB()
}
} else {
datasources, err = models.GetDatasourcesGetsBy(pc.ctx, models.TDENGINE, "", "", "")
if err != nil {
logger.Errorf("failed to get datasources, error: %v", err)
return
}
}
newCluster := make(map[int64]struct{})
for _, ds := range datasources {
dsId := ds.Id
var header []string
for k, v := range ds.HTTPJson.Headers {
header = append(header, k)
header = append(header, v)
}
po := TdengineOption{
DatasourceName: ds.Name,
Url: ds.HTTPJson.Url,
BasicAuthUser: ds.AuthJson.BasicAuthUser,
BasicAuthPass: ds.AuthJson.BasicAuthPassword,
Timeout: ds.HTTPJson.Timeout,
DialTimeout: ds.HTTPJson.DialTimeout,
MaxIdleConnsPerHost: ds.HTTPJson.MaxIdleConnsPerHost,
Headers: header,
}
if strings.HasPrefix(ds.HTTPJson.Url, "https") {
po.UseTLS = true
po.InsecureSkipVerify = ds.HTTPJson.TLS.SkipTlsVerify
}
if len(ds.SettingsJson) > 0 {
for k, v := range ds.SettingsJson {
if strings.Contains(k, "token") {
po.Headers = append(po.Headers, "Authorization")
po.Headers = append(po.Headers, "Taosd "+v.(string))
}
}
}
newCluster[dsId] = struct{}{}
if pc.IsNil(dsId) {
// first time
if err = pc.setClientFromTdengineOption(dsId, po); err != nil {
logger.Errorf("failed to setClientFromTdengineOption po:%+v err:%v", po, err)
continue
}
logger.Info("setClientFromTdengineOption success: ", dsId)
TdengineOptions.Set(dsId, po)
continue
}
localPo, has := TdengineOptions.Get(dsId)
if !has || !localPo.Equal(po) {
if err = pc.setClientFromTdengineOption(dsId, po); err != nil {
logger.Errorf("failed to setClientFromTdengineOption: %v", err)
continue
}
TdengineOptions.Set(dsId, po)
}
}
// delete useless cluster
oldIds := pc.GetDatasourceIds()
for _, oldId := range oldIds {
if _, has := newCluster[oldId]; !has {
pc.Del(oldId)
TdengineOptions.Del(oldId)
logger.Info("delete cluster: ", oldId)
}
}
}
func (pc *TdengineClientMap) setClientFromTdengineOption(datasourceId int64, po TdengineOption) error {
if datasourceId < 0 {
return fmt.Errorf("argument clusterName is blank")
}
if po.Url == "" {
return fmt.Errorf("prometheus url is blank")
}
reader := newTdengine(po)
logger.Debugf("setClientFromTdengineOption: %d, %+v", datasourceId, po)
pc.Set(datasourceId, reader)
return nil
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册