From 2231aeab4db36dcd4a10bf7e2b52476a96c8f03c Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Fri, 27 Sep 2024 18:15:19 +0800 Subject: [PATCH] fix:[Cherry-Pick] Split delete task msg to MaxMessageSize (#36574) relate: https://github.com/milvus-io/milvus/issues/36089 pr: https://github.com/milvus-io/milvus/pull/36197 split delete task msg to MaxMessageSize to avoid mq message too large error Signed-off-by: aoiasd --- internal/proxy/task_delete.go | 144 ++++++++++++++++++++++------------ pkg/util/typeutil/schema.go | 38 +++++++++ 2 files changed, 134 insertions(+), 48 deletions(-) diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index 4e2e202325..877456e966 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -142,28 +142,19 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) { return err } - hashValues := typeutil.HashPK2Channels(dt.primaryKeys, dt.vChannels) - // repack delete msg by dmChannel - result := make(map[uint32]msgstream.TsMsg) - numRows := int64(0) - for index, key := range hashValues { - vchannel := dt.vChannels[key] - _, ok := result[key] - if !ok { - deleteMsg, err := dt.newDeleteMsg(ctx) - if err != nil { - return err - } - deleteMsg.ShardName = vchannel - result[key] = deleteMsg - } - curMsg := result[key].(*msgstream.DeleteMsg) - curMsg.HashValues = append(curMsg.HashValues, hashValues[index]) - curMsg.Timestamps = append(curMsg.Timestamps, dt.ts) - - typeutil.AppendIDs(curMsg.PrimaryKeys, dt.primaryKeys, index) - curMsg.NumRows++ - numRows++ + result, numRows, err := repackDeleteMsgByHash( + ctx, + dt.primaryKeys, + dt.vChannels, + dt.idAllocator, + dt.ts, + dt.collectionID, + dt.req.GetCollectionName(), + dt.partitionID, + dt.req.GetPartitionName(), + ) + if err != nil { + return err } // send delete request to log broker @@ -172,8 +163,8 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) { EndTs: dt.EndTs(), } - for _, msg := range result { - if msg != nil { + for _, msgs := range result { + for _, msg := range msgs { msgPack.Msgs = append(msgPack.Msgs, msg) } } @@ -197,31 +188,88 @@ func (dt *deleteTask) PostExecute(ctx context.Context) error { return nil } -func (dt *deleteTask) newDeleteMsg(ctx context.Context) (*msgstream.DeleteMsg, error) { - msgid, err := dt.idAllocator.AllocOne() - if err != nil { - return nil, errors.Wrap(err, "failed to allocate MsgID of delete") +func repackDeleteMsgByHash( + ctx context.Context, + primaryKeys *schemapb.IDs, + vChannels []string, + idAllocator allocator.Interface, + ts uint64, + collectionID int64, + collectionName string, + partitionID int64, + partitionName string, +) (map[uint32][]*msgstream.DeleteMsg, int64, error) { + maxSize := Params.PulsarCfg.MaxMessageSize.GetAsInt() + hashValues := typeutil.HashPK2Channels(primaryKeys, vChannels) + // repack delete msg by dmChannel + result := make(map[uint32][]*msgstream.DeleteMsg) + lastMessageSize := map[uint32]int{} + + numRows := int64(0) + numMessage := 0 + + createMessage := func(key uint32, vchannel string) *msgstream.DeleteMsg { + numMessage++ + lastMessageSize[key] = 0 + return &msgstream.DeleteMsg{ + BaseMsg: msgstream.BaseMsg{ + Ctx: ctx, + }, + DeleteRequest: &msgpb.DeleteRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_Delete), + // msgid of delete msg must be set later + // or it will be seen as duplicated msg in mq + commonpbutil.WithTimeStamp(ts), + commonpbutil.WithSourceID(paramtable.GetNodeID()), + ), + CollectionID: collectionID, + PartitionID: partitionID, + CollectionName: collectionName, + PartitionName: partitionName, + PrimaryKeys: &schemapb.IDs{}, + ShardName: vchannel, + }, + } } - return &msgstream.DeleteMsg{ - BaseMsg: msgstream.BaseMsg{ - Ctx: ctx, - }, - DeleteRequest: &msgpb.DeleteRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_Delete), - // msgid of delete msg must be set - // or it will be seen as duplicated msg in mq - commonpbutil.WithMsgID(msgid), - commonpbutil.WithTimeStamp(dt.ts), - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - CollectionID: dt.collectionID, - PartitionID: dt.partitionID, - CollectionName: dt.req.GetCollectionName(), - PartitionName: dt.req.GetPartitionName(), - PrimaryKeys: &schemapb.IDs{}, - }, - }, nil + + for index, key := range hashValues { + vchannel := vChannels[key] + msgs, ok := result[key] + if !ok { + result[key] = make([]*msgstream.DeleteMsg, 1) + msgs = result[key] + result[key][0] = createMessage(key, vchannel) + } + curMsg := msgs[len(msgs)-1] + size, id := typeutil.GetId(primaryKeys, index) + if lastMessageSize[key]+16+size > maxSize { + curMsg = createMessage(key, vchannel) + result[key] = append(result[key], curMsg) + } + curMsg.HashValues = append(curMsg.HashValues, hashValues[index]) + curMsg.Timestamps = append(curMsg.Timestamps, ts) + + typeutil.AppendID(curMsg.PrimaryKeys, id) + lastMessageSize[key] += 16 + size + curMsg.NumRows++ + numRows++ + } + + // alloc messageID + start, _, err := idAllocator.Alloc(uint32(numMessage)) + if err != nil { + return nil, 0, err + } + + cnt := int64(0) + for _, msgs := range result { + for _, msg := range msgs { + msg.Base.MsgID = start + cnt + cnt++ + } + } + return result, numRows, nil } type deleteRunner struct { diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index 23f1c0b3cc..c01d7c270f 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -1249,6 +1249,44 @@ func AppendSystemFields(schema *schemapb.CollectionSchema) *schemapb.CollectionS return newSchema } +func GetId(src *schemapb.IDs, idx int) (int, any) { + switch src.IdField.(type) { + case *schemapb.IDs_IntId: + return 8, src.GetIntId().Data[idx] + case *schemapb.IDs_StrId: + return len(src.GetStrId().Data[idx]), src.GetStrId().Data[idx] + default: + panic("unknown pk type") + } +} + +func AppendID(dst *schemapb.IDs, src any) { + switch value := src.(type) { + case int64: + if dst.GetIdField() == nil { + dst.IdField = &schemapb.IDs_IntId{ + IntId: &schemapb.LongArray{ + Data: []int64{value}, + }, + } + } else { + dst.GetIntId().Data = append(dst.GetIntId().Data, value) + } + case string: + if dst.GetIdField() == nil { + dst.IdField = &schemapb.IDs_StrId{ + StrId: &schemapb.StringArray{ + Data: []string{value}, + }, + } + } else { + dst.GetStrId().Data = append(dst.GetStrId().Data, value) + } + default: + // TODO + } +} + func AppendIDs(dst *schemapb.IDs, src *schemapb.IDs, idx int) { switch src.IdField.(type) { case *schemapb.IDs_IntId: