未验证 提交 8e814a29 编写于 作者: C congqixia 提交者: GitHub

Make flowgraph Close only once (#7631)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 a3666c84
......@@ -18,12 +18,16 @@ import (
"errors"
)
// TimeTickedFlowGraph flowgraph with input from tt msg stream
type TimeTickedFlowGraph struct {
ctx context.Context
cancel context.CancelFunc
nodeCtx map[NodeName]*nodeCtx
ctx context.Context
cancel context.CancelFunc
nodeCtx map[NodeName]*nodeCtx
stopOnce sync.Once
startOnce sync.Once
}
// AddNode add Node into flowgraph
func (fg *TimeTickedFlowGraph) AddNode(node Node) {
nodeName := node.Name()
nodeCtx := nodeCtx{
......@@ -34,6 +38,7 @@ func (fg *TimeTickedFlowGraph) AddNode(node Node) {
fg.nodeCtx[nodeName] = &nodeCtx
}
// SetEdges set directed edges from in nodes to out nodes
func (fg *TimeTickedFlowGraph) SetEdges(nodeName string, in []string, out []string) error {
currentNode, ok := fg.nodeCtx[nodeName]
if !ok {
......@@ -69,30 +74,30 @@ func (fg *TimeTickedFlowGraph) SetEdges(nodeName string, in []string, out []stri
return nil
}
// Start starts all nodes in timetick flowgragh
func (fg *TimeTickedFlowGraph) Start() {
wg := sync.WaitGroup{}
for _, v := range fg.nodeCtx {
wg.Add(1)
go v.Start(fg.ctx, &wg)
}
wg.Wait()
fg.startOnce.Do(func() {
wg := sync.WaitGroup{}
for _, v := range fg.nodeCtx {
wg.Add(1)
go v.Start(fg.ctx, &wg)
}
wg.Wait()
})
}
// Close closes all nodes in flowgraph
func (fg *TimeTickedFlowGraph) Close() {
for _, v := range fg.nodeCtx {
// close message stream
// if v.node.IsInputNode() {
// inStream, ok := v.node.(*InputNode)
// if !ok {
// log.Fatal("Invalid inputNode")
// }
// (*inStream.inStream).Close()
// }
v.Close()
}
fg.cancel()
fg.stopOnce.Do(func() {
for _, v := range fg.nodeCtx {
// maybe need to stop in order
v.Close()
}
fg.cancel()
})
}
// NewTimeTickedFlowGraph create timetick flowgraph
func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph {
ctx1, cancel := context.WithCancel(ctx)
flowGraph := TimeTickedFlowGraph{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册