diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index ebefbc5631..242791de0d 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -524,6 +524,7 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg PartitionID: info.GetPartitionID(), NodeID: req.GetDstNodeID(), Version: req.GetVersion(), + Level: info.GetLevel(), } }) if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 { diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 8ec313ad46..7a96b3d919 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -509,6 +509,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() { PartitionID: 500, StartPosition: &msgpb.MsgPosition{Timestamp: 20000}, DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000}, + Level: datapb.SegmentLevel_L1, InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID), }, }, @@ -524,6 +525,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() { NodeID: 1, PartitionID: 500, TargetVersion: unreadableTargetVersion, + Level: datapb.SegmentLevel_L1, }, }, sealed[0].Segments) }) @@ -599,20 +601,21 @@ func (s *DelegatorDataSuite) TestLoadSegments() { }) s.NoError(err) - err = s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{ - Base: commonpbutil.NewMsgBase(), - DstNodeID: 1, - CollectionID: s.collectionID, - Infos: []*querypb.SegmentLoadInfo{ - { - SegmentID: 200, - PartitionID: 500, - StartPosition: &msgpb.MsgPosition{Timestamp: 20000}, - DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000}, - InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID), - }, - }, - }) + // err = s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{ + // Base: commonpbutil.NewMsgBase(), + // DstNodeID: 1, + // CollectionID: s.collectionID, + // Infos: []*querypb.SegmentLoadInfo{ + // { + // SegmentID: 200, + // PartitionID: 500, + // StartPosition: &msgpb.MsgPosition{Timestamp: 20000}, + // DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000}, + // Level: datapb.SegmentLevel_L1, + // InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID), + // }, + // }, + // }) s.NoError(err) sealed, _ := s.delegator.GetSegmentInfo(false) @@ -624,12 +627,14 @@ func (s *DelegatorDataSuite) TestLoadSegments() { NodeID: 1, PartitionID: 500, TargetVersion: unreadableTargetVersion, + Level: datapb.SegmentLevel_L1, }, { SegmentID: 200, NodeID: 1, PartitionID: 500, TargetVersion: unreadableTargetVersion, + Level: datapb.SegmentLevel_L0, }, }, sealed[0].Segments) }) diff --git a/internal/querynodev2/delegator/distribution.go b/internal/querynodev2/delegator/distribution.go index 44fda25934..ae57b0fe04 100644 --- a/internal/querynodev2/delegator/distribution.go +++ b/internal/querynodev2/delegator/distribution.go @@ -23,6 +23,7 @@ import ( "go.uber.org/atomic" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -84,6 +85,7 @@ type SegmentEntry struct { PartitionID UniqueID Version int64 TargetVersion int64 + Level datapb.SegmentLevel } // NewDistribution creates a new distribution instance with all field initialized. @@ -114,9 +116,7 @@ func (d *distribution) PinReadableSegments(partitions ...int64) (sealed []Snapsh sealed, growing = current.Get(partitions...) version = current.version targetVersion := current.GetTargetVersion() - filterReadable := func(entry SegmentEntry, _ int) bool { - return entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion - } + filterReadable := d.readableFilter(targetVersion) sealed, growing = d.filterSegments(sealed, growing, filterReadable) return } @@ -157,9 +157,7 @@ func (d *distribution) PeekSegments(readable bool, partitions ...int64) (sealed if readable { targetVersion := current.GetTargetVersion() - filterReadable := func(entry SegmentEntry, _ int) bool { - return entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion - } + filterReadable := d.readableFilter(targetVersion) sealed, growing = d.filterSegments(sealed, growing, filterReadable) return } @@ -382,6 +380,13 @@ func (d *distribution) genSnapshot() chan struct{} { return last.cleared } +func (d *distribution) readableFilter(targetVersion int64) func(entry SegmentEntry, _ int) bool { + return func(entry SegmentEntry, _ int) bool { + // segment L0 is not readable for now + return entry.Level != datapb.SegmentLevel_L0 && (entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion) + } +} + // getCleanup returns cleanup snapshots function. func (d *distribution) getCleanup(version int64) snapshotCleanup { return func() {