From c68c128e4754b0ba51fc7e4768b896a5ff8f4cb1 Mon Sep 17 00:00:00 2001 From: yah01 Date: Tue, 16 Jan 2024 14:40:53 +0800 Subject: [PATCH] fix: level 0 segments not loaded (#29908) the recent changes move the level 0 segments list to a new proto field, which leads to the QueryCoord can't see the level 0 segments, handle the new changes fix #29907 Signed-off-by: yah01 --- internal/querycoordv2/meta/target_manager.go | 10 ++++++++++ .../querycoordv2/meta/target_manager_test.go | 19 +++++++++++-------- internal/querycoordv2/task/executor.go | 3 ++- 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index efd7e04fe1..dc220c07f3 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -24,6 +24,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -205,6 +206,15 @@ func (mgr *TargetManager) PullNextTargetV2(broker Broker, collectionID int64, ch for _, info := range vChannelInfos { channelInfos[info.GetChannelName()] = append(channelInfos[info.GetChannelName()], info) + for _, segmentID := range info.GetLevelZeroSegmentIds() { + segments[segmentID] = &datapb.SegmentInfo{ + ID: segmentID, + CollectionID: collectionID, + InsertChannel: info.GetChannelName(), + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L0, + } + } } partitionSet := typeutil.NewUniqueSet(chosenPartitionIDs...) diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index 503dee4ec7..67cbcd7f00 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -44,10 +44,11 @@ type TargetManagerSuite struct { suite.Suite // Data - collections []int64 - partitions map[int64][]int64 - channels map[int64][]string - segments map[int64]map[int64][]int64 // CollectionID, PartitionID -> Segments + collections []int64 + partitions map[int64][]int64 + channels map[int64][]string + segments map[int64]map[int64][]int64 // CollectionID, PartitionID -> Segments + level0Segments []int64 // Derived data allChannels []string allSegments []int64 @@ -80,6 +81,7 @@ func (suite *TargetManagerSuite) SetupSuite() { 103: {7, 8}, }, } + suite.level0Segments = []int64{10000, 10001} suite.allChannels = make([]string, 0) suite.allSegments = make([]int64, 0) @@ -118,8 +120,9 @@ func (suite *TargetManagerSuite) SetupTest() { dmChannels := make([]*datapb.VchannelInfo, 0) for _, channel := range suite.channels[collection] { dmChannels = append(dmChannels, &datapb.VchannelInfo{ - CollectionID: collection, - ChannelName: channel, + CollectionID: collection, + ChannelName: channel, + LevelZeroSegmentIds: suite.level0Segments, }) } @@ -265,7 +268,7 @@ func (suite *TargetManagerSuite) TestRemovePartition() { suite.assertChannels([]string{}, suite.mgr.GetDmChannelsByCollection(collectionID, CurrentTarget)) suite.mgr.RemovePartition(collectionID, 100) - suite.assertSegments([]int64{3, 4}, suite.mgr.GetSealedSegmentsByCollection(collectionID, NextTarget)) + suite.assertSegments(append([]int64{3, 4}, suite.level0Segments...), suite.mgr.GetSealedSegmentsByCollection(collectionID, NextTarget)) suite.assertChannels(suite.channels[collectionID], suite.mgr.GetDmChannelsByCollection(collectionID, NextTarget)) suite.assertSegments([]int64{}, suite.mgr.GetSealedSegmentsByCollection(collectionID, CurrentTarget)) suite.assertChannels([]string{}, suite.mgr.GetDmChannelsByCollection(collectionID, CurrentTarget)) @@ -310,7 +313,7 @@ func (suite *TargetManagerSuite) getAllSegment(collectionID int64, partitionIDs } } - return allSegments + return append(allSegments, suite.level0Segments...) } func (suite *TargetManagerSuite) assertChannels(expected []string, actual map[string]*DmChannel) bool { diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index cb4fed75ce..b08bc33815 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -190,6 +190,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { return err } segment := resp.GetInfos()[0] + log = log.With(zap.String("level", segment.GetLevel().String())) indexes, err := ex.broker.GetIndexInfo(ctx, task.CollectionID(), segment.GetID()) if err != nil { @@ -224,7 +225,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { segmentIndex.IndexParams = funcutil.Map2KeyValuePair(params) } - loadInfo := utils.PackSegmentLoadInfo(resp.GetInfos()[0], channel.GetSeekPosition(), indexes) + loadInfo := utils.PackSegmentLoadInfo(segment, channel.GetSeekPosition(), indexes) req := packLoadSegmentRequest( task,