fix:[Cherry-Pick] Split delete task msg to MaxMessageSize (#36574)

relate: https://github.com/milvus-io/milvus/issues/36089
pr: https://github.com/milvus-io/milvus/pull/36197
split delete task msg to MaxMessageSize to avoid mq message too large
error

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
aoiasd 2024-09-27 18:15:19 +08:00 committed by GitHub
parent e5a6c5b31d
commit 2231aeab4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 134 additions and 48 deletions

View File

@ -142,28 +142,19 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) {
return err
}
hashValues := typeutil.HashPK2Channels(dt.primaryKeys, dt.vChannels)
// repack delete msg by dmChannel
result := make(map[uint32]msgstream.TsMsg)
numRows := int64(0)
for index, key := range hashValues {
vchannel := dt.vChannels[key]
_, ok := result[key]
if !ok {
deleteMsg, err := dt.newDeleteMsg(ctx)
if err != nil {
return err
}
deleteMsg.ShardName = vchannel
result[key] = deleteMsg
}
curMsg := result[key].(*msgstream.DeleteMsg)
curMsg.HashValues = append(curMsg.HashValues, hashValues[index])
curMsg.Timestamps = append(curMsg.Timestamps, dt.ts)
typeutil.AppendIDs(curMsg.PrimaryKeys, dt.primaryKeys, index)
curMsg.NumRows++
numRows++
result, numRows, err := repackDeleteMsgByHash(
ctx,
dt.primaryKeys,
dt.vChannels,
dt.idAllocator,
dt.ts,
dt.collectionID,
dt.req.GetCollectionName(),
dt.partitionID,
dt.req.GetPartitionName(),
)
if err != nil {
return err
}
// send delete request to log broker
@ -172,8 +163,8 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) {
EndTs: dt.EndTs(),
}
for _, msg := range result {
if msg != nil {
for _, msgs := range result {
for _, msg := range msgs {
msgPack.Msgs = append(msgPack.Msgs, msg)
}
}
@ -197,31 +188,88 @@ func (dt *deleteTask) PostExecute(ctx context.Context) error {
return nil
}
func (dt *deleteTask) newDeleteMsg(ctx context.Context) (*msgstream.DeleteMsg, error) {
msgid, err := dt.idAllocator.AllocOne()
if err != nil {
return nil, errors.Wrap(err, "failed to allocate MsgID of delete")
func repackDeleteMsgByHash(
ctx context.Context,
primaryKeys *schemapb.IDs,
vChannels []string,
idAllocator allocator.Interface,
ts uint64,
collectionID int64,
collectionName string,
partitionID int64,
partitionName string,
) (map[uint32][]*msgstream.DeleteMsg, int64, error) {
maxSize := Params.PulsarCfg.MaxMessageSize.GetAsInt()
hashValues := typeutil.HashPK2Channels(primaryKeys, vChannels)
// repack delete msg by dmChannel
result := make(map[uint32][]*msgstream.DeleteMsg)
lastMessageSize := map[uint32]int{}
numRows := int64(0)
numMessage := 0
createMessage := func(key uint32, vchannel string) *msgstream.DeleteMsg {
numMessage++
lastMessageSize[key] = 0
return &msgstream.DeleteMsg{
BaseMsg: msgstream.BaseMsg{
Ctx: ctx,
},
DeleteRequest: &msgpb.DeleteRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_Delete),
// msgid of delete msg must be set later
// or it will be seen as duplicated msg in mq
commonpbutil.WithTimeStamp(ts),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
CollectionID: collectionID,
PartitionID: partitionID,
CollectionName: collectionName,
PartitionName: partitionName,
PrimaryKeys: &schemapb.IDs{},
ShardName: vchannel,
},
}
}
return &msgstream.DeleteMsg{
BaseMsg: msgstream.BaseMsg{
Ctx: ctx,
},
DeleteRequest: &msgpb.DeleteRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_Delete),
// msgid of delete msg must be set
// or it will be seen as duplicated msg in mq
commonpbutil.WithMsgID(msgid),
commonpbutil.WithTimeStamp(dt.ts),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
CollectionID: dt.collectionID,
PartitionID: dt.partitionID,
CollectionName: dt.req.GetCollectionName(),
PartitionName: dt.req.GetPartitionName(),
PrimaryKeys: &schemapb.IDs{},
},
}, nil
for index, key := range hashValues {
vchannel := vChannels[key]
msgs, ok := result[key]
if !ok {
result[key] = make([]*msgstream.DeleteMsg, 1)
msgs = result[key]
result[key][0] = createMessage(key, vchannel)
}
curMsg := msgs[len(msgs)-1]
size, id := typeutil.GetId(primaryKeys, index)
if lastMessageSize[key]+16+size > maxSize {
curMsg = createMessage(key, vchannel)
result[key] = append(result[key], curMsg)
}
curMsg.HashValues = append(curMsg.HashValues, hashValues[index])
curMsg.Timestamps = append(curMsg.Timestamps, ts)
typeutil.AppendID(curMsg.PrimaryKeys, id)
lastMessageSize[key] += 16 + size
curMsg.NumRows++
numRows++
}
// alloc messageID
start, _, err := idAllocator.Alloc(uint32(numMessage))
if err != nil {
return nil, 0, err
}
cnt := int64(0)
for _, msgs := range result {
for _, msg := range msgs {
msg.Base.MsgID = start + cnt
cnt++
}
}
return result, numRows, nil
}
type deleteRunner struct {

View File

@ -1249,6 +1249,44 @@ func AppendSystemFields(schema *schemapb.CollectionSchema) *schemapb.CollectionS
return newSchema
}
func GetId(src *schemapb.IDs, idx int) (int, any) {
switch src.IdField.(type) {
case *schemapb.IDs_IntId:
return 8, src.GetIntId().Data[idx]
case *schemapb.IDs_StrId:
return len(src.GetStrId().Data[idx]), src.GetStrId().Data[idx]
default:
panic("unknown pk type")
}
}
func AppendID(dst *schemapb.IDs, src any) {
switch value := src.(type) {
case int64:
if dst.GetIdField() == nil {
dst.IdField = &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{value},
},
}
} else {
dst.GetIntId().Data = append(dst.GetIntId().Data, value)
}
case string:
if dst.GetIdField() == nil {
dst.IdField = &schemapb.IDs_StrId{
StrId: &schemapb.StringArray{
Data: []string{value},
},
}
} else {
dst.GetStrId().Data = append(dst.GetStrId().Data, value)
}
default:
// TODO
}
}
func AppendIDs(dst *schemapb.IDs, src *schemapb.IDs, idx int) {
switch src.IdField.(type) {
case *schemapb.IDs_IntId: