From a55c371261d01a9afb6d406df0288ccf162d71de Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Thu, 5 Jun 2025 15:56:33 +0800 Subject: [PATCH] fix: querynode upgrade from 2.5 get stucked (#42503) issue: #42492 pr: #42502 - querynode graceful stop can be done if there's only L0 segment exists. Signed-off-by: chyezh --- internal/querynodev2/segments/segment_filter.go | 7 +++++++ internal/querynodev2/server.go | 8 ++++++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/internal/querynodev2/segments/segment_filter.go b/internal/querynodev2/segments/segment_filter.go index a9fec49219..e9401a7d15 100644 --- a/internal/querynodev2/segments/segment_filter.go +++ b/internal/querynodev2/segments/segment_filter.go @@ -140,3 +140,10 @@ func WithLevel(level datapb.SegmentLevel) SegmentFilter { return segment.Level() == level }) } + +// WithoutLevel is the segment filter for without segment level. +func WithoutLevel(level datapb.SegmentLevel) SegmentFilter { + return SegmentFilterFunc(func(segment Segment) bool { + return segment.Level() != level + }) +} diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index ff0dc17e52..90f20ffde0 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -68,6 +68,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/mq/msgdispatcher" + "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/expr" "github.com/milvus-io/milvus/pkg/v2/util/hardware" "github.com/milvus-io/milvus/pkg/v2/util/lifetime" @@ -479,12 +480,15 @@ func (node *QueryNode) Stop() error { channelNum = 0 ) if node.manager != nil { - sealedSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeSealed)) - growingSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing)) + sealedSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeSealed), segments.WithoutLevel(datapb.SegmentLevel_L0)) + growingSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing), segments.WithoutLevel(datapb.SegmentLevel_L0)) } if node.pipelineManager != nil { channelNum = node.pipelineManager.Num() } + if len(sealedSegments) == 0 && len(growingSegments) == 0 && channelNum == 0 { + break + } select { case <-timeoutCh: