From 4c0ca83928dbc5389ff0122909bdf503e9d3fc34 Mon Sep 17 00:00:00 2001 From: yah01 Date: Wed, 27 Dec 2023 23:24:46 +0800 Subject: [PATCH] enhance: speed up loading with many deletions (#29455) (#29520) the executor always fetches the latest segment info, so we could consume from the latest checkpoint, which could save much time while deleted many entities pr: #29455 Signed-off-by: yah01 Signed-off-by: yah01 --- .../querycoordv2/observers/leader_observer.go | 7 +++- .../observers/leader_observer_test.go | 6 ++-- internal/querycoordv2/task/executor.go | 7 +++- internal/querycoordv2/task/task_test.go | 14 ++++---- internal/querycoordv2/utils/types.go | 25 +++---------- internal/querycoordv2/utils/types_test.go | 35 +++++++------------ 6 files changed, 39 insertions(+), 55 deletions(-) diff --git a/internal/querycoordv2/observers/leader_observer.go b/internal/querycoordv2/observers/leader_observer.go index 26e03d0bcf..713cfd0465 100644 --- a/internal/querycoordv2/observers/leader_observer.go +++ b/internal/querycoordv2/observers/leader_observer.go @@ -151,7 +151,12 @@ func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dis log.Warn("failed to get segment info from DataCoord", zap.Error(err)) continue } - loadInfo := utils.PackSegmentLoadInfo(resp, nil) + + channel := o.target.GetDmChannel(s.GetCollectionID(), s.GetInsertChannel(), meta.NextTarget) + if channel == nil { + channel = o.target.GetDmChannel(s.GetCollectionID(), s.GetInsertChannel(), meta.CurrentTarget) + } + loadInfo := utils.PackSegmentLoadInfo(resp, channel.GetSeekPosition(), nil) log.Debug("leader observer append a segment to set", zap.Int64("collectionID", leaderView.CollectionID), diff --git a/internal/querycoordv2/observers/leader_observer_test.go b/internal/querycoordv2/observers/leader_observer_test.go index 1e36388607..8f1ec27099 100644 --- a/internal/querycoordv2/observers/leader_observer_test.go +++ b/internal/querycoordv2/observers/leader_observer_test.go @@ -136,7 +136,7 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() { view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) observer.dist.LeaderViewManager.Update(2, view) - loadInfo := utils.PackSegmentLoadInfo(resp, nil) + loadInfo := utils.PackSegmentLoadInfo(resp, nil, nil) expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest { return &querypb.SyncDistributionRequest{ @@ -232,7 +232,7 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() { view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) observer.dist.LeaderViewManager.Update(2, view) - loadInfo := utils.PackSegmentLoadInfo(resp, nil) + loadInfo := utils.PackSegmentLoadInfo(resp, nil, nil) expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest { return &querypb.SyncDistributionRequest{ @@ -368,7 +368,7 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() { view2 := utils.CreateTestLeaderView(4, 1, "test-insert-channel", map[int64]int64{1: 4}, map[int64]*meta.Segment{}) view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) observer.dist.LeaderViewManager.Update(4, view2) - loadInfo := utils.PackSegmentLoadInfo(resp, nil) + loadInfo := utils.PackSegmentLoadInfo(resp, nil, nil) expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest { return &querypb.SyncDistributionRequest{ diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 9dc0337966..0c33fc4e5d 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -191,7 +191,12 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { indexes = nil } - loadInfo := utils.PackSegmentLoadInfo(resp, indexes) + channel := ex.targetMgr.GetDmChannel(task.CollectionID(), segment.GetInsertChannel(), meta.NextTarget) + if channel == nil { + channel = ex.targetMgr.GetDmChannel(task.CollectionID(), segment.GetInsertChannel(), meta.CurrentTarget) + } + + loadInfo := utils.PackSegmentLoadInfo(resp, channel.GetSeekPosition(), indexes) // Get collection index info indexInfo, err := ex.broker.DescribeIndex(ctx, task.CollectionID()) diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 1feeafdeb2..41cbe69afe 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -436,7 +436,7 @@ func (suite *TaskSuite) TestLoadSegmentTask() { err = suite.scheduler.Add(task) suite.NoError(err) } - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil) + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil) suite.target.UpdateCollectionNextTarget(suite.collection) segmentsNum := len(suite.loadSegments) suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum) @@ -532,7 +532,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() { err = suite.scheduler.Add(task) suite.NoError(err) } - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil) + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil) suite.target.UpdateCollectionNextTarget(suite.collection) segmentsNum := len(suite.loadSegments) suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum) @@ -622,7 +622,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() { err = suite.scheduler.Add(task) suite.NoError(err) } - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil) + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil) suite.target.UpdateCollectionNextTarget(suite.collection) segmentsNum := len(suite.loadSegments) suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum) @@ -998,7 +998,7 @@ func (suite *TaskSuite) TestTaskCanceled() { } segmentsNum := len(suite.loadSegments) suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum) - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segmentInfos, nil) + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segmentInfos, nil) suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collection, partition)) suite.target.UpdateCollectionNextTarget(suite.collection) @@ -1084,7 +1084,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() { err = suite.scheduler.Add(task) suite.NoError(err) } - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil) + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil) suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collection, partition)) suite.target.UpdateCollectionNextTarget(suite.collection) segmentsNum := len(suite.loadSegments) @@ -1120,7 +1120,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() { bakExpectations := suite.broker.ExpectedCalls suite.broker.AssertExpectations(suite.T()) suite.broker.ExpectedCalls = suite.broker.ExpectedCalls[:0] - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil) + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil) suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collection, 2)) suite.target.UpdateCollectionNextTarget(suite.collection) @@ -1326,7 +1326,7 @@ func (suite *TaskSuite) TestNoExecutor() { err = suite.scheduler.Add(task) suite.NoError(err) } - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil) + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil) suite.target.UpdateCollectionNextTarget(suite.collection) segmentsNum := len(suite.loadSegments) suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum) diff --git a/internal/querycoordv2/utils/types.go b/internal/querycoordv2/utils/types.go index 7920f10f64..1da7af9dac 100644 --- a/internal/querycoordv2/utils/types.go +++ b/internal/querycoordv2/utils/types.go @@ -72,34 +72,17 @@ func MergeMetaSegmentIntoSegmentInfo(info *querypb.SegmentInfo, segments ...*met // packSegmentLoadInfo packs SegmentLoadInfo for given segment, // packs with index if withIndex is true, this fetch indexes from IndexCoord -func PackSegmentLoadInfo(resp *datapb.GetSegmentInfoResponse, indexes []*querypb.FieldIndexInfo) *querypb.SegmentLoadInfo { - var ( - deltaPosition *msgpb.MsgPosition - positionSrc string - ) - +func PackSegmentLoadInfo(resp *datapb.GetSegmentInfoResponse, checkpoint *msgpb.MsgPosition, indexes []*querypb.FieldIndexInfo) *querypb.SegmentLoadInfo { segment := resp.GetInfos()[0] - if resp.GetChannelCheckpoint() != nil && resp.ChannelCheckpoint[segment.InsertChannel] != nil { - deltaPosition = resp.ChannelCheckpoint[segment.InsertChannel] - positionSrc = "channelCheckpoint" - } else if segment.GetDmlPosition() != nil { - deltaPosition = segment.GetDmlPosition() - positionSrc = "segmentDMLPos" - } else { - deltaPosition = segment.GetStartPosition() - positionSrc = "segmentStartPos" - } - - posTime := tsoutil.PhysicalTime(deltaPosition.GetTimestamp()) + posTime := tsoutil.PhysicalTime(checkpoint.GetTimestamp()) tsLag := time.Since(posTime) if tsLag >= 10*time.Minute { log.Warn("delta position is quite stale", zap.Int64("collectionID", segment.GetCollectionID()), zap.Int64("segmentID", segment.GetID()), zap.String("channel", segment.InsertChannel), - zap.String("positionSource", positionSrc), - zap.Uint64("posTs", deltaPosition.GetTimestamp()), + zap.Uint64("posTs", checkpoint.GetTimestamp()), zap.Time("posTime", posTime), zap.Duration("tsLag", tsLag)) } @@ -114,7 +97,7 @@ func PackSegmentLoadInfo(resp *datapb.GetSegmentInfoResponse, indexes []*querypb InsertChannel: segment.InsertChannel, IndexInfos: indexes, StartPosition: segment.GetStartPosition(), - DeltaPosition: deltaPosition, + DeltaPosition: checkpoint, } loadInfo.SegmentSize = calculateSegmentSize(loadInfo) return loadInfo diff --git a/internal/querycoordv2/utils/types_test.go b/internal/querycoordv2/utils/types_test.go index f9bc9d9489..12205db914 100644 --- a/internal/querycoordv2/utils/types_test.go +++ b/internal/querycoordv2/utils/types_test.go @@ -36,6 +36,13 @@ func Test_packLoadSegmentRequest(t *testing.T) { t1 := tsoutil.ComposeTSByTime(time.Now().Add(-8*time.Minute), 0) t2 := tsoutil.ComposeTSByTime(time.Now().Add(-5*time.Minute), 0) + channel := &datapb.VchannelInfo{ + SeekPosition: &msgpb.MsgPosition{ + ChannelName: mockPChannel, + Timestamp: t2, + }, + } + segmentInfo := &datapb.SegmentInfo{ ID: 0, InsertChannel: mockVChannel, @@ -43,43 +50,27 @@ func Test_packLoadSegmentRequest(t *testing.T) { ChannelName: mockPChannel, Timestamp: t1, }, - DmlPosition: &msgpb.MsgPosition{ - ChannelName: mockPChannel, - Timestamp: t2, - }, } - t.Run("test set deltaPosition from segment dmlPosition", func(t *testing.T) { + t.Run("test set deltaPosition from channel", func(t *testing.T) { resp := &datapb.GetSegmentInfoResponse{ Infos: []*datapb.SegmentInfo{ proto.Clone(segmentInfo).(*datapb.SegmentInfo), }, } - req := PackSegmentLoadInfo(resp, nil) + req := PackSegmentLoadInfo(resp, channel.GetSeekPosition(), nil) assert.NotNil(t, req.GetDeltaPosition()) assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName) assert.Equal(t, t2, req.GetDeltaPosition().Timestamp) }) - t.Run("test set deltaPosition from segment startPosition", func(t *testing.T) { - segInfo := proto.Clone(segmentInfo).(*datapb.SegmentInfo) - segInfo.DmlPosition = nil - resp := &datapb.GetSegmentInfoResponse{ - Infos: []*datapb.SegmentInfo{segInfo}, - } - req := PackSegmentLoadInfo(resp, nil) - assert.NotNil(t, req.GetDeltaPosition()) - assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName) - assert.Equal(t, t1, req.GetDeltaPosition().Timestamp) - }) - t.Run("test tsLag > 10minutes", func(t *testing.T) { - segInfo := proto.Clone(segmentInfo).(*datapb.SegmentInfo) - segInfo.DmlPosition.Timestamp = t0 + channel := proto.Clone(channel).(*datapb.VchannelInfo) + channel.SeekPosition.Timestamp = t0 resp := &datapb.GetSegmentInfoResponse{ - Infos: []*datapb.SegmentInfo{segInfo}, + Infos: []*datapb.SegmentInfo{segmentInfo}, } - req := PackSegmentLoadInfo(resp, nil) + req := PackSegmentLoadInfo(resp, channel.GetSeekPosition(), nil) assert.NotNil(t, req.GetDeltaPosition()) assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName) assert.Equal(t, t0, req.GetDeltaPosition().Timestamp)