diff --git a/internal/querynodev2/segments/segment_filter.go b/internal/querynodev2/segments/segment_filter.go index a9fec49219..e9401a7d15 100644 --- a/internal/querynodev2/segments/segment_filter.go +++ b/internal/querynodev2/segments/segment_filter.go @@ -140,3 +140,10 @@ func WithLevel(level datapb.SegmentLevel) SegmentFilter { return segment.Level() == level }) } + +// WithoutLevel is the segment filter for without segment level. +func WithoutLevel(level datapb.SegmentLevel) SegmentFilter { + return SegmentFilterFunc(func(segment Segment) bool { + return segment.Level() != level + }) +} diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index ff0dc17e52..90f20ffde0 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -68,6 +68,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/mq/msgdispatcher" + "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/expr" "github.com/milvus-io/milvus/pkg/v2/util/hardware" "github.com/milvus-io/milvus/pkg/v2/util/lifetime" @@ -479,12 +480,15 @@ func (node *QueryNode) Stop() error { channelNum = 0 ) if node.manager != nil { - sealedSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeSealed)) - growingSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing)) + sealedSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeSealed), segments.WithoutLevel(datapb.SegmentLevel_L0)) + growingSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing), segments.WithoutLevel(datapb.SegmentLevel_L0)) } if node.pipelineManager != nil { channelNum = node.pipelineManager.Num() } + if len(sealedSegments) == 0 && len(growingSegments) == 0 && channelNum == 0 { + break + } select { case <-timeoutCh: