Merge delta channels when load partition (#12274)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
godchen 2021-11-25 18:49:16 +08:00 committed by GitHub
parent 1fbe7af99b
commit ad83090925
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 114 additions and 17 deletions

View File

@ -771,16 +771,13 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error {
loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq)
}
if len(watchDeltaChannels) != len(recoveryInfo.Channels) {
for _, info := range recoveryInfo.Channels {
deltaChannel, err := generateWatchDeltaChannelInfo(info)
if err != nil {
return err
}
watchDeltaChannels = append(watchDeltaChannels, deltaChannel)
for _, info := range recoveryInfo.Channels {
deltaChannel, err := generateWatchDeltaChannelInfo(info)
if err != nil {
return err
}
watchDeltaChannels = append(watchDeltaChannels, deltaChannel)
}
for _, info := range recoveryInfo.Channels {
// watch dml channels
channel := info.ChannelName
@ -799,12 +796,13 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error {
}
}
mergedDeltaChannels := mergeWatchDeltaChannelInfo(watchDeltaChannels)
msgBase := proto.Clone(lpt.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels
watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{
Base: msgBase,
CollectionID: collectionID,
Infos: watchDeltaChannels,
Infos: mergedDeltaChannels,
}
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
lpt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)
@ -1762,14 +1760,12 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq)
}
if len(watchDeltaChannels) != len(recoveryInfo.Channels) {
for _, info := range recoveryInfo.Channels {
deltaChannel, err := generateWatchDeltaChannelInfo(info)
if err != nil {
return err
}
watchDeltaChannels = append(watchDeltaChannels, deltaChannel)
for _, info := range recoveryInfo.Channels {
deltaChannel, err := generateWatchDeltaChannelInfo(info)
if err != nil {
return err
}
watchDeltaChannels = append(watchDeltaChannels, deltaChannel)
}
for _, channelInfo := range recoveryInfo.Channels {
@ -1816,12 +1812,13 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
}
}
}
mergedDeltaChannel := mergeWatchDeltaChannelInfo(watchDeltaChannels)
msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels
watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{
Base: msgBase,
CollectionID: collectionID,
Infos: watchDeltaChannels,
Infos: mergedDeltaChannel,
}
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
lbt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)
@ -2216,3 +2213,22 @@ func generateWatchDeltaChannelInfo(info *datapb.VchannelInfo) (*datapb.VchannelI
deltaChannel.DroppedSegments = nil
return deltaChannel, nil
}
func mergeWatchDeltaChannelInfo(infos []*datapb.VchannelInfo) []*datapb.VchannelInfo {
minPositions := make(map[string]int)
for index, info := range infos {
_, ok := minPositions[info.ChannelName]
if !ok {
minPositions[info.ChannelName] = index
}
minTimeStampIndex := minPositions[info.ChannelName]
if info.SeekPosition.GetTimestamp() < infos[minTimeStampIndex].SeekPosition.GetTimestamp() {
minPositions[info.ChannelName] = index
}
}
var result []*datapb.VchannelInfo
for _, index := range minPositions {
result = append(result, infos[index])
}
return result
}

View File

@ -24,6 +24,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/funcutil"
)
@ -1029,3 +1030,83 @@ func TestLoadBalanceIndexedSegmentsAfterNodeDown(t *testing.T) {
err = removeAllSession()
assert.Nil(t, err)
}
func TestMergeWatchDeltaChannelInfo(t *testing.T) {
infos := []*datapb.VchannelInfo{
{
ChannelName: "test-1",
SeekPosition: &internalpb.MsgPosition{
ChannelName: "test-1",
Timestamp: 9,
},
},
{
ChannelName: "test-2",
SeekPosition: &internalpb.MsgPosition{
ChannelName: "test-2",
Timestamp: 10,
},
},
{
ChannelName: "test-1",
SeekPosition: &internalpb.MsgPosition{
ChannelName: "test-1",
Timestamp: 15,
},
},
{
ChannelName: "test-2",
SeekPosition: &internalpb.MsgPosition{
ChannelName: "test-2",
Timestamp: 16,
},
},
{
ChannelName: "test-1",
SeekPosition: &internalpb.MsgPosition{
ChannelName: "test-1",
Timestamp: 5,
},
},
{
ChannelName: "test-2",
SeekPosition: &internalpb.MsgPosition{
ChannelName: "test-2",
Timestamp: 4,
},
},
{
ChannelName: "test-1",
SeekPosition: &internalpb.MsgPosition{
ChannelName: "test-1",
Timestamp: 3,
},
},
{
ChannelName: "test-2",
SeekPosition: &internalpb.MsgPosition{
ChannelName: "test-2",
Timestamp: 5,
},
},
}
results := mergeWatchDeltaChannelInfo(infos)
expected := []*datapb.VchannelInfo{
{
ChannelName: "test-1",
SeekPosition: &internalpb.MsgPosition{
ChannelName: "test-1",
Timestamp: 3,
},
},
{
ChannelName: "test-2",
SeekPosition: &internalpb.MsgPosition{
ChannelName: "test-2",
Timestamp: 4,
},
},
}
assert.ElementsMatch(t, expected, results)
}