From 4c603cd02c49419f4a962ccaba8c6a2d41ef1513 Mon Sep 17 00:00:00 2001 From: smellthemoon <64083300+smellthemoon@users.noreply.github.com> Date: Tue, 21 Mar 2023 14:06:01 +0800 Subject: [PATCH] Fix upsert msgid (#22839) Signed-off-by: lixinguo Co-authored-by: lixinguo --- internal/proxy/impl.go | 5 +++-- internal/proxy/task_delete.go | 11 ++++++++++- internal/proxy/task_test.go | 20 ++++++++++++-------- internal/proxy/task_upsert.go | 9 +++++++++ internal/util/merr/errors_test.go | 1 + 5 files changed, 35 insertions(+), 11 deletions(-) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 3894110044..51426be504 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2184,8 +2184,9 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) // RowData: transfer column based request to this }, }, - chMgr: node.chMgr, - chTicker: node.chTicker, + idAllocator: node.rowIDAllocator, + chMgr: node.chMgr, + chTicker: node.chTicker, } log.Debug("Enqueue delete request in Proxy", diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index da2e6b2c95..ac8cd12de4 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" + "github.com/cockroachdb/errors" "go.opentelemetry.io/otel" "go.uber.org/zap" @@ -12,6 +13,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus-proto/go-api/msgpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" @@ -37,6 +39,7 @@ type deleteTask struct { vChannels []vChan pChannels []pChan + idAllocator *allocator.IDAllocator collectionID UniqueID schema *schemapb.CollectionSchema } @@ -252,10 +255,16 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) { ts := dt.deleteMsg.Timestamps[index] _, ok := result[key] if !ok { + msgid, err := dt.idAllocator.AllocOne() + if err != nil { + return errors.Wrap(err, "failed to allocate MsgID of delete") + } sliceRequest := msgpb.DeleteRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_Delete), - commonpbutil.WithMsgID(dt.deleteMsg.Base.MsgID), + // msgid of delete msg must be set + // or it will be seen as duplicated msg in mq + commonpbutil.WithMsgID(msgid), commonpbutil.WithTimeStamp(ts), commonpbutil.WithSourceID(proxyID), ), diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index 0d3dcf6550..62b3339f1c 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -1425,8 +1425,9 @@ func TestTask_Int64PrimaryKey(t *testing.T) { PartitionName: partitionName, }, }, - deleteExpr: "int64 in [0, 1]", - ctx: ctx, + idAllocator: idAllocator, + deleteExpr: "int64 in [0, 1]", + ctx: ctx, result: &milvuspb.MutationResult{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -1479,8 +1480,9 @@ func TestTask_Int64PrimaryKey(t *testing.T) { PartitionName: partitionName, }, }, - deleteExpr: "int64 not in [0, 1]", - ctx: ctx, + idAllocator: idAllocator, + deleteExpr: "int64 not in [0, 1]", + ctx: ctx, result: &milvuspb.MutationResult{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -1768,8 +1770,9 @@ func TestTask_VarCharPrimaryKey(t *testing.T) { PartitionName: partitionName, }, }, - deleteExpr: "varChar in [\"milvus\", \"test\"]", - ctx: ctx, + idAllocator: idAllocator, + deleteExpr: "varChar in [\"milvus\", \"test\"]", + ctx: ctx, result: &milvuspb.MutationResult{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -1822,8 +1825,9 @@ func TestTask_VarCharPrimaryKey(t *testing.T) { PartitionName: partitionName, }, }, - deleteExpr: "varChar not in [\"milvus\", \"test\"]", - ctx: ctx, + idAllocator: idAllocator, + deleteExpr: "varChar not in [\"milvus\", \"test\"]", + ctx: ctx, result: &milvuspb.MutationResult{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index cc4767ce88..5c4ae4de71 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -20,6 +20,7 @@ import ( "fmt" "strconv" + "github.com/cockroachdb/errors" "go.opentelemetry.io/otel" "go.uber.org/zap" @@ -415,10 +416,18 @@ func (it *upsertTask) deleteExecute(ctx context.Context, msgPack *msgstream.MsgP ts := it.upsertMsg.DeleteMsg.Timestamps[index] _, ok := result[key] if !ok { + msgid, err := it.idAllocator.AllocOne() + if err != nil { + errors.Wrap(err, "failed to allocate MsgID for delete of upsert") + } sliceRequest := msgpb.DeleteRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_Delete), commonpbutil.WithTimeStamp(ts), + // id of upsertTask were set as ts in scheduler + // msgid of delete msg must be set + // or it will be seen as duplicated msg in mq + commonpbutil.WithMsgID(msgid), commonpbutil.WithSourceID(proxyID), ), CollectionID: collectionID, diff --git a/internal/util/merr/errors_test.go b/internal/util/merr/errors_test.go index e27f017eb5..3330ceb4ea 100644 --- a/internal/util/merr/errors_test.go +++ b/internal/util/merr/errors_test.go @@ -108,6 +108,7 @@ func (s *ErrSuite) TestWrap() { // Metrics related s.ErrorIs(WrapErrMetricNotFound("unknown", "failed to get metric"), ErrMetricNotFound) + } func (s *ErrSuite) TestCombine() {