diff --git a/go.mod b/go.mod index e808fb6019..b641f5e4c5 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.16.7 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c + github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231220103033-abd0d12ba669 github.com/minio/minio-go/v7 v7.0.61 github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_model v0.3.0 diff --git a/go.sum b/go.sum index 73c0ca4ac2..9e22ac81cd 100644 --- a/go.sum +++ b/go.sum @@ -583,8 +583,8 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c h1:Wbc2IZt/13+B5jc8JPU/dOxGYy+1jeOsChVgcza+qgw= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231220103033-abd0d12ba669 h1:yUtc+pVKVhmmnwTY9iyV8+EmhrNjZ74Hxm3y5QKCNyg= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231220103033-abd0d12ba669/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092 h1:UYJ7JB+QlMOoFHNdd8mUa3/lV63t9dnBX7ILXmEEWPY= github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho= github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A= diff --git a/internal/allocator/mock_allcoator.go b/internal/allocator/mock_allcoator.go new file mode 100644 index 0000000000..867e80de97 --- /dev/null +++ b/internal/allocator/mock_allcoator.go @@ -0,0 +1,142 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package allocator + +import mock "github.com/stretchr/testify/mock" + +// MockAllocator is an autogenerated mock type for the Interface type +type MockAllocator struct { + mock.Mock +} + +type MockAllocator_Expecter struct { + mock *mock.Mock +} + +func (_m *MockAllocator) EXPECT() *MockAllocator_Expecter { + return &MockAllocator_Expecter{mock: &_m.Mock} +} + +// Alloc provides a mock function with given fields: count +func (_m *MockAllocator) Alloc(count uint32) (int64, int64, error) { + ret := _m.Called(count) + + var r0 int64 + var r1 int64 + var r2 error + if rf, ok := ret.Get(0).(func(uint32) (int64, int64, error)); ok { + return rf(count) + } + if rf, ok := ret.Get(0).(func(uint32) int64); ok { + r0 = rf(count) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(uint32) int64); ok { + r1 = rf(count) + } else { + r1 = ret.Get(1).(int64) + } + + if rf, ok := ret.Get(2).(func(uint32) error); ok { + r2 = rf(count) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// MockAllocator_Alloc_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Alloc' +type MockAllocator_Alloc_Call struct { + *mock.Call +} + +// Alloc is a helper method to define mock.On call +// - count uint32 +func (_e *MockAllocator_Expecter) Alloc(count interface{}) *MockAllocator_Alloc_Call { + return &MockAllocator_Alloc_Call{Call: _e.mock.On("Alloc", count)} +} + +func (_c *MockAllocator_Alloc_Call) Run(run func(count uint32)) *MockAllocator_Alloc_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(uint32)) + }) + return _c +} + +func (_c *MockAllocator_Alloc_Call) Return(_a0 int64, _a1 int64, _a2 error) *MockAllocator_Alloc_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *MockAllocator_Alloc_Call) RunAndReturn(run func(uint32) (int64, int64, error)) *MockAllocator_Alloc_Call { + _c.Call.Return(run) + return _c +} + +// AllocOne provides a mock function with given fields: +func (_m *MockAllocator) AllocOne() (int64, error) { + ret := _m.Called() + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func() (int64, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockAllocator_AllocOne_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocOne' +type MockAllocator_AllocOne_Call struct { + *mock.Call +} + +// AllocOne is a helper method to define mock.On call +func (_e *MockAllocator_Expecter) AllocOne() *MockAllocator_AllocOne_Call { + return &MockAllocator_AllocOne_Call{Call: _e.mock.On("AllocOne")} +} + +func (_c *MockAllocator_AllocOne_Call) Run(run func()) *MockAllocator_AllocOne_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockAllocator_AllocOne_Call) Return(_a0 int64, _a1 error) *MockAllocator_AllocOne_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockAllocator_AllocOne_Call) RunAndReturn(run func() (int64, error)) *MockAllocator_AllocOne_Call { + _c.Call.Return(run) + return _c +} + +// NewMockAllocator creates a new instance of MockAllocator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockAllocator(t interface { + mock.TestingT + Cleanup(func()) +}) *MockAllocator { + mock := &MockAllocator{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index f02118148a..0959c59ef4 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2397,20 +2397,19 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() - dt := &deleteTask{ - ctx: ctx, - Condition: NewTaskCondition(ctx), - req: request, - idAllocator: node.rowIDAllocator, - chMgr: node.chMgr, - chTicker: node.chTicker, - lb: node.lbPolicy, + + dr := &deleteRunner{ + req: request, + idAllocator: node.rowIDAllocator, + tsoAllocatorIns: node.tsoAllocator, + chMgr: node.chMgr, + chTicker: node.chTicker, + queue: node.sched.dmQueue, + lb: node.lbPolicy, } - log.Debug("Enqueue delete request in Proxy") - - // MsgID will be set by Enqueue() - if err := node.sched.dmQueue.Enqueue(dt); err != nil { + log.Debug("init delete runner in Proxy") + if err := dr.Init(ctx); err != nil { log.Error("Failed to enqueue delete task: " + err.Error()) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() @@ -2420,25 +2419,26 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) }, nil } - log.Debug("Detail of delete request in Proxy") + log.Debug("Run delete in Proxy") - if err := dt.WaitToFinish(); err != nil { - log.Error("Failed to execute delete task in task scheduler: " + err.Error()) + if err := dr.Run(ctx); err != nil { + log.Error("Failed to enqueue delete task: " + err.Error()) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.AbandonLabel).Inc() + return &milvuspb.MutationResult{ Status: merr.Status(err), }, nil } - receiveSize := proto.Size(dt.req) + receiveSize := proto.Size(dr.req) rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(receiveSize)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds())) - return dt.result, nil + return dr.result, nil } // Upsert upsert records into collection. diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index e8cc06f0d9..52782dea3a 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -8,6 +8,7 @@ import ( "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" "go.opentelemetry.io/otel" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -37,8 +38,7 @@ type deleteTask struct { ctx context.Context tr *timerecord.TimeRecorder - req *milvuspb.DeleteRequest - result *milvuspb.MutationResult + req *milvuspb.DeleteRequest // channel chMgr channelsMgr @@ -46,17 +46,20 @@ type deleteTask struct { pChannels []pChan vChannels []vChan - idAllocator *allocator.IDAllocator - lb LBPolicy + idAllocator allocator.Interface // delete info - schema *schemapb.CollectionSchema - ts Timestamp - msgID UniqueID + primaryKeys *schemapb.IDs collectionID UniqueID partitionID UniqueID - count int partitionKeyMode bool + + // set by scheduler + ts Timestamp + msgID UniqueID + + // result + count int64 } func (dt *deleteTask) TraceCtx() context.Context { @@ -112,115 +115,14 @@ func (dt *deleteTask) getChannels() []pChan { return dt.pChannels } -func getExpr(plan *planpb.PlanNode) (bool, *planpb.Expr_TermExpr) { - // simple delete request need expr with "pk in [a, b]" - termExpr, ok := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_TermExpr) - if !ok { - return false, nil - } - - if !termExpr.TermExpr.GetColumnInfo().GetIsPrimaryKey() { - return false, nil - } - return true, termExpr -} - -func getPrimaryKeysFromExpr(schema *schemapb.CollectionSchema, termExpr *planpb.Expr_TermExpr) (res *schemapb.IDs, rowNum int64, err error) { - res = &schemapb.IDs{} - rowNum = int64(len(termExpr.TermExpr.Values)) - switch termExpr.TermExpr.ColumnInfo.GetDataType() { - case schemapb.DataType_Int64: - ids := make([]int64, 0) - for _, v := range termExpr.TermExpr.Values { - ids = append(ids, v.GetInt64Val()) - } - res.IdField = &schemapb.IDs_IntId{ - IntId: &schemapb.LongArray{ - Data: ids, - }, - } - case schemapb.DataType_VarChar: - ids := make([]string, 0) - for _, v := range termExpr.TermExpr.Values { - ids = append(ids, v.GetStringVal()) - } - res.IdField = &schemapb.IDs_StrId{ - StrId: &schemapb.StringArray{ - Data: ids, - }, - } - default: - return res, 0, fmt.Errorf("invalid field data type specifyed in delete expr") - } - - return res, rowNum, nil -} - func (dt *deleteTask) PreExecute(ctx context.Context) error { - dt.result = &milvuspb.MutationResult{ - Status: merr.Success(), - IDs: &schemapb.IDs{ - IdField: nil, - }, - Timestamp: dt.BeginTs(), - } - - log := log.Ctx(ctx) - collName := dt.req.GetCollectionName() - if err := validateCollectionName(collName); err != nil { - return ErrWithLog(log, "Invalid collection name", err) - } - collID, err := globalMetaCache.GetCollectionID(ctx, dt.req.GetDbName(), collName) - if err != nil { - return ErrWithLog(log, "Failed to get collection id", err) - } - dt.collectionID = collID - - dt.partitionKeyMode, err = isPartitionKeyMode(ctx, dt.req.GetDbName(), dt.req.GetCollectionName()) - if err != nil { - return ErrWithLog(log, "Failed to get partition key mode", err) - } - if dt.partitionKeyMode && len(dt.req.PartitionName) != 0 { - return errors.New("not support manually specifying the partition names if partition key mode is used") - } - - // If partitionName is not empty, partitionID will be set. - if len(dt.req.PartitionName) > 0 { - partName := dt.req.GetPartitionName() - if err := validatePartitionTag(partName, true); err != nil { - return ErrWithLog(log, "Invalid partition name", err) - } - partID, err := globalMetaCache.GetPartitionID(ctx, dt.req.GetDbName(), collName, partName) - if err != nil { - return ErrWithLog(log, "Failed to get partition id", err) - } - dt.partitionID = partID - } else { - dt.partitionID = common.InvalidPartitionID - } - - schema, err := globalMetaCache.GetCollectionSchema(ctx, dt.req.GetDbName(), collName) - if err != nil { - return ErrWithLog(log, "Failed to get collection schema", err) - } - dt.schema = schema - - // hash primary keys to channels - channelNames, err := dt.chMgr.getVChannels(dt.collectionID) - if err != nil { - return ErrWithLog(log, "Failed to get primary keys from expr", err) - } - dt.vChannels = channelNames - - log.Debug("pre delete done", zap.Int64("collection_id", dt.collectionID)) - return nil } func (dt *deleteTask) Execute(ctx context.Context) (err error) { ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Delete-Execute") defer sp.End() - log := log.Ctx(ctx) + // log := log.Ctx(ctx) if len(dt.req.GetExpr()) == 0 { return merr.WrapErrParameterInvalid("valid expr", "empty expr", "invalid expression") @@ -232,162 +134,7 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) { return err } - plan, err := planparserv2.CreateRetrievePlan(dt.schema, dt.req.Expr) - if err != nil { - return fmt.Errorf("failed to create expr plan, expr = %s", dt.req.GetExpr()) - } - - isSimple, termExp := getExpr(plan) - if isSimple { - // if could get delete.primaryKeys from delete expr - err := dt.simpleDelete(ctx, termExp, stream) - if err != nil { - return err - } - } else { - // if get complex delete expr - // need query from querynode before delete - err = dt.complexDelete(ctx, plan, stream) - if err != nil { - log.Warn("complex delete failed,but delete some data", zap.Int("count", dt.count), zap.String("expr", dt.req.GetExpr())) - return err - } - } - - return nil -} - -func (dt *deleteTask) PostExecute(ctx context.Context) error { - return nil -} - -func (dt *deleteTask) getStreamingQueryAndDelteFunc(stream msgstream.MsgStream, plan *planpb.PlanNode) executeFunc { - return func(ctx context.Context, nodeID int64, qn types.QueryNodeClient, channelIDs ...string) error { - var partitionIDs []int64 - - // optimize query when partitionKey on - if dt.partitionKeyMode { - expr, err := ParseExprFromPlan(plan) - if err != nil { - return err - } - partitionKeys := ParsePartitionKeys(expr) - hashedPartitionNames, err := assignPartitionKeys(ctx, dt.req.GetDbName(), dt.req.GetCollectionName(), partitionKeys) - if err != nil { - return err - } - partitionIDs, err = getPartitionIDs(ctx, dt.req.GetDbName(), dt.req.GetCollectionName(), hashedPartitionNames) - if err != nil { - return err - } - } else if dt.partitionID != common.InvalidFieldID { - partitionIDs = []int64{dt.partitionID} - } - - log := log.Ctx(ctx).With( - zap.Int64("collectionID", dt.collectionID), - zap.Int64s("partitionIDs", partitionIDs), - zap.Strings("channels", channelIDs), - zap.Int64("nodeID", nodeID)) - // set plan - _, outputFieldIDs := translatePkOutputFields(dt.schema) - outputFieldIDs = append(outputFieldIDs, common.TimeStampField) - plan.OutputFieldIds = outputFieldIDs - log.Debug("start query for delete") - - serializedPlan, err := proto.Marshal(plan) - if err != nil { - return err - } - - queryReq := &querypb.QueryRequest{ - Req: &internalpb.RetrieveRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_Retrieve), - commonpbutil.WithMsgID(dt.msgID), - commonpbutil.WithSourceID(paramtable.GetNodeID()), - commonpbutil.WithTargetID(nodeID), - ), - MvccTimestamp: dt.ts, - ReqID: paramtable.GetNodeID(), - DbID: 0, // TODO - CollectionID: dt.collectionID, - PartitionIDs: partitionIDs, - SerializedExprPlan: serializedPlan, - OutputFieldsId: outputFieldIDs, - GuaranteeTimestamp: parseGuaranteeTsFromConsistency(dt.ts, dt.ts, commonpb.ConsistencyLevel_Bounded), - }, - DmlChannels: channelIDs, - Scope: querypb.DataScope_All, - } - - rc := timerecord.NewTimeRecorder("QueryStreamDelete") - client, err := qn.QueryStream(ctx, queryReq) - if err != nil { - log.Warn("query stream for delete create failed", zap.Error(err)) - return err - } - - for { - result, err := client.Recv() - if err != nil { - if err == io.EOF { - log.Debug("query stream for delete finished", zap.Int64("msgID", dt.msgID), zap.Duration("duration", rc.ElapseSpan())) - return nil - } - return err - } - - err = merr.Error(result.GetStatus()) - if err != nil { - log.Warn("query stream for delete get error status", zap.Int64("msgID", dt.msgID), zap.Error(err)) - return err - } - - err = dt.produce(ctx, stream, result.GetIds()) - if err != nil { - log.Warn("query stream for delete produce result failed", zap.Int64("msgID", dt.msgID), zap.Error(err)) - return err - } - } - } -} - -func (dt *deleteTask) complexDelete(ctx context.Context, plan *planpb.PlanNode, stream msgstream.MsgStream) error { - err := dt.lb.Execute(ctx, CollectionWorkLoad{ - db: dt.req.GetDbName(), - collectionName: dt.req.GetCollectionName(), - collectionID: dt.collectionID, - nq: 1, - exec: dt.getStreamingQueryAndDelteFunc(stream, plan), - }) - if err != nil { - log.Warn("fail to get or create dml stream", zap.Error(err)) - return err - } - - return nil -} - -func (dt *deleteTask) simpleDelete(ctx context.Context, termExp *planpb.Expr_TermExpr, stream msgstream.MsgStream) error { - primaryKeys, numRow, err := getPrimaryKeysFromExpr(dt.schema, termExp) - if err != nil { - log.Info("Failed to get primary keys from expr", zap.Error(err)) - return err - } - log.Debug("get primary keys from expr", - zap.Int64("len of primary keys", numRow), - zap.Int64("collectionID", dt.collectionID), - zap.Int64("partitionID", dt.partitionID)) - err = dt.produce(ctx, stream, primaryKeys) - if err != nil { - return err - } - return nil -} - -func (dt *deleteTask) produce(ctx context.Context, stream msgstream.MsgStream, primaryKeys *schemapb.IDs) error { - hashValues := typeutil.HashPK2Channels(primaryKeys, dt.vChannels) + hashValues := typeutil.HashPK2Channels(dt.primaryKeys, dt.vChannels) // repack delete msg by dmChannel result := make(map[uint32]msgstream.TsMsg) numRows := int64(0) @@ -406,7 +153,7 @@ func (dt *deleteTask) produce(ctx context.Context, stream msgstream.MsgStream, p curMsg.HashValues = append(curMsg.HashValues, hashValues[index]) curMsg.Timestamps = append(curMsg.Timestamps, dt.ts) - typeutil.AppendIDs(curMsg.PrimaryKeys, primaryKeys, index) + typeutil.AppendIDs(curMsg.PrimaryKeys, dt.primaryKeys, index) curMsg.NumRows++ numRows++ } @@ -430,11 +177,15 @@ func (dt *deleteTask) produce(ctx context.Context, stream msgstream.MsgStream, p zap.Int64("taskID", dt.ID()), zap.Duration("prepare duration", dt.tr.RecordSpan())) - err := stream.Produce(msgPack) + err = stream.Produce(msgPack) if err != nil { return err } - dt.result.DeleteCnt += numRows + dt.count += numRows + return nil +} + +func (dt *deleteTask) PostExecute(ctx context.Context) error { return nil } @@ -465,3 +216,395 @@ func (dt *deleteTask) newDeleteMsg(ctx context.Context) (*msgstream.DeleteMsg, e DeleteRequest: sliceRequest, }, nil } + +type deleteRunner struct { + req *milvuspb.DeleteRequest + result *milvuspb.MutationResult + + // channel + chMgr channelsMgr + chTicker channelsTimeTicker + vChannels []vChan + + idAllocator allocator.Interface + tsoAllocatorIns tsoAllocator + + // delete info + schema *schemapb.CollectionSchema + collectionID UniqueID + partitionID UniqueID + partitionKeyMode bool + + // for query + msgID int64 + ts uint64 + lb LBPolicy + count atomic.Int64 + err error + + // task queue + queue *dmTaskQueue +} + +func (dr *deleteRunner) Init(ctx context.Context) error { + log := log.Ctx(ctx) + var err error + + collName := dr.req.GetCollectionName() + if err := validateCollectionName(collName); err != nil { + return ErrWithLog(log, "Invalid collection name", err) + } + dr.collectionID, err = globalMetaCache.GetCollectionID(ctx, dr.req.GetDbName(), collName) + if err != nil { + return ErrWithLog(log, "Failed to get collection id", err) + } + + dr.schema, err = globalMetaCache.GetCollectionSchema(ctx, dr.req.GetDbName(), collName) + if err != nil { + return ErrWithLog(log, "Failed to get collection schema", err) + } + + dr.partitionKeyMode = hasParitionKeyModeField(dr.schema) + // get prititionIDs of delete + dr.partitionID = common.InvalidPartitionID + if len(dr.req.PartitionName) > 0 { + if dr.partitionKeyMode { + return errors.New("not support manually specifying the partition names if partition key mode is used") + } + + partName := dr.req.GetPartitionName() + if err := validatePartitionTag(partName, true); err != nil { + return ErrWithLog(log, "Invalid partition name", err) + } + partID, err := globalMetaCache.GetPartitionID(ctx, dr.req.GetDbName(), collName, partName) + if err != nil { + return ErrWithLog(log, "Failed to get partition id", err) + } + dr.partitionID = partID + } + + // hash primary keys to channels + channelNames, err := dr.chMgr.getVChannels(dr.collectionID) + if err != nil { + return ErrWithLog(log, "Failed to get primary keys from expr", err) + } + dr.vChannels = channelNames + + dr.result = &milvuspb.MutationResult{ + Status: merr.Success(), + IDs: &schemapb.IDs{ + IdField: nil, + }, + } + return nil +} + +func (dr *deleteRunner) Run(ctx context.Context) error { + plan, err := planparserv2.CreateRetrievePlan(dr.schema, dr.req.Expr) + if err != nil { + return fmt.Errorf("failed to create expr plan, expr = %s", dr.req.GetExpr()) + } + + isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema, plan) + if isSimple { + // if could get delete.primaryKeys from delete expr + err := dr.simpleDelete(ctx, pk, numRow) + if err != nil { + return err + } + } else { + // if get complex delete expr + // need query from querynode before delete + err = dr.complexDelete(ctx, plan) + if err != nil { + log.Warn("complex delete failed,but delete some data", zap.Int64("count", dr.result.DeleteCnt), zap.String("expr", dr.req.GetExpr())) + return err + } + } + return nil +} + +func (dr *deleteRunner) produce(ctx context.Context, primaryKeys *schemapb.IDs) (*deleteTask, error) { + task := &deleteTask{ + ctx: ctx, + Condition: NewTaskCondition(ctx), + req: dr.req, + idAllocator: dr.idAllocator, + chMgr: dr.chMgr, + chTicker: dr.chTicker, + collectionID: dr.collectionID, + partitionID: dr.partitionID, + partitionKeyMode: dr.partitionKeyMode, + vChannels: dr.vChannels, + primaryKeys: primaryKeys, + } + + if err := dr.queue.Enqueue(task); err != nil { + log.Error("Failed to enqueue delete task: " + err.Error()) + return nil, err + } + + return task, nil +} + +// getStreamingQueryAndDelteFunc return query function used by LBPolicy +// make sure it concurrent safe +func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) executeFunc { + return func(ctx context.Context, nodeID int64, qn types.QueryNodeClient, channelIDs ...string) error { + var partitionIDs []int64 + + // optimize query when partitionKey on + if dr.partitionKeyMode { + expr, err := ParseExprFromPlan(plan) + if err != nil { + return err + } + partitionKeys := ParsePartitionKeys(expr) + hashedPartitionNames, err := assignPartitionKeys(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), partitionKeys) + if err != nil { + return err + } + partitionIDs, err = getPartitionIDs(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), hashedPartitionNames) + if err != nil { + return err + } + } else if dr.partitionID != common.InvalidFieldID { + partitionIDs = []int64{dr.partitionID} + } + + log := log.Ctx(ctx).With( + zap.Int64("collectionID", dr.collectionID), + zap.Int64s("partitionIDs", partitionIDs), + zap.Strings("channels", channelIDs), + zap.Int64("nodeID", nodeID)) + + // set plan + _, outputFieldIDs := translatePkOutputFields(dr.schema) + outputFieldIDs = append(outputFieldIDs, common.TimeStampField) + plan.OutputFieldIds = outputFieldIDs + + serializedPlan, err := proto.Marshal(plan) + if err != nil { + return err + } + + queryReq := &querypb.QueryRequest{ + Req: &internalpb.RetrieveRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_Retrieve), + commonpbutil.WithMsgID(dr.msgID), + commonpbutil.WithSourceID(paramtable.GetNodeID()), + commonpbutil.WithTargetID(nodeID), + ), + MvccTimestamp: dr.ts, + ReqID: paramtable.GetNodeID(), + DbID: 0, // TODO + CollectionID: dr.collectionID, + PartitionIDs: partitionIDs, + SerializedExprPlan: serializedPlan, + OutputFieldsId: outputFieldIDs, + GuaranteeTimestamp: parseGuaranteeTsFromConsistency(dr.ts, dr.ts, dr.req.GetConsistencyLevel()), + }, + DmlChannels: channelIDs, + Scope: querypb.DataScope_All, + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + log.Debug("start query for delete", zap.Int64("msgID", dr.msgID)) + client, err := qn.QueryStream(ctx, queryReq) + if err != nil { + log.Warn("query stream for delete create failed", zap.Error(err)) + return err + } + + taskCh := make(chan *deleteTask, 256) + go dr.receiveQueryResult(ctx, client, taskCh) + // wait all task finish + for task := range taskCh { + err := task.WaitToFinish() + if err != nil { + return err + } + dr.count.Add(task.count) + } + + // query or produce task failed + if dr.err != nil { + return dr.err + } + return nil + } +} + +func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask) { + defer func() { + close(taskCh) + }() + + for { + result, err := client.Recv() + if err != nil { + if err == io.EOF { + log.Debug("query stream for delete finished", zap.Int64("msgID", dr.msgID)) + return + } + dr.err = err + return + } + + err = merr.Error(result.GetStatus()) + if err != nil { + dr.err = err + log.Warn("query stream for delete get error status", zap.Int64("msgID", dr.msgID), zap.Error(err)) + return + } + + task, err := dr.produce(ctx, result.GetIds()) + if err != nil { + dr.err = err + log.Warn("produce delete task failed", zap.Error(err)) + return + } + + taskCh <- task + } +} + +func (dr *deleteRunner) complexDelete(ctx context.Context, plan *planpb.PlanNode) error { + rc := timerecord.NewTimeRecorder("QueryStreamDelete") + var err error + + dr.msgID, err = dr.idAllocator.AllocOne() + if err != nil { + return err + } + + dr.ts, err = dr.tsoAllocatorIns.AllocOne(ctx) + if err != nil { + return err + } + + err = dr.lb.Execute(ctx, CollectionWorkLoad{ + db: dr.req.GetDbName(), + collectionName: dr.req.GetCollectionName(), + collectionID: dr.collectionID, + nq: 1, + exec: dr.getStreamingQueryAndDelteFunc(plan), + }) + dr.result.DeleteCnt = dr.count.Load() + if err != nil { + log.Warn("fail to execute complex delete", + zap.Int64("deleteCnt", dr.result.GetDeleteCnt()), + zap.Duration("interval", rc.ElapseSpan()), + zap.Error(err)) + return err + } + + log.Info("complex delete finished", zap.Int64("deleteCnt", dr.result.GetDeleteCnt()), zap.Duration("interval", rc.ElapseSpan())) + return nil +} + +func (dr *deleteRunner) simpleDelete(ctx context.Context, pk *schemapb.IDs, numRow int64) error { + log.Debug("get primary keys from expr", + zap.Int64("len of primary keys", numRow), + zap.Int64("collectionID", dr.collectionID), + zap.Int64("partitionID", dr.partitionID)) + + task, err := dr.produce(ctx, pk) + if err != nil { + log.Warn("produce delete task failed") + return err + } + + err = task.WaitToFinish() + if err == nil { + dr.result.DeleteCnt = task.count + } + return err +} + +func getPrimaryKeysFromPlan(schema *schemapb.CollectionSchema, plan *planpb.PlanNode) (bool, *schemapb.IDs, int64) { + // simple delete request need expr with "pk in [a, b]" + termExpr, ok := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_TermExpr) + if ok { + if !termExpr.TermExpr.GetColumnInfo().GetIsPrimaryKey() { + return false, nil, 0 + } + + ids, rowNum, err := getPrimaryKeysFromTermExpr(schema, termExpr) + if err != nil { + return false, nil, 0 + } + return true, ids, rowNum + } + + // simple delete if expr with "pk == a" + unaryRangeExpr, ok := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_UnaryRangeExpr) + if ok { + if unaryRangeExpr.UnaryRangeExpr.GetOp() != planpb.OpType_Equal || !unaryRangeExpr.UnaryRangeExpr.GetColumnInfo().GetIsPrimaryKey() { + return false, nil, 0 + } + + ids, err := getPrimaryKeysFromUnaryRangeExpr(schema, unaryRangeExpr) + if err != nil { + return false, nil, 0 + } + return true, ids, 1 + } + + return false, nil, 0 +} + +func getPrimaryKeysFromUnaryRangeExpr(schema *schemapb.CollectionSchema, unaryRangeExpr *planpb.Expr_UnaryRangeExpr) (res *schemapb.IDs, err error) { + res = &schemapb.IDs{} + switch unaryRangeExpr.UnaryRangeExpr.GetColumnInfo().GetDataType() { + case schemapb.DataType_Int64: + res.IdField = &schemapb.IDs_IntId{ + IntId: &schemapb.LongArray{ + Data: []int64{unaryRangeExpr.UnaryRangeExpr.GetValue().GetInt64Val()}, + }, + } + case schemapb.DataType_VarChar: + res.IdField = &schemapb.IDs_StrId{ + StrId: &schemapb.StringArray{ + Data: []string{unaryRangeExpr.UnaryRangeExpr.GetValue().GetStringVal()}, + }, + } + default: + return res, fmt.Errorf("invalid field data type specifyed in simple delete expr") + } + + return res, nil +} + +func getPrimaryKeysFromTermExpr(schema *schemapb.CollectionSchema, termExpr *planpb.Expr_TermExpr) (res *schemapb.IDs, rowNum int64, err error) { + res = &schemapb.IDs{} + rowNum = int64(len(termExpr.TermExpr.Values)) + switch termExpr.TermExpr.ColumnInfo.GetDataType() { + case schemapb.DataType_Int64: + ids := make([]int64, 0) + for _, v := range termExpr.TermExpr.Values { + ids = append(ids, v.GetInt64Val()) + } + res.IdField = &schemapb.IDs_IntId{ + IntId: &schemapb.LongArray{ + Data: ids, + }, + } + case schemapb.DataType_VarChar: + ids := make([]string, 0) + for _, v := range termExpr.TermExpr.Values { + ids = append(ids, v.GetStringVal()) + } + res.IdField = &schemapb.IDs_StrId{ + StrId: &schemapb.StringArray{ + Data: ids, + }, + } + default: + return res, 0, fmt.Errorf("invalid field data type specifyed in simple delete expr") + } + + return res, rowNum, nil +} diff --git a/internal/proxy/task_delete_test.go b/internal/proxy/task_delete_test.go index eddead02fc..cbd44cb8e8 100644 --- a/internal/proxy/task_delete_test.go +++ b/internal/proxy/task_delete_test.go @@ -26,7 +26,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/paramtable" ) -func Test_GetExpr(t *testing.T) { +func Test_getPrimaryKeysFromPlan(t *testing.T) { schema := &schemapb.CollectionSchema{ Name: "test_delete", Description: "", @@ -50,24 +50,56 @@ func Test_GetExpr(t *testing.T) { expr := "pk < 4" plan, err := planparserv2.CreateRetrievePlan(schema, expr) assert.NoError(t, err) - isSimple, _ := getExpr(plan) + isSimple, _, _ := getPrimaryKeysFromPlan(schema, plan) assert.False(t, isSimple) }) t.Run("delete with no-pk field expr", func(t *testing.T) { - expr := "non_pk in [1, 2, 3]" + expr := "non_pk == 1" plan, err := planparserv2.CreateRetrievePlan(schema, expr) assert.NoError(t, err) - isSimple, _ := getExpr(plan) + isSimple, _, _ := getPrimaryKeysFromPlan(schema, plan) assert.False(t, isSimple) }) - t.Run("delete with simple expr", func(t *testing.T) { + t.Run("delete with simple term expr", func(t *testing.T) { expr := "pk in [1, 2, 3]" plan, err := planparserv2.CreateRetrievePlan(schema, expr) assert.NoError(t, err) - isSimple, _ := getExpr(plan) + isSimple, _, rowNum := getPrimaryKeysFromPlan(schema, plan) assert.True(t, isSimple) + assert.Equal(t, int64(3), rowNum) + }) + + t.Run("delete failed with simple term expr", func(t *testing.T) { + expr := "pk in [1, 2, 3]" + plan, err := planparserv2.CreateRetrievePlan(schema, expr) + assert.NoError(t, err) + termExpr := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_TermExpr) + termExpr.TermExpr.ColumnInfo.DataType = -1 + + isSimple, _, _ := getPrimaryKeysFromPlan(schema, plan) + assert.False(t, isSimple) + }) + + t.Run("delete with simple equal expr", func(t *testing.T) { + expr := "pk == 1" + plan, err := planparserv2.CreateRetrievePlan(schema, expr) + assert.NoError(t, err) + isSimple, _, rowNum := getPrimaryKeysFromPlan(schema, plan) + assert.True(t, isSimple) + assert.Equal(t, int64(1), rowNum) + }) + + t.Run("delete failed with simple equal expr", func(t *testing.T) { + expr := "pk == 1" + plan, err := planparserv2.CreateRetrievePlan(schema, expr) + assert.NoError(t, err) + unaryRangeExpr := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_UnaryRangeExpr) + unaryRangeExpr.UnaryRangeExpr.ColumnInfo.DataType = -1 + + isSimple, _, _ := getPrimaryKeysFromPlan(schema, plan) + assert.False(t, isSimple) }) } @@ -98,147 +130,6 @@ func TestDeleteTask_GetChannels(t *testing.T) { assert.ElementsMatch(t, channels, dt.pChannels) } -func TestDeleteTask_PreExecute(t *testing.T) { - schema := &schemapb.CollectionSchema{ - Name: "test_delete", - Description: "", - AutoID: false, - Fields: []*schemapb.FieldSchema{ - { - FieldID: common.StartOfUserFieldID, - Name: "pk", - IsPrimaryKey: true, - DataType: schemapb.DataType_Int64, - }, - { - FieldID: common.StartOfUserFieldID + 1, - Name: "non_pk", - IsPrimaryKey: false, - DataType: schemapb.DataType_Int64, - }, - }, - } - - t.Run("empty collection name", func(t *testing.T) { - dt := deleteTask{} - assert.Error(t, dt.PreExecute(context.Background())) - }) - - t.Run("fail to get collection id", func(t *testing.T) { - dt := deleteTask{ - req: &milvuspb.DeleteRequest{ - CollectionName: "foo", - }, - } - cache := NewMockCache(t) - cache.On("GetCollectionID", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(int64(0), errors.New("mock GetCollectionID err")) - globalMetaCache = cache - assert.Error(t, dt.PreExecute(context.Background())) - }) - - t.Run("fail partition key mode", func(t *testing.T) { - dt := deleteTask{req: &milvuspb.DeleteRequest{ - CollectionName: "foo", - DbName: "db_1", - }} - cache := NewMockCache(t) - cache.On("GetCollectionID", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(int64(10000), nil) - cache.On("GetCollectionSchema", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(nil, errors.New("mock GetCollectionSchema err")) - - globalMetaCache = cache - assert.Error(t, dt.PreExecute(context.Background())) - }) - - t.Run("invalid partition name", func(t *testing.T) { - dt := deleteTask{req: &milvuspb.DeleteRequest{ - CollectionName: "foo", - DbName: "db_1", - PartitionName: "aaa", - }} - cache := NewMockCache(t) - cache.On("GetCollectionID", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(int64(10000), nil) - cache.On("GetCollectionSchema", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(&schemapb.CollectionSchema{ - Name: "test_delete", - Description: "", - AutoID: false, - Fields: []*schemapb.FieldSchema{ - { - FieldID: common.StartOfUserFieldID, - Name: "pk", - IsPrimaryKey: true, - DataType: schemapb.DataType_Int64, - IsPartitionKey: true, - }, - }, - }, nil) - - globalMetaCache = cache - assert.Error(t, dt.PreExecute(context.Background())) - }) - - t.Run("invalie partition", func(t *testing.T) { - dt := deleteTask{ - req: &milvuspb.DeleteRequest{ - CollectionName: "foo", - DbName: "db_1", - PartitionName: "aaa", - Expr: "non_pk in [1, 2, 3]", - }, - } - cache := NewMockCache(t) - cache.On("GetCollectionID", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(int64(10000), nil) - cache.On("GetCollectionSchema", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(schema, nil) - cache.On("GetPartitionID", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(int64(0), errors.New("mock GetPartitionID err")) - - globalMetaCache = cache - assert.Error(t, dt.PreExecute(context.Background())) - - dt.req.PartitionName = "aaa" - assert.Error(t, dt.PreExecute(context.Background())) - - cache.On("GetPartitionID", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(int64(100001), nil) - assert.Error(t, dt.PreExecute(context.Background())) - }) -} - func TestDeleteTask_Execute(t *testing.T) { collectionName := "test_delete" collectionID := int64(111) @@ -246,6 +137,102 @@ func TestDeleteTask_Execute(t *testing.T) { partitionID := int64(222) channels := []string{"test_channel"} dbName := "test_1" + pk := &schemapb.IDs{ + IdField: &schemapb.IDs_IntId{IntId: &schemapb.LongArray{Data: []int64{1, 2}}}, + } + + t.Run("empty expr", func(t *testing.T) { + dt := deleteTask{} + assert.Error(t, dt.Execute(context.Background())) + }) + + t.Run("get channel failed", func(t *testing.T) { + mockMgr := NewMockChannelsMgr(t) + dt := deleteTask{ + chMgr: mockMgr, + req: &milvuspb.DeleteRequest{ + Expr: "pk in [1,2]", + }, + } + + mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(nil, errors.New("mock error")) + assert.Error(t, dt.Execute(context.Background())) + }) + + t.Run("alloc failed", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockMgr := NewMockChannelsMgr(t) + rc := mocks.NewMockRootCoordClient(t) + allocator, err := allocator.NewIDAllocator(ctx, rc, paramtable.GetNodeID()) + assert.NoError(t, err) + allocator.Close() + + dt := deleteTask{ + chMgr: mockMgr, + collectionID: collectionID, + partitionID: partitionID, + vChannels: channels, + idAllocator: allocator, + req: &milvuspb.DeleteRequest{ + CollectionName: collectionName, + PartitionName: partitionName, + DbName: dbName, + Expr: "pk in [1,2]", + }, + primaryKeys: pk, + } + stream := msgstream.NewMockMsgStream(t) + mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil) + + assert.Error(t, dt.Execute(context.Background())) + }) + + t.Run("delete produce failed", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockMgr := NewMockChannelsMgr(t) + rc := mocks.NewMockRootCoordClient(t) + rc.EXPECT().AllocID(mock.Anything, mock.Anything).Return( + &rootcoordpb.AllocIDResponse{ + Status: merr.Success(), + ID: 0, + Count: 1, + }, nil) + allocator, err := allocator.NewIDAllocator(ctx, rc, paramtable.GetNodeID()) + allocator.Start() + assert.NoError(t, err) + + dt := deleteTask{ + chMgr: mockMgr, + collectionID: collectionID, + partitionID: partitionID, + vChannels: channels, + idAllocator: allocator, + req: &milvuspb.DeleteRequest{ + CollectionName: collectionName, + PartitionName: partitionName, + DbName: dbName, + Expr: "pk in [1,2]", + }, + primaryKeys: pk, + } + stream := msgstream.NewMockMsgStream(t) + mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil) + stream.EXPECT().Produce(mock.Anything).Return(errors.New("mock error")) + assert.Error(t, dt.Execute(context.Background())) + }) +} + +func TestDeleteRunner_Init(t *testing.T) { + collectionName := "test_delete" + collectionID := int64(111) + partitionName := "default" + partitionID := int64(222) + // channels := []string{"test_channel"} + dbName := "test_1" schema := &schemapb.CollectionSchema{ Name: collectionName, @@ -266,102 +253,265 @@ func TestDeleteTask_Execute(t *testing.T) { }, }, } - t.Run("empty expr", func(t *testing.T) { - dt := deleteTask{} - assert.Error(t, dt.Execute(context.Background())) + + t.Run("empty collection name", func(t *testing.T) { + dr := deleteRunner{} + assert.Error(t, dr.Init(context.Background())) }) - t.Run("get channel failed", func(t *testing.T) { - mockMgr := NewMockChannelsMgr(t) - dt := deleteTask{ - chMgr: mockMgr, + t.Run("fail to get collection id", func(t *testing.T) { + dr := deleteRunner{ req: &milvuspb.DeleteRequest{ - Expr: "pk in [1,2]", + CollectionName: collectionName, }, } - - mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(nil, errors.New("mock error")) - assert.Error(t, dt.Execute(context.Background())) + cache := NewMockCache(t) + cache.On("GetCollectionID", + mock.Anything, // context.Context + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + ).Return(int64(0), errors.New("mock GetCollectionID err")) + globalMetaCache = cache + assert.Error(t, dr.Init(context.Background())) }) + t.Run("fail get collection schema", func(t *testing.T) { + dr := deleteRunner{req: &milvuspb.DeleteRequest{ + CollectionName: collectionName, + DbName: dbName, + }} + cache := NewMockCache(t) + cache.On("GetCollectionID", + mock.Anything, // context.Context + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + ).Return(collectionID, nil) + cache.On("GetCollectionSchema", + mock.Anything, // context.Context + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + ).Return(nil, errors.New("mock GetCollectionSchema err")) + + globalMetaCache = cache + assert.Error(t, dr.Init(context.Background())) + }) + + t.Run("partition key mode but delete with partition name", func(t *testing.T) { + dr := deleteRunner{req: &milvuspb.DeleteRequest{ + CollectionName: collectionName, + DbName: dbName, + PartitionName: partitionName, + }} + cache := NewMockCache(t) + cache.On("GetCollectionID", + mock.Anything, // context.Context + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + ).Return(collectionID, nil) + cache.On("GetCollectionSchema", + mock.Anything, // context.Context + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + ).Return(&schemapb.CollectionSchema{ + Name: collectionName, + Description: "", + AutoID: false, + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.StartOfUserFieldID, + Name: "pk", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + IsPartitionKey: true, + }, + }, + }, nil) + + globalMetaCache = cache + assert.Error(t, dr.Init(context.Background())) + }) + + t.Run("invalid partition name", func(t *testing.T) { + dr := deleteRunner{ + req: &milvuspb.DeleteRequest{ + CollectionName: collectionName, + DbName: dbName, + PartitionName: "???", + Expr: "non_pk in [1, 2, 3]", + }, + } + cache := NewMockCache(t) + cache.On("GetCollectionID", + mock.Anything, // context.Context + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + ).Return(int64(10000), nil) + cache.On("GetCollectionSchema", + mock.Anything, // context.Context + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + ).Return(schema, nil) + + globalMetaCache = cache + assert.Error(t, dr.Init(context.Background())) + }) + + t.Run("get partition id failed", func(t *testing.T) { + dr := deleteRunner{ + req: &milvuspb.DeleteRequest{ + CollectionName: collectionName, + DbName: dbName, + PartitionName: partitionName, + Expr: "non_pk in [1, 2, 3]", + }, + } + cache := NewMockCache(t) + cache.On("GetCollectionID", + mock.Anything, // context.Context + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + ).Return(collectionID, nil) + cache.On("GetCollectionSchema", + mock.Anything, // context.Context + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + ).Return(schema, nil) + cache.On("GetPartitionID", + mock.Anything, // context.Context + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + ).Return(int64(0), errors.New("mock GetPartitionID err")) + + globalMetaCache = cache + assert.Error(t, dr.Init(context.Background())) + }) + + t.Run("get vchannel failed", func(t *testing.T) { + chMgr := NewMockChannelsMgr(t) + dr := deleteRunner{ + req: &milvuspb.DeleteRequest{ + CollectionName: collectionName, + DbName: dbName, + PartitionName: partitionName, + Expr: "non_pk in [1, 2, 3]", + }, + chMgr: chMgr, + } + cache := NewMockCache(t) + cache.On("GetCollectionID", + mock.Anything, // context.Context + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + ).Return(collectionID, nil) + cache.On("GetCollectionSchema", + mock.Anything, // context.Context + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + ).Return(schema, nil) + cache.On("GetPartitionID", + mock.Anything, // context.Context + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + ).Return(partitionID, nil) + chMgr.On("getVChannels", mock.Anything).Return(nil, fmt.Errorf("mock error")) + + globalMetaCache = cache + assert.Error(t, dr.Init(context.Background())) + }) +} + +func TestDeleteRunner_Run(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + collectionName := "test_delete" + collectionID := int64(111) + partitionName := "default" + partitionID := int64(222) + channels := []string{"test_channel"} + dbName := "test_1" + tsoAllocator := &mockTsoAllocator{} + idAllocator := &mockIDAllocatorInterface{} + + queue, err := newTaskScheduler(ctx, tsoAllocator, nil) + assert.NoError(t, err) + queue.Start() + defer queue.Close() + + schema := &schemapb.CollectionSchema{ + Name: collectionName, + Description: "", + AutoID: false, + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.StartOfUserFieldID, + Name: "pk", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.StartOfUserFieldID + 1, + Name: "non_pk", + IsPrimaryKey: false, + DataType: schemapb.DataType_Int64, + }, + }, + } + + metaCache := NewMockCache(t) + metaCache.EXPECT().GetCollectionID(mock.Anything, dbName, collectionName).Return(collectionID, nil).Maybe() + globalMetaCache = metaCache + defer func() { + globalMetaCache = nil + }() + t.Run("create plan failed", func(t *testing.T) { mockMgr := NewMockChannelsMgr(t) - dt := deleteTask{ - chMgr: mockMgr, - schema: schema, + dr := deleteRunner{ + chMgr: mockMgr, req: &milvuspb.DeleteRequest{ Expr: "????", }, } - stream := msgstream.NewMockMsgStream(t) - mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil) - assert.Error(t, dt.Execute(context.Background())) + assert.Error(t, dr.Run(context.Background())) }) - t.Run("alloc failed", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - + t.Run("simple delete task failed", func(t *testing.T) { mockMgr := NewMockChannelsMgr(t) - rc := mocks.NewMockRootCoordClient(t) - allocator, err := allocator.NewIDAllocator(ctx, rc, paramtable.GetNodeID()) - assert.NoError(t, err) - allocator.Close() + lb := NewMockLBPolicy(t) - dt := deleteTask{ - chMgr: mockMgr, - schema: schema, - collectionID: collectionID, - partitionID: partitionID, - vChannels: channels, - idAllocator: allocator, - req: &milvuspb.DeleteRequest{ - CollectionName: collectionName, - PartitionName: partitionName, - DbName: dbName, - Expr: "pk in [1,2]", - }, - } - stream := msgstream.NewMockMsgStream(t) - mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil) - - assert.Error(t, dt.Execute(context.Background())) - }) - - t.Run("simple delete failed", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - mockMgr := NewMockChannelsMgr(t) - rc := mocks.NewMockRootCoordClient(t) - rc.EXPECT().AllocID(mock.Anything, mock.Anything).Return( - &rootcoordpb.AllocIDResponse{ + dr := deleteRunner{ + chMgr: mockMgr, + schema: schema, + collectionID: collectionID, + partitionID: partitionID, + vChannels: channels, + tsoAllocatorIns: tsoAllocator, + idAllocator: idAllocator, + queue: queue.dmQueue, + lb: lb, + result: &milvuspb.MutationResult{ Status: merr.Success(), - ID: 0, - Count: 1, - }, nil) - allocator, err := allocator.NewIDAllocator(ctx, rc, paramtable.GetNodeID()) - allocator.Start() - assert.NoError(t, err) - - dt := deleteTask{ - chMgr: mockMgr, - schema: schema, - collectionID: collectionID, - partitionID: partitionID, - vChannels: channels, - idAllocator: allocator, + IDs: &schemapb.IDs{ + IdField: nil, + }, + }, req: &milvuspb.DeleteRequest{ CollectionName: collectionName, PartitionName: partitionName, DbName: dbName, - Expr: "pk in [1,2]", + Expr: "pk in [1,2,3]", }, } stream := msgstream.NewMockMsgStream(t) mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil) - stream.EXPECT().Produce(mock.Anything).Return(errors.New("mock error")) - assert.Error(t, dt.Execute(context.Background())) + mockMgr.EXPECT().getChannels(collectionID).Return(channels, nil) + stream.EXPECT().Produce(mock.Anything).Return(fmt.Errorf("mock error")) + + assert.Error(t, dr.Run(context.Background())) + assert.Equal(t, int64(0), dr.result.DeleteCnt) }) t.Run("complex delete query rpc failed", func(t *testing.T) { @@ -369,13 +519,16 @@ func TestDeleteTask_Execute(t *testing.T) { qn := mocks.NewMockQueryNodeClient(t) lb := NewMockLBPolicy(t) - dt := deleteTask{ - chMgr: mockMgr, - schema: schema, - collectionID: collectionID, - partitionID: partitionID, - vChannels: channels, - lb: lb, + dr := deleteRunner{ + idAllocator: idAllocator, + tsoAllocatorIns: tsoAllocator, + queue: queue.dmQueue, + chMgr: mockMgr, + schema: schema, + collectionID: collectionID, + partitionID: partitionID, + vChannels: channels, + lb: lb, result: &milvuspb.MutationResult{ Status: merr.Success(), IDs: &schemapb.IDs{ @@ -389,15 +542,13 @@ func TestDeleteTask_Execute(t *testing.T) { Expr: "pk < 3", }, } - stream := msgstream.NewMockMsgStream(t) - mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil) lb.EXPECT().Execute(mock.Anything, mock.Anything).Call.Return(func(ctx context.Context, workload CollectionWorkLoad) error { return workload.exec(ctx, 1, qn) }) qn.EXPECT().QueryStream(mock.Anything, mock.Anything).Return(nil, errors.New("mock error")) - assert.Error(t, dt.Execute(context.Background())) - assert.Equal(t, int64(0), dt.result.DeleteCnt) + assert.Error(t, dr.Run(context.Background())) + assert.Equal(t, int64(0), dr.result.DeleteCnt) }) t.Run("complex delete query failed", func(t *testing.T) { @@ -405,27 +556,19 @@ func TestDeleteTask_Execute(t *testing.T) { defer cancel() mockMgr := NewMockChannelsMgr(t) - rc := mocks.NewMockRootCoordClient(t) qn := mocks.NewMockQueryNodeClient(t) lb := NewMockLBPolicy(t) - rc.EXPECT().AllocID(mock.Anything, mock.Anything).Return( - &rootcoordpb.AllocIDResponse{ - Status: merr.Success(), - ID: 0, - Count: 1, - }, nil) - allocator, err := allocator.NewIDAllocator(ctx, rc, paramtable.GetNodeID()) - allocator.Start() - assert.NoError(t, err) - dt := deleteTask{ - chMgr: mockMgr, - schema: schema, - collectionID: collectionID, - partitionID: partitionID, - vChannels: channels, - idAllocator: allocator, - lb: lb, + dr := deleteRunner{ + queue: queue.dmQueue, + chMgr: mockMgr, + schema: schema, + collectionID: collectionID, + partitionID: partitionID, + vChannels: channels, + tsoAllocatorIns: tsoAllocator, + idAllocator: idAllocator, + lb: lb, result: &milvuspb.MutationResult{ Status: merr.Success(), IDs: &schemapb.IDs{ @@ -441,6 +584,9 @@ func TestDeleteTask_Execute(t *testing.T) { } stream := msgstream.NewMockMsgStream(t) mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil) + mockMgr.EXPECT().getChannels(collectionID).Return(channels, nil) + stream.EXPECT().Produce(mock.Anything).Return(nil) + lb.EXPECT().Execute(mock.Anything, mock.Anything).Call.Return(func(ctx context.Context, workload CollectionWorkLoad) error { return workload.exec(ctx, 1, qn) }) @@ -466,11 +612,8 @@ func TestDeleteTask_Execute(t *testing.T) { }) return client }, nil) - stream.EXPECT().Produce(mock.Anything).Return(nil) - assert.Error(t, dt.Execute(context.Background())) - // query failed but still delete some data before failed. - assert.Equal(t, int64(3), dt.result.DeleteCnt) + assert.Error(t, dr.Run(ctx)) }) t.Run("complex delete produce failed", func(t *testing.T) { @@ -478,27 +621,19 @@ func TestDeleteTask_Execute(t *testing.T) { defer cancel() mockMgr := NewMockChannelsMgr(t) - rc := mocks.NewMockRootCoordClient(t) qn := mocks.NewMockQueryNodeClient(t) lb := NewMockLBPolicy(t) - rc.EXPECT().AllocID(mock.Anything, mock.Anything).Return( - &rootcoordpb.AllocIDResponse{ - Status: merr.Success(), - ID: 0, - Count: 1, - }, nil) - allocator, err := allocator.NewIDAllocator(ctx, rc, paramtable.GetNodeID()) - allocator.Start() - assert.NoError(t, err) - dt := deleteTask{ - chMgr: mockMgr, - schema: schema, - collectionID: collectionID, - partitionID: partitionID, - vChannels: channels, - idAllocator: allocator, - lb: lb, + dr := deleteRunner{ + chMgr: mockMgr, + queue: queue.dmQueue, + schema: schema, + collectionID: collectionID, + partitionID: partitionID, + vChannels: channels, + idAllocator: idAllocator, + tsoAllocatorIns: tsoAllocator, + lb: lb, result: &milvuspb.MutationResult{ Status: merr.Success(), IDs: &schemapb.IDs{ @@ -514,6 +649,7 @@ func TestDeleteTask_Execute(t *testing.T) { } stream := msgstream.NewMockMsgStream(t) mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil) + mockMgr.EXPECT().getChannels(collectionID).Return(channels, nil) lb.EXPECT().Execute(mock.Anything, mock.Anything).Call.Return(func(ctx context.Context, workload CollectionWorkLoad) error { return workload.exec(ctx, 1, qn) }) @@ -538,8 +674,8 @@ func TestDeleteTask_Execute(t *testing.T) { }, nil) stream.EXPECT().Produce(mock.Anything).Return(errors.New("mock error")) - assert.Error(t, dt.Execute(context.Background())) - assert.Equal(t, int64(0), dt.result.DeleteCnt) + assert.Error(t, dr.Run(ctx)) + assert.Equal(t, int64(0), dr.result.DeleteCnt) }) t.Run("complex delete success", func(t *testing.T) { @@ -547,27 +683,19 @@ func TestDeleteTask_Execute(t *testing.T) { defer cancel() mockMgr := NewMockChannelsMgr(t) - rc := mocks.NewMockRootCoordClient(t) qn := mocks.NewMockQueryNodeClient(t) lb := NewMockLBPolicy(t) - rc.EXPECT().AllocID(mock.Anything, mock.Anything).Return( - &rootcoordpb.AllocIDResponse{ - Status: merr.Success(), - ID: 0, - Count: 1, - }, nil) - allocator, err := allocator.NewIDAllocator(ctx, rc, paramtable.GetNodeID()) - allocator.Start() - assert.NoError(t, err) - dt := deleteTask{ - chMgr: mockMgr, - schema: schema, - collectionID: collectionID, - partitionID: partitionID, - vChannels: channels, - idAllocator: allocator, - lb: lb, + dr := deleteRunner{ + queue: queue.dmQueue, + chMgr: mockMgr, + schema: schema, + collectionID: collectionID, + partitionID: partitionID, + vChannels: channels, + idAllocator: idAllocator, + tsoAllocatorIns: tsoAllocator, + lb: lb, result: &milvuspb.MutationResult{ Status: merr.Success(), IDs: &schemapb.IDs{ @@ -583,6 +711,7 @@ func TestDeleteTask_Execute(t *testing.T) { } stream := msgstream.NewMockMsgStream(t) mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil) + mockMgr.EXPECT().getChannels(collectionID).Return(channels, nil) lb.EXPECT().Execute(mock.Anything, mock.Anything).Call.Return(func(ctx context.Context, workload CollectionWorkLoad) error { return workload.exec(ctx, 1, qn) }) @@ -607,8 +736,8 @@ func TestDeleteTask_Execute(t *testing.T) { }, nil) stream.EXPECT().Produce(mock.Anything).Return(nil) - assert.NoError(t, dt.Execute(context.Background())) - assert.Equal(t, int64(3), dt.result.DeleteCnt) + assert.NoError(t, dr.Run(ctx)) + assert.Equal(t, int64(3), dr.result.DeleteCnt) }) schema.Fields[1].IsPartitionKey = true @@ -623,11 +752,11 @@ func TestDeleteTask_Execute(t *testing.T) { defer cancel() mockMgr := NewMockChannelsMgr(t) - rc := mocks.NewMockRootCoordClient(t) qn := mocks.NewMockQueryNodeClient(t) lb := NewMockLBPolicy(t) mockCache := NewMockCache(t) + mockCache.EXPECT().GetCollectionID(mock.Anything, dbName, collectionName).Return(collectionID, nil).Maybe() mockCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return( partitionMaps, nil) mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return( @@ -635,25 +764,17 @@ func TestDeleteTask_Execute(t *testing.T) { mockCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything). Return(indexedPartitions, nil) globalMetaCache = mockCache - defer func() { globalMetaCache = nil }() + defer func() { globalMetaCache = metaCache }() - rc.EXPECT().AllocID(mock.Anything, mock.Anything).Return( - &rootcoordpb.AllocIDResponse{ - Status: merr.Success(), - ID: 0, - Count: 1, - }, nil) - allocator, err := allocator.NewIDAllocator(ctx, rc, paramtable.GetNodeID()) - allocator.Start() - assert.NoError(t, err) - - dt := deleteTask{ + dr := deleteRunner{ + queue: queue.dmQueue, chMgr: mockMgr, schema: schema, collectionID: collectionID, partitionID: int64(-1), vChannels: channels, - idAllocator: allocator, + idAllocator: idAllocator, + tsoAllocatorIns: tsoAllocator, lb: lb, partitionKeyMode: true, result: &milvuspb.MutationResult{ @@ -671,6 +792,7 @@ func TestDeleteTask_Execute(t *testing.T) { } stream := msgstream.NewMockMsgStream(t) mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil) + mockMgr.EXPECT().getChannels(collectionID).Return(channels, nil) lb.EXPECT().Execute(mock.Anything, mock.Anything).Call.Return(func(ctx context.Context, workload CollectionWorkLoad) error { return workload.exec(ctx, 1, qn) }) @@ -695,16 +817,26 @@ func TestDeleteTask_Execute(t *testing.T) { }, nil) stream.EXPECT().Produce(mock.Anything).Return(nil) - assert.NoError(t, dt.Execute(context.Background())) - assert.Equal(t, int64(3), dt.result.DeleteCnt) + assert.NoError(t, dr.Run(ctx)) + assert.Equal(t, int64(3), dr.result.DeleteCnt) }) } -func TestDeleteTask_StreamingQueryAndDelteFunc(t *testing.T) { +func TestDeleteRunner_StreamingQueryAndDelteFunc(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + collectionName := "test_delete" collectionID := int64(111) channels := []string{"test_channel"} dbName := "test_1" + tsoAllocator := &mockTsoAllocator{} + idAllocator := &mockIDAllocatorInterface{} + + queue, err := newTaskScheduler(ctx, tsoAllocator, nil) + assert.NoError(t, err) + queue.Start() + defer queue.Close() schema := &schemapb.CollectionSchema{ Name: "test_delete", @@ -737,8 +869,11 @@ func TestDeleteTask_StreamingQueryAndDelteFunc(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dt := deleteTask{ + dr := deleteRunner{ schema: schema, + queue: queue.dmQueue, + tsoAllocatorIns: tsoAllocator, + idAllocator: idAllocator, collectionID: collectionID, partitionID: int64(-1), vChannels: channels, @@ -756,9 +891,9 @@ func TestDeleteTask_StreamingQueryAndDelteFunc(t *testing.T) { Expr: "non_pk in [2, 3]", }, } - stream := msgstream.NewMockMsgStream(t) qn := mocks.NewMockQueryNodeClient(t) - queryFunc := dt.getStreamingQueryAndDelteFunc(stream, nil) + // witho out plan + queryFunc := dr.getStreamingQueryAndDelteFunc(nil) assert.Error(t, queryFunc(ctx, 1, qn)) }) @@ -766,8 +901,10 @@ func TestDeleteTask_StreamingQueryAndDelteFunc(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dt := deleteTask{ + dr := deleteRunner{ schema: schema, + tsoAllocatorIns: tsoAllocator, + idAllocator: idAllocator, collectionID: collectionID, partitionID: int64(-1), vChannels: channels, @@ -785,7 +922,6 @@ func TestDeleteTask_StreamingQueryAndDelteFunc(t *testing.T) { Expr: "non_pk in [2, 3]", }, } - stream := msgstream.NewMockMsgStream(t) qn := mocks.NewMockQueryNodeClient(t) mockCache := NewMockCache(t) @@ -794,9 +930,9 @@ func TestDeleteTask_StreamingQueryAndDelteFunc(t *testing.T) { globalMetaCache = mockCache defer func() { globalMetaCache = nil }() - plan, err := planparserv2.CreateRetrievePlan(dt.schema, dt.req.Expr) + plan, err := planparserv2.CreateRetrievePlan(dr.schema, dr.req.Expr) assert.NoError(t, err) - queryFunc := dt.getStreamingQueryAndDelteFunc(stream, plan) + queryFunc := dr.getStreamingQueryAndDelteFunc(plan) assert.Error(t, queryFunc(ctx, 1, qn)) }) @@ -804,8 +940,10 @@ func TestDeleteTask_StreamingQueryAndDelteFunc(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dt := deleteTask{ + dr := deleteRunner{ schema: schema, + tsoAllocatorIns: tsoAllocator, + idAllocator: idAllocator, collectionID: collectionID, partitionID: int64(-1), vChannels: channels, @@ -823,7 +961,6 @@ func TestDeleteTask_StreamingQueryAndDelteFunc(t *testing.T) { Expr: "non_pk in [2, 3]", }, } - stream := msgstream.NewMockMsgStream(t) qn := mocks.NewMockQueryNodeClient(t) mockCache := NewMockCache(t) @@ -836,63 +973,9 @@ func TestDeleteTask_StreamingQueryAndDelteFunc(t *testing.T) { globalMetaCache = mockCache defer func() { globalMetaCache = nil }() - plan, err := planparserv2.CreateRetrievePlan(dt.schema, dt.req.Expr) + plan, err := planparserv2.CreateRetrievePlan(dr.schema, dr.req.Expr) assert.NoError(t, err) - queryFunc := dt.getStreamingQueryAndDelteFunc(stream, plan) + queryFunc := dr.getStreamingQueryAndDelteFunc(plan) assert.Error(t, queryFunc(ctx, 1, qn)) }) } - -func TestDeleteTask_SimpleDelete(t *testing.T) { - collectionName := "test_delete" - collectionID := int64(111) - partitionName := "default" - partitionID := int64(222) - dbName := "test_1" - - schema := &schemapb.CollectionSchema{ - Name: collectionName, - Description: "", - AutoID: false, - Fields: []*schemapb.FieldSchema{ - { - FieldID: common.StartOfUserFieldID, - Name: "pk", - IsPrimaryKey: true, - DataType: schemapb.DataType_Int64, - }, - { - FieldID: common.StartOfUserFieldID + 1, - Name: "non_pk", - IsPrimaryKey: false, - DataType: schemapb.DataType_Int64, - }, - }, - } - - task := deleteTask{ - schema: schema, - collectionID: collectionID, - partitionID: partitionID, - req: &milvuspb.DeleteRequest{ - CollectionName: collectionName, - PartitionName: partitionName, - DbName: dbName, - }, - } - t.Run("get PK failed", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - expr := &planpb.Expr_TermExpr{ - TermExpr: &planpb.TermExpr{ - ColumnInfo: &planpb.ColumnInfo{ - DataType: schemapb.DataType_BinaryVector, - }, - }, - } - stream := msgstream.NewMockMsgStream(t) - err := task.simpleDelete(ctx, expr, stream) - assert.Error(t, err) - }) -} diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index 9a35b6b7ec..445d8bd6c5 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -1671,19 +1671,13 @@ func TestTask_Int64PrimaryKey(t *testing.T) { }, idAllocator: idAllocator, ctx: ctx, - result: &milvuspb.MutationResult{ - Status: merr.Success(), - IDs: nil, - SuccIndex: nil, - ErrIndex: nil, - Acknowledged: false, - InsertCnt: 0, - DeleteCnt: 0, - UpsertCnt: 0, - Timestamp: 0, + primaryKeys: &schemapb.IDs{ + IdField: &schemapb.IDs_IntId{IntId: &schemapb.LongArray{Data: []int64{0, 1}}}, }, - chMgr: chMgr, - chTicker: ticker, + chMgr: chMgr, + chTicker: ticker, + collectionID: collectionID, + vChannels: []string{"test-ch"}, } assert.NoError(t, task.OnEnqueue()) @@ -1703,51 +1697,6 @@ func TestTask_Int64PrimaryKey(t *testing.T) { assert.NoError(t, task.Execute(ctx)) assert.NoError(t, task.PostExecute(ctx)) }) - - t.Run("complex delete", func(t *testing.T) { - lb := NewMockLBPolicy(t) - task := &deleteTask{ - Condition: NewTaskCondition(ctx), - lb: lb, - req: &milvuspb.DeleteRequest{ - CollectionName: collectionName, - PartitionName: partitionName, - Expr: "int64 < 2", - }, - idAllocator: idAllocator, - ctx: ctx, - result: &milvuspb.MutationResult{ - Status: merr.Success(), - IDs: nil, - SuccIndex: nil, - ErrIndex: nil, - Acknowledged: false, - InsertCnt: 0, - DeleteCnt: 0, - UpsertCnt: 0, - Timestamp: 0, - }, - chMgr: chMgr, - chTicker: ticker, - } - lb.EXPECT().Execute(mock.Anything, mock.Anything).Return(nil) - assert.NoError(t, task.OnEnqueue()) - assert.NotNil(t, task.TraceCtx()) - - id := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) - task.SetID(id) - assert.Equal(t, id, task.ID()) - assert.Equal(t, commonpb.MsgType_Delete, task.Type()) - - ts := Timestamp(time.Now().UnixNano()) - task.SetTs(ts) - assert.Equal(t, ts, task.BeginTs()) - assert.Equal(t, ts, task.EndTs()) - - assert.NoError(t, task.PreExecute(ctx)) - assert.NoError(t, task.Execute(ctx)) - assert.NoError(t, task.PostExecute(ctx)) - }) } func TestTask_VarCharPrimaryKey(t *testing.T) { @@ -2003,19 +1952,13 @@ func TestTask_VarCharPrimaryKey(t *testing.T) { }, idAllocator: idAllocator, ctx: ctx, - result: &milvuspb.MutationResult{ - Status: merr.Success(), - IDs: nil, - SuccIndex: nil, - ErrIndex: nil, - Acknowledged: false, - InsertCnt: 0, - DeleteCnt: 0, - UpsertCnt: 0, - Timestamp: 0, + chMgr: chMgr, + chTicker: ticker, + vChannels: []string{"test-channel"}, + primaryKeys: &schemapb.IDs{ + IdField: &schemapb.IDs_StrId{StrId: &schemapb.StringArray{Data: []string{"milvus", "test"}}}, }, - chMgr: chMgr, - chTicker: ticker, + collectionID: collectionID, } assert.NoError(t, task.OnEnqueue()) @@ -3432,24 +3375,15 @@ func TestPartitionKey(t *testing.T) { Expr: "int64_field in [0, 1]", }, ctx: ctx, - result: &milvuspb.MutationResult{ - Status: merr.Success(), - IDs: nil, - SuccIndex: nil, - ErrIndex: nil, - Acknowledged: false, - InsertCnt: 0, - DeleteCnt: 0, - UpsertCnt: 0, - Timestamp: 0, + primaryKeys: &schemapb.IDs{ + IdField: &schemapb.IDs_IntId{IntId: &schemapb.LongArray{Data: []int64{0, 1}}}, }, - idAllocator: idAllocator, - chMgr: chMgr, - chTicker: ticker, + idAllocator: idAllocator, + chMgr: chMgr, + chTicker: ticker, + collectionID: collectionID, + vChannels: []string{"test-channel"}, } - // don't support specify partition name if use partition key - dt.req.PartitionName = partitionNames[0] - assert.Error(t, dt.PreExecute(ctx)) dt.req.PartitionName = "" assert.NoError(t, dt.PreExecute(ctx)) diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 53a458f100..e5fc96ba59 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -220,7 +220,6 @@ func validatePartitionTag(partitionTag string, strictCheck bool) error { msg := invalidMsg + "Partition name should not be empty." return errors.New(msg) } - if len(partitionTag) > Params.ProxyCfg.MaxNameLength.GetAsInt() { msg := invalidMsg + "The length of a partition name must be less than " + Params.ProxyCfg.MaxNameLength.GetValue() + " characters." return errors.New(msg) @@ -1367,6 +1366,15 @@ func isPartitionKeyMode(ctx context.Context, dbName string, colName string) (boo return false, nil } +func hasParitionKeyModeField(schema *schemapb.CollectionSchema) bool { + for _, fieldSchema := range schema.GetFields() { + if fieldSchema.IsPartitionKey { + return true + } + } + return false +} + // getDefaultPartitionNames only used in partition key mode func getDefaultPartitionsInPartitionKeyMode(ctx context.Context, dbName string, collectionName string) ([]string, error) { partitions, err := globalMetaCache.GetPartitions(ctx, dbName, collectionName) diff --git a/internal/querynodev2/segments/retrieve.go b/internal/querynodev2/segments/retrieve.go index f4f99be45b..bfff4fd021 100644 --- a/internal/querynodev2/segments/retrieve.go +++ b/internal/querynodev2/segments/retrieve.go @@ -101,12 +101,14 @@ func retrieveOnSegmentsWithStream(ctx context.Context, segments []Segment, segTy return } - if err = svr.Send(&internalpb.RetrieveResults{ - Status: merr.Success(), - Ids: result.GetIds(), - FieldsData: result.GetFieldsData(), - }); err != nil { - errs[i] = err + if len(result.GetOffset()) != 0 { + if err = svr.Send(&internalpb.RetrieveResults{ + Status: merr.Success(), + Ids: result.GetIds(), + FieldsData: result.GetFieldsData(), + }); err != nil { + errs[i] = err + } } errs[i] = nil diff --git a/pkg/go.mod b/pkg/go.mod index 245cdde9d9..fbc9fbcc33 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -14,7 +14,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.16.5 github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 - github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c + github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231220103033-abd0d12ba669 github.com/nats-io/nats-server/v2 v2.9.17 github.com/nats-io/nats.go v1.24.0 github.com/panjf2000/ants/v2 v2.7.2 diff --git a/pkg/go.sum b/pkg/go.sum index 95976e7cd2..9d57db5a93 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -479,8 +479,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c h1:Wbc2IZt/13+B5jc8JPU/dOxGYy+1jeOsChVgcza+qgw= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231220103033-abd0d12ba669 h1:yUtc+pVKVhmmnwTY9iyV8+EmhrNjZ74Hxm3y5QKCNyg= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231220103033-abd0d12ba669/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A= github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=