From 08fd28b30b85d78190a6f1b5be50d42a71915da2 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Mon, 21 Aug 2023 11:16:20 +0800 Subject: [PATCH] Only do gracefully stop when DN Stop (#26399) Signed-off-by: yangxuan --- internal/datanode/data_sync_service.go | 8 ++++ internal/datanode/data_sync_service_test.go | 4 +- .../flow_graph_dmstream_input_node.go | 12 ++++- internal/datanode/flow_graph_manager.go | 2 +- internal/util/flowgraph/flow_graph.go | 23 +++++++--- internal/util/flowgraph/input_node.go | 46 ++++++++++++++----- 6 files changed, 72 insertions(+), 23 deletions(-) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 4cabd999f3..a164a052e8 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -161,6 +161,14 @@ func (dsService *dataSyncService) start() { } } +func (dsService *dataSyncService) GracefullyClose() { + if dsService.fg != nil { + log.Info("dataSyncService gracefully closing flowgraph") + dsService.fg.SetCloseMethod(flowgraph.CloseGracefully) + dsService.close() + } +} + func (dsService *dataSyncService) close() { dsService.stopOnce.Do(func() { log := log.Ctx(context.Background()).With( diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 5b275c4e17..ef4e46758e 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -534,7 +534,7 @@ func TestDataSyncService_Close(t *testing.T) { assert.Equal(t, 0, len(syncService.flushListener)) // close will trigger a force sync - syncService.close() + syncService.GracefullyClose() assert.Eventually(t, func() bool { return len(syncService.flushListener) == 1 }, 5*time.Second, 100*time.Millisecond) flushPack, ok := <-syncService.flushListener @@ -547,7 +547,7 @@ func TestDataSyncService_Close(t *testing.T) { <-syncService.ctx.Done() // Double close is safe - syncService.close() + syncService.GracefullyClose() <-syncService.ctx.Done() } diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index bfed86fd8c..21e3ecf56e 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -62,7 +62,15 @@ func newDmInputNode(dispatcherClient msgdispatcher.Client, seekPos *msgpb.MsgPos } name := fmt.Sprintf("dmInputNode-data-%d-%s", dmNodeConfig.collectionID, dmNodeConfig.vChannelName) - node := flowgraph.NewInputNode(input, name, dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism, - typeutil.DataNodeRole, paramtable.GetNodeID(), dmNodeConfig.collectionID, metrics.AllLabel) + node := flowgraph.NewInputNode( + input, + name, + dmNodeConfig.maxQueueLength, + dmNodeConfig.maxParallelism, + typeutil.DataNodeRole, + paramtable.GetNodeID(), + dmNodeConfig.collectionID, + metrics.AllLabel, + ) return node, nil } diff --git a/internal/datanode/flow_graph_manager.go b/internal/datanode/flow_graph_manager.go index 0309eeaaf8..95b01b2ebe 100644 --- a/internal/datanode/flow_graph_manager.go +++ b/internal/datanode/flow_graph_manager.go @@ -216,7 +216,7 @@ func (fm *flowgraphManager) getFlowGraphNum() int { func (fm *flowgraphManager) dropAll() { log.Info("start drop all flowgraph resources in DataNode") fm.flowgraphs.Range(func(key string, value *dataSyncService) bool { - value.close() + value.GracefullyClose() fm.flowgraphs.GetAndRemove(key) log.Info("successfully dropped flowgraph", zap.String("vChannelName", key)) diff --git a/internal/util/flowgraph/flow_graph.go b/internal/util/flowgraph/flow_graph.go index f574bb2126..8f5e9777b4 100644 --- a/internal/util/flowgraph/flow_graph.go +++ b/internal/util/flowgraph/flow_graph.go @@ -21,16 +21,18 @@ import ( "sync" "github.com/cockroachdb/errors" + "go.uber.org/atomic" ) // Flow Graph is no longer a graph rather than a simple pipeline, this simplified our code and increase recovery speed - xiaofan. // TimeTickedFlowGraph flowgraph with input from tt msg stream type TimeTickedFlowGraph struct { - nodeCtx map[NodeName]*nodeCtx - stopOnce sync.Once - startOnce sync.Once - closeWg *sync.WaitGroup + nodeCtx map[NodeName]*nodeCtx + stopOnce sync.Once + startOnce sync.Once + closeWg *sync.WaitGroup + closeGracefully *atomic.Bool } // AddNode add Node into flowgraph @@ -93,6 +95,14 @@ func (fg *TimeTickedFlowGraph) Unblock() { } } +func (fg *TimeTickedFlowGraph) SetCloseMethod(gracefully bool) { + for _, v := range fg.nodeCtx { + if v.node.IsInputNode() { + v.node.(*InputNode).SetCloseMethod(gracefully) + } + } +} + // Close closes all nodes in flowgraph func (fg *TimeTickedFlowGraph) Close() { fg.stopOnce.Do(func() { @@ -108,8 +118,9 @@ func (fg *TimeTickedFlowGraph) Close() { // NewTimeTickedFlowGraph create timetick flowgraph func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph { flowGraph := TimeTickedFlowGraph{ - nodeCtx: make(map[string]*nodeCtx), - closeWg: &sync.WaitGroup{}, + nodeCtx: make(map[string]*nodeCtx), + closeWg: &sync.WaitGroup{}, + closeGracefully: atomic.NewBool(CloseImmediately), } return &flowGraph diff --git a/internal/util/flowgraph/input_node.go b/internal/util/flowgraph/input_node.go index d363043c5e..a847c98366 100644 --- a/internal/util/flowgraph/input_node.go +++ b/internal/util/flowgraph/input_node.go @@ -30,9 +30,15 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" + "go.uber.org/atomic" "go.uber.org/zap" ) +const ( + CloseGracefully bool = true + CloseImmediately bool = false +) + // InputNode is the entry point of flowgragh type InputNode struct { BaseNode @@ -43,7 +49,9 @@ type InputNode struct { nodeID int64 collectionID int64 dataType string - closeOnce sync.Once + + closeOnce sync.Once + closeGracefully *atomic.Bool } // IsInputNode returns whether Node is InputNode @@ -60,14 +68,27 @@ func (inNode *InputNode) Name() string { return inNode.name } +func (inNode *InputNode) SetCloseMethod(gracefully bool) { + inNode.closeGracefully.Store(gracefully) + log.Info("input node close method set", + zap.String("node", inNode.Name()), + zap.Int64("collection", inNode.collectionID), + zap.Any("gracefully", gracefully)) +} + // Operate consume a message pack from msgstream and return func (inNode *InputNode) Operate(in []Msg) []Msg { msgPack, ok := <-inNode.input if !ok { - log.Warn("input closed", zap.Any("input node", inNode.Name())) - if inNode.lastMsg != nil { - log.Info("trigger force sync", - zap.Int64("collection", inNode.collectionID), + log := log.With( + zap.String("node", inNode.Name()), + zap.Int64("collection", inNode.collectionID), + ) + log.Info("input node message stream closed", + zap.Bool("closeGracefully", inNode.closeGracefully.Load()), + ) + if inNode.lastMsg != nil && inNode.closeGracefully.Load() { + log.Info("input node trigger force sync", zap.Any("position", inNode.lastMsg.EndPositions)) return []Msg{&MsgStreamMsg{ BaseMsg: NewBaseMsg(true), @@ -144,12 +165,13 @@ func NewInputNode(input <-chan *msgstream.MsgPack, nodeName string, maxQueueLeng baseNode.SetMaxParallelism(maxParallelism) return &InputNode{ - BaseNode: baseNode, - input: input, - name: nodeName, - role: role, - nodeID: nodeID, - collectionID: collectionID, - dataType: dataType, + BaseNode: baseNode, + input: input, + name: nodeName, + role: role, + nodeID: nodeID, + collectionID: collectionID, + dataType: dataType, + closeGracefully: atomic.NewBool(CloseImmediately), } }