未验证 提交 08fd28b3 编写于 作者: X XuanYang-cn 提交者: GitHub

Only do gracefully stop when DN Stop (#26399)

Signed-off-by: Nyangxuan <xuan.yang@zilliz.com>
上级 e69e200f
......@@ -161,6 +161,14 @@ func (dsService *dataSyncService) start() {
}
}
func (dsService *dataSyncService) GracefullyClose() {
if dsService.fg != nil {
log.Info("dataSyncService gracefully closing flowgraph")
dsService.fg.SetCloseMethod(flowgraph.CloseGracefully)
dsService.close()
}
}
func (dsService *dataSyncService) close() {
dsService.stopOnce.Do(func() {
log := log.Ctx(context.Background()).With(
......
......@@ -534,7 +534,7 @@ func TestDataSyncService_Close(t *testing.T) {
assert.Equal(t, 0, len(syncService.flushListener))
// close will trigger a force sync
syncService.close()
syncService.GracefullyClose()
assert.Eventually(t, func() bool { return len(syncService.flushListener) == 1 },
5*time.Second, 100*time.Millisecond)
flushPack, ok := <-syncService.flushListener
......@@ -547,7 +547,7 @@ func TestDataSyncService_Close(t *testing.T) {
<-syncService.ctx.Done()
// Double close is safe
syncService.close()
syncService.GracefullyClose()
<-syncService.ctx.Done()
}
......
......@@ -62,7 +62,15 @@ func newDmInputNode(dispatcherClient msgdispatcher.Client, seekPos *msgpb.MsgPos
}
name := fmt.Sprintf("dmInputNode-data-%d-%s", dmNodeConfig.collectionID, dmNodeConfig.vChannelName)
node := flowgraph.NewInputNode(input, name, dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism,
typeutil.DataNodeRole, paramtable.GetNodeID(), dmNodeConfig.collectionID, metrics.AllLabel)
node := flowgraph.NewInputNode(
input,
name,
dmNodeConfig.maxQueueLength,
dmNodeConfig.maxParallelism,
typeutil.DataNodeRole,
paramtable.GetNodeID(),
dmNodeConfig.collectionID,
metrics.AllLabel,
)
return node, nil
}
......@@ -216,7 +216,7 @@ func (fm *flowgraphManager) getFlowGraphNum() int {
func (fm *flowgraphManager) dropAll() {
log.Info("start drop all flowgraph resources in DataNode")
fm.flowgraphs.Range(func(key string, value *dataSyncService) bool {
value.close()
value.GracefullyClose()
fm.flowgraphs.GetAndRemove(key)
log.Info("successfully dropped flowgraph", zap.String("vChannelName", key))
......
......@@ -21,16 +21,18 @@ import (
"sync"
"github.com/cockroachdb/errors"
"go.uber.org/atomic"
)
// Flow Graph is no longer a graph rather than a simple pipeline, this simplified our code and increase recovery speed - xiaofan.
// TimeTickedFlowGraph flowgraph with input from tt msg stream
type TimeTickedFlowGraph struct {
nodeCtx map[NodeName]*nodeCtx
stopOnce sync.Once
startOnce sync.Once
closeWg *sync.WaitGroup
nodeCtx map[NodeName]*nodeCtx
stopOnce sync.Once
startOnce sync.Once
closeWg *sync.WaitGroup
closeGracefully *atomic.Bool
}
// AddNode add Node into flowgraph
......@@ -93,6 +95,14 @@ func (fg *TimeTickedFlowGraph) Unblock() {
}
}
func (fg *TimeTickedFlowGraph) SetCloseMethod(gracefully bool) {
for _, v := range fg.nodeCtx {
if v.node.IsInputNode() {
v.node.(*InputNode).SetCloseMethod(gracefully)
}
}
}
// Close closes all nodes in flowgraph
func (fg *TimeTickedFlowGraph) Close() {
fg.stopOnce.Do(func() {
......@@ -108,8 +118,9 @@ func (fg *TimeTickedFlowGraph) Close() {
// NewTimeTickedFlowGraph create timetick flowgraph
func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph {
flowGraph := TimeTickedFlowGraph{
nodeCtx: make(map[string]*nodeCtx),
closeWg: &sync.WaitGroup{},
nodeCtx: make(map[string]*nodeCtx),
closeWg: &sync.WaitGroup{},
closeGracefully: atomic.NewBool(CloseImmediately),
}
return &flowGraph
......
......@@ -30,9 +30,15 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"go.uber.org/atomic"
"go.uber.org/zap"
)
const (
CloseGracefully bool = true
CloseImmediately bool = false
)
// InputNode is the entry point of flowgragh
type InputNode struct {
BaseNode
......@@ -43,7 +49,9 @@ type InputNode struct {
nodeID int64
collectionID int64
dataType string
closeOnce sync.Once
closeOnce sync.Once
closeGracefully *atomic.Bool
}
// IsInputNode returns whether Node is InputNode
......@@ -60,14 +68,27 @@ func (inNode *InputNode) Name() string {
return inNode.name
}
func (inNode *InputNode) SetCloseMethod(gracefully bool) {
inNode.closeGracefully.Store(gracefully)
log.Info("input node close method set",
zap.String("node", inNode.Name()),
zap.Int64("collection", inNode.collectionID),
zap.Any("gracefully", gracefully))
}
// Operate consume a message pack from msgstream and return
func (inNode *InputNode) Operate(in []Msg) []Msg {
msgPack, ok := <-inNode.input
if !ok {
log.Warn("input closed", zap.Any("input node", inNode.Name()))
if inNode.lastMsg != nil {
log.Info("trigger force sync",
zap.Int64("collection", inNode.collectionID),
log := log.With(
zap.String("node", inNode.Name()),
zap.Int64("collection", inNode.collectionID),
)
log.Info("input node message stream closed",
zap.Bool("closeGracefully", inNode.closeGracefully.Load()),
)
if inNode.lastMsg != nil && inNode.closeGracefully.Load() {
log.Info("input node trigger force sync",
zap.Any("position", inNode.lastMsg.EndPositions))
return []Msg{&MsgStreamMsg{
BaseMsg: NewBaseMsg(true),
......@@ -144,12 +165,13 @@ func NewInputNode(input <-chan *msgstream.MsgPack, nodeName string, maxQueueLeng
baseNode.SetMaxParallelism(maxParallelism)
return &InputNode{
BaseNode: baseNode,
input: input,
name: nodeName,
role: role,
nodeID: nodeID,
collectionID: collectionID,
dataType: dataType,
BaseNode: baseNode,
input: input,
name: nodeName,
role: role,
nodeID: nodeID,
collectionID: collectionID,
dataType: dataType,
closeGracefully: atomic.NewBool(CloseImmediately),
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册