From 93bc805933590ec7cbd77571a7f87cbcc0c48e57 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 23 Mar 2023 19:43:57 +0800 Subject: [PATCH] Enhance ID allocator in DataNode (#22905) Signed-off-by: yangxuan --- Makefile | 2 + internal/allocator/id_allocator.go | 14 + internal/allocator/id_allocator_test.go | 14 + internal/datanode/allocator.go | 104 ----- internal/datanode/allocator/allocator.go | 81 ++++ internal/datanode/allocator/allocator_test.go | 92 +++++ internal/datanode/allocator/mock_allocator.go | 278 +++++++++++++ internal/datanode/allocator_test.go | 104 ----- internal/datanode/binlog_io.go | 31 +- internal/datanode/binlog_io_test.go | 386 +++++++++--------- internal/datanode/compactor.go | 25 +- internal/datanode/compactor_test.go | 32 +- internal/datanode/data_node.go | 30 +- internal/datanode/data_node_test.go | 1 - internal/datanode/data_sync_service.go | 11 +- internal/datanode/data_sync_service_test.go | 25 +- internal/datanode/flow_graph_delete_node.go | 3 +- .../datanode/flow_graph_delete_node_test.go | 31 +- .../datanode/flow_graph_insert_buffer_node.go | 3 +- .../flow_graph_insert_buffer_node_test.go | 41 +- internal/datanode/flow_graph_manager.go | 9 +- internal/datanode/flush_manager.go | 19 +- internal/datanode/flush_manager_test.go | 17 +- internal/datanode/mock_test.go | 54 --- internal/datanode/services.go | 6 +- internal/datanode/services_test.go | 13 + .../querycoordv2/balance/mock_balancer.go | 8 +- internal/querycoordv2/meta/mock_broker.go | 24 +- internal/querycoordv2/meta/mock_store.go | 2 +- internal/querycoordv2/mocks/mock_querynode.go | 2 +- internal/querycoordv2/session/mock_cluster.go | 2 +- internal/querycoordv2/task/mock_scheduler.go | 16 +- internal/querynode/mock_tsafe_replica_test.go | 12 +- internal/rootcoord/mocks/garbage_collector.go | 22 +- internal/rootcoord/mocks/meta_table.go | 172 ++++---- internal/types/mock_querycoord.go | 2 +- internal/types/mock_querynode.go | 2 +- 37 files changed, 969 insertions(+), 721 deletions(-) delete mode 100644 internal/datanode/allocator.go create mode 100644 internal/datanode/allocator/allocator.go create mode 100644 internal/datanode/allocator/allocator_test.go create mode 100644 internal/datanode/allocator/mock_allocator.go delete mode 100644 internal/datanode/allocator_test.go diff --git a/Makefile b/Makefile index 57d958a68f..14750445dd 100644 --- a/Makefile +++ b/Makefile @@ -347,6 +347,8 @@ mock-tnx-kv: mockery --name=TxnKV --dir=$(PWD)/internal/kv --output=$(PWD)/internal/kv/mocks --filename=TxnKV.go --with-expecter generate-mockery: getdeps + # internal/datanode + $(PWD)/bin/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator/ --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage # internal/querycoordv2 $(PWD)/bin/mockery --name=QueryNodeServer --dir=$(PWD)/internal/proto/querypb/ --output=$(PWD)/internal/querycoordv2/mocks --filename=mock_querynode.go --with-expecter --structname=MockQueryNodeServer $(PWD)/bin/mockery --name=Broker --dir=$(PWD)/internal/querycoordv2/meta --output=$(PWD)/internal/querycoordv2/meta --filename=mock_broker.go --with-expecter --structname=MockBroker --outpkg=meta diff --git a/internal/allocator/id_allocator.go b/internal/allocator/id_allocator.go index 941d07ebf6..99dba1f560 100644 --- a/internal/allocator/id_allocator.go +++ b/internal/allocator/id_allocator.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "github.com/cockroachdb/errors" "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/util/commonpbutil" @@ -151,6 +152,9 @@ func (ia *IDAllocator) AllocOne() (UniqueID, error) { // Alloc allocates the id of the count number. func (ia *IDAllocator) Alloc(count uint32) (UniqueID, UniqueID, error) { + if ia.closed() { + return 0, 0, errors.New("fail to allocate ID, closed allocator") + } req := &IDRequest{BaseRequest: BaseRequest{Done: make(chan error), Valid: false}} req.count = count @@ -162,3 +166,13 @@ func (ia *IDAllocator) Alloc(count uint32) (UniqueID, UniqueID, error) { start, count := req.id, req.count return start, start + int64(count), nil } + +// preventing alloc from a closed allocator stucking forever +func (ia *IDAllocator) closed() bool { + select { + case <-ia.Ctx.Done(): + return true + default: + return false + } +} diff --git a/internal/allocator/id_allocator_test.go b/internal/allocator/id_allocator_test.go index 9f9d1df221..03c47006d1 100644 --- a/internal/allocator/id_allocator_test.go +++ b/internal/allocator/id_allocator_test.go @@ -23,6 +23,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type mockIDAllocator struct { @@ -65,3 +66,16 @@ func TestIDAllocator(t *testing.T) { assert.Nil(t, err) assert.Equal(t, id, int64(20002)) } + +func TestIDAllocatorClose(t *testing.T) { + a, err := NewIDAllocator(context.TODO(), newMockIDAllocator(), 1) + require.NoError(t, err) + + err = a.Start() + assert.NoError(t, err) + + a.Close() + + _, _, err = a.Alloc(10) + assert.Error(t, err) +} diff --git a/internal/datanode/allocator.go b/internal/datanode/allocator.go deleted file mode 100644 index 924655b30b..0000000000 --- a/internal/datanode/allocator.go +++ /dev/null @@ -1,104 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package datanode - -import ( - "context" - - "github.com/cockroachdb/errors" - - "github.com/milvus-io/milvus/internal/util/commonpbutil" - "github.com/milvus-io/milvus/internal/util/metautil" - "github.com/milvus-io/milvus/internal/util/paramtable" - - "github.com/milvus-io/milvus-proto/go-api/commonpb" - "github.com/milvus-io/milvus/internal/proto/rootcoordpb" - "github.com/milvus-io/milvus/internal/types" -) - -type allocatorInterface interface { - allocID() (UniqueID, error) - allocIDBatch(count uint32) (UniqueID, uint32, error) - genKey(ids ...UniqueID) (key string, err error) -} - -type allocator struct { - rootCoord types.RootCoord -} - -// check if allocator implements allocatorInterface -var _ allocatorInterface = &allocator{} - -func newAllocator(s types.RootCoord) *allocator { - return &allocator{ - rootCoord: s, - } -} - -// allocID allocates one ID from rootCoord -func (alloc *allocator) allocID() (UniqueID, error) { - ctx := context.TODO() - resp, err := alloc.rootCoord.AllocID(ctx, &rootcoordpb.AllocIDRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_RequestID), - commonpbutil.WithMsgID(1), // GOOSE TODO - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - Count: 1, - }) - if err != nil { - return 0, err - } - - if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return 0, errors.New(resp.GetStatus().GetReason()) - } - - return resp.ID, nil -} - -// allocIDBatch allocates IDs in batch from rootCoord -func (alloc *allocator) allocIDBatch(count uint32) (UniqueID, uint32, error) { - ctx := context.Background() - resp, err := alloc.rootCoord.AllocID(ctx, &rootcoordpb.AllocIDRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_RequestID), - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - Count: count, - }) - - if err != nil { - return 0, 0, err - } - - if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return 0, 0, errors.New(resp.GetStatus().GetReason()) - } - - return resp.GetID(), resp.GetCount(), nil -} - -// genKey gives a valid key string for lists of UniqueIDs: -func (alloc *allocator) genKey(ids ...UniqueID) (string, error) { - idx, err := alloc.allocID() - if err != nil { - return "", err - } - ids = append(ids, idx) - return metautil.JoinIDPath(ids...), nil -} diff --git a/internal/datanode/allocator/allocator.go b/internal/datanode/allocator/allocator.go new file mode 100644 index 0000000000..478d5238f5 --- /dev/null +++ b/internal/datanode/allocator/allocator.go @@ -0,0 +1,81 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package allocator + +import ( + "context" + + gAllocator "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +type UniqueID = typeutil.UniqueID + +type Allocator interface { + Start() error + Close() + AllocOne() (UniqueID, error) + Alloc(count uint32) (UniqueID, UniqueID, error) + GetGenerator(count int, done <-chan struct{}) (<-chan UniqueID, error) + GetIDAlloactor() *gAllocator.IDAllocator +} + +var _ Allocator = (*Impl)(nil) + +type Impl struct { + // Start() error + // Close() error + // AllocOne() (UniqueID, error) + // Alloc(count uint32) (UniqueID, UniqueID, error) + *gAllocator.IDAllocator +} + +func New(ctx context.Context, rootCoord types.RootCoord, peerID UniqueID) (Allocator, error) { + idAlloc, err := gAllocator.NewIDAllocator(ctx, rootCoord, peerID) + if err != nil { + return nil, err + } + return &Impl{idAlloc}, nil +} + +func (a *Impl) GetIDAlloactor() *gAllocator.IDAllocator { + return a.IDAllocator +} + +func (a *Impl) GetGenerator(count int, done <-chan struct{}) (<-chan UniqueID, error) { + + idStart, _, err := a.Alloc(uint32(count)) + if err != nil { + return nil, err + } + + rt := make(chan UniqueID) + go func(rt chan<- UniqueID) { + for i := 0; i < count; i++ { + select { + case <-done: + close(rt) + return + case rt <- idStart + UniqueID(i): + } + } + close(rt) + }(rt) + + return rt, nil +} diff --git a/internal/datanode/allocator/allocator_test.go b/internal/datanode/allocator/allocator_test.go new file mode 100644 index 0000000000..9cd91179fd --- /dev/null +++ b/internal/datanode/allocator/allocator_test.go @@ -0,0 +1,92 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package allocator + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/milvus-io/milvus-proto/go-api/commonpb" + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + "github.com/milvus-io/milvus/internal/types" +) + +func TestGetGenerator(t *testing.T) { + tests := []struct { + isvalid bool + innumber int + + expectedNo int + description string + }{ + {true, 1, 1, "valid input n 1"}, + {true, 3, 3, "valid input n 3 with cancel"}, + } + + for _, test := range tests { + rc := &RootCoordFactory{ID: 11111} + alloc, err := New(context.TODO(), rc, 100) + require.NoError(t, err) + err = alloc.Start() + require.NoError(t, err) + + t.Run(test.description, func(t *testing.T) { + done := make(chan struct{}) + gen, err := alloc.GetGenerator(test.innumber, done) + assert.NoError(t, err) + + r := make([]UniqueID, 0) + for i := range gen { + r = append(r, i) + } + + assert.Equal(t, test.expectedNo, len(r)) + + if test.innumber > 1 { + donedone := make(chan struct{}) + gen, err := alloc.GetGenerator(test.innumber, donedone) + assert.NoError(t, err) + + _, ok := <-gen + assert.True(t, ok) + + donedone <- struct{}{} + + _, ok = <-gen + assert.False(t, ok) + } + }) + } +} + +type RootCoordFactory struct { + types.RootCoord + ID UniqueID +} + +func (m *RootCoordFactory) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) { + resp := &rootcoordpb.AllocIDResponse{ + ID: m.ID, + Count: in.GetCount(), + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }} + return resp, nil +} diff --git a/internal/datanode/allocator/mock_allocator.go b/internal/datanode/allocator/mock_allocator.go new file mode 100644 index 0000000000..a15474a81e --- /dev/null +++ b/internal/datanode/allocator/mock_allocator.go @@ -0,0 +1,278 @@ +// Code generated by mockery v2.16.0. DO NOT EDIT. + +package allocator + +import ( + internalallocator "github.com/milvus-io/milvus/internal/allocator" + mock "github.com/stretchr/testify/mock" +) + +// MockAllocator is an autogenerated mock type for the Allocator type +type MockAllocator struct { + mock.Mock +} + +type MockAllocator_Expecter struct { + mock *mock.Mock +} + +func (_m *MockAllocator) EXPECT() *MockAllocator_Expecter { + return &MockAllocator_Expecter{mock: &_m.Mock} +} + +// Alloc provides a mock function with given fields: count +func (_m *MockAllocator) Alloc(count uint32) (int64, int64, error) { + ret := _m.Called(count) + + var r0 int64 + if rf, ok := ret.Get(0).(func(uint32) int64); ok { + r0 = rf(count) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 int64 + if rf, ok := ret.Get(1).(func(uint32) int64); ok { + r1 = rf(count) + } else { + r1 = ret.Get(1).(int64) + } + + var r2 error + if rf, ok := ret.Get(2).(func(uint32) error); ok { + r2 = rf(count) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// MockAllocator_Alloc_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Alloc' +type MockAllocator_Alloc_Call struct { + *mock.Call +} + +// Alloc is a helper method to define mock.On call +// - count uint32 +func (_e *MockAllocator_Expecter) Alloc(count interface{}) *MockAllocator_Alloc_Call { + return &MockAllocator_Alloc_Call{Call: _e.mock.On("Alloc", count)} +} + +func (_c *MockAllocator_Alloc_Call) Run(run func(count uint32)) *MockAllocator_Alloc_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(uint32)) + }) + return _c +} + +func (_c *MockAllocator_Alloc_Call) Return(_a0 int64, _a1 int64, _a2 error) *MockAllocator_Alloc_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +// AllocOne provides a mock function with given fields: +func (_m *MockAllocator) AllocOne() (int64, error) { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockAllocator_AllocOne_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocOne' +type MockAllocator_AllocOne_Call struct { + *mock.Call +} + +// AllocOne is a helper method to define mock.On call +func (_e *MockAllocator_Expecter) AllocOne() *MockAllocator_AllocOne_Call { + return &MockAllocator_AllocOne_Call{Call: _e.mock.On("AllocOne")} +} + +func (_c *MockAllocator_AllocOne_Call) Run(run func()) *MockAllocator_AllocOne_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockAllocator_AllocOne_Call) Return(_a0 int64, _a1 error) *MockAllocator_AllocOne_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// Close provides a mock function with given fields: +func (_m *MockAllocator) Close() { + _m.Called() +} + +// MockAllocator_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockAllocator_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockAllocator_Expecter) Close() *MockAllocator_Close_Call { + return &MockAllocator_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockAllocator_Close_Call) Run(run func()) *MockAllocator_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockAllocator_Close_Call) Return() *MockAllocator_Close_Call { + _c.Call.Return() + return _c +} + +// GetGenerator provides a mock function with given fields: count, done +func (_m *MockAllocator) GetGenerator(count int, done <-chan struct{}) (<-chan int64, error) { + ret := _m.Called(count, done) + + var r0 <-chan int64 + if rf, ok := ret.Get(0).(func(int, <-chan struct{}) <-chan int64); ok { + r0 = rf(count, done) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan int64) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(int, <-chan struct{}) error); ok { + r1 = rf(count, done) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockAllocator_GetGenerator_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetGenerator' +type MockAllocator_GetGenerator_Call struct { + *mock.Call +} + +// GetGenerator is a helper method to define mock.On call +// - count int +// - done <-chan struct{} +func (_e *MockAllocator_Expecter) GetGenerator(count interface{}, done interface{}) *MockAllocator_GetGenerator_Call { + return &MockAllocator_GetGenerator_Call{Call: _e.mock.On("GetGenerator", count, done)} +} + +func (_c *MockAllocator_GetGenerator_Call) Run(run func(count int, done <-chan struct{})) *MockAllocator_GetGenerator_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int), args[1].(<-chan struct{})) + }) + return _c +} + +func (_c *MockAllocator_GetGenerator_Call) Return(_a0 <-chan int64, _a1 error) *MockAllocator_GetGenerator_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// GetIDAlloactor provides a mock function with given fields: +func (_m *MockAllocator) GetIDAlloactor() *internalallocator.IDAllocator { + ret := _m.Called() + + var r0 *internalallocator.IDAllocator + if rf, ok := ret.Get(0).(func() *internalallocator.IDAllocator); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*internalallocator.IDAllocator) + } + } + + return r0 +} + +// MockAllocator_GetIDAlloactor_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetIDAlloactor' +type MockAllocator_GetIDAlloactor_Call struct { + *mock.Call +} + +// GetIDAlloactor is a helper method to define mock.On call +func (_e *MockAllocator_Expecter) GetIDAlloactor() *MockAllocator_GetIDAlloactor_Call { + return &MockAllocator_GetIDAlloactor_Call{Call: _e.mock.On("GetIDAlloactor")} +} + +func (_c *MockAllocator_GetIDAlloactor_Call) Run(run func()) *MockAllocator_GetIDAlloactor_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockAllocator_GetIDAlloactor_Call) Return(_a0 *internalallocator.IDAllocator) *MockAllocator_GetIDAlloactor_Call { + _c.Call.Return(_a0) + return _c +} + +// Start provides a mock function with given fields: +func (_m *MockAllocator) Start() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockAllocator_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start' +type MockAllocator_Start_Call struct { + *mock.Call +} + +// Start is a helper method to define mock.On call +func (_e *MockAllocator_Expecter) Start() *MockAllocator_Start_Call { + return &MockAllocator_Start_Call{Call: _e.mock.On("Start")} +} + +func (_c *MockAllocator_Start_Call) Run(run func()) *MockAllocator_Start_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockAllocator_Start_Call) Return(_a0 error) *MockAllocator_Start_Call { + _c.Call.Return(_a0) + return _c +} + +type mockConstructorTestingTNewMockAllocator interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockAllocator creates a new instance of MockAllocator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockAllocator(t mockConstructorTestingTNewMockAllocator) *MockAllocator { + mock := &MockAllocator{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datanode/allocator_test.go b/internal/datanode/allocator_test.go deleted file mode 100644 index e69d41e0f8..0000000000 --- a/internal/datanode/allocator_test.go +++ /dev/null @@ -1,104 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package datanode - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestAllocator_Basic(t *testing.T) { - ms := &RootCoordFactory{} - allocator := newAllocator(ms) - - t.Run("Test allocID", func(t *testing.T) { - ms.setID(666) - _, err := allocator.allocID() - assert.NoError(t, err) - - ms.setID(-1) - _, err = allocator.allocID() - assert.Error(t, err) - }) - - t.Run("Test alloc ID batch", func(t *testing.T) { - // If id == 0, AllocID will return not successful status - // If id == -1, AllocID will return err with nil status - ms.setID(666) - _, count, err := allocator.allocIDBatch(10) - assert.NoError(t, err) - assert.EqualValues(t, 10, count) - - ms.setID(0) - _, _, err = allocator.allocIDBatch(10) - assert.Error(t, err) - - ms.setID(-1) - _, _, err = allocator.allocIDBatch(10) - assert.Error(t, err) - }) - - t.Run("Test genKey", func(t *testing.T) { - ms.setID(666) - - type Test struct { - inIDs []UniqueID - outKey string - - description string - } - - tests := []Test{ - {[]UniqueID{}, "666", "genKey with empty input ids"}, - {[]UniqueID{1}, "1/666", "genKey with 1 input id"}, - {[]UniqueID{1, 2, 3}, "1/2/3/666", "genKey with input 3 ids"}, - {[]UniqueID{2, 2, 2}, "2/2/2/666", "genKey with input 3 ids"}, - } - - for i, test := range tests { - key, err := allocator.genKey(test.inIDs...) - assert.NoError(t, err) - assert.Equalf(t, test.outKey, key, "#%d", i) - } - - // Status.ErrorCode != Success - ms.setID(0) - tests = []Test{ - {[]UniqueID{}, "", "error rpc status"}, - {[]UniqueID{1}, "", "error rpc status"}, - } - - for _, test := range tests { - k, err := allocator.genKey(test.inIDs...) - assert.Error(t, err) - assert.Equal(t, test.outKey, k) - } - - // Grpc error - ms.setID(-1) - tests = []Test{ - {[]UniqueID{}, "", "error rpc"}, - {[]UniqueID{1}, "", "error rpc"}, - } - for _, test := range tests { - k, err := allocator.genKey(test.inIDs...) - assert.Error(t, err) - assert.Equal(t, test.outKey, k) - } - }) -} diff --git a/internal/datanode/binlog_io.go b/internal/datanode/binlog_io.go index aec058230b..11c2c15a25 100644 --- a/internal/datanode/binlog_io.go +++ b/internal/datanode/binlog_io.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/errors" "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" @@ -60,7 +61,7 @@ type uploader interface { type binlogIO struct { storage.ChunkManager - allocatorInterface + allocator.Allocator } var _ downloader = (*binlogIO)(nil) @@ -241,10 +242,11 @@ func (b *binlogIO) genDeltaBlobs(data *DeleteData, collID, partID, segID UniqueI return "", nil, err } - k, err := b.genKey(collID, partID, segID) + idx, err := b.AllocOne() if err != nil { return "", nil, err } + k := metautil.JoinIDPath(collID, partID, segID, idx) key := path.Join(b.ChunkManager.RootPath(), common.SegmentDeltaLogPath, k) @@ -268,7 +270,7 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta notifyGenIdx := make(chan struct{}) defer close(notifyGenIdx) - generator, err := b.idxGenerator(len(inlogs)+len(statslogs), notifyGenIdx) + generator, err := b.GetGenerator(len(inlogs)+len(statslogs), notifyGenIdx) if err != nil { return nil, nil, nil, err } @@ -309,29 +311,6 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta return kvs, inpaths, statspaths, nil } -func (b *binlogIO) idxGenerator(n int, done <-chan struct{}) (<-chan UniqueID, error) { - - idStart, _, err := b.allocIDBatch(uint32(n)) - if err != nil { - return nil, err - } - - rt := make(chan UniqueID) - go func(rt chan<- UniqueID) { - for i := 0; i < n; i++ { - select { - case <-done: - close(rt) - return - case rt <- idStart + UniqueID(i): - } - } - close(rt) - }(rt) - - return rt, nil -} - func (b *binlogIO) uploadInsertLog( ctx context.Context, segID UniqueID, diff --git a/internal/datanode/binlog_io_test.go b/internal/datanode/binlog_io_test.go index 35f3ae8a91..3a5ac9e1e6 100644 --- a/internal/datanode/binlog_io_test.go +++ b/internal/datanode/binlog_io_test.go @@ -18,32 +18,43 @@ package datanode import ( "context" + "fmt" "path" "testing" "time" - "github.com/cockroachdb/errors" - "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/typeutil" + + "github.com/milvus-io/milvus/internal/datanode/allocator" + + "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" ) var binlogTestDir = "/tmp/milvus_test/test_binlog_io" +var validGeneratorFn = func(count int, done <-chan struct{}) <-chan UniqueID { + ret := make(chan UniqueID, count) + for i := 0; i < count; i++ { + ret <- int64(100 + i) + } + return ret +} + func TestBinlogIOInterfaceMethods(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - alloc := NewAllocatorFactory() cm := storage.NewLocalChunkManager(storage.RootPath(binlogTestDir)) defer cm.RemoveWithPrefix(ctx, cm.RootPath()) - b := &binlogIO{cm, alloc} t.Run("Test upload", func(t *testing.T) { f := &MetaFactory{} meta := f.GetCollectionMeta(UniqueID(10001), "uploads", schemapb.DataType_Int64) @@ -56,50 +67,82 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { Pks: []primaryKey{pk}, Tss: []uint64{666666}, } + t.Run("Test upload one iData", func(t *testing.T) { + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) + alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) - p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) - assert.NoError(t, err) - assert.Equal(t, 12, len(p.inPaths)) - assert.Equal(t, 1, len(p.statsPaths)) - assert.Equal(t, 1, len(p.inPaths[0].GetBinlogs())) - assert.Equal(t, 1, len(p.statsPaths[0].GetBinlogs())) - assert.NotNil(t, p.deltaInfo) + b := &binlogIO{cm, alloc} + p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) + assert.NoError(t, err) + assert.Equal(t, 12, len(p.inPaths)) + assert.Equal(t, 1, len(p.statsPaths)) + assert.Equal(t, 1, len(p.inPaths[0].GetBinlogs())) + assert.Equal(t, 1, len(p.statsPaths[0].GetBinlogs())) + assert.NotNil(t, p.deltaInfo) + }) - p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData, iData}, dData, meta) - assert.NoError(t, err) - assert.Equal(t, 12, len(p.inPaths)) - assert.Equal(t, 1, len(p.statsPaths)) - assert.Equal(t, 2, len(p.inPaths[0].GetBinlogs())) - assert.Equal(t, 2, len(p.statsPaths[0].GetBinlogs())) - assert.NotNil(t, p.deltaInfo) + t.Run("Test upload two iData", func(t *testing.T) { + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) + alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) - ctx, cancel := context.WithCancel(context.Background()) + b := &binlogIO{cm, alloc} + p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData, iData}, dData, meta) + assert.NoError(t, err) + assert.Equal(t, 12, len(p.inPaths)) + assert.Equal(t, 1, len(p.statsPaths)) + assert.Equal(t, 2, len(p.inPaths[0].GetBinlogs())) + assert.Equal(t, 2, len(p.statsPaths[0].GetBinlogs())) + assert.NotNil(t, p.deltaInfo) - in, stats, err := b.uploadInsertLog(ctx, 1, 10, iData, meta) - assert.NoError(t, err) - assert.Equal(t, 12, len(in)) - assert.Equal(t, 1, len(in[0].GetBinlogs())) - assert.Equal(t, 1, len(stats)) + }) - deltas, err := b.uploadDeltaLog(ctx, 1, 10, dData, meta) - assert.NoError(t, err) - assert.NotNil(t, deltas) - assert.Equal(t, 1, len(deltas[0].GetBinlogs())) + t.Run("Test uploadInsertLog", func(t *testing.T) { + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) - cancel() + b := &binlogIO{cm, alloc} - p, err = b.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta) - assert.EqualError(t, err, errUploadToBlobStorage.Error()) - assert.Nil(t, p) + in, stats, err := b.uploadInsertLog(ctx, 1, 10, iData, meta) + assert.NoError(t, err) + assert.Equal(t, 12, len(in)) + assert.Equal(t, 1, len(in[0].GetBinlogs())) + assert.Equal(t, 1, len(stats)) + }) + t.Run("Test uploadDeltaLog", func(t *testing.T) { + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) - in, _, err = b.uploadInsertLog(ctx, 1, 10, iData, meta) - assert.EqualError(t, err, errUploadToBlobStorage.Error()) - assert.Nil(t, in) + b := &binlogIO{cm, alloc} + deltas, err := b.uploadDeltaLog(ctx, 1, 10, dData, meta) + assert.NoError(t, err) + assert.NotNil(t, deltas) + assert.Equal(t, 1, len(deltas[0].GetBinlogs())) + }) - deltas, err = b.uploadDeltaLog(ctx, 1, 10, dData, meta) - assert.EqualError(t, err, errUploadToBlobStorage.Error()) - assert.Nil(t, deltas) + t.Run("Test context Done", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) + alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) + + b := &binlogIO{cm, alloc} + + p, err := b.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta) + assert.EqualError(t, err, errUploadToBlobStorage.Error()) + assert.Nil(t, p) + + in, _, err := b.uploadInsertLog(ctx, 1, 10, iData, meta) + assert.EqualError(t, err, errUploadToBlobStorage.Error()) + assert.Nil(t, in) + + deltas, err := b.uploadDeltaLog(ctx, 1, 10, dData, meta) + assert.EqualError(t, err, errUploadToBlobStorage.Error()) + assert.Nil(t, deltas) + }) }) t.Run("Test upload error", func(t *testing.T) { @@ -110,85 +153,74 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { Tss: []uint64{}, } - iData := genEmptyInsertData() - p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) - assert.NoError(t, err) - assert.Empty(t, p.inPaths) - assert.Empty(t, p.statsPaths) - assert.Empty(t, p.deltaInfo) + t.Run("Test upload empty insertData", func(t *testing.T) { + alloc := allocator.NewMockAllocator(t) + b := &binlogIO{cm, alloc} - iData = &InsertData{Data: make(map[int64]storage.FieldData)} - p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) - assert.NoError(t, err) - assert.Empty(t, p.inPaths) - assert.Empty(t, p.statsPaths) - assert.Empty(t, p.deltaInfo) + iData := genEmptyInsertData() + p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) + assert.NoError(t, err) + assert.Empty(t, p.inPaths) + assert.Empty(t, p.statsPaths) + assert.Empty(t, p.deltaInfo) - iData = genInsertData() - dData = &DeleteData{ - Pks: []primaryKey{}, - Tss: []uint64{1}, - RowCount: 1, - } - p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) - assert.Error(t, err) - assert.Empty(t, p) + iData = &InsertData{Data: make(map[int64]storage.FieldData)} + p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) + assert.NoError(t, err) + assert.Empty(t, p.inPaths) + assert.Empty(t, p.statsPaths) + assert.Empty(t, p.deltaInfo) + }) - mkc := &mockCm{errMultiSave: true} - bin := &binlogIO{mkc, alloc} - iData = genInsertData() - pk := newInt64PrimaryKey(1) - dData = &DeleteData{ - Pks: []primaryKey{pk}, - Tss: []uint64{1}, - RowCount: 1, - } - ctx, cancel := context.WithTimeout(context.TODO(), 20*time.Millisecond) - p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta) - assert.Error(t, err) - assert.Empty(t, p) + t.Run("Test deleta data not match", func(t *testing.T) { + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) + b := &binlogIO{cm, alloc} + iData := genInsertData() + dData := &DeleteData{ + Pks: []primaryKey{}, + Tss: []uint64{1}, + RowCount: 1, + } + p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) + assert.Error(t, err) + assert.Empty(t, p) + }) - in, _, err := b.uploadInsertLog(ctx, 1, 10, iData, meta) - assert.Error(t, err) - assert.Empty(t, in) + t.Run("Test multisave error", func(t *testing.T) { + mkc := &mockCm{errMultiSave: true} + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) + alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) + var ( + b = &binlogIO{mkc, alloc} + iData = genInsertData() + pk = newInt64PrimaryKey(1) + dData = &DeleteData{ + Pks: []primaryKey{pk}, + Tss: []uint64{1}, + RowCount: 1, + } + ) + ctx, cancel := context.WithTimeout(context.TODO(), 20*time.Millisecond) + p, err := b.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta) + assert.Error(t, err) + assert.Empty(t, p) - deltas, err := b.uploadDeltaLog(ctx, 1, 10, dData, meta) - assert.Error(t, err) - assert.Empty(t, deltas) + in, _, err := b.uploadInsertLog(ctx, 1, 10, iData, meta) + assert.Error(t, err) + assert.Empty(t, in) - alloc.isvalid = false - p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta) - assert.Error(t, err) - assert.Empty(t, p) - - in, _, err = b.uploadInsertLog(ctx, 1, 10, iData, meta) - assert.Error(t, err) - assert.Empty(t, in) - - deltas, err = b.uploadDeltaLog(ctx, 1, 10, dData, meta) - assert.Error(t, err) - assert.Empty(t, deltas) - - alloc.isvalid = true - for _, field := range meta.GetSchema().GetFields() { - field.IsPrimaryKey = false - } - p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta) - assert.Error(t, err) - assert.Empty(t, p) - - in, _, err = b.uploadInsertLog(ctx, 1, 10, iData, meta) - assert.Error(t, err) - assert.Empty(t, in) - - deltas, err = b.uploadDeltaLog(ctx, 1, 10, dData, meta) - assert.Error(t, err) - assert.Empty(t, deltas) - - cancel() + deltas, err := b.uploadDeltaLog(ctx, 1, 10, dData, meta) + assert.Error(t, err) + assert.Empty(t, deltas) + cancel() + }) }) t.Run("Test download", func(t *testing.T) { + alloc := allocator.NewMockAllocator(t) + b := &binlogIO{cm, alloc} tests := []struct { isvalid bool ks []string // for preparation @@ -232,6 +264,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { t.Run("Test download twice", func(t *testing.T) { mkc := &mockCm{errMultiLoad: true} + alloc := allocator.NewMockAllocator(t) b := &binlogIO{mkc, alloc} ctx, cancel := context.WithTimeout(context.TODO(), time.Millisecond*20) @@ -259,15 +292,14 @@ func prepareBlob(cm storage.ChunkManager, key string) ([]byte, string, error) { func TestBinlogIOInnerMethods(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - alloc := NewAllocatorFactory() cm := storage.NewLocalChunkManager(storage.RootPath(binlogTestDir)) defer cm.RemoveWithPrefix(ctx, cm.RootPath()) - b := &binlogIO{ - cm, - alloc, - } t.Run("Test genDeltaBlobs", func(t *testing.T) { + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) + + b := &binlogIO{cm, alloc} f := &MetaFactory{} meta := f.GetCollectionMeta(UniqueID(10002), "test_gen_blobs", schemapb.DataType_Int64) @@ -302,23 +334,34 @@ func TestBinlogIOInnerMethods(t *testing.T) { t.Run("Test genDeltaBlobs error", func(t *testing.T) { pk := newInt64PrimaryKey(1) - k, v, err := b.genDeltaBlobs(&DeleteData{Pks: []primaryKey{pk}, Tss: []uint64{}}, 1, 1, 1) - assert.Error(t, err) - assert.Empty(t, k) - assert.Empty(t, v) - errAlloc := NewAllocatorFactory() - errAlloc.isvalid = false + t.Run("Test serialize error", func(t *testing.T) { + alloc := allocator.NewMockAllocator(t) + b := &binlogIO{cm, alloc} + k, v, err := b.genDeltaBlobs(&DeleteData{Pks: []primaryKey{pk}, Tss: []uint64{}}, 1, 1, 1) + assert.Error(t, err) + assert.Empty(t, k) + assert.Empty(t, v) + }) - bin := binlogIO{cm, errAlloc} - k, v, err = bin.genDeltaBlobs(&DeleteData{Pks: []primaryKey{pk}, Tss: []uint64{1}}, 1, 1, 1) - assert.Error(t, err) - assert.Empty(t, k) - assert.Empty(t, v) + t.Run("Test AllocOne error", func(t *testing.T) { + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocOne().Call.Return(int64(0), fmt.Errorf("mock AllocOne error")) + bin := binlogIO{cm, alloc} + k, v, err := bin.genDeltaBlobs(&DeleteData{Pks: []primaryKey{pk}, Tss: []uint64{1}}, 1, 1, 1) + assert.Error(t, err) + assert.Empty(t, k) + assert.Empty(t, v) + + }) }) t.Run("Test genInsertBlobs", func(t *testing.T) { f := &MetaFactory{} + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) + b := binlogIO{cm, alloc} + tests := []struct { pkType schemapb.DataType description string @@ -353,81 +396,36 @@ func TestBinlogIOInnerMethods(t *testing.T) { }) t.Run("Test genInsertBlobs error", func(t *testing.T) { - kvs, pin, pstats, err := b.genInsertBlobs(&InsertData{}, 1, 1, nil) - assert.Error(t, err) - assert.Empty(t, kvs) - assert.Empty(t, pin) - assert.Empty(t, pstats) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cm := storage.NewLocalChunkManager(storage.RootPath(binlogTestDir)) + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) - f := &MetaFactory{} - meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64) + t.Run("serialize error", func(t *testing.T) { + bin := &binlogIO{cm, allocator.NewMockAllocator(t)} + kvs, pin, pstats, err := bin.genInsertBlobs(genEmptyInsertData(), 10, 1, nil) - kvs, pin, pstats, err = b.genInsertBlobs(genEmptyInsertData(), 10, 1, meta) - assert.Error(t, err) - assert.Empty(t, kvs) - assert.Empty(t, pin) - assert.Empty(t, pstats) + assert.Error(t, err) + assert.Empty(t, kvs) + assert.Empty(t, pin) + assert.Empty(t, pstats) + }) - errAlloc := NewAllocatorFactory() - errAlloc.errAllocBatch = true - bin := &binlogIO{cm, errAlloc} - kvs, pin, pstats, err = bin.genInsertBlobs(genInsertData(), 10, 1, meta) + t.Run("GetGenerator error", func(t *testing.T) { + f := &MetaFactory{} + meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64) - assert.Error(t, err) - assert.Empty(t, kvs) - assert.Empty(t, pin) - assert.Empty(t, pstats) + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock GetGenerator error")) + bin := &binlogIO{cm, alloc} + kvs, pin, pstats, err := bin.genInsertBlobs(genInsertData(), 10, 1, meta) + + assert.Error(t, err) + assert.Empty(t, kvs) + assert.Empty(t, pin) + assert.Empty(t, pstats) + }) }) - - t.Run("Test idxGenerator", func(t *testing.T) { - tests := []struct { - isvalid bool - innumber int - - expectedNo int - description string - }{ - {false, 0, 0, "Invalid input count n"}, - {true, 1, 1, "valid input n 1"}, - {true, 3, 3, "valid input n 3 with cancel"}, - } - - for _, test := range tests { - t.Run(test.description, func(t *testing.T) { - done := make(chan struct{}) - if test.isvalid { - gen, err := b.idxGenerator(test.innumber, done) - assert.NoError(t, err) - - r := make([]UniqueID, 0) - for i := range gen { - r = append(r, i) - } - - assert.Equal(t, test.expectedNo, len(r)) - - if test.innumber > 1 { - donedone := make(chan struct{}) - gen, err := b.idxGenerator(test.innumber, donedone) - assert.NoError(t, err) - - _, ok := <-gen - assert.True(t, ok) - - donedone <- struct{}{} - - _, ok = <-gen - assert.False(t, ok) - } - } else { - gen, err := b.idxGenerator(test.innumber, done) - assert.Error(t, err) - assert.Nil(t, gen) - } - }) - } - }) - } type mockCm struct { diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 470c6096d8..7d36ae30dd 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -75,7 +76,7 @@ type compactionTask struct { compactor Channel flushManager - allocatorInterface + allocator.Allocator plan *datapb.CompactionPlan @@ -97,7 +98,7 @@ func newCompactionTask( ul uploader, channel Channel, fm flushManager, - alloc allocatorInterface, + alloc allocator.Allocator, plan *datapb.CompactionPlan, chunkManager storage.ChunkManager) *compactionTask { @@ -106,15 +107,15 @@ func newCompactionTask( ctx: ctx1, cancel: cancel, - downloader: dl, - uploader: ul, - Channel: channel, - flushManager: fm, - allocatorInterface: alloc, - plan: plan, - tr: timerecord.NewTimeRecorder("compactionTask"), - chunkManager: chunkManager, - done: make(chan struct{}, 1), + downloader: dl, + uploader: ul, + Channel: channel, + flushManager: fm, + Allocator: alloc, + plan: plan, + tr: timerecord.NewTimeRecorder("compactionTask"), + chunkManager: chunkManager, + done: make(chan struct{}, 1), } } @@ -445,7 +446,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { return nil, errIllegalCompactionPlan case t.plan.GetType() == datapb.CompactionType_MergeCompaction || t.plan.GetType() == datapb.CompactionType_MixCompaction: - targetSegID, err = t.allocID() + targetSegID, err = t.AllocOne() if err != nil { log.Warn("compact wrong", zap.Error(err)) return nil, err diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 963626874b..9b4f97c325 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -38,6 +38,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/paramtable" + + "github.com/milvus-io/milvus/internal/datanode/allocator" ) var compactTestDir = "/tmp/milvus_test/compact" @@ -276,8 +278,10 @@ func TestCompactionTaskInnerMethods(t *testing.T) { Schema: meta.GetSchema(), }, nil) channel := newChannel("a", collectionID, meta.GetSchema(), rc, nil) + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) t.Run("Merge without expiration", func(t *testing.T) { - alloc := NewAllocatorFactory(1) + mockbIO := &binlogIO{cm, alloc} paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") iData := genInsertDataWithExpiredTS() @@ -309,7 +313,6 @@ func TestCompactionTaskInnerMethods(t *testing.T) { assert.Equal(t, 1, len(statsPaths)) }) t.Run("Merge without expiration2", func(t *testing.T) { - alloc := NewAllocatorFactory(1) mockbIO := &binlogIO{cm, alloc} paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") BinLogMaxSize := Params.DataNodeCfg.BinLogMaxSize @@ -347,7 +350,6 @@ func TestCompactionTaskInnerMethods(t *testing.T) { }) t.Run("Merge with expiration", func(t *testing.T) { - alloc := NewAllocatorFactory(1) mockbIO := &binlogIO{cm, alloc} iData := genInsertDataWithExpiredTS() @@ -390,7 +392,6 @@ func TestCompactionTaskInnerMethods(t *testing.T) { }) t.Run("Merge with meta error", func(t *testing.T) { - alloc := NewAllocatorFactory(1) mockbIO := &binlogIO{cm, alloc} paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") iData := genInsertDataWithExpiredTS() @@ -427,7 +428,6 @@ func TestCompactionTaskInnerMethods(t *testing.T) { }) t.Run("Merge with meta type param error", func(t *testing.T) { - alloc := NewAllocatorFactory(1) mockbIO := &binlogIO{cm, alloc} paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") iData := genInsertDataWithExpiredTS() @@ -574,12 +574,14 @@ func TestCompactorInterfaceMethods(t *testing.T) { paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") // Turn off auto expiration t.Run("Test compact invalid", func(t *testing.T) { - invalidAlloc := NewAllocatorFactory(-1) + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) ctx, cancel := context.WithCancel(context.TODO()) emptyTask := &compactionTask{ - ctx: ctx, - cancel: cancel, - done: make(chan struct{}, 1), + ctx: ctx, + cancel: cancel, + done: make(chan struct{}, 1), + Channel: &ChannelMeta{}, } plan := &datapb.CompactionPlan{ @@ -596,7 +598,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { assert.Error(t, err) plan.Type = datapb.CompactionType_MergeCompaction - emptyTask.allocatorInterface = invalidAlloc + emptyTask.Allocator = alloc plan.SegmentBinlogs = notEmptySegmentBinlogs _, err = emptyTask.compact() assert.Error(t, err) @@ -606,7 +608,9 @@ func TestCompactorInterfaceMethods(t *testing.T) { }) t.Run("Test typeII compact valid", func(t *testing.T) { - alloc := NewAllocatorFactory(1) + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) + alloc.EXPECT().AllocOne().Call.Return(int64(19530), nil) type testCase struct { pkType schemapb.DataType iData1 storage.FieldData @@ -702,7 +706,6 @@ func TestCompactorInterfaceMethods(t *testing.T) { Channel: "channelname", } - alloc.random = false // generated ID = 19530 task := newCompactionTask(context.TODO(), mockbIO, mockbIO, channel, mockfm, alloc, plan, nil) result, err := task.compact() assert.NoError(t, err) @@ -775,7 +778,9 @@ func TestCompactorInterfaceMethods(t *testing.T) { // Both pk = 1 rows of the two segments are compacted. var collID, partID, segID1, segID2 UniqueID = 1, 10, 200, 201 - alloc := NewAllocatorFactory(1) + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocOne().Call.Return(int64(19530), nil) + alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) rc := &RootCoordFactory{ pkType: schemapb.DataType_Int64, } @@ -838,7 +843,6 @@ func TestCompactorInterfaceMethods(t *testing.T) { Channel: "channelname", } - alloc.random = false // generated ID = 19530 task := newCompactionTask(context.TODO(), mockbIO, mockbIO, channel, mockfm, alloc, plan, nil) result, err := task.compact() assert.NoError(t, err) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 73568b15fc..7315aab96a 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -35,12 +35,13 @@ import ( "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" - "github.com/milvus-io/milvus-proto/go-api/commonpb" v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" - allocator2 "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus-proto/go-api/commonpb" + + "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" @@ -119,12 +120,12 @@ type DataNode struct { dataCoord types.DataCoord //call once - initOnce sync.Once - sessionMu sync.Mutex // to fix data race - session *sessionutil.Session - watchKv kv.MetaKv - chunkManager storage.ChunkManager - rowIDAllocator *allocator2.IDAllocator + initOnce sync.Once + sessionMu sync.Mutex // to fix data race + session *sessionutil.Session + watchKv kv.MetaKv + chunkManager storage.ChunkManager + allocator allocator.Allocator closer io.Closer @@ -254,7 +255,7 @@ func (node *DataNode) Init() error { node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.DataNodeRole, paramtable.GetNodeID()) log.Info("DataNode server init dispatcher client done", zap.Int64("node ID", paramtable.GetNodeID())) - idAllocator, err := allocator2.NewIDAllocator(node.ctx, node.rootCoord, paramtable.GetNodeID()) + alloc, err := allocator.New(context.Background(), node.rootCoord, paramtable.GetNodeID()) if err != nil { log.Error("failed to create id allocator", zap.Error(err), @@ -262,7 +263,7 @@ func (node *DataNode) Init() error { initError = err return } - node.rowIDAllocator = idAllocator + node.allocator = alloc node.factory.Init(Params) log.Info("DataNode server init succeeded", @@ -489,7 +490,7 @@ func (node *DataNode) BackGroundGC(vChannelCh <-chan string) { // Start will update DataNode state to HEALTHY func (node *DataNode) Start() error { - if err := node.rowIDAllocator.Start(); err != nil { + if err := node.allocator.Start(); err != nil { log.Error("failed to start id allocator", zap.Error(err), zap.String("role", typeutil.DataNodeRole)) return err } @@ -565,14 +566,13 @@ func (node *DataNode) ReadyToFlush() error { func (node *DataNode) Stop() error { // https://github.com/milvus-io/milvus/issues/12282 node.UpdateStateCode(commonpb.StateCode_Abnormal) - - node.cancel() node.flowgraphManager.dropAll() node.flowgraphManager.stop() - if node.rowIDAllocator != nil { + node.cancel() + if node.allocator != nil { log.Info("close id allocator", zap.String("role", typeutil.DataNodeRole)) - node.rowIDAllocator.Close() + node.allocator.Close() } if node.closer != nil { diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 78c9133048..ab08505a63 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -242,7 +242,6 @@ func TestWatchChannel(t *testing.T) { defer cancel() t.Run("test watch channel", func(t *testing.T) { - // GOOSE TODO kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath.GetValue()) oldInvalidCh := "datanode-etcd-test-by-dev-rootcoord-dml-channel-invalid" path := fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), oldInvalidCh) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 3398069bcc..3268c2fe26 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -26,6 +26,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/commonpb" + "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/mq/msgdispatcher" @@ -48,9 +49,9 @@ type dataSyncService struct { cancelFn context.CancelFunc fg *flowgraph.TimeTickedFlowGraph // internal flowgraph processes insert/delta messages flushCh chan flushMsg - resendTTCh chan resendTTMsg // chan to ask for resending DataNode time tick message. - channel Channel // channel stores meta of channel - idAllocator allocatorInterface // id/timestamp allocator + resendTTCh chan resendTTMsg // chan to ask for resending DataNode time tick message. + channel Channel // channel stores meta of channel + idAllocator allocator.Allocator // id/timestamp allocator dispClient msgdispatcher.Client msFactory msgstream.Factory collectionID UniqueID // collection id of vchan for which this data sync service serves @@ -73,7 +74,7 @@ func newDataSyncService(ctx context.Context, flushCh chan flushMsg, resendTTCh chan resendTTMsg, channel Channel, - alloc allocatorInterface, + alloc allocator.Allocator, dispClient msgdispatcher.Client, factory msgstream.Factory, vchan *datapb.VchannelInfo, @@ -135,7 +136,7 @@ type nodeConfig struct { collectionID UniqueID vChannelName string channel Channel // Channel info - allocator allocatorInterface + allocator allocator.Allocator serverID int64 // defaults parallelConfig diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 3d2c2692fa..c8fdb3d6cf 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -33,6 +34,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/msgpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgdispatcher" "github.com/milvus-io/milvus/internal/mq/msgstream" @@ -113,7 +115,7 @@ type testInfo struct { description string } -func TestDataSyncService_newDataSyncService(te *testing.T) { +func TestDataSyncService_newDataSyncService(t *testing.T) { ctx := context.Background() @@ -158,7 +160,7 @@ func TestDataSyncService_newDataSyncService(te *testing.T) { defer cm.RemoveWithPrefix(ctx, cm.RootPath()) for _, test := range tests { - te.Run(test.description, func(t *testing.T) { + t.Run(test.description, func(t *testing.T) { df := &DataCoordFactory{} rc := &RootCoordFactory{pkType: schemapb.DataType_Int64} @@ -172,7 +174,7 @@ func TestDataSyncService_newDataSyncService(te *testing.T) { make(chan flushMsg), make(chan resendTTMsg), channel, - NewAllocatorFactory(), + allocator.NewMockAllocator(t), dispClient, test.inMsgFactory, getVchanInfo(test), @@ -224,7 +226,11 @@ func TestDataSyncService_Start(t *testing.T) { defer cm.RemoveWithPrefix(ctx, cm.RootPath()) channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm) - allocFactory := NewAllocatorFactory(1) + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().Alloc(mock.Anything).Call.Return(int64(22222), + func(count uint32) int64 { + return int64(22222 + count) + }, nil) factory := dependency.NewDefaultFactory(true) dispClient := msgdispatcher.NewClient(factory, typeutil.DataNodeRole, paramtable.GetNodeID()) defer os.RemoveAll("/tmp/milvus") @@ -280,7 +286,7 @@ func TestDataSyncService_Start(t *testing.T) { }, } - sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, allocFactory, dispClient, factory, vchan, signalCh, dataCoord, newCache(), cm, newCompactionExecutor(), genTestTickler(), 0) + sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, alloc, dispClient, factory, vchan, signalCh, dataCoord, newCache(), cm, newCompactionExecutor(), genTestTickler(), 0) assert.Nil(t, err) sync.flushListener = make(chan *segmentFlushPack) @@ -401,13 +407,18 @@ func TestDataSyncService_Close(t *testing.T) { UnflushedSegmentIds: ufsIds, FlushedSegmentIds: fsIds, } + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) + alloc.EXPECT().Alloc(mock.Anything).Call.Return(int64(22222), + func(count uint32) int64 { + return int64(22222 + count) + }, nil) var ( flushChan = make(chan flushMsg, 100) resendTTChan = make(chan resendTTMsg, 100) signalCh = make(chan string, 100) - allocFactory = NewAllocatorFactory(1) factory = dependency.NewDefaultFactory(true) dispClient = msgdispatcher.NewClient(factory, typeutil.DataNodeRole, paramtable.GetNodeID()) mockDataCoord = &DataCoordFactory{} @@ -432,7 +443,7 @@ func TestDataSyncService_Close(t *testing.T) { paramtable.Get().Reset(Params.DataNodeCfg.FlushInsertBufferSize.Key) channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm) - sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, allocFactory, dispClient, factory, vchan, signalCh, mockDataCoord, newCache(), cm, newCompactionExecutor(), genTestTickler(), 0) + sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, alloc, dispClient, factory, vchan, signalCh, mockDataCoord, newCache(), cm, newCompactionExecutor(), genTestTickler(), 0) assert.Nil(t, err) sync.flushListener = make(chan *segmentFlushPack, 10) diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index 270b9b5ccf..46bf0a218e 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -25,6 +25,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/msgpb" + "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/storage" @@ -39,7 +40,7 @@ type deleteNode struct { channelName string delBufferManager *DelBufferManager // manager of delete msg channel Channel - idAllocator allocatorInterface + idAllocator allocator.Allocator flushManager flushManager clearSignal chan<- string diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index d446b54a63..afcbdf3ef5 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" @@ -181,10 +182,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { t.Run("Test get segment by varChar primary keys", func(te *testing.T) { channel := genMockChannel(segIDs, varCharPks, chanName) - fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, channel, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) + alloc := allocator.NewMockAllocator(t) + fm := NewRendezvousFlushManager(alloc, cm, channel, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) c := &nodeConfig{ channel: channel, - allocator: &allocator{}, + allocator: alloc, vChannelName: chanName, } delBufManager := &DelBufferManager{ @@ -214,11 +216,12 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { }) channel := genMockChannel(segIDs, int64Pks, chanName) - fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, channel, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) + alloc := allocator.NewMockAllocator(t) + fm := NewRendezvousFlushManager(alloc, cm, channel, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) t.Run("Test get segment by int64 primary keys", func(te *testing.T) { c := &nodeConfig{ channel: channel, - allocator: &allocator{}, + allocator: alloc, vChannelName: chanName, } @@ -260,7 +263,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { c := &nodeConfig{ channel: channel, - allocator: NewAllocatorFactory(), + allocator: allocator.NewMockAllocator(t), vChannelName: chanName, } delBufManager := &DelBufferManager{ @@ -288,7 +291,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { c := &nodeConfig{ channel: channel, - allocator: NewAllocatorFactory(), + allocator: allocator.NewMockAllocator(t), vChannelName: chanName, } delBufManager := &DelBufferManager{ @@ -322,7 +325,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { c := &nodeConfig{ channel: channel, - allocator: NewAllocatorFactory(), + allocator: allocator.NewMockAllocator(t), vChannelName: chanName, } delBufManager := &DelBufferManager{ @@ -364,7 +367,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { c := &nodeConfig{ channel: channel, - allocator: NewAllocatorFactory(), + allocator: allocator.NewMockAllocator(t), vChannelName: chanName, } delBufManager := &DelBufferManager{ @@ -392,7 +395,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { &DelDataBuf{delData: &DeleteData{}, item: bufItem}) heap.Push(delNode.delBufferManager.delBufHeap, bufItem) - delNode.flushManager = NewRendezvousFlushManager(&allocator{}, cm, channel, + delNode.flushManager = NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, channel, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) var fgMsg flowgraph.Msg = &msg @@ -419,7 +422,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { c := &nodeConfig{ channel: channel, - allocator: NewAllocatorFactory(), + allocator: allocator.NewMockAllocator(t), vChannelName: chanName, } delBufManager := &DelBufferManager{ @@ -513,7 +516,7 @@ func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) { cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir)) defer cm.RemoveWithPrefix(ctx, cm.RootPath()) - fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, nil, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) + fm := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, nil, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) chanName := "datanode-test-FlowGraphDeletenode-showDelBuf" testPath := "/test/datanode/root/meta" @@ -525,7 +528,7 @@ func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) { } c := &nodeConfig{ channel: channel, - allocator: NewAllocatorFactory(), + allocator: allocator.NewMockAllocator(t), vChannelName: chanName, } delBufManager := &DelBufferManager{ @@ -562,7 +565,7 @@ func TestFlowGraphDeleteNode_updateCompactedSegments(t *testing.T) { cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir)) defer cm.RemoveWithPrefix(ctx, cm.RootPath()) - fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, nil, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) + fm := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, nil, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) chanName := "datanode-test-FlowGraphDeletenode-showDelBuf" testPath := "/test/datanode/root/meta" @@ -575,7 +578,7 @@ func TestFlowGraphDeleteNode_updateCompactedSegments(t *testing.T) { c := &nodeConfig{ channel: &channel, - allocator: NewAllocatorFactory(), + allocator: allocator.NewMockAllocator(t), vChannelName: chanName, } delBufManager := &DelBufferManager{ diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 26b3ee3428..1c28955bc1 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/msgpb" + "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/mq/msgstream" @@ -50,7 +51,7 @@ type insertBufferNode struct { channelName string delBufferManager *DelBufferManager // manager of delete msg channel Channel - idAllocator allocatorInterface + idAllocator allocator.Allocator flushMap sync.Map flushChan <-chan flushMsg diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 6cea63e275..b3be1732ec 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/errors" "github.com/samber/lo" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -34,6 +35,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus-proto/go-api/msgpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" @@ -97,7 +99,8 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) { factory := dependency.NewDefaultFactory(true) - fm := NewRendezvousFlushManager(&allocator{}, cm, channel, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) + alloc := allocator.NewMockAllocator(t) + fm := NewRendezvousFlushManager(alloc, cm, channel, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) flushChan := make(chan flushMsg, 100) resendTTChan := make(chan resendTTMsg, 100) @@ -105,7 +108,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) { c := &nodeConfig{ channel: channel, msFactory: factory, - allocator: NewAllocatorFactory(), + allocator: alloc, vChannelName: "string", } delBufManager := &DelBufferManager{ @@ -201,14 +204,19 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { factory := dependency.NewDefaultFactory(true) - fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, channel, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().Alloc(mock.Anything).Call.Return(int64(22222), + func(count uint32) int64 { + return int64(22222 + count) + }, nil) + fm := NewRendezvousFlushManager(alloc, cm, channel, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) flushChan := make(chan flushMsg, 100) resendTTChan := make(chan resendTTMsg, 100) c := &nodeConfig{ channel: channel, msFactory: factory, - allocator: NewAllocatorFactory(), + allocator: alloc, vChannelName: "string", } delBufManager := &DelBufferManager{ @@ -363,7 +371,12 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir)) defer cm.RemoveWithPrefix(ctx, cm.RootPath()) - fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, channel, func(pack *segmentFlushPack) { + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().Alloc(mock.Anything).Call.Return(int64(22222), + func(count uint32) int64 { + return int64(22222 + count) + }, nil) + fm := NewRendezvousFlushManager(alloc, cm, channel, func(pack *segmentFlushPack) { fpMut.Lock() flushPacks = append(flushPacks, pack) fpMut.Unlock() @@ -382,7 +395,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { c := &nodeConfig{ channel: channel, msFactory: factory, - allocator: NewAllocatorFactory(), + allocator: alloc, vChannelName: "string", } delBufManager := &DelBufferManager{ @@ -571,7 +584,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { }) } -func TestRollBF(t *testing.T) { +func TestInsertBufferNodeRollBF(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -604,7 +617,12 @@ func TestRollBF(t *testing.T) { cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir)) defer cm.RemoveWithPrefix(ctx, cm.RootPath()) - fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, channel, func(pack *segmentFlushPack) { + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().Alloc(mock.Anything).Call.Return(int64(22222), + func(count uint32) int64 { + return int64(22222 + count) + }, nil) + fm := NewRendezvousFlushManager(alloc, cm, channel, func(pack *segmentFlushPack) { fpMut.Lock() flushPacks = append(flushPacks, pack) fpMut.Unlock() @@ -623,7 +641,7 @@ func TestRollBF(t *testing.T) { c := &nodeConfig{ channel: channel, msFactory: factory, - allocator: NewAllocatorFactory(), + allocator: alloc, vChannelName: "string", } delBufManager := &DelBufferManager{ @@ -992,14 +1010,15 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { factory := dependency.NewDefaultFactory(true) - fm := NewRendezvousFlushManager(&allocator{}, cm, channel, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) + alloc := allocator.NewMockAllocator(t) + fm := NewRendezvousFlushManager(alloc, cm, channel, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) flushChan := make(chan flushMsg, 100) resendTTChan := make(chan resendTTMsg, 100) c := &nodeConfig{ channel: channel, msFactory: factory, - allocator: NewAllocatorFactory(), + allocator: alloc, vChannelName: "string", } delBufManager := &DelBufferManager{ diff --git a/internal/datanode/flow_graph_manager.go b/internal/datanode/flow_graph_manager.go index 7f7d51d0e4..7c01e9ac20 100644 --- a/internal/datanode/flow_graph_manager.go +++ b/internal/datanode/flow_graph_manager.go @@ -102,19 +102,18 @@ func (fm *flowgraphManager) execute(totalMemory uint64) { } func (fm *flowgraphManager) addAndStart(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *tickler) error { + log := log.With(zap.String("channel", vchan.GetChannelName())) if _, ok := fm.flowgraphs.Load(vchan.GetChannelName()); ok { - log.Warn("try to add an existed DataSyncService", zap.String("vChannelName", vchan.GetChannelName())) + log.Warn("try to add an existed DataSyncService") return nil } channel := newChannel(vchan.GetChannelName(), vchan.GetCollectionID(), schema, dn.rootCoord, dn.chunkManager) - var alloc allocatorInterface = newAllocator(dn.rootCoord) - dataSyncService, err := newDataSyncService(dn.ctx, make(chan flushMsg, 100), make(chan resendTTMsg, 100), channel, - alloc, dn.dispClient, dn.factory, vchan, dn.clearSignal, dn.dataCoord, dn.segmentCache, dn.chunkManager, dn.compactionExecutor, tickler, dn.GetSession().ServerID) + dn.allocator, dn.dispClient, dn.factory, vchan, dn.clearSignal, dn.dataCoord, dn.segmentCache, dn.chunkManager, dn.compactionExecutor, tickler, dn.GetSession().ServerID) if err != nil { - log.Warn("new data sync service fail", zap.String("vChannelName", vchan.GetChannelName()), zap.Error(err)) + log.Warn("fail to create new datasyncservice", zap.Error(err)) return err } dataSyncService.start() diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index c0bb1c6db5..ffa15868a0 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/msgpb" "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -262,7 +263,7 @@ type dropHandler struct { // rendezvousFlushManager makes sure insert & del buf all flushed type rendezvousFlushManager struct { - allocatorInterface + allocator.Allocator storage.ChunkManager Channel @@ -371,7 +372,7 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni } // binlogs - start, _, err := m.allocIDBatch(uint32(len(binLogs) + len(statsBinlogs))) + start, _, err := m.Alloc(uint32(len(binLogs) + len(statsBinlogs))) if err != nil { return nil, err } @@ -387,7 +388,6 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni logidx := start + int64(idx) - // no error raise if alloc=false k := metautil.JoinIDPath(collID, partID, segmentID, fieldID, logidx) // [rootPath]/[insert_log]/key @@ -413,7 +413,6 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni logidx := start + UniqueID(len(binLogs)+idx) - // no error raise if alloc=false k := metautil.JoinIDPath(collID, partID, segmentID, fieldID, logidx) key := path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k) @@ -458,7 +457,7 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique return err } - logID, err := m.allocID() + logID, err := m.AllocOne() if err != nil { log.Error("failed to alloc ID", zap.Error(err)) return err @@ -618,12 +617,12 @@ func (t *flushBufferDeleteTask) flushDeleteData() error { } // NewRendezvousFlushManager create rendezvousFlushManager with provided allocator and kv -func NewRendezvousFlushManager(allocator allocatorInterface, cm storage.ChunkManager, channel Channel, f notifyMetaFunc, drop flushAndDropFunc) *rendezvousFlushManager { +func NewRendezvousFlushManager(allocator allocator.Allocator, cm storage.ChunkManager, channel Channel, f notifyMetaFunc, drop flushAndDropFunc) *rendezvousFlushManager { fm := &rendezvousFlushManager{ - allocatorInterface: allocator, - ChunkManager: cm, - notifyFunc: f, - Channel: channel, + Allocator: allocator, + ChunkManager: cm, + notifyFunc: f, + Channel: channel, dropHandler: dropHandler{ flushAndDrop: drop, }, diff --git a/internal/datanode/flush_manager_test.go b/internal/datanode/flush_manager_test.go index 69f7711cba..15195c3c7c 100644 --- a/internal/datanode/flush_manager_test.go +++ b/internal/datanode/flush_manager_test.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/msgpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/retry" @@ -158,7 +159,8 @@ func TestRendezvousFlushManager(t *testing.T) { var counter atomic.Int64 finish := sync.WaitGroup{} finish.Add(size) - m := NewRendezvousFlushManager(&allocator{}, cm, newTestChannel(), func(pack *segmentFlushPack) { + alloc := allocator.NewMockAllocator(t) + m := NewRendezvousFlushManager(alloc, cm, newTestChannel(), func(pack *segmentFlushPack) { counter.Inc() finish.Done() }, emptyFlushAndDropFunc) @@ -199,7 +201,8 @@ func TestRendezvousFlushManager_Inject(t *testing.T) { finish.Add(size) var packMut sync.Mutex packs := make([]*segmentFlushPack, 0, size+3) - m := NewRendezvousFlushManager(&allocator{}, cm, newTestChannel(), func(pack *segmentFlushPack) { + alloc := allocator.NewMockAllocator(t) + m := NewRendezvousFlushManager(alloc, cm, newTestChannel(), func(pack *segmentFlushPack) { packMut.Lock() packs = append(packs, pack) packMut.Unlock() @@ -297,7 +300,7 @@ func TestRendezvousFlushManager_getSegmentMeta(t *testing.T) { channel := newTestChannel() channel.collSchema = &schemapb.CollectionSchema{} - fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, channel, func(*segmentFlushPack) { + fm := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, channel, func(*segmentFlushPack) { }, emptyFlushAndDropFunc) // non exists segment @@ -330,7 +333,7 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) { var counter atomic.Int64 var finish sync.WaitGroup finish.Add(size) - m := NewRendezvousFlushManager(&allocator{}, cm, newTestChannel(), func(pack *segmentFlushPack) { + m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, newTestChannel(), func(pack *segmentFlushPack) { counter.Inc() finish.Done() }, emptyFlushAndDropFunc) @@ -402,7 +405,7 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) { var result []*segmentFlushPack signal := make(chan struct{}) - m := NewRendezvousFlushManager(&allocator{}, cm, newTestChannel(), func(pack *segmentFlushPack) { + m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, newTestChannel(), func(pack *segmentFlushPack) { }, func(packs []*segmentFlushPack) { mut.Lock() result = packs @@ -456,7 +459,7 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) { var result []*segmentFlushPack signal := make(chan struct{}) - m := NewRendezvousFlushManager(&allocator{}, cm, newTestChannel(), func(pack *segmentFlushPack) { + m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, newTestChannel(), func(pack *segmentFlushPack) { }, func(packs []*segmentFlushPack) { mut.Lock() result = packs @@ -518,7 +521,7 @@ func TestRendezvousFlushManager_close(t *testing.T) { var counter atomic.Int64 finish := sync.WaitGroup{} finish.Add(size) - m := NewRendezvousFlushManager(&allocator{}, cm, newTestChannel(), func(pack *segmentFlushPack) { + m := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, newTestChannel(), func(pack *segmentFlushPack) { counter.Inc() finish.Done() }, emptyFlushAndDropFunc) diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 4207aedff4..ec4e98d7a5 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -22,8 +22,6 @@ import ( "encoding/binary" "fmt" "math" - "math/rand" - "sync" "time" "github.com/cockroachdb/errors" @@ -46,7 +44,6 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/etcd" - "github.com/milvus-io/milvus/internal/util/metautil" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/tsoutil" @@ -967,57 +964,6 @@ func genFlowGraphDeleteMsg(pks []primaryKey, chanName string) flowGraphMsg { return *fgMsg } -type AllocatorFactory struct { - sync.Mutex - r *rand.Rand - isvalid bool - random bool - errAllocBatch bool -} - -var _ allocatorInterface = &AllocatorFactory{} - -func NewAllocatorFactory(id ...UniqueID) *AllocatorFactory { - f := &AllocatorFactory{ - r: rand.New(rand.NewSource(time.Now().UnixNano())), - isvalid: len(id) == 0 || (len(id) > 0 && id[0] > 0), - } - return f -} - -func (alloc *AllocatorFactory) allocID() (UniqueID, error) { - alloc.Lock() - defer alloc.Unlock() - - if !alloc.isvalid { - return -1, errors.New("allocID error") - } - - if alloc.random { - return alloc.r.Int63n(10000), nil - } - - return 19530, nil -} - -func (alloc *AllocatorFactory) allocIDBatch(count uint32) (UniqueID, uint32, error) { - if count == 0 || alloc.errAllocBatch { - return 0, 0, errors.New("count should be greater than zero") - } - - start, err := alloc.allocID() - return start, count, err -} - -func (alloc *AllocatorFactory) genKey(ids ...UniqueID) (string, error) { - idx, err := alloc.allocID() - if err != nil { - return "", err - } - ids = append(ids, idx) - return metautil.JoinIDPath(ids...), nil -} - // If id == 0, AllocID will return not successful status // If id == -1, AllocID will return err func (m *RootCoordFactory) setID(id UniqueID) { diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 8011c555c8..838d4515a8 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -567,7 +567,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) // parse files and generate segments segmentSize := Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 - importWrapper := importutil.NewImportWrapper(newCtx, colInfo.GetSchema(), colInfo.GetShardsNum(), segmentSize, node.rowIDAllocator, + importWrapper := importutil.NewImportWrapper(newCtx, colInfo.GetSchema(), colInfo.GetShardsNum(), segmentSize, node.allocator.GetIDAlloactor(), node.chunkManager, importResult, reportFunc) importWrapper.SetCallbackFunctions(assignSegmentFunc(node, req), createBinLogsFunc(node, req, colInfo.GetSchema(), ts), @@ -922,8 +922,7 @@ func createBinLogs(rowNum int, schema *schemapb.CollectionSchema, ts Timestamp, return nil, nil, err } - var alloc allocatorInterface = newAllocator(node.rootCoord) - start, _, err := alloc.allocIDBatch(uint32(len(binLogs))) + start, _, err := node.allocator.Alloc(uint32(len(binLogs))) if err != nil { return nil, nil, err } @@ -940,7 +939,6 @@ func createBinLogs(rowNum int, schema *schemapb.CollectionSchema, ts Timestamp, logidx := start + int64(idx) - // no error raise if alloc=false k := metautil.JoinIDPath(colID, partID, segmentID, fieldID, logidx) key := path.Join(node.chunkManager.RootPath(), common.SegmentInsertLogPath, k) diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 4f60851eaf..5e47771b28 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -24,6 +24,7 @@ import ( "sync" "testing" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -32,7 +33,9 @@ import ( "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus-proto/go-api/msgpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" + allocator2 "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -82,6 +85,16 @@ func (s *DataNodeServicesSuite) SetupTest() { err := s.node.Init() s.Require().NoError(err) + alloc := &allocator.MockAllocator{} + alloc.EXPECT().Start().Return(nil).Maybe() + alloc.EXPECT().Close().Maybe() + alloc.EXPECT().GetIDAlloactor().Return(&allocator2.IDAllocator{}).Maybe() + alloc.EXPECT().Alloc(mock.Anything).Call.Return(int64(22222), + func(count uint32) int64 { + return int64(22222 + count) + }, nil).Maybe() + s.node.allocator = alloc + err = s.node.Start() s.Require().NoError(err) diff --git a/internal/querycoordv2/balance/mock_balancer.go b/internal/querycoordv2/balance/mock_balancer.go index 8a91249fbb..552345f0fe 100644 --- a/internal/querycoordv2/balance/mock_balancer.go +++ b/internal/querycoordv2/balance/mock_balancer.go @@ -42,8 +42,8 @@ type MockBalancer_AssignChannel_Call struct { } // AssignChannel is a helper method to define mock.On call -// - channels []*meta.DmChannel -// - nodes []int64 +// - channels []*meta.DmChannel +// - nodes []int64 func (_e *MockBalancer_Expecter) AssignChannel(channels interface{}, nodes interface{}) *MockBalancer_AssignChannel_Call { return &MockBalancer_AssignChannel_Call{Call: _e.mock.On("AssignChannel", channels, nodes)} } @@ -82,8 +82,8 @@ type MockBalancer_AssignSegment_Call struct { } // AssignSegment is a helper method to define mock.On call -// - segments []*meta.Segment -// - nodes []int64 +// - segments []*meta.Segment +// - nodes []int64 func (_e *MockBalancer_Expecter) AssignSegment(segments interface{}, nodes interface{}) *MockBalancer_AssignSegment_Call { return &MockBalancer_AssignSegment_Call{Call: _e.mock.On("AssignSegment", segments, nodes)} } diff --git a/internal/querycoordv2/meta/mock_broker.go b/internal/querycoordv2/meta/mock_broker.go index 03f8180b9b..3d5bb2d036 100644 --- a/internal/querycoordv2/meta/mock_broker.go +++ b/internal/querycoordv2/meta/mock_broker.go @@ -56,8 +56,8 @@ type MockBroker_GetCollectionSchema_Call struct { } // GetCollectionSchema is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 +// - ctx context.Context +// - collectionID int64 func (_e *MockBroker_Expecter) GetCollectionSchema(ctx interface{}, collectionID interface{}) *MockBroker_GetCollectionSchema_Call { return &MockBroker_GetCollectionSchema_Call{Call: _e.mock.On("GetCollectionSchema", ctx, collectionID)} } @@ -103,9 +103,9 @@ type MockBroker_GetIndexInfo_Call struct { } // GetIndexInfo is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - segmentID int64 +// - ctx context.Context +// - collectionID int64 +// - segmentID int64 func (_e *MockBroker_Expecter) GetIndexInfo(ctx interface{}, collectionID interface{}, segmentID interface{}) *MockBroker_GetIndexInfo_Call { return &MockBroker_GetIndexInfo_Call{Call: _e.mock.On("GetIndexInfo", ctx, collectionID, segmentID)} } @@ -151,8 +151,8 @@ type MockBroker_GetPartitions_Call struct { } // GetPartitions is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 +// - ctx context.Context +// - collectionID int64 func (_e *MockBroker_Expecter) GetPartitions(ctx interface{}, collectionID interface{}) *MockBroker_GetPartitions_Call { return &MockBroker_GetPartitions_Call{Call: _e.mock.On("GetPartitions", ctx, collectionID)} } @@ -207,9 +207,9 @@ type MockBroker_GetRecoveryInfo_Call struct { } // GetRecoveryInfo is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - partitionID int64 +// - ctx context.Context +// - collectionID int64 +// - partitionID int64 func (_e *MockBroker_Expecter) GetRecoveryInfo(ctx interface{}, collectionID interface{}, partitionID interface{}) *MockBroker_GetRecoveryInfo_Call { return &MockBroker_GetRecoveryInfo_Call{Call: _e.mock.On("GetRecoveryInfo", ctx, collectionID, partitionID)} } @@ -262,8 +262,8 @@ type MockBroker_GetSegmentInfo_Call struct { } // GetSegmentInfo is a helper method to define mock.On call -// - ctx context.Context -// - segmentID ...int64 +// - ctx context.Context +// - segmentID ...int64 func (_e *MockBroker_Expecter) GetSegmentInfo(ctx interface{}, segmentID ...interface{}) *MockBroker_GetSegmentInfo_Call { return &MockBroker_GetSegmentInfo_Call{Call: _e.mock.On("GetSegmentInfo", append([]interface{}{ctx}, segmentID...)...)} diff --git a/internal/querycoordv2/meta/mock_store.go b/internal/querycoordv2/meta/mock_store.go index 31f2ed1c04..a35e8a6c3b 100644 --- a/internal/querycoordv2/meta/mock_store.go +++ b/internal/querycoordv2/meta/mock_store.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package meta diff --git a/internal/querycoordv2/mocks/mock_querynode.go b/internal/querycoordv2/mocks/mock_querynode.go index 1139810331..d37f5ad96f 100644 --- a/internal/querycoordv2/mocks/mock_querynode.go +++ b/internal/querycoordv2/mocks/mock_querynode.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package mocks diff --git a/internal/querycoordv2/session/mock_cluster.go b/internal/querycoordv2/session/mock_cluster.go index b541b188c9..acc7d3223b 100644 --- a/internal/querycoordv2/session/mock_cluster.go +++ b/internal/querycoordv2/session/mock_cluster.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package session diff --git a/internal/querycoordv2/task/mock_scheduler.go b/internal/querycoordv2/task/mock_scheduler.go index 25c694fb76..ae70e7b551 100644 --- a/internal/querycoordv2/task/mock_scheduler.go +++ b/internal/querycoordv2/task/mock_scheduler.go @@ -41,7 +41,7 @@ type MockScheduler_Add_Call struct { } // Add is a helper method to define mock.On call -// - task Task +// - task Task func (_e *MockScheduler_Expecter) Add(task interface{}) *MockScheduler_Add_Call { return &MockScheduler_Add_Call{Call: _e.mock.On("Add", task)} } @@ -69,7 +69,7 @@ type MockScheduler_AddExecutor_Call struct { } // AddExecutor is a helper method to define mock.On call -// - nodeID int64 +// - nodeID int64 func (_e *MockScheduler_Expecter) AddExecutor(nodeID interface{}) *MockScheduler_AddExecutor_Call { return &MockScheduler_AddExecutor_Call{Call: _e.mock.On("AddExecutor", nodeID)} } @@ -97,7 +97,7 @@ type MockScheduler_Dispatch_Call struct { } // Dispatch is a helper method to define mock.On call -// - node int64 +// - node int64 func (_e *MockScheduler_Expecter) Dispatch(node interface{}) *MockScheduler_Dispatch_Call { return &MockScheduler_Dispatch_Call{Call: _e.mock.On("Dispatch", node)} } @@ -134,7 +134,7 @@ type MockScheduler_GetNodeChannelDelta_Call struct { } // GetNodeChannelDelta is a helper method to define mock.On call -// - nodeID int64 +// - nodeID int64 func (_e *MockScheduler_Expecter) GetNodeChannelDelta(nodeID interface{}) *MockScheduler_GetNodeChannelDelta_Call { return &MockScheduler_GetNodeChannelDelta_Call{Call: _e.mock.On("GetNodeChannelDelta", nodeID)} } @@ -171,7 +171,7 @@ type MockScheduler_GetNodeSegmentDelta_Call struct { } // GetNodeSegmentDelta is a helper method to define mock.On call -// - nodeID int64 +// - nodeID int64 func (_e *MockScheduler_Expecter) GetNodeSegmentDelta(nodeID interface{}) *MockScheduler_GetNodeSegmentDelta_Call { return &MockScheduler_GetNodeSegmentDelta_Call{Call: _e.mock.On("GetNodeSegmentDelta", nodeID)} } @@ -199,7 +199,7 @@ type MockScheduler_RemoveByNode_Call struct { } // RemoveByNode is a helper method to define mock.On call -// - node int64 +// - node int64 func (_e *MockScheduler_Expecter) RemoveByNode(node interface{}) *MockScheduler_RemoveByNode_Call { return &MockScheduler_RemoveByNode_Call{Call: _e.mock.On("RemoveByNode", node)} } @@ -227,7 +227,7 @@ type MockScheduler_RemoveExecutor_Call struct { } // RemoveExecutor is a helper method to define mock.On call -// - nodeID int64 +// - nodeID int64 func (_e *MockScheduler_Expecter) RemoveExecutor(nodeID interface{}) *MockScheduler_RemoveExecutor_Call { return &MockScheduler_RemoveExecutor_Call{Call: _e.mock.On("RemoveExecutor", nodeID)} } @@ -255,7 +255,7 @@ type MockScheduler_Start_Call struct { } // Start is a helper method to define mock.On call -// - ctx context.Context +// - ctx context.Context func (_e *MockScheduler_Expecter) Start(ctx interface{}) *MockScheduler_Start_Call { return &MockScheduler_Start_Call{Call: _e.mock.On("Start", ctx)} } diff --git a/internal/querynode/mock_tsafe_replica_test.go b/internal/querynode/mock_tsafe_replica_test.go index a0b22fe84d..2658167a84 100644 --- a/internal/querynode/mock_tsafe_replica_test.go +++ b/internal/querynode/mock_tsafe_replica_test.go @@ -77,7 +77,7 @@ type MockTSafeReplicaInterface_WatchChannel_Call struct { } // WatchChannel is a helper method to define mock.On call -// - channel string +// - channel string func (_e *MockTSafeReplicaInterface_Expecter) WatchChannel(channel interface{}) *MockTSafeReplicaInterface_WatchChannel_Call { return &MockTSafeReplicaInterface_WatchChannel_Call{Call: _e.mock.On("WatchChannel", channel)} } @@ -105,7 +105,7 @@ type MockTSafeReplicaInterface_addTSafe_Call struct { } // addTSafe is a helper method to define mock.On call -// - vChannel string +// - vChannel string func (_e *MockTSafeReplicaInterface_Expecter) addTSafe(vChannel interface{}) *MockTSafeReplicaInterface_addTSafe_Call { return &MockTSafeReplicaInterface_addTSafe_Call{Call: _e.mock.On("addTSafe", vChannel)} } @@ -149,7 +149,7 @@ type MockTSafeReplicaInterface_getTSafe_Call struct { } // getTSafe is a helper method to define mock.On call -// - vChannel string +// - vChannel string func (_e *MockTSafeReplicaInterface_Expecter) getTSafe(vChannel interface{}) *MockTSafeReplicaInterface_getTSafe_Call { return &MockTSafeReplicaInterface_getTSafe_Call{Call: _e.mock.On("getTSafe", vChannel)} } @@ -177,7 +177,7 @@ type MockTSafeReplicaInterface_removeTSafe_Call struct { } // removeTSafe is a helper method to define mock.On call -// - vChannel string +// - vChannel string func (_e *MockTSafeReplicaInterface_Expecter) removeTSafe(vChannel interface{}) *MockTSafeReplicaInterface_removeTSafe_Call { return &MockTSafeReplicaInterface_removeTSafe_Call{Call: _e.mock.On("removeTSafe", vChannel)} } @@ -214,8 +214,8 @@ type MockTSafeReplicaInterface_setTSafe_Call struct { } // setTSafe is a helper method to define mock.On call -// - vChannel string -// - timestamp uint64 +// - vChannel string +// - timestamp uint64 func (_e *MockTSafeReplicaInterface_Expecter) setTSafe(vChannel interface{}, timestamp interface{}) *MockTSafeReplicaInterface_setTSafe_Call { return &MockTSafeReplicaInterface_setTSafe_Call{Call: _e.mock.On("setTSafe", vChannel, timestamp)} } diff --git a/internal/rootcoord/mocks/garbage_collector.go b/internal/rootcoord/mocks/garbage_collector.go index 2f39157e5d..3905b5a032 100644 --- a/internal/rootcoord/mocks/garbage_collector.go +++ b/internal/rootcoord/mocks/garbage_collector.go @@ -49,8 +49,8 @@ type GarbageCollector_GcCollectionData_Call struct { } // GcCollectionData is a helper method to define mock.On call -// - ctx context.Context -// - coll *model.Collection +// - ctx context.Context +// - coll *model.Collection func (_e *GarbageCollector_Expecter) GcCollectionData(ctx interface{}, coll interface{}) *GarbageCollector_GcCollectionData_Call { return &GarbageCollector_GcCollectionData_Call{Call: _e.mock.On("GcCollectionData", ctx, coll)} } @@ -94,9 +94,9 @@ type GarbageCollector_GcPartitionData_Call struct { } // GcPartitionData is a helper method to define mock.On call -// - ctx context.Context -// - pChannels []string -// - partition *model.Partition +// - ctx context.Context +// - pChannels []string +// - partition *model.Partition func (_e *GarbageCollector_Expecter) GcPartitionData(ctx interface{}, pChannels interface{}, partition interface{}) *GarbageCollector_GcPartitionData_Call { return &GarbageCollector_GcPartitionData_Call{Call: _e.mock.On("GcPartitionData", ctx, pChannels, partition)} } @@ -124,8 +124,8 @@ type GarbageCollector_ReDropCollection_Call struct { } // ReDropCollection is a helper method to define mock.On call -// - collMeta *model.Collection -// - ts uint64 +// - collMeta *model.Collection +// - ts uint64 func (_e *GarbageCollector_Expecter) ReDropCollection(collMeta interface{}, ts interface{}) *GarbageCollector_ReDropCollection_Call { return &GarbageCollector_ReDropCollection_Call{Call: _e.mock.On("ReDropCollection", collMeta, ts)} } @@ -153,9 +153,9 @@ type GarbageCollector_ReDropPartition_Call struct { } // ReDropPartition is a helper method to define mock.On call -// - pChannels []string -// - partition *model.Partition -// - ts uint64 +// - pChannels []string +// - partition *model.Partition +// - ts uint64 func (_e *GarbageCollector_Expecter) ReDropPartition(pChannels interface{}, partition interface{}, ts interface{}) *GarbageCollector_ReDropPartition_Call { return &GarbageCollector_ReDropPartition_Call{Call: _e.mock.On("ReDropPartition", pChannels, partition, ts)} } @@ -183,7 +183,7 @@ type GarbageCollector_RemoveCreatingCollection_Call struct { } // RemoveCreatingCollection is a helper method to define mock.On call -// - collMeta *model.Collection +// - collMeta *model.Collection func (_e *GarbageCollector_Expecter) RemoveCreatingCollection(collMeta interface{}) *GarbageCollector_RemoveCreatingCollection_Call { return &GarbageCollector_RemoveCreatingCollection_Call{Call: _e.mock.On("RemoveCreatingCollection", collMeta)} } diff --git a/internal/rootcoord/mocks/meta_table.go b/internal/rootcoord/mocks/meta_table.go index e443e47f6f..92b45933b9 100644 --- a/internal/rootcoord/mocks/meta_table.go +++ b/internal/rootcoord/mocks/meta_table.go @@ -48,8 +48,8 @@ type IMetaTable_AddCollection_Call struct { } // AddCollection is a helper method to define mock.On call -// - ctx context.Context -// - coll *model.Collection +// - ctx context.Context +// - coll *model.Collection func (_e *IMetaTable_Expecter) AddCollection(ctx interface{}, coll interface{}) *IMetaTable_AddCollection_Call { return &IMetaTable_AddCollection_Call{Call: _e.mock.On("AddCollection", ctx, coll)} } @@ -86,7 +86,7 @@ type IMetaTable_AddCredential_Call struct { } // AddCredential is a helper method to define mock.On call -// - credInfo *internalpb.CredentialInfo +// - credInfo *internalpb.CredentialInfo func (_e *IMetaTable_Expecter) AddCredential(credInfo interface{}) *IMetaTable_AddCredential_Call { return &IMetaTable_AddCredential_Call{Call: _e.mock.On("AddCredential", credInfo)} } @@ -123,8 +123,8 @@ type IMetaTable_AddPartition_Call struct { } // AddPartition is a helper method to define mock.On call -// - ctx context.Context -// - partition *model.Partition +// - ctx context.Context +// - partition *model.Partition func (_e *IMetaTable_Expecter) AddPartition(ctx interface{}, partition interface{}) *IMetaTable_AddPartition_Call { return &IMetaTable_AddPartition_Call{Call: _e.mock.On("AddPartition", ctx, partition)} } @@ -161,10 +161,10 @@ type IMetaTable_AlterAlias_Call struct { } // AlterAlias is a helper method to define mock.On call -// - ctx context.Context -// - alias string -// - collectionName string -// - ts uint64 +// - ctx context.Context +// - alias string +// - collectionName string +// - ts uint64 func (_e *IMetaTable_Expecter) AlterAlias(ctx interface{}, alias interface{}, collectionName interface{}, ts interface{}) *IMetaTable_AlterAlias_Call { return &IMetaTable_AlterAlias_Call{Call: _e.mock.On("AlterAlias", ctx, alias, collectionName, ts)} } @@ -201,10 +201,10 @@ type IMetaTable_AlterCollection_Call struct { } // AlterCollection is a helper method to define mock.On call -// - ctx context.Context -// - oldColl *model.Collection -// - newColl *model.Collection -// - ts uint64 +// - ctx context.Context +// - oldColl *model.Collection +// - newColl *model.Collection +// - ts uint64 func (_e *IMetaTable_Expecter) AlterCollection(ctx interface{}, oldColl interface{}, newColl interface{}, ts interface{}) *IMetaTable_AlterCollection_Call { return &IMetaTable_AlterCollection_Call{Call: _e.mock.On("AlterCollection", ctx, oldColl, newColl, ts)} } @@ -241,7 +241,7 @@ type IMetaTable_AlterCredential_Call struct { } // AlterCredential is a helper method to define mock.On call -// - credInfo *internalpb.CredentialInfo +// - credInfo *internalpb.CredentialInfo func (_e *IMetaTable_Expecter) AlterCredential(credInfo interface{}) *IMetaTable_AlterCredential_Call { return &IMetaTable_AlterCredential_Call{Call: _e.mock.On("AlterCredential", credInfo)} } @@ -278,10 +278,10 @@ type IMetaTable_ChangeCollectionState_Call struct { } // ChangeCollectionState is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - state etcdpb.CollectionState -// - ts uint64 +// - ctx context.Context +// - collectionID int64 +// - state etcdpb.CollectionState +// - ts uint64 func (_e *IMetaTable_Expecter) ChangeCollectionState(ctx interface{}, collectionID interface{}, state interface{}, ts interface{}) *IMetaTable_ChangeCollectionState_Call { return &IMetaTable_ChangeCollectionState_Call{Call: _e.mock.On("ChangeCollectionState", ctx, collectionID, state, ts)} } @@ -318,11 +318,11 @@ type IMetaTable_ChangePartitionState_Call struct { } // ChangePartitionState is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - partitionID int64 -// - state etcdpb.PartitionState -// - ts uint64 +// - ctx context.Context +// - collectionID int64 +// - partitionID int64 +// - state etcdpb.PartitionState +// - ts uint64 func (_e *IMetaTable_Expecter) ChangePartitionState(ctx interface{}, collectionID interface{}, partitionID interface{}, state interface{}, ts interface{}) *IMetaTable_ChangePartitionState_Call { return &IMetaTable_ChangePartitionState_Call{Call: _e.mock.On("ChangePartitionState", ctx, collectionID, partitionID, state, ts)} } @@ -359,10 +359,10 @@ type IMetaTable_CreateAlias_Call struct { } // CreateAlias is a helper method to define mock.On call -// - ctx context.Context -// - alias string -// - collectionName string -// - ts uint64 +// - ctx context.Context +// - alias string +// - collectionName string +// - ts uint64 func (_e *IMetaTable_Expecter) CreateAlias(ctx interface{}, alias interface{}, collectionName interface{}, ts interface{}) *IMetaTable_CreateAlias_Call { return &IMetaTable_CreateAlias_Call{Call: _e.mock.On("CreateAlias", ctx, alias, collectionName, ts)} } @@ -399,8 +399,8 @@ type IMetaTable_CreateRole_Call struct { } // CreateRole is a helper method to define mock.On call -// - tenant string -// - entity *milvuspb.RoleEntity +// - tenant string +// - entity *milvuspb.RoleEntity func (_e *IMetaTable_Expecter) CreateRole(tenant interface{}, entity interface{}) *IMetaTable_CreateRole_Call { return &IMetaTable_CreateRole_Call{Call: _e.mock.On("CreateRole", tenant, entity)} } @@ -437,7 +437,7 @@ type IMetaTable_DeleteCredential_Call struct { } // DeleteCredential is a helper method to define mock.On call -// - username string +// - username string func (_e *IMetaTable_Expecter) DeleteCredential(username interface{}) *IMetaTable_DeleteCredential_Call { return &IMetaTable_DeleteCredential_Call{Call: _e.mock.On("DeleteCredential", username)} } @@ -474,9 +474,9 @@ type IMetaTable_DropAlias_Call struct { } // DropAlias is a helper method to define mock.On call -// - ctx context.Context -// - alias string -// - ts uint64 +// - ctx context.Context +// - alias string +// - ts uint64 func (_e *IMetaTable_Expecter) DropAlias(ctx interface{}, alias interface{}, ts interface{}) *IMetaTable_DropAlias_Call { return &IMetaTable_DropAlias_Call{Call: _e.mock.On("DropAlias", ctx, alias, ts)} } @@ -513,8 +513,8 @@ type IMetaTable_DropGrant_Call struct { } // DropGrant is a helper method to define mock.On call -// - tenant string -// - role *milvuspb.RoleEntity +// - tenant string +// - role *milvuspb.RoleEntity func (_e *IMetaTable_Expecter) DropGrant(tenant interface{}, role interface{}) *IMetaTable_DropGrant_Call { return &IMetaTable_DropGrant_Call{Call: _e.mock.On("DropGrant", tenant, role)} } @@ -551,8 +551,8 @@ type IMetaTable_DropRole_Call struct { } // DropRole is a helper method to define mock.On call -// - tenant string -// - roleName string +// - tenant string +// - roleName string func (_e *IMetaTable_Expecter) DropRole(tenant interface{}, roleName interface{}) *IMetaTable_DropRole_Call { return &IMetaTable_DropRole_Call{Call: _e.mock.On("DropRole", tenant, roleName)} } @@ -598,10 +598,10 @@ type IMetaTable_GetCollectionByID_Call struct { } // GetCollectionByID is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - ts uint64 -// - allowUnavailable bool +// - ctx context.Context +// - collectionID int64 +// - ts uint64 +// - allowUnavailable bool func (_e *IMetaTable_Expecter) GetCollectionByID(ctx interface{}, collectionID interface{}, ts interface{}, allowUnavailable interface{}) *IMetaTable_GetCollectionByID_Call { return &IMetaTable_GetCollectionByID_Call{Call: _e.mock.On("GetCollectionByID", ctx, collectionID, ts, allowUnavailable)} } @@ -647,9 +647,9 @@ type IMetaTable_GetCollectionByName_Call struct { } // GetCollectionByName is a helper method to define mock.On call -// - ctx context.Context -// - collectionName string -// - ts uint64 +// - ctx context.Context +// - collectionName string +// - ts uint64 func (_e *IMetaTable_Expecter) GetCollectionByName(ctx interface{}, collectionName interface{}, ts interface{}) *IMetaTable_GetCollectionByName_Call { return &IMetaTable_GetCollectionByName_Call{Call: _e.mock.On("GetCollectionByName", ctx, collectionName, ts)} } @@ -688,7 +688,7 @@ type IMetaTable_GetCollectionVirtualChannels_Call struct { } // GetCollectionVirtualChannels is a helper method to define mock.On call -// - colID int64 +// - colID int64 func (_e *IMetaTable_Expecter) GetCollectionVirtualChannels(colID interface{}) *IMetaTable_GetCollectionVirtualChannels_Call { return &IMetaTable_GetCollectionVirtualChannels_Call{Call: _e.mock.On("GetCollectionVirtualChannels", colID)} } @@ -734,7 +734,7 @@ type IMetaTable_GetCredential_Call struct { } // GetCredential is a helper method to define mock.On call -// - username string +// - username string func (_e *IMetaTable_Expecter) GetCredential(username interface{}) *IMetaTable_GetCredential_Call { return &IMetaTable_GetCredential_Call{Call: _e.mock.On("GetCredential", username)} } @@ -778,9 +778,9 @@ type IMetaTable_GetPartitionByName_Call struct { } // GetPartitionByName is a helper method to define mock.On call -// - collID int64 -// - partitionName string -// - ts uint64 +// - collID int64 +// - partitionName string +// - ts uint64 func (_e *IMetaTable_Expecter) GetPartitionByName(collID interface{}, partitionName interface{}, ts interface{}) *IMetaTable_GetPartitionByName_Call { return &IMetaTable_GetPartitionByName_Call{Call: _e.mock.On("GetPartitionByName", collID, partitionName, ts)} } @@ -824,9 +824,9 @@ type IMetaTable_GetPartitionNameByID_Call struct { } // GetPartitionNameByID is a helper method to define mock.On call -// - collID int64 -// - partitionID int64 -// - ts uint64 +// - collID int64 +// - partitionID int64 +// - ts uint64 func (_e *IMetaTable_Expecter) GetPartitionNameByID(collID interface{}, partitionID interface{}, ts interface{}) *IMetaTable_GetPartitionNameByID_Call { return &IMetaTable_GetPartitionNameByID_Call{Call: _e.mock.On("GetPartitionNameByID", collID, partitionID, ts)} } @@ -863,7 +863,7 @@ type IMetaTable_IsAlias_Call struct { } // IsAlias is a helper method to define mock.On call -// - name string +// - name string func (_e *IMetaTable_Expecter) IsAlias(name interface{}) *IMetaTable_IsAlias_Call { return &IMetaTable_IsAlias_Call{Call: _e.mock.On("IsAlias", name)} } @@ -909,8 +909,8 @@ type IMetaTable_ListAbnormalCollections_Call struct { } // ListAbnormalCollections is a helper method to define mock.On call -// - ctx context.Context -// - ts uint64 +// - ctx context.Context +// - ts uint64 func (_e *IMetaTable_Expecter) ListAbnormalCollections(ctx interface{}, ts interface{}) *IMetaTable_ListAbnormalCollections_Call { return &IMetaTable_ListAbnormalCollections_Call{Call: _e.mock.On("ListAbnormalCollections", ctx, ts)} } @@ -949,7 +949,7 @@ type IMetaTable_ListAliasesByID_Call struct { } // ListAliasesByID is a helper method to define mock.On call -// - collID int64 +// - collID int64 func (_e *IMetaTable_Expecter) ListAliasesByID(collID interface{}) *IMetaTable_ListAliasesByID_Call { return &IMetaTable_ListAliasesByID_Call{Call: _e.mock.On("ListAliasesByID", collID)} } @@ -1033,8 +1033,8 @@ type IMetaTable_ListCollections_Call struct { } // ListCollections is a helper method to define mock.On call -// - ctx context.Context -// - ts uint64 +// - ctx context.Context +// - ts uint64 func (_e *IMetaTable_Expecter) ListCollections(ctx interface{}, ts interface{}) *IMetaTable_ListCollections_Call { return &IMetaTable_ListCollections_Call{Call: _e.mock.On("ListCollections", ctx, ts)} } @@ -1125,7 +1125,7 @@ type IMetaTable_ListPolicy_Call struct { } // ListPolicy is a helper method to define mock.On call -// - tenant string +// - tenant string func (_e *IMetaTable_Expecter) ListPolicy(tenant interface{}) *IMetaTable_ListPolicy_Call { return &IMetaTable_ListPolicy_Call{Call: _e.mock.On("ListPolicy", tenant)} } @@ -1171,7 +1171,7 @@ type IMetaTable_ListUserRole_Call struct { } // ListUserRole is a helper method to define mock.On call -// - tenant string +// - tenant string func (_e *IMetaTable_Expecter) ListUserRole(tenant interface{}) *IMetaTable_ListUserRole_Call { return &IMetaTable_ListUserRole_Call{Call: _e.mock.On("ListUserRole", tenant)} } @@ -1208,9 +1208,9 @@ type IMetaTable_OperatePrivilege_Call struct { } // OperatePrivilege is a helper method to define mock.On call -// - tenant string -// - entity *milvuspb.GrantEntity -// - operateType milvuspb.OperatePrivilegeType +// - tenant string +// - entity *milvuspb.GrantEntity +// - operateType milvuspb.OperatePrivilegeType func (_e *IMetaTable_Expecter) OperatePrivilege(tenant interface{}, entity interface{}, operateType interface{}) *IMetaTable_OperatePrivilege_Call { return &IMetaTable_OperatePrivilege_Call{Call: _e.mock.On("OperatePrivilege", tenant, entity, operateType)} } @@ -1247,10 +1247,10 @@ type IMetaTable_OperateUserRole_Call struct { } // OperateUserRole is a helper method to define mock.On call -// - tenant string -// - userEntity *milvuspb.UserEntity -// - roleEntity *milvuspb.RoleEntity -// - operateType milvuspb.OperateUserRoleType +// - tenant string +// - userEntity *milvuspb.UserEntity +// - roleEntity *milvuspb.RoleEntity +// - operateType milvuspb.OperateUserRoleType func (_e *IMetaTable_Expecter) OperateUserRole(tenant interface{}, userEntity interface{}, roleEntity interface{}, operateType interface{}) *IMetaTable_OperateUserRole_Call { return &IMetaTable_OperateUserRole_Call{Call: _e.mock.On("OperateUserRole", tenant, userEntity, roleEntity, operateType)} } @@ -1287,9 +1287,9 @@ type IMetaTable_RemoveCollection_Call struct { } // RemoveCollection is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - ts uint64 +// - ctx context.Context +// - collectionID int64 +// - ts uint64 func (_e *IMetaTable_Expecter) RemoveCollection(ctx interface{}, collectionID interface{}, ts interface{}) *IMetaTable_RemoveCollection_Call { return &IMetaTable_RemoveCollection_Call{Call: _e.mock.On("RemoveCollection", ctx, collectionID, ts)} } @@ -1326,10 +1326,10 @@ type IMetaTable_RemovePartition_Call struct { } // RemovePartition is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - partitionID int64 -// - ts uint64 +// - ctx context.Context +// - collectionID int64 +// - partitionID int64 +// - ts uint64 func (_e *IMetaTable_Expecter) RemovePartition(ctx interface{}, collectionID interface{}, partitionID interface{}, ts interface{}) *IMetaTable_RemovePartition_Call { return &IMetaTable_RemovePartition_Call{Call: _e.mock.On("RemovePartition", ctx, collectionID, partitionID, ts)} } @@ -1366,10 +1366,10 @@ type IMetaTable_RenameCollection_Call struct { } // RenameCollection is a helper method to define mock.On call -// - ctx context.Context -// - oldName string -// - newName string -// - ts uint64 +// - ctx context.Context +// - oldName string +// - newName string +// - ts uint64 func (_e *IMetaTable_Expecter) RenameCollection(ctx interface{}, oldName interface{}, newName interface{}, ts interface{}) *IMetaTable_RenameCollection_Call { return &IMetaTable_RenameCollection_Call{Call: _e.mock.On("RenameCollection", ctx, oldName, newName, ts)} } @@ -1415,8 +1415,8 @@ type IMetaTable_SelectGrant_Call struct { } // SelectGrant is a helper method to define mock.On call -// - tenant string -// - entity *milvuspb.GrantEntity +// - tenant string +// - entity *milvuspb.GrantEntity func (_e *IMetaTable_Expecter) SelectGrant(tenant interface{}, entity interface{}) *IMetaTable_SelectGrant_Call { return &IMetaTable_SelectGrant_Call{Call: _e.mock.On("SelectGrant", tenant, entity)} } @@ -1462,9 +1462,9 @@ type IMetaTable_SelectRole_Call struct { } // SelectRole is a helper method to define mock.On call -// - tenant string -// - entity *milvuspb.RoleEntity -// - includeUserInfo bool +// - tenant string +// - entity *milvuspb.RoleEntity +// - includeUserInfo bool func (_e *IMetaTable_Expecter) SelectRole(tenant interface{}, entity interface{}, includeUserInfo interface{}) *IMetaTable_SelectRole_Call { return &IMetaTable_SelectRole_Call{Call: _e.mock.On("SelectRole", tenant, entity, includeUserInfo)} } @@ -1510,9 +1510,9 @@ type IMetaTable_SelectUser_Call struct { } // SelectUser is a helper method to define mock.On call -// - tenant string -// - entity *milvuspb.UserEntity -// - includeRoleInfo bool +// - tenant string +// - entity *milvuspb.UserEntity +// - includeRoleInfo bool func (_e *IMetaTable_Expecter) SelectUser(tenant interface{}, entity interface{}, includeRoleInfo interface{}) *IMetaTable_SelectUser_Call { return &IMetaTable_SelectUser_Call{Call: _e.mock.On("SelectUser", tenant, entity, includeRoleInfo)} } diff --git a/internal/types/mock_querycoord.go b/internal/types/mock_querycoord.go index 8d062958ab..d2cc34095a 100644 --- a/internal/types/mock_querycoord.go +++ b/internal/types/mock_querycoord.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package types diff --git a/internal/types/mock_querynode.go b/internal/types/mock_querynode.go index 937fd52365..144446db02 100644 --- a/internal/types/mock_querynode.go +++ b/internal/types/mock_querynode.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package types