From 4c0e36d28c851a429d49c53eeba16c4662e96e57 Mon Sep 17 00:00:00 2001 From: yah01 Date: Thu, 6 Jul 2023 10:04:25 +0800 Subject: [PATCH] Stop blocking until all channels & segments released (#25328) Signed-off-by: yah01 --- internal/querynodev2/server.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 5e04b84921..485a493c60 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -374,11 +374,23 @@ func (node *QueryNode) Stop() error { timeoutCh := time.After(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second)) outer: - for node.manager == nil || node.manager.Segment.Empty() { + for (node.manager == nil || node.manager.Segment.Empty()) && + (node.pipelineManager == nil || node.pipelineManager.Num() == 0) { select { case <-timeoutCh: - sealedSegments := node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeSealed)) - growingSegments := node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing)) + var ( + sealedSegments = []segments.Segment{} + growingSegments = []segments.Segment{} + channelNum = 0 + ) + if node.manager != nil { + sealedSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeSealed)) + growingSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing)) + } + if node.pipelineManager != nil { + channelNum = node.pipelineManager.Num() + } + log.Warn("migrate data timed out", zap.Int64("ServerID", paramtable.GetNodeID()), zap.Int64s("sealedSegments", lo.Map[segments.Segment, int64](sealedSegments, func(s segments.Segment, i int) int64 { return s.ID() @@ -386,6 +398,7 @@ func (node *QueryNode) Stop() error { zap.Int64s("growingSegments", lo.Map[segments.Segment, int64](growingSegments, func(t segments.Segment, i int) int64 { return t.ID() })), + zap.Int("channelNum", channelNum), ) break outer