From ad83090925eb6c588a1a166d9cc2d7c861a6ac2c Mon Sep 17 00:00:00 2001 From: godchen Date: Thu, 25 Nov 2021 18:49:16 +0800 Subject: [PATCH] Merge delta channels when load partition (#12274) Signed-off-by: godchen --- internal/querycoord/task.go | 50 +++++++++++++------- internal/querycoord/task_test.go | 81 ++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 17 deletions(-) diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 525216da78..d62e5329a2 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -771,16 +771,13 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error { loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) } - if len(watchDeltaChannels) != len(recoveryInfo.Channels) { - for _, info := range recoveryInfo.Channels { - deltaChannel, err := generateWatchDeltaChannelInfo(info) - if err != nil { - return err - } - watchDeltaChannels = append(watchDeltaChannels, deltaChannel) + for _, info := range recoveryInfo.Channels { + deltaChannel, err := generateWatchDeltaChannelInfo(info) + if err != nil { + return err } + watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } - for _, info := range recoveryInfo.Channels { // watch dml channels channel := info.ChannelName @@ -799,12 +796,13 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error { } } + mergedDeltaChannels := mergeWatchDeltaChannelInfo(watchDeltaChannels) msgBase := proto.Clone(lpt.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ Base: msgBase, CollectionID: collectionID, - Infos: watchDeltaChannels, + Infos: mergedDeltaChannels, } // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule lpt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos) @@ -1762,14 +1760,12 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) } - if len(watchDeltaChannels) != len(recoveryInfo.Channels) { - for _, info := range recoveryInfo.Channels { - deltaChannel, err := generateWatchDeltaChannelInfo(info) - if err != nil { - return err - } - watchDeltaChannels = append(watchDeltaChannels, deltaChannel) + for _, info := range recoveryInfo.Channels { + deltaChannel, err := generateWatchDeltaChannelInfo(info) + if err != nil { + return err } + watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } for _, channelInfo := range recoveryInfo.Channels { @@ -1816,12 +1812,13 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { } } } + mergedDeltaChannel := mergeWatchDeltaChannelInfo(watchDeltaChannels) msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ Base: msgBase, CollectionID: collectionID, - Infos: watchDeltaChannels, + Infos: mergedDeltaChannel, } // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule lbt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos) @@ -2216,3 +2213,22 @@ func generateWatchDeltaChannelInfo(info *datapb.VchannelInfo) (*datapb.VchannelI deltaChannel.DroppedSegments = nil return deltaChannel, nil } + +func mergeWatchDeltaChannelInfo(infos []*datapb.VchannelInfo) []*datapb.VchannelInfo { + minPositions := make(map[string]int) + for index, info := range infos { + _, ok := minPositions[info.ChannelName] + if !ok { + minPositions[info.ChannelName] = index + } + minTimeStampIndex := minPositions[info.ChannelName] + if info.SeekPosition.GetTimestamp() < infos[minTimeStampIndex].SeekPosition.GetTimestamp() { + minPositions[info.ChannelName] = index + } + } + var result []*datapb.VchannelInfo + for _, index := range minPositions { + result = append(result, infos[index]) + } + return result +} diff --git a/internal/querycoord/task_test.go b/internal/querycoord/task_test.go index 7319926bf5..7f2964ace8 100644 --- a/internal/querycoord/task_test.go +++ b/internal/querycoord/task_test.go @@ -24,6 +24,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/funcutil" ) @@ -1029,3 +1030,83 @@ func TestLoadBalanceIndexedSegmentsAfterNodeDown(t *testing.T) { err = removeAllSession() assert.Nil(t, err) } + +func TestMergeWatchDeltaChannelInfo(t *testing.T) { + infos := []*datapb.VchannelInfo{ + { + ChannelName: "test-1", + SeekPosition: &internalpb.MsgPosition{ + ChannelName: "test-1", + Timestamp: 9, + }, + }, + { + ChannelName: "test-2", + SeekPosition: &internalpb.MsgPosition{ + ChannelName: "test-2", + Timestamp: 10, + }, + }, + { + ChannelName: "test-1", + SeekPosition: &internalpb.MsgPosition{ + ChannelName: "test-1", + Timestamp: 15, + }, + }, + { + ChannelName: "test-2", + SeekPosition: &internalpb.MsgPosition{ + ChannelName: "test-2", + Timestamp: 16, + }, + }, + { + ChannelName: "test-1", + SeekPosition: &internalpb.MsgPosition{ + ChannelName: "test-1", + Timestamp: 5, + }, + }, + { + ChannelName: "test-2", + SeekPosition: &internalpb.MsgPosition{ + ChannelName: "test-2", + Timestamp: 4, + }, + }, + { + ChannelName: "test-1", + SeekPosition: &internalpb.MsgPosition{ + ChannelName: "test-1", + Timestamp: 3, + }, + }, + { + ChannelName: "test-2", + SeekPosition: &internalpb.MsgPosition{ + ChannelName: "test-2", + Timestamp: 5, + }, + }, + } + + results := mergeWatchDeltaChannelInfo(infos) + expected := []*datapb.VchannelInfo{ + { + ChannelName: "test-1", + SeekPosition: &internalpb.MsgPosition{ + ChannelName: "test-1", + Timestamp: 3, + }, + }, + { + ChannelName: "test-2", + SeekPosition: &internalpb.MsgPosition{ + ChannelName: "test-2", + Timestamp: 4, + }, + }, + } + assert.ElementsMatch(t, expected, results) +}