diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index 5d15fad49f..abd4b714d7 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -332,3 +332,16 @@ func (c *Client) Delete(ctx context.Context, req *querypb.DeleteRequest, _ ...gr return client.Delete(ctx, req) }) } + +// DeleteBatch is the API to apply same delete data into multiple segments. +// it's basically same as `Delete` but cost less memory pressure. +func (c *Client) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest, _ ...grpc.CallOption) (*querypb.DeleteBatchResponse, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(c.nodeID), + ) + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*querypb.DeleteBatchResponse, error) { + return client.DeleteBatch(ctx, req) + }) +} diff --git a/internal/distributed/querynode/client/client_test.go b/internal/distributed/querynode/client/client_test.go index fb6857c44f..e24a8b59a2 100644 --- a/internal/distributed/querynode/client/client_test.go +++ b/internal/distributed/querynode/client/client_test.go @@ -108,6 +108,9 @@ func Test_NewClient(t *testing.T) { r20, err := client.SearchSegments(ctx, nil) retCheck(retNotNil, r20, err) + r21, err := client.DeleteBatch(ctx, nil) + retCheck(retNotNil, r21, err) + // stream rpc client, err := client.QueryStream(ctx, nil) retCheck(retNotNil, client, err) diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index e66884681a..31a3074b12 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -383,3 +383,9 @@ func (s *Server) SyncDistribution(ctx context.Context, req *querypb.SyncDistribu func (s *Server) Delete(ctx context.Context, req *querypb.DeleteRequest) (*commonpb.Status, error) { return s.querynode.Delete(ctx, req) } + +// DeleteBatch is the API to apply same delete data into multiple segments. +// it's basically same as `Delete` but cost less memory pressure. +func (s *Server) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) { + return s.querynode.DeleteBatch(ctx, req) +} diff --git a/internal/distributed/querynode/service_test.go b/internal/distributed/querynode/service_test.go index caa69c14e6..bfedc5ad58 100644 --- a/internal/distributed/querynode/service_test.go +++ b/internal/distributed/querynode/service_test.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -270,6 +271,15 @@ func Test_NewServer(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) + t.Run("DeleteBatch", func(t *testing.T) { + mockQN.EXPECT().DeleteBatch(mock.Anything, mock.Anything).Return(&querypb.DeleteBatchResponse{ + Status: merr.Success(), + }, nil) + + resp, err := server.DeleteBatch(ctx, &querypb.DeleteBatchRequest{}) + assert.NoError(t, merr.CheckRPCCall(resp, err)) + }) + err = server.Stop() assert.NoError(t, err) } diff --git a/internal/mocks/mock_querynode.go b/internal/mocks/mock_querynode.go index 451f651a7e..a8c32577fa 100644 --- a/internal/mocks/mock_querynode.go +++ b/internal/mocks/mock_querynode.go @@ -89,6 +89,65 @@ func (_c *MockQueryNode_Delete_Call) RunAndReturn(run func(context.Context, *que return _c } +// DeleteBatch provides a mock function with given fields: _a0, _a1 +func (_m *MockQueryNode) DeleteBatch(_a0 context.Context, _a1 *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for DeleteBatch") + } + + var r0 *querypb.DeleteBatchResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) *querypb.DeleteBatchResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*querypb.DeleteBatchResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *querypb.DeleteBatchRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryNode_DeleteBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteBatch' +type MockQueryNode_DeleteBatch_Call struct { + *mock.Call +} + +// DeleteBatch is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *querypb.DeleteBatchRequest +func (_e *MockQueryNode_Expecter) DeleteBatch(_a0 interface{}, _a1 interface{}) *MockQueryNode_DeleteBatch_Call { + return &MockQueryNode_DeleteBatch_Call{Call: _e.mock.On("DeleteBatch", _a0, _a1)} +} + +func (_c *MockQueryNode_DeleteBatch_Call) Run(run func(_a0 context.Context, _a1 *querypb.DeleteBatchRequest)) *MockQueryNode_DeleteBatch_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*querypb.DeleteBatchRequest)) + }) + return _c +} + +func (_c *MockQueryNode_DeleteBatch_Call) Return(_a0 *querypb.DeleteBatchResponse, _a1 error) *MockQueryNode_DeleteBatch_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryNode_DeleteBatch_Call) RunAndReturn(run func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)) *MockQueryNode_DeleteBatch_Call { + _c.Call.Return(run) + return _c +} + // GetAddress provides a mock function with given fields: func (_m *MockQueryNode) GetAddress() string { ret := _m.Called() diff --git a/internal/mocks/mock_querynode_client.go b/internal/mocks/mock_querynode_client.go index fcd8ea591d..3b3d465610 100644 --- a/internal/mocks/mock_querynode_client.go +++ b/internal/mocks/mock_querynode_client.go @@ -150,6 +150,80 @@ func (_c *MockQueryNodeClient_Delete_Call) RunAndReturn(run func(context.Context return _c } +// DeleteBatch provides a mock function with given fields: ctx, in, opts +func (_m *MockQueryNodeClient) DeleteBatch(ctx context.Context, in *querypb.DeleteBatchRequest, opts ...grpc.CallOption) (*querypb.DeleteBatchResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for DeleteBatch") + } + + var r0 *querypb.DeleteBatchResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest, ...grpc.CallOption) (*querypb.DeleteBatchResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest, ...grpc.CallOption) *querypb.DeleteBatchResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*querypb.DeleteBatchResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *querypb.DeleteBatchRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryNodeClient_DeleteBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteBatch' +type MockQueryNodeClient_DeleteBatch_Call struct { + *mock.Call +} + +// DeleteBatch is a helper method to define mock.On call +// - ctx context.Context +// - in *querypb.DeleteBatchRequest +// - opts ...grpc.CallOption +func (_e *MockQueryNodeClient_Expecter) DeleteBatch(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryNodeClient_DeleteBatch_Call { + return &MockQueryNodeClient_DeleteBatch_Call{Call: _e.mock.On("DeleteBatch", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockQueryNodeClient_DeleteBatch_Call) Run(run func(ctx context.Context, in *querypb.DeleteBatchRequest, opts ...grpc.CallOption)) *MockQueryNodeClient_DeleteBatch_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*querypb.DeleteBatchRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockQueryNodeClient_DeleteBatch_Call) Return(_a0 *querypb.DeleteBatchResponse, _a1 error) *MockQueryNodeClient_DeleteBatch_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryNodeClient_DeleteBatch_Call) RunAndReturn(run func(context.Context, *querypb.DeleteBatchRequest, ...grpc.CallOption) (*querypb.DeleteBatchResponse, error)) *MockQueryNodeClient_DeleteBatch_Call { + _c.Call.Return(run) + return _c +} + // GetComponentStates provides a mock function with given fields: ctx, in, opts func (_m *MockQueryNodeClient) GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error) { _va := make([]interface{}, len(opts)) diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index ef5c8d8d1b..8a5d688c89 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -172,6 +172,10 @@ service QueryNode { } rpc Delete(DeleteRequest) returns (common.Status) { } + // DeleteBatch is the API to apply same delete data into multiple segments. + // it's basically same as `Delete` but cost less memory pressure. + rpc DeleteBatch(DeleteBatchRequest) returns (DeleteBatchResponse) { + } } // --------------------QueryCoord grpc request and response proto------------------ @@ -777,6 +781,25 @@ message DeleteRequest { DataScope scope = 8; } +message DeleteBatchRequest { + common.MsgBase base = 1; + int64 collection_id = 2; + int64 partition_id = 3; + string vchannel_name = 4; + repeated int64 segment_ids = 5; + schema.IDs primary_keys = 6; + repeated uint64 timestamps = 7; + DataScope scope = 8; +} + +// DeleteBatchResponse returns failed/missing segment ids +// cannot just using common.Status to handle partial failure logic +message DeleteBatchResponse { + common.Status status = 1; + repeated int64 failed_ids = 2; + repeated int64 missing_ids = 3; +} + message ActivateCheckerRequest { common.MsgBase base = 1; int32 checkerID = 2; diff --git a/internal/querycoordv2/mocks/mock_querynode.go b/internal/querycoordv2/mocks/mock_querynode.go index 4bf70286ac..c161c66309 100644 --- a/internal/querycoordv2/mocks/mock_querynode.go +++ b/internal/querycoordv2/mocks/mock_querynode.go @@ -88,6 +88,65 @@ func (_c *MockQueryNodeServer_Delete_Call) RunAndReturn(run func(context.Context return _c } +// DeleteBatch provides a mock function with given fields: _a0, _a1 +func (_m *MockQueryNodeServer) DeleteBatch(_a0 context.Context, _a1 *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for DeleteBatch") + } + + var r0 *querypb.DeleteBatchResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) *querypb.DeleteBatchResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*querypb.DeleteBatchResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *querypb.DeleteBatchRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryNodeServer_DeleteBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteBatch' +type MockQueryNodeServer_DeleteBatch_Call struct { + *mock.Call +} + +// DeleteBatch is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *querypb.DeleteBatchRequest +func (_e *MockQueryNodeServer_Expecter) DeleteBatch(_a0 interface{}, _a1 interface{}) *MockQueryNodeServer_DeleteBatch_Call { + return &MockQueryNodeServer_DeleteBatch_Call{Call: _e.mock.On("DeleteBatch", _a0, _a1)} +} + +func (_c *MockQueryNodeServer_DeleteBatch_Call) Run(run func(_a0 context.Context, _a1 *querypb.DeleteBatchRequest)) *MockQueryNodeServer_DeleteBatch_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*querypb.DeleteBatchRequest)) + }) + return _c +} + +func (_c *MockQueryNodeServer_DeleteBatch_Call) Return(_a0 *querypb.DeleteBatchResponse, _a1 error) *MockQueryNodeServer_DeleteBatch_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryNodeServer_DeleteBatch_Call) RunAndReturn(run func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)) *MockQueryNodeServer_DeleteBatch_Call { + _c.Call.Return(run) + return _c +} + // GetComponentStates provides a mock function with given fields: _a0, _a1 func (_m *MockQueryNodeServer) GetComponentStates(_a0 context.Context, _a1 *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/querynodev2/cluster/mock_worker.go b/internal/querynodev2/cluster/mock_worker.go index 0eb5104b90..b9c6f27238 100644 --- a/internal/querynodev2/cluster/mock_worker.go +++ b/internal/querynodev2/cluster/mock_worker.go @@ -73,6 +73,65 @@ func (_c *MockWorker_Delete_Call) RunAndReturn(run func(context.Context, *queryp return _c } +// DeleteBatch provides a mock function with given fields: ctx, req +func (_m *MockWorker) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) { + ret := _m.Called(ctx, req) + + if len(ret) == 0 { + panic("no return value specified for DeleteBatch") + } + + var r0 *querypb.DeleteBatchResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)); ok { + return rf(ctx, req) + } + if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) *querypb.DeleteBatchResponse); ok { + r0 = rf(ctx, req) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*querypb.DeleteBatchResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *querypb.DeleteBatchRequest) error); ok { + r1 = rf(ctx, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockWorker_DeleteBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteBatch' +type MockWorker_DeleteBatch_Call struct { + *mock.Call +} + +// DeleteBatch is a helper method to define mock.On call +// - ctx context.Context +// - req *querypb.DeleteBatchRequest +func (_e *MockWorker_Expecter) DeleteBatch(ctx interface{}, req interface{}) *MockWorker_DeleteBatch_Call { + return &MockWorker_DeleteBatch_Call{Call: _e.mock.On("DeleteBatch", ctx, req)} +} + +func (_c *MockWorker_DeleteBatch_Call) Run(run func(ctx context.Context, req *querypb.DeleteBatchRequest)) *MockWorker_DeleteBatch_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*querypb.DeleteBatchRequest)) + }) + return _c +} + +func (_c *MockWorker_DeleteBatch_Call) Return(_a0 *querypb.DeleteBatchResponse, _a1 error) *MockWorker_DeleteBatch_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockWorker_DeleteBatch_Call) RunAndReturn(run func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)) *MockWorker_DeleteBatch_Call { + _c.Call.Return(run) + return _c +} + // GetStatistics provides a mock function with given fields: ctx, req func (_m *MockWorker) GetStatistics(ctx context.Context, req *querypb.GetStatisticsRequest) (*internalpb.GetStatisticsResponse, error) { ret := _m.Called(ctx, req) diff --git a/internal/querynodev2/cluster/worker.go b/internal/querynodev2/cluster/worker.go index 9548b05b3a..47752e81df 100644 --- a/internal/querynodev2/cluster/worker.go +++ b/internal/querynodev2/cluster/worker.go @@ -39,6 +39,7 @@ type Worker interface { LoadSegments(context.Context, *querypb.LoadSegmentsRequest) error ReleaseSegments(context.Context, *querypb.ReleaseSegmentsRequest) error Delete(ctx context.Context, req *querypb.DeleteRequest) error + DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) QueryStreamSegments(ctx context.Context, req *querypb.QueryRequest, srv streamrpc.QueryStreamServer) error @@ -141,6 +142,52 @@ func (w *remoteWorker) Delete(ctx context.Context, req *querypb.DeleteRequest) e return nil } +func (w *remoteWorker) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) { + log := log.Ctx(ctx).With( + zap.Int64("workerID", req.GetBase().GetTargetID()), + ) + client := w.getClient() + resp, err := client.DeleteBatch(ctx, req) + if err := merr.CheckRPCCall(resp, err); err != nil { + if errors.Is(err, merr.ErrServiceUnimplemented) { + log.Warn("invoke legacy querynode DeleteBatch method, fallback to ") + return w.splitDeleteBatch(ctx, req) + } + return nil, err + } + return resp, nil +} + +func (w *remoteWorker) splitDeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) { + sReq := &querypb.DeleteRequest{ + CollectionId: req.GetCollectionId(), + PartitionId: req.GetPartitionId(), + VchannelName: req.GetVchannelName(), + PrimaryKeys: req.GetPrimaryKeys(), + Timestamps: req.GetTimestamps(), + Scope: req.GetScope(), + } + // do fallback without parallel, to protect the mem limit + var missingIDs []int64 + var failedIDs []int64 + for _, segmentID := range req.GetSegmentIds() { + sReq.SegmentId = segmentID + err := w.Delete(ctx, sReq) + switch { + case errors.Is(err, merr.ErrSegmentNotFound): + missingIDs = append(missingIDs, segmentID) + case err != nil: + failedIDs = append(failedIDs, segmentID) + default: + } + } + return &querypb.DeleteBatchResponse{ + Status: merr.Success(), + FailedIds: failedIDs, + MissingIds: missingIDs, + }, nil +} + func (w *remoteWorker) SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) { client := w.getClient() ret, err := client.SearchSegments(ctx, req) diff --git a/internal/querynodev2/cluster/worker_test.go b/internal/querynodev2/cluster/worker_test.go index 084fa3ead1..d224a14d0a 100644 --- a/internal/querynodev2/cluster/worker_test.go +++ b/internal/querynodev2/cluster/worker_test.go @@ -193,6 +193,70 @@ func (s *RemoteWorkerSuite) TestDelete() { }) } +func (s *RemoteWorkerSuite) TestDeleteBatch() { + s.Run("normal_run", func() { + defer func() { s.mockClient.ExpectedCalls = nil }() + + s.mockClient.EXPECT().DeleteBatch(mock.Anything, mock.AnythingOfType("*querypb.DeleteBatchRequest")). + Return(&querypb.DeleteBatchResponse{Status: merr.Success()}, nil).Once() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + resp, err := s.worker.DeleteBatch(ctx, &querypb.DeleteBatchRequest{ + SegmentIds: []int64{100, 200}, + }) + s.NoError(merr.CheckRPCCall(resp, err)) + }) + + s.Run("client_return_error", func() { + defer func() { s.mockClient.ExpectedCalls = nil }() + + s.mockClient.EXPECT().DeleteBatch(mock.Anything, mock.AnythingOfType("*querypb.DeleteBatchRequest")). + Return(nil, merr.WrapErrServiceInternal("mocked")).Once() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + resp, err := s.worker.DeleteBatch(ctx, &querypb.DeleteBatchRequest{ + SegmentIds: []int64{100, 200}, + }) + + s.Error(merr.CheckRPCCall(resp, err)) + }) + + s.Run("client_return_fail_status", func() { + defer func() { s.mockClient.ExpectedCalls = nil }() + + s.mockClient.EXPECT().DeleteBatch(mock.Anything, mock.AnythingOfType("*querypb.DeleteBatchRequest")). + Return(&querypb.DeleteBatchResponse{ + Status: merr.Status(merr.WrapErrServiceUnavailable("mocked")), + }, nil).Once() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + resp, err := s.worker.DeleteBatch(ctx, &querypb.DeleteBatchRequest{ + SegmentIds: []int64{100, 200}, + }) + + s.Error(merr.CheckRPCCall(resp, err)) + }) + + s.Run("batch_delete_unimplemented", func() { + defer func() { s.mockClient.ExpectedCalls = nil }() + + s.mockClient.EXPECT().DeleteBatch(mock.Anything, mock.AnythingOfType("*querypb.DeleteBatchRequest")). + Return(nil, merr.WrapErrServiceUnimplemented(status.Errorf(codes.Unimplemented, "mocked grpc unimplemented"))) + s.mockClient.EXPECT().Delete(mock.Anything, mock.Anything).Return(merr.Success(), nil).Times(2) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + resp, err := s.worker.DeleteBatch(ctx, &querypb.DeleteBatchRequest{ + SegmentIds: []int64{100, 200}, + }) + + s.NoError(merr.CheckRPCCall(resp, err)) + }) +} + func (s *RemoteWorkerSuite) TestSearch() { s.Run("normal_run", func() { defer func() { s.mockClient.ExpectedCalls = nil }() diff --git a/internal/querynodev2/delegator/delta_forward.go b/internal/querynodev2/delegator/delta_forward.go index 63220a8122..af21af0d83 100644 --- a/internal/querynodev2/delegator/delta_forward.go +++ b/internal/querynodev2/delegator/delta_forward.go @@ -19,8 +19,10 @@ package delegator import ( "context" "fmt" + "runtime" "time" + "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -36,8 +38,10 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -276,50 +280,43 @@ func (sd *shardDelegator) forwardStreamingDirect(ctx context.Context, deleteData sealed, growing, version := sd.distribution.PinOnlineSegments(partitions...) defer sd.distribution.Unpin(version) - for _, item := range group { - deleteData := *item - for _, entry := range sealed { - entry := entry - worker, err := sd.workerManager.GetWorker(ctx, entry.NodeID) - if err != nil { - log.Warn("failed to get worker", - zap.Int64("nodeID", entry.NodeID), - zap.Error(err), - ) - // skip if node down - // delete will be processed after loaded again - continue - } - // forward to non level0 segment only - segments := lo.Filter(entry.Segments, func(segmentEntry SegmentEntry, _ int) bool { - return segmentEntry.Level != datapb.SegmentLevel_L0 - }) - - eg.Go(func() error { - offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, func(segmentID int64) (DeleteData, bool) { - return deleteData, true - }, segments, querypb.DataScope_Historical)...) - return nil - }) + for _, entry := range sealed { + entry := entry + worker, err := sd.workerManager.GetWorker(ctx, entry.NodeID) + if err != nil { + log.Warn("failed to get worker", + zap.Int64("nodeID", entry.NodeID), + zap.Error(err), + ) + // skip if node down + // delete will be processed after loaded again + continue } + // forward to non level0 segment only + segments := lo.Filter(entry.Segments, func(segmentEntry SegmentEntry, _ int) bool { + return segmentEntry.Level != datapb.SegmentLevel_L0 + }) - if len(growing) > 0 { - worker, err := sd.workerManager.GetWorker(ctx, paramtable.GetNodeID()) - if err != nil { - log.Error("failed to get worker(local)", - zap.Int64("nodeID", paramtable.GetNodeID()), - zap.Error(err), - ) - // panic here, local worker shall not have error - panic(err) - } - eg.Go(func() error { - offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, func(segmentID int64) (DeleteData, bool) { - return deleteData, true - }, growing, querypb.DataScope_Streaming)...) - return nil - }) + eg.Go(func() error { + offlineSegments.Upsert(sd.applyDeleteBatch(ctx, entry.NodeID, worker, group, segments, querypb.DataScope_Historical)...) + return nil + }) + } + + if len(growing) > 0 { + worker, err := sd.workerManager.GetWorker(ctx, paramtable.GetNodeID()) + if err != nil { + log.Error("failed to get worker(local)", + zap.Int64("nodeID", paramtable.GetNodeID()), + zap.Error(err), + ) + // panic here, local worker shall not have error + panic(err) } + eg.Go(func() error { + offlineSegments.Upsert(sd.applyDeleteBatch(ctx, paramtable.GetNodeID(), worker, group, growing, querypb.DataScope_Streaming)...) + return nil + }) } return nil }) @@ -336,3 +333,72 @@ func (sd *shardDelegator) forwardStreamingDirect(ctx context.Context, deleteData metrics.QueryNodeForwardDeleteCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(forwardDeleteCost.Milliseconds())) } + +// applyDeleteBatch handles delete record and apply them to corresponding workers in batch. +func (sd *shardDelegator) applyDeleteBatch(ctx context.Context, + nodeID int64, + worker cluster.Worker, + data []*DeleteData, + entries []SegmentEntry, + scope querypb.DataScope, +) []int64 { + offlineSegments := typeutil.NewConcurrentSet[int64]() + log := sd.getLogger(ctx) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0) * 4) + defer pool.Release() + + var futures []*conc.Future[struct{}] + for _, delData := range data { + delData := delData + segmentIDs := lo.Map(entries, func(entry SegmentEntry, _ int) int64 { + return entry.SegmentID + }) + future := pool.Submit(func() (struct{}, error) { + log.Debug("delegator plan to applyDelete via worker") + err := retry.Handle(ctx, func() (bool, error) { + if sd.Stopped() { + return false, merr.WrapErrChannelNotAvailable(sd.vchannelName, "channel is unsubscribing") + } + + resp, err := worker.DeleteBatch(ctx, &querypb.DeleteBatchRequest{ + Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(nodeID)), + CollectionId: sd.collectionID, + PartitionId: delData.PartitionID, + VchannelName: sd.vchannelName, + SegmentIds: segmentIDs, + PrimaryKeys: storage.ParsePrimaryKeys2IDs(delData.PrimaryKeys), + Timestamps: delData.Timestamps, + Scope: scope, + }) + if errors.Is(err, merr.ErrNodeNotFound) { + log.Warn("try to delete data on non-exist node") + // cancel other request + cancel() + return false, err + } + // grpc/network error + if err != nil { + return true, err + } + if len(resp.GetMissingIds()) > 0 { + log.Warn("try to delete data of released segment", zap.Int64s("ids", resp.GetMissingIds())) + } + if len(resp.GetFailedIds()) > 0 { + log.Warn("apply delete for segment failed, marking it offline") + offlineSegments.Upsert(resp.GetFailedIds()...) + } + return false, nil + }, retry.Attempts(10)) + + return struct{}{}, err + }) + futures = append(futures, future) + } + + conc.AwaitAll(futures...) + return offlineSegments.Collect() +} diff --git a/internal/querynodev2/delegator/delta_forward_test.go b/internal/querynodev2/delegator/delta_forward_test.go index 9a53fbb7c2..e9661a53bc 100644 --- a/internal/querynodev2/delegator/delta_forward_test.go +++ b/internal/querynodev2/delegator/delta_forward_test.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metric" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -243,12 +244,11 @@ func (s *StreamingForwardSuite) TestDirectStreamingForward() { deletedSegment := typeutil.NewConcurrentSet[int64]() mockWorker := cluster.NewMockWorker(s.T()) s.workerManager.EXPECT().GetWorker(mock.Anything, int64(1)).Return(mockWorker, nil) - mockWorker.EXPECT().Delete(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, dr *querypb.DeleteRequest) error { - s.T().Log(dr.GetSegmentId()) - deletedSegment.Insert(dr.SegmentId) + mockWorker.EXPECT().DeleteBatch(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, dr *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) { + deletedSegment.Upsert(dr.GetSegmentIds()...) s.ElementsMatch([]int64{10}, dr.GetPrimaryKeys().GetIntId().GetData()) s.ElementsMatch([]uint64{10}, dr.GetTimestamps()) - return nil + return &querypb.DeleteBatchResponse{Status: merr.Success()}, nil }) delegator.ProcessDelete([]*DeleteData{ diff --git a/internal/querynodev2/local_worker.go b/internal/querynodev2/local_worker.go index cc05af99dd..06ab6b26f6 100644 --- a/internal/querynodev2/local_worker.go +++ b/internal/querynodev2/local_worker.go @@ -53,6 +53,10 @@ func (w *LocalWorker) Delete(ctx context.Context, req *querypb.DeleteRequest) er return merr.CheckRPCCall(status, err) } +func (w *LocalWorker) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) { + return w.node.DeleteBatch(ctx, req) +} + func (w *LocalWorker) SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) { return w.node.SearchSegments(ctx, req) } diff --git a/internal/querynodev2/segments/manager.go b/internal/querynodev2/segments/manager.go index 0a881a6dcf..4796a5f310 100644 --- a/internal/querynodev2/segments/manager.go +++ b/internal/querynodev2/segments/manager.go @@ -87,6 +87,22 @@ func (f SegmentIDFilter) SegmentIDs() ([]int64, bool) { return []int64{int64(f)}, true } +type SegmentIDsFilter struct { + segmentIDs typeutil.Set[int64] +} + +func (f SegmentIDsFilter) Filter(segment Segment) bool { + return f.segmentIDs.Contain(segment.ID()) +} + +func (f SegmentIDsFilter) SegmentType() (SegmentType, bool) { + return commonpb.SegmentState_SegmentStateNone, false +} + +func (f SegmentIDsFilter) SegmentIDs() ([]int64, bool) { + return f.segmentIDs.Collect(), true +} + type SegmentTypeFilter SegmentType func (f SegmentTypeFilter) Filter(segment Segment) bool { @@ -133,6 +149,12 @@ func WithID(id int64) SegmentFilter { return SegmentIDFilter(id) } +func WithIDs(ids ...int64) SegmentFilter { + return SegmentIDsFilter{ + segmentIDs: typeutil.NewSet(ids...), + } +} + func WithLevel(level datapb.SegmentLevel) SegmentFilter { return SegmentFilterFunc(func(segment Segment) bool { return segment.Level() == level diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 77b2d1c4ee..317127603e 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -19,6 +19,7 @@ package querynodev2 import ( "context" "fmt" + "runtime" "strconv" "sync" "time" @@ -48,6 +49,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/merr" @@ -1404,6 +1406,84 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) ( return merr.Success(), nil } +// DeleteBatch is the API to apply same delete data into multiple segments. +// it's basically same as `Delete` but cost less memory pressure. +func (node *QueryNode) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) { + log := log.Ctx(ctx).With( + zap.Int64("collectionID", req.GetCollectionId()), + zap.String("channel", req.GetVchannelName()), + zap.Int64s("segmentIDs", req.GetSegmentIds()), + zap.String("scope", req.GetScope().String()), + ) + + // check node healthy + if err := node.lifetime.Add(merr.IsHealthy); err != nil { + return &querypb.DeleteBatchResponse{ + Status: merr.Status(err), + }, nil + } + defer node.lifetime.Done() + + // log.Debug("QueryNode received worker delete detail", zap.Stringer("info", &deleteRequestStringer{DeleteRequest: req})) + + filters := []segments.SegmentFilter{ + segments.WithIDs(req.GetSegmentIds()...), + } + + // do not add filter for Unknown & All scope, for backward cap + switch req.GetScope() { + case querypb.DataScope_Historical: + filters = append(filters, segments.WithType(segments.SegmentTypeSealed)) + case querypb.DataScope_Streaming: + filters = append(filters, segments.WithType(segments.SegmentTypeGrowing)) + } + + segs := node.manager.Segment.GetBy(filters...) + + hitIDs := lo.Map(segs, func(segment segments.Segment, _ int) int64 { + return segment.ID() + }) + // calculate missing ids, continue to delete existing ones. + missingIDs := typeutil.NewSet(req.GetSegmentIds()...).Complement(typeutil.NewSet(hitIDs...)) + if missingIDs.Len() > 0 { + log.Warn("Delete batch find missing ids", zap.Int64s("missing_ids", missingIDs.Collect())) + } + + pks := storage.ParseIDs2PrimaryKeys(req.GetPrimaryKeys()) + + // control the execution batch parallel with P number + // maybe it shall be lower in case of heavy CPU usage may impacting search/query + pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0)) + futures := make([]*conc.Future[struct{}], 0, len(segs)) + errSet := typeutil.NewConcurrentSet[int64]() + + for _, segment := range segs { + segment := segment + futures = append(futures, pool.Submit(func() (struct{}, error) { + // TODO @silverxia, add interface to use same data struct for segment delete + // current implementation still copys pks into protobuf(or arrow) struct + err := segment.Delete(ctx, pks, req.GetTimestamps()) + if err != nil { + errSet.Insert(segment.ID()) + log.Warn("segment delete failed", + zap.Int64("segmentID", segment.ID()), + zap.Error(err)) + return struct{}{}, err + } + return struct{}{}, nil + })) + } + + // ignore error returned, since error segment is recorded into error set + _ = conc.AwaitAll(futures...) + + // return merr.Success(), nil + return &querypb.DeleteBatchResponse{ + Status: merr.Success(), + FailedIds: errSet.Collect(), + }, nil +} + type deleteRequestStringer struct { *querypb.DeleteRequest } diff --git a/internal/util/mock/grpc_querynode_client.go b/internal/util/mock/grpc_querynode_client.go index e20dc0d635..dadfb31578 100644 --- a/internal/util/mock/grpc_querynode_client.go +++ b/internal/util/mock/grpc_querynode_client.go @@ -130,6 +130,10 @@ func (m *GrpcQueryNodeClient) Delete(ctx context.Context, in *querypb.DeleteRequ return &commonpb.Status{}, m.Err } +func (m *GrpcQueryNodeClient) DeleteBatch(ctx context.Context, in *querypb.DeleteBatchRequest, opts ...grpc.CallOption) (*querypb.DeleteBatchResponse, error) { + return &querypb.DeleteBatchResponse{}, m.Err +} + func (m *GrpcQueryNodeClient) Close() error { return m.Err } diff --git a/internal/util/wrappers/qn_wrapper.go b/internal/util/wrappers/qn_wrapper.go index 63147c0116..def2e64f01 100644 --- a/internal/util/wrappers/qn_wrapper.go +++ b/internal/util/wrappers/qn_wrapper.go @@ -148,6 +148,10 @@ func (qn *qnServerWrapper) Delete(ctx context.Context, in *querypb.DeleteRequest return qn.QueryNode.Delete(ctx, in) } +func (qn *qnServerWrapper) DeleteBatch(ctx context.Context, in *querypb.DeleteBatchRequest, opts ...grpc.CallOption) (*querypb.DeleteBatchResponse, error) { + return qn.QueryNode.DeleteBatch(ctx, in) +} + func WrapQueryNodeServerAsClient(qn types.QueryNode) types.QueryNodeClient { return &qnServerWrapper{ QueryNode: qn, diff --git a/internal/util/wrappers/qn_wrapper_test.go b/internal/util/wrappers/qn_wrapper_test.go index 94719ee2da..9299e63058 100644 --- a/internal/util/wrappers/qn_wrapper_test.go +++ b/internal/util/wrappers/qn_wrapper_test.go @@ -246,6 +246,17 @@ func (s *QnWrapperSuite) TestDelete() { s.NoError(err) } +func (s *QnWrapperSuite) TestDeleteBatch() { + s.qn.EXPECT().DeleteBatch(mock.Anything, mock.Anything). + Return(&querypb.DeleteBatchResponse{ + Status: merr.Status(nil), + }, nil) + + resp, err := s.client.DeleteBatch(context.Background(), &querypb.DeleteBatchRequest{}) + err = merr.CheckRPCCall(resp, err) + s.NoError(err) +} + // Race caused by mock parameter check on once /* func (s *QnWrapperSuite) TestQueryStream() {