From ece0a0679805617a6883c52e22085f6883a85252 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Mon, 27 Sep 2021 10:35:58 +0800 Subject: [PATCH] Update repack delete msg (#8631) Signed-off-by: yudong.cai --- internal/msgstream/mq_msgstream.go | 4 +- internal/msgstream/mq_msgstream_test.go | 106 ++++++++++++------------ internal/msgstream/repack_func.go | 91 ++++++++++---------- 3 files changed, 98 insertions(+), 103 deletions(-) diff --git a/internal/msgstream/mq_msgstream.go b/internal/msgstream/mq_msgstream.go index 0ac42c50ac..1f1bbc405b 100644 --- a/internal/msgstream/mq_msgstream.go +++ b/internal/msgstream/mq_msgstream.go @@ -217,8 +217,8 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error { switch msgType { case commonpb.MsgType_Insert: result, err = InsertRepackFunc(tsMsgs, reBucketValues) - //case commonpb.MsgType_Delete: - // result, err = DeleteRepackFunc(tsMsgs, reBucketValues) + case commonpb.MsgType_Delete: + result, err = DeleteRepackFunc(tsMsgs, reBucketValues) default: result, err = DefaultRepackFunc(tsMsgs, reBucketValues) } diff --git a/internal/msgstream/mq_msgstream_test.go b/internal/msgstream/mq_msgstream_test.go index 9f3e59537a..a22220a602 100644 --- a/internal/msgstream/mq_msgstream_test.go +++ b/internal/msgstream/mq_msgstream_test.go @@ -302,59 +302,59 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) { (*outputStream).Close() } -//func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) { -// pulsarAddress, _ := Params.Load("_PulsarAddress") -// c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) -// producerChannels := []string{c1, c2} -// consumerChannels := []string{c1, c2} -// consumerSubName := funcutil.RandomString(8) -// -// baseMsg := BaseMsg{ -// BeginTimestamp: 0, -// EndTimestamp: 0, -// HashValues: []uint32{1, 3}, -// } -// -// deleteRequest := internalpb.DeleteRequest{ -// Base: &commonpb.MsgBase{ -// MsgType: commonpb.MsgType_Delete, -// MsgID: 1, -// Timestamp: 1, -// SourceID: 1, -// }, -// CollectionName: "Collection", -// ChannelID: "1", -// Timestamps: []Timestamp{1, 1}, -// PrimaryKeys: []int64{1, 3}, -// } -// deleteMsg := &DeleteMsg{ -// BaseMsg: baseMsg, -// DeleteRequest: deleteRequest, -// } -// -// msgPack := MsgPack{} -// msgPack.Msgs = append(msgPack.Msgs, deleteMsg) -// -// factory := ProtoUDFactory{} -// pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress}) -// inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) -// inputStream.AsProducer(producerChannels) -// inputStream.Start() -// -// pulsarClient2, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress}) -// outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) -// outputStream.AsConsumer(consumerChannels, consumerSubName) -// outputStream.Start() -// var output MsgStream = outputStream -// -// err := (*inputStream).Produce(&msgPack) -// if err != nil { -// log.Fatalf("produce error = %v", err) -// } -// receiveMsg(output, len(msgPack.Msgs)*2) -// (*inputStream).Close() -// (*outputStream).Close() -//} +func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) { + pulsarAddress, _ := Params.Load("_PulsarAddress") + c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) + producerChannels := []string{c1, c2} + consumerChannels := []string{c1, c2} + consumerSubName := funcutil.RandomString(8) + + baseMsg := BaseMsg{ + BeginTimestamp: 0, + EndTimestamp: 0, + HashValues: []uint32{1}, + } + + deleteRequest := internalpb.DeleteRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Delete, + MsgID: 1, + Timestamp: 1, + SourceID: 1, + }, + CollectionName: "Collection", + ChannelID: "1", + Timestamp: Timestamp(1), + ExprPlan: []byte{}, + } + deleteMsg := &DeleteMsg{ + BaseMsg: baseMsg, + DeleteRequest: deleteRequest, + } + + msgPack := MsgPack{} + msgPack.Msgs = append(msgPack.Msgs, deleteMsg) + + factory := ProtoUDFactory{} + pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress}) + inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) + inputStream.AsProducer(producerChannels) + inputStream.Start() + + pulsarClient2, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress}) + outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) + outputStream.AsConsumer(consumerChannels, consumerSubName) + outputStream.Start() + var output MsgStream = outputStream + + err := (*inputStream).Produce(&msgPack) + if err != nil { + log.Fatalf("produce error = %v", err) + } + receiveMsg(output, len(msgPack.Msgs)*1) + (*inputStream).Close() + (*outputStream).Close() +} func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") diff --git a/internal/msgstream/repack_func.go b/internal/msgstream/repack_func.go index 83bc7295e6..56a2bf482b 100644 --- a/internal/msgstream/repack_func.go +++ b/internal/msgstream/repack_func.go @@ -74,54 +74,49 @@ func InsertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e return result, nil } -//func DeleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) { -// result := make(map[int32]*MsgPack) -// for i, request := range tsMsgs { -// if request.Type() != commonpb.MsgType_Delete { -// return nil, errors.New("msg's must be Delete") -// } -// deleteRequest := request.(*DeleteMsg) -// keys := hashKeys[i] -// -// timestampLen := len(deleteRequest.Timestamps) -// primaryKeysLen := len(deleteRequest.PrimaryKeys) -// keysLen := len(keys) -// -// if keysLen != timestampLen || keysLen != primaryKeysLen { -// return nil, errors.New("the length of hashValue, timestamps, primaryKeys are not equal") -// } -// -// for index, key := range keys { -// _, ok := result[key] -// if !ok { -// msgPack := MsgPack{} -// result[key] = &msgPack -// } -// -// sliceRequest := internalpb.DeleteRequest{ -// Base: &commonpb.MsgBase{ -// MsgType: commonpb.MsgType_Delete, -// MsgID: deleteRequest.Base.MsgID, -// Timestamp: deleteRequest.Timestamps[index], -// SourceID: deleteRequest.Base.SourceID, -// }, -// CollectionName: deleteRequest.CollectionName, -// ChannelID: deleteRequest.ChannelID, -// Timestamps: []uint64{deleteRequest.Timestamps[index]}, -// PrimaryKeys: []int64{deleteRequest.PrimaryKeys[index]}, -// } -// -// deleteMsg := &DeleteMsg{ -// BaseMsg: BaseMsg{ -// Ctx: request.TraceCtx(), -// }, -// DeleteRequest: sliceRequest, -// } -// result[key].Msgs = append(result[key].Msgs, deleteMsg) -// } -// } -// return result, nil -//} +func DeleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) { + result := make(map[int32]*MsgPack) + for i, request := range tsMsgs { + if request.Type() != commonpb.MsgType_Delete { + return nil, errors.New("msg's must be Delete") + } + deleteRequest := request.(*DeleteMsg) + keys := hashKeys[i] + + if len(keys) != 1 { + return nil, errors.New("len(msg.hashValue) must equal 1, but it is: " + strconv.Itoa(len(keys))) + } + + key := keys[0] + _, ok := result[key] + if !ok { + msgPack := MsgPack{} + result[key] = &msgPack + } + + sliceRequest := internalpb.DeleteRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Delete, + MsgID: deleteRequest.Base.MsgID, + Timestamp: deleteRequest.Timestamp, + SourceID: deleteRequest.Base.SourceID, + }, + CollectionName: deleteRequest.CollectionName, + ChannelID: deleteRequest.ChannelID, + Timestamp: deleteRequest.Timestamp, + ExprPlan: deleteRequest.ExprPlan, + } + + deleteMsg := &DeleteMsg{ + BaseMsg: BaseMsg{ + Ctx: request.TraceCtx(), + }, + DeleteRequest: sliceRequest, + } + result[key].Msgs = append(result[key].Msgs, deleteMsg) + } + return result, nil +} func DefaultRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) { result := make(map[int32]*MsgPack)