diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index 95e15b92ea..2db0c78fe3 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -166,8 +166,8 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) { EndTs: dt.EndTs(), } - for _, msg := range result { - if msg != nil { + for _, msgs := range result { + for _, msg := range msgs { msgPack.Msgs = append(msgPack.Msgs, msg) } } @@ -202,75 +202,78 @@ func repackDeleteMsgByHash( collectionName string, partitionID int64, partitionName string, -) (map[uint32]*msgstream.DeleteMsg, int64, error) { +) (map[uint32][]*msgstream.DeleteMsg, int64, error) { + maxSize := Params.PulsarCfg.MaxMessageSize.GetAsInt() hashValues := typeutil.HashPK2Channels(primaryKeys, vChannels) // repack delete msg by dmChannel - result := make(map[uint32]*msgstream.DeleteMsg) + result := make(map[uint32][]*msgstream.DeleteMsg) + lastMessageSize := map[uint32]int{} + numRows := int64(0) + numMessage := 0 + + createMessage := func(key uint32, vchannel string) *msgstream.DeleteMsg { + numMessage++ + lastMessageSize[key] = 0 + return &msgstream.DeleteMsg{ + BaseMsg: msgstream.BaseMsg{ + Ctx: ctx, + }, + DeleteRequest: &msgpb.DeleteRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_Delete), + // msgid of delete msg must be set later + // or it will be seen as duplicated msg in mq + commonpbutil.WithTimeStamp(ts), + commonpbutil.WithSourceID(paramtable.GetNodeID()), + ), + CollectionID: collectionID, + PartitionID: partitionID, + CollectionName: collectionName, + PartitionName: partitionName, + PrimaryKeys: &schemapb.IDs{}, + ShardName: vchannel, + }, + } + } + for index, key := range hashValues { vchannel := vChannels[key] - _, ok := result[key] + msgs, ok := result[key] if !ok { - deleteMsg, err := newDeleteMsg( - ctx, - idAllocator, - ts, - collectionID, - collectionName, - partitionID, - partitionName, - ) - if err != nil { - return nil, 0, err - } - deleteMsg.ShardName = vchannel - result[key] = deleteMsg + result[key] = make([]*msgstream.DeleteMsg, 1) + msgs = result[key] + result[key][0] = createMessage(key, vchannel) + } + curMsg := msgs[len(msgs)-1] + size, id := typeutil.GetId(primaryKeys, index) + if lastMessageSize[key]+16+size > maxSize { + curMsg = createMessage(key, vchannel) + result[key] = append(result[key], curMsg) } - curMsg := result[key] curMsg.HashValues = append(curMsg.HashValues, hashValues[index]) curMsg.Timestamps = append(curMsg.Timestamps, ts) - typeutil.AppendIDs(curMsg.PrimaryKeys, primaryKeys, index) + typeutil.AppendID(curMsg.PrimaryKeys, id) + lastMessageSize[key] += 16 + size curMsg.NumRows++ numRows++ } - return result, numRows, nil -} -func newDeleteMsg( - ctx context.Context, - idAllocator allocator.Interface, - ts uint64, - collectionID int64, - collectionName string, - partitionID int64, - partitionName string, -) (*msgstream.DeleteMsg, error) { - msgid, err := idAllocator.AllocOne() + // alloc messageID + start, _, err := idAllocator.Alloc(uint32(numMessage)) if err != nil { - return nil, errors.Wrap(err, "failed to allocate MsgID of delete") + return nil, 0, err } - sliceRequest := &msgpb.DeleteRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_Delete), - // msgid of delete msg must be set - // or it will be seen as duplicated msg in mq - commonpbutil.WithMsgID(msgid), - commonpbutil.WithTimeStamp(ts), - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - CollectionID: collectionID, - PartitionID: partitionID, - CollectionName: collectionName, - PartitionName: partitionName, - PrimaryKeys: &schemapb.IDs{}, + + cnt := int64(0) + for _, msgs := range result { + for _, msg := range msgs { + msg.Base.MsgID = start + cnt + cnt++ + } } - return &msgstream.DeleteMsg{ - BaseMsg: msgstream.BaseMsg{ - Ctx: ctx, - }, - DeleteRequest: sliceRequest, - }, nil + return result, numRows, nil } type deleteRunner struct { diff --git a/internal/proxy/task_delete_streaming.go b/internal/proxy/task_delete_streaming.go index cc46130ea8..5e9e107d72 100644 --- a/internal/proxy/task_delete_streaming.go +++ b/internal/proxy/task_delete_streaming.go @@ -46,19 +46,21 @@ func (dt *deleteTaskByStreamingService) Execute(ctx context.Context) (err error) } var msgs []message.MutableMessage - for hashKey, deleteMsg := range result { + for hashKey, deleteMsgs := range result { vchannel := dt.vChannels[hashKey] - msg, err := message.NewDeleteMessageBuilderV1(). - WithHeader(&message.DeleteMessageHeader{ - CollectionId: dt.collectionID, - }). - WithBody(deleteMsg.DeleteRequest). - WithVChannel(vchannel). - BuildMutable() - if err != nil { - return err + for _, deleteMsg := range deleteMsgs { + msg, err := message.NewDeleteMessageBuilderV1(). + WithHeader(&message.DeleteMessageHeader{ + CollectionId: dt.collectionID, + }). + WithBody(deleteMsg.DeleteRequest). + WithVChannel(vchannel). + BuildMutable() + if err != nil { + return err + } + msgs = append(msgs, msg) } - msgs = append(msgs, msg) } log.Debug("send delete request to virtual channels", diff --git a/internal/proxy/task_upsert_streaming.go b/internal/proxy/task_upsert_streaming.go index 2aba567034..12cb78ba93 100644 --- a/internal/proxy/task_upsert_streaming.go +++ b/internal/proxy/task_upsert_streaming.go @@ -122,19 +122,21 @@ func (it *upsertTaskByStreamingService) packDeleteMessage(ctx context.Context) ( } var msgs []message.MutableMessage - for hashKey, deleteMsg := range result { + for hashKey, deleteMsgs := range result { vchannel := vChannels[hashKey] - msg, err := message.NewDeleteMessageBuilderV1(). - WithHeader(&message.DeleteMessageHeader{ - CollectionId: it.upsertMsg.DeleteMsg.CollectionID, - }). - WithBody(deleteMsg.DeleteRequest). - WithVChannel(vchannel). - BuildMutable() - if err != nil { - return nil, err + for _, deleteMsg := range deleteMsgs { + msg, err := message.NewDeleteMessageBuilderV1(). + WithHeader(&message.DeleteMessageHeader{ + CollectionId: it.upsertMsg.DeleteMsg.CollectionID, + }). + WithBody(deleteMsg.DeleteRequest). + WithVChannel(vchannel). + BuildMutable() + if err != nil { + return nil, err + } + msgs = append(msgs, msg) } - msgs = append(msgs, msg) } log.Debug("Proxy Upsert deleteExecute done", diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index 2a3f54c612..edd0fcd0f7 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -1269,6 +1269,44 @@ func AppendSystemFields(schema *schemapb.CollectionSchema) *schemapb.CollectionS return newSchema } +func GetId(src *schemapb.IDs, idx int) (int, any) { + switch src.IdField.(type) { + case *schemapb.IDs_IntId: + return 8, src.GetIntId().Data[idx] + case *schemapb.IDs_StrId: + return len(src.GetStrId().Data[idx]), src.GetStrId().Data[idx] + default: + panic("unknown pk type") + } +} + +func AppendID(dst *schemapb.IDs, src any) { + switch value := src.(type) { + case int64: + if dst.GetIdField() == nil { + dst.IdField = &schemapb.IDs_IntId{ + IntId: &schemapb.LongArray{ + Data: []int64{value}, + }, + } + } else { + dst.GetIntId().Data = append(dst.GetIntId().Data, value) + } + case string: + if dst.GetIdField() == nil { + dst.IdField = &schemapb.IDs_StrId{ + StrId: &schemapb.StringArray{ + Data: []string{value}, + }, + } + } else { + dst.GetStrId().Data = append(dst.GetStrId().Data, value) + } + default: + // TODO + } +} + func AppendIDs(dst *schemapb.IDs, src *schemapb.IDs, idx int) { switch src.IdField.(type) { case *schemapb.IDs_IntId: