diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index d6d8abc869..c5716a96ad 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -113,6 +113,7 @@ type DataNode struct { //call once initOnce sync.Once startOnce sync.Once + stopOnce sync.Once wg sync.WaitGroup sessionMu sync.Mutex // to fix data race session *sessionutil.Session @@ -583,34 +584,32 @@ func (node *DataNode) ReadyToFlush() error { // Stop will release DataNode resources and shutdown datanode func (node *DataNode) Stop() error { - // https://github.com/milvus-io/milvus/issues/12282 - node.UpdateStateCode(commonpb.StateCode_Abnormal) - node.flowgraphManager.dropAll() - node.flowgraphManager.stop() + node.stopOnce.Do(func() { + node.cancel() + // https://github.com/milvus-io/milvus/issues/12282 + node.UpdateStateCode(commonpb.StateCode_Abnormal) + node.flowgraphManager.close() - node.cancel() - node.wg.Wait() - node.eventManagerMap.Range(func(_ string, m *channelEventManager) bool { - m.Close() - return true - }) + node.eventManagerMap.Range(func(_ string, m *channelEventManager) bool { + m.Close() + return true + }) - if node.allocator != nil { - log.Info("close id allocator", zap.String("role", typeutil.DataNodeRole)) - node.allocator.Close() - } - - if node.closer != nil { - err := node.closer.Close() - if err != nil { - return err + if node.allocator != nil { + log.Info("close id allocator", zap.String("role", typeutil.DataNodeRole)) + node.allocator.Close() } - } - if node.session != nil { - node.session.Stop() - } + if node.closer != nil { + node.closer.Close() + } + if node.session != nil { + node.session.Stop() + } + + node.wg.Wait() + }) return nil } diff --git a/internal/datanode/flow_graph_manager.go b/internal/datanode/flow_graph_manager.go index 43abf6d7ab..0e5da3f371 100644 --- a/internal/datanode/flow_graph_manager.go +++ b/internal/datanode/flow_graph_manager.go @@ -61,7 +61,8 @@ func (fm *flowgraphManager) start() { } } -func (fm *flowgraphManager) stop() { +func (fm *flowgraphManager) close() { + fm.dropAll() fm.closeOnce.Do(func() { close(fm.closeCh) }) diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index b5628384e8..4e512614e2 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -70,7 +70,7 @@ var rateCol *ratelimitutil.RateCollector // Proxy of milvus type Proxy struct { ctx context.Context - cancel func() + cancel context.CancelFunc wg sync.WaitGroup initParams *internalpb.InitParams diff --git a/internal/util/flowgraph/input_node.go b/internal/util/flowgraph/input_node.go index 475b16f6d1..d363043c5e 100644 --- a/internal/util/flowgraph/input_node.go +++ b/internal/util/flowgraph/input_node.go @@ -66,7 +66,9 @@ func (inNode *InputNode) Operate(in []Msg) []Msg { if !ok { log.Warn("input closed", zap.Any("input node", inNode.Name())) if inNode.lastMsg != nil { - log.Info("trigger force sync", zap.Int64("collection", inNode.collectionID), zap.Any("position", inNode.lastMsg)) + log.Info("trigger force sync", + zap.Int64("collection", inNode.collectionID), + zap.Any("position", inNode.lastMsg.EndPositions)) return []Msg{&MsgStreamMsg{ BaseMsg: NewBaseMsg(true), tsMessages: []msgstream.TsMsg{},