提交 7830e60c 编写于 作者: 7 710leo

fix log

上级 56feba9b
......@@ -66,7 +66,7 @@ func UpdateConfigsLoop() {
}
}
func GetLatestTmsAndDelay(filepath string) (int64, int64, bool) {
func GetLatestTmsAndDelay(filepath string) (int64, int32, bool) {
ManagerJobLock.RLock()
job, ok := ManagerJob[filepath]
ManagerJobLock.RUnlock()
......@@ -74,6 +74,7 @@ func GetLatestTmsAndDelay(filepath string) (int64, int64, bool) {
if !ok {
return 0, 0, false
}
latest, delay := job.w.GetLatestTmsAndDelay()
return latest, delay, true
}
......
......@@ -106,13 +106,13 @@ func tmsNeedPush(tms int64, filePath string, step int64, waitPush int) bool {
// 为解决日志时间戳乱序的最大等待时间, hard code
// delay == 0时, 不用额外等待, 进而提高时效性
if delay > 0 {
var maxDelay int64
var maxDelay int32
if step <= 10 {
maxDelay = step * 3
maxDelay = int32(step) * 3
} else if step > 10 && step <= 30 {
maxDelay = step * 2
maxDelay = int32(step) * 2
} else {
maxDelay = step
maxDelay = int32(step)
}
if delay > maxDelay {
delay = maxDelay
......@@ -129,7 +129,7 @@ func tmsNeedPush(tms int64, filePath string, step int64, waitPush int) bool {
return true
}
if tms < AlignStepTms(step, latest-delay) {
if tms < AlignStepTms(step, latest-int64(delay)) {
return true
}
......
......@@ -16,7 +16,7 @@ import (
"github.com/didi/nightingale/src/modules/agent/stra"
)
type callbackHandler func(int64, int64)
type callbackHandler func(int64, int32)
//单个worker对象
type Worker struct {
......@@ -35,7 +35,7 @@ type Worker struct {
type WorkerGroup struct {
WorkerNum int
LatestTms int64 //日志文件最新处理的时间戳
MaxDelay int64 //日志文件存在的时间戳乱序最大差值
MaxDelay int32 //日志文件存在的时间戳乱序最大差值
ResetTms int64 //maxDelay上次重置的时间
Workers []*Worker
TimeFormatStrategy string
......@@ -71,11 +71,11 @@ func NewWorkerGroup(filePath string, stream chan string) *WorkerGroup {
return wg
}
func (wg WorkerGroup) GetLatestTmsAndDelay() (tms int64, delay int64) {
func (wg WorkerGroup) GetLatestTmsAndDelay() (tms int64, delay int32) {
return wg.LatestTms, wg.MaxDelay
}
func (wg *WorkerGroup) SetLatestTmsAndDelay(tms int64, delay int64) {
func (wg *WorkerGroup) SetLatestTmsAndDelay(tms int64, delay int32) {
latest := atomic.LoadInt64(&wg.LatestTms)
if latest < tms {
......@@ -89,9 +89,9 @@ func (wg *WorkerGroup) SetLatestTmsAndDelay(tms int64, delay int64) {
return
}
newest := atomic.LoadInt64(&wg.MaxDelay)
newest := atomic.LoadInt32(&wg.MaxDelay)
if newest < delay {
atomic.CompareAndSwapInt64(&wg.MaxDelay, newest, delay)
atomic.CompareAndSwapInt32(&wg.MaxDelay, newest, delay)
}
}
......@@ -112,7 +112,7 @@ func (wg *WorkerGroup) ResetMaxDelay() {
ts := time.Now().Unix()
if ts-wg.ResetTms > 86400 {
wg.ResetTms = ts
atomic.StoreInt64(&wg.MaxDelay, 0)
atomic.StoreInt32(&wg.MaxDelay, 0)
}
}
......@@ -253,7 +253,7 @@ func (w *Worker) producer(line string, strategy *stra.Strategy) (*AnalysPoint, e
delay = w.LatestTms - tmsUnix
}
if updateLatest || delay > 0 {
w.Callback(tmsUnix, delay)
w.Callback(tmsUnix, int32(delay))
}
//处理用户正则
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册