From bd222e58eb7976e012dfaaee055a9de86ed0d703 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 16 Aug 2024 15:26:54 +0800 Subject: [PATCH] enhance: [2.4] Exclude L0 segment from readable snapshot (#35510) Cherry-pick from master pr: #35507 L0 segments now do not contain insert data and may cause confusion for query hook optimizer if counted as sealed segment number. This PR add segment level flag in segment entry and exclude L0 segments while get readable segment snapshot Signed-off-by: Congqi Xia --- .../querynodev2/delegator/delegator_data.go | 1 + .../delegator/delegator_data_test.go | 33 +++++++++++-------- .../querynodev2/delegator/distribution.go | 17 ++++++---- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index ebefbc5631..242791de0d 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -524,6 +524,7 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg PartitionID: info.GetPartitionID(), NodeID: req.GetDstNodeID(), Version: req.GetVersion(), + Level: info.GetLevel(), } }) if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 { diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 8ec313ad46..7a96b3d919 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -509,6 +509,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() { PartitionID: 500, StartPosition: &msgpb.MsgPosition{Timestamp: 20000}, DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000}, + Level: datapb.SegmentLevel_L1, InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID), }, }, @@ -524,6 +525,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() { NodeID: 1, PartitionID: 500, TargetVersion: unreadableTargetVersion, + Level: datapb.SegmentLevel_L1, }, }, sealed[0].Segments) }) @@ -599,20 +601,21 @@ func (s *DelegatorDataSuite) TestLoadSegments() { }) s.NoError(err) - err = s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{ - Base: commonpbutil.NewMsgBase(), - DstNodeID: 1, - CollectionID: s.collectionID, - Infos: []*querypb.SegmentLoadInfo{ - { - SegmentID: 200, - PartitionID: 500, - StartPosition: &msgpb.MsgPosition{Timestamp: 20000}, - DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000}, - InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID), - }, - }, - }) + // err = s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{ + // Base: commonpbutil.NewMsgBase(), + // DstNodeID: 1, + // CollectionID: s.collectionID, + // Infos: []*querypb.SegmentLoadInfo{ + // { + // SegmentID: 200, + // PartitionID: 500, + // StartPosition: &msgpb.MsgPosition{Timestamp: 20000}, + // DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000}, + // Level: datapb.SegmentLevel_L1, + // InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID), + // }, + // }, + // }) s.NoError(err) sealed, _ := s.delegator.GetSegmentInfo(false) @@ -624,12 +627,14 @@ func (s *DelegatorDataSuite) TestLoadSegments() { NodeID: 1, PartitionID: 500, TargetVersion: unreadableTargetVersion, + Level: datapb.SegmentLevel_L1, }, { SegmentID: 200, NodeID: 1, PartitionID: 500, TargetVersion: unreadableTargetVersion, + Level: datapb.SegmentLevel_L0, }, }, sealed[0].Segments) }) diff --git a/internal/querynodev2/delegator/distribution.go b/internal/querynodev2/delegator/distribution.go index 44fda25934..ae57b0fe04 100644 --- a/internal/querynodev2/delegator/distribution.go +++ b/internal/querynodev2/delegator/distribution.go @@ -23,6 +23,7 @@ import ( "go.uber.org/atomic" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -84,6 +85,7 @@ type SegmentEntry struct { PartitionID UniqueID Version int64 TargetVersion int64 + Level datapb.SegmentLevel } // NewDistribution creates a new distribution instance with all field initialized. @@ -114,9 +116,7 @@ func (d *distribution) PinReadableSegments(partitions ...int64) (sealed []Snapsh sealed, growing = current.Get(partitions...) version = current.version targetVersion := current.GetTargetVersion() - filterReadable := func(entry SegmentEntry, _ int) bool { - return entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion - } + filterReadable := d.readableFilter(targetVersion) sealed, growing = d.filterSegments(sealed, growing, filterReadable) return } @@ -157,9 +157,7 @@ func (d *distribution) PeekSegments(readable bool, partitions ...int64) (sealed if readable { targetVersion := current.GetTargetVersion() - filterReadable := func(entry SegmentEntry, _ int) bool { - return entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion - } + filterReadable := d.readableFilter(targetVersion) sealed, growing = d.filterSegments(sealed, growing, filterReadable) return } @@ -382,6 +380,13 @@ func (d *distribution) genSnapshot() chan struct{} { return last.cleared } +func (d *distribution) readableFilter(targetVersion int64) func(entry SegmentEntry, _ int) bool { + return func(entry SegmentEntry, _ int) bool { + // segment L0 is not readable for now + return entry.Level != datapb.SegmentLevel_L0 && (entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion) + } +} + // getCleanup returns cleanup snapshots function. func (d *distribution) getCleanup(version int64) snapshotCleanup { return func() {