From c992a61a23cdd49506390ab1baf9be9cb0b30f0b Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 22 Aug 2024 10:06:56 +0800 Subject: [PATCH] enhance: Separate allocator pkg in datacoord (#35622) Related to #28861 Move allocator interface and implementation into separate package. Also update some unittest logic. Signed-off-by: Congqi Xia --- .../datacoord/{ => allocator}/allocator.go | 44 ++-- .../datacoord/allocator/allocator_test.go | 136 ++++++++++++ .../datacoord/allocator/mock_allocator.go | 199 ++++++++++++++++++ internal/datacoord/allocator_test.go | 55 ----- internal/datacoord/channel_manager.go | 7 +- internal/datacoord/channel_manager_test.go | 11 +- internal/datacoord/compaction.go | 5 +- .../datacoord/compaction_policy_clustering.go | 7 +- .../compaction_policy_clustering_test.go | 6 +- .../datacoord/compaction_policy_l0_test.go | 3 +- .../datacoord/compaction_policy_single.go | 7 +- .../compaction_policy_single_test.go | 8 +- .../datacoord/compaction_task_clustering.go | 7 +- .../compaction_task_clustering_test.go | 9 +- internal/datacoord/compaction_task_l0.go | 5 +- internal/datacoord/compaction_task_l0_test.go | 27 +-- internal/datacoord/compaction_task_mix.go | 5 +- .../datacoord/compaction_task_mix_test.go | 9 +- internal/datacoord/compaction_test.go | 5 +- internal/datacoord/compaction_trigger.go | 11 +- internal/datacoord/compaction_trigger_test.go | 97 +++++---- internal/datacoord/compaction_trigger_v2.go | 13 +- .../datacoord/compaction_trigger_v2_test.go | 13 +- internal/datacoord/import_checker.go | 5 +- internal/datacoord/import_checker_test.go | 22 +- internal/datacoord/import_scheduler.go | 5 +- internal/datacoord/import_scheduler_test.go | 9 +- internal/datacoord/import_util.go | 15 +- internal/datacoord/import_util_test.go | 15 +- internal/datacoord/index_service.go | 4 +- internal/datacoord/index_service_test.go | 48 +++-- internal/datacoord/meta_test.go | 30 ++- internal/datacoord/mock_allocator_test.go | 199 ------------------ internal/datacoord/mock_test.go | 96 +++------ internal/datacoord/segment_manager.go | 17 +- internal/datacoord/segment_manager_test.go | 115 +++++----- internal/datacoord/server.go | 5 +- internal/datacoord/server_test.go | 10 +- internal/datacoord/services.go | 4 +- internal/datacoord/services_test.go | 9 +- internal/datacoord/util_test.go | 6 +- 41 files changed, 706 insertions(+), 597 deletions(-) rename internal/datacoord/{ => allocator}/allocator.go (60%) create mode 100644 internal/datacoord/allocator/allocator_test.go create mode 100644 internal/datacoord/allocator/mock_allocator.go delete mode 100644 internal/datacoord/allocator_test.go delete mode 100644 internal/datacoord/mock_allocator_test.go diff --git a/internal/datacoord/allocator.go b/internal/datacoord/allocator/allocator.go similarity index 60% rename from internal/datacoord/allocator.go rename to internal/datacoord/allocator/allocator.go index 57f22bea3c..d2c8cbe4bf 100644 --- a/internal/datacoord/allocator.go +++ b/internal/datacoord/allocator/allocator.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package datacoord +package allocator import ( "context" @@ -24,50 +24,52 @@ import ( "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) -// allocator is the interface that allocating `UniqueID` or `Timestamp` -type allocator interface { - allocTimestamp(context.Context) (Timestamp, error) - allocID(context.Context) (UniqueID, error) - allocN(n int64) (UniqueID, UniqueID, error) +// Allocator is the interface that allocating `UniqueID` or `Timestamp` +type Allocator interface { + AllocTimestamp(context.Context) (typeutil.Timestamp, error) + AllocID(context.Context) (typeutil.UniqueID, error) + AllocN(n int64) (typeutil.UniqueID, typeutil.UniqueID, error) } // make sure rootCoordAllocator implements allocator interface -var _ allocator = (*rootCoordAllocator)(nil) +var _ Allocator = (*rootCoordAllocator)(nil) // rootCoordAllocator use RootCoord as allocator type rootCoordAllocator struct { types.RootCoordClient } -// newRootCoordAllocator gets an allocator from RootCoord -func newRootCoordAllocator(rootCoordClient types.RootCoordClient) allocator { +// NewRootCoordAllocator gets an allocator from RootCoord +func NewRootCoordAllocator(rootCoordClient types.RootCoordClient) Allocator { return &rootCoordAllocator{ RootCoordClient: rootCoordClient, } } -// allocTimestamp allocates a Timestamp +// AllocTimestamp allocates a Timestamp // invoking RootCoord `AllocTimestamp` -func (alloc *rootCoordAllocator) allocTimestamp(ctx context.Context) (Timestamp, error) { - resp, err := alloc.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{ +func (alloc *rootCoordAllocator) AllocTimestamp(ctx context.Context) (typeutil.Timestamp, error) { + resp, err := alloc.RootCoordClient.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), Count: 1, }) - if err = VerifyResponse(resp, err); err != nil { + if err = merr.CheckRPCCall(resp, err); err != nil { return 0, err } return resp.Timestamp, nil } -// allocID allocates an `UniqueID` from RootCoord, invoking AllocID grpc -func (alloc *rootCoordAllocator) allocID(ctx context.Context) (UniqueID, error) { - resp, err := alloc.AllocID(ctx, &rootcoordpb.AllocIDRequest{ +// AllocID allocates an `UniqueID` from RootCoord, invoking AllocID grpc +func (alloc *rootCoordAllocator) AllocID(ctx context.Context) (typeutil.UniqueID, error) { + resp, err := alloc.RootCoordClient.AllocID(ctx, &rootcoordpb.AllocIDRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_RequestID), commonpbutil.WithSourceID(paramtable.GetNodeID()), @@ -75,21 +77,21 @@ func (alloc *rootCoordAllocator) allocID(ctx context.Context) (UniqueID, error) Count: 1, }) - if err = VerifyResponse(resp, err); err != nil { + if err = merr.CheckRPCCall(resp, err); err != nil { return 0, err } return resp.ID, nil } -// allocID allocates an `UniqueID` from RootCoord, invoking AllocID grpc -func (alloc *rootCoordAllocator) allocN(n int64) (UniqueID, UniqueID, error) { +// AllocID allocates an `UniqueID` from RootCoord, invoking AllocID grpc +func (alloc *rootCoordAllocator) AllocN(n int64) (typeutil.UniqueID, typeutil.UniqueID, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if n <= 0 { n = 1 } - resp, err := alloc.AllocID(ctx, &rootcoordpb.AllocIDRequest{ + resp, err := alloc.RootCoordClient.AllocID(ctx, &rootcoordpb.AllocIDRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_RequestID), commonpbutil.WithSourceID(paramtable.GetNodeID()), @@ -97,7 +99,7 @@ func (alloc *rootCoordAllocator) allocN(n int64) (UniqueID, UniqueID, error) { Count: uint32(n), }) - if err = VerifyResponse(resp, err); err != nil { + if err = merr.CheckRPCCall(resp, err); err != nil { return 0, 0, err } start, count := resp.GetID(), resp.GetCount() diff --git a/internal/datacoord/allocator/allocator_test.go b/internal/datacoord/allocator/allocator_test.go new file mode 100644 index 0000000000..2de4e742f4 --- /dev/null +++ b/internal/datacoord/allocator/allocator_test.go @@ -0,0 +1,136 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package allocator + +import ( + "context" + "math/rand" + "testing" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "google.golang.org/grpc" + + "github.com/milvus-io/milvus/internal/mocks" + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +type RootCoordAllocatorSuite struct { + suite.Suite + ms *mocks.MockRootCoordClient + allocator Allocator +} + +func (s *RootCoordAllocatorSuite) SetupTest() { + s.ms = mocks.NewMockRootCoordClient(s.T()) + s.allocator = NewRootCoordAllocator(s.ms) +} + +func (s *RootCoordAllocatorSuite) TestAllocTimestamp() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s.Run("normal", func() { + ts := rand.Uint64() + s.ms.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, atr *rootcoordpb.AllocTimestampRequest, co ...grpc.CallOption) (*rootcoordpb.AllocTimestampResponse, error) { + s.EqualValues(1, atr.GetCount()) + return &rootcoordpb.AllocTimestampResponse{ + Status: merr.Success(), + Timestamp: ts, + }, nil + }).Once() + result, err := s.allocator.AllocTimestamp(ctx) + s.NoError(err) + s.EqualValues(ts, result) + }) + + s.Run("error", func() { + s.ms.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(nil, errors.New("mock")).Once() + _, err := s.allocator.AllocTimestamp(ctx) + s.Error(err) + }) +} + +func (s *RootCoordAllocatorSuite) TestAllocID() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s.Run("normal", func() { + id := rand.Int63n(1000000) + s.ms.EXPECT().AllocID(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, ai *rootcoordpb.AllocIDRequest, co ...grpc.CallOption) (*rootcoordpb.AllocIDResponse, error) { + s.EqualValues(1, ai.GetCount()) + return &rootcoordpb.AllocIDResponse{ + Status: merr.Success(), + ID: id, + }, nil + }).Once() + result, err := s.allocator.AllocID(ctx) + s.NoError(err) + s.EqualValues(id, result) + }) + + s.Run("error", func() { + s.ms.EXPECT().AllocID(mock.Anything, mock.Anything).Return(nil, errors.New("mock")).Once() + _, err := s.allocator.AllocID(ctx) + s.Error(err) + }) +} + +func (s *RootCoordAllocatorSuite) TestAllocN() { + s.Run("normal", func() { + n := rand.Int63n(100) + 1 + id := rand.Int63n(1000000) + s.ms.EXPECT().AllocID(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, ai *rootcoordpb.AllocIDRequest, co ...grpc.CallOption) (*rootcoordpb.AllocIDResponse, error) { + s.EqualValues(n, ai.GetCount()) + return &rootcoordpb.AllocIDResponse{ + Status: merr.Success(), + ID: id, + Count: uint32(n), + }, nil + }).Once() + start, end, err := s.allocator.AllocN(n) + s.NoError(err) + s.EqualValues(id, start) + s.EqualValues(id+n, end) + }) + + s.Run("zero_n", func() { + id := rand.Int63n(1000000) + s.ms.EXPECT().AllocID(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, ai *rootcoordpb.AllocIDRequest, co ...grpc.CallOption) (*rootcoordpb.AllocIDResponse, error) { + s.EqualValues(1, ai.GetCount()) + return &rootcoordpb.AllocIDResponse{ + Status: merr.Success(), + ID: id, + Count: uint32(1), + }, nil + }).Once() + start, end, err := s.allocator.AllocN(0) + s.NoError(err) + s.EqualValues(id, start) + s.EqualValues(id+1, end) + }) + + s.Run("error", func() { + s.ms.EXPECT().AllocID(mock.Anything, mock.Anything).Return(nil, errors.New("mock")).Once() + _, _, err := s.allocator.AllocN(10) + s.Error(err) + }) +} + +func TestRootCoordAllocator(t *testing.T) { + suite.Run(t, new(RootCoordAllocatorSuite)) +} diff --git a/internal/datacoord/allocator/mock_allocator.go b/internal/datacoord/allocator/mock_allocator.go new file mode 100644 index 0000000000..0f5fd4bf61 --- /dev/null +++ b/internal/datacoord/allocator/mock_allocator.go @@ -0,0 +1,199 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package allocator + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// MockAllocator is an autogenerated mock type for the Allocator type +type MockAllocator struct { + mock.Mock +} + +type MockAllocator_Expecter struct { + mock *mock.Mock +} + +func (_m *MockAllocator) EXPECT() *MockAllocator_Expecter { + return &MockAllocator_Expecter{mock: &_m.Mock} +} + +// AllocID provides a mock function with given fields: _a0 +func (_m *MockAllocator) AllocID(_a0 context.Context) (int64, error) { + ret := _m.Called(_a0) + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (int64, error)); ok { + return rf(_a0) + } + if rf, ok := ret.Get(0).(func(context.Context) int64); ok { + r0 = rf(_a0) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockAllocator_AllocID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocID' +type MockAllocator_AllocID_Call struct { + *mock.Call +} + +// AllocID is a helper method to define mock.On call +// - _a0 context.Context +func (_e *MockAllocator_Expecter) AllocID(_a0 interface{}) *MockAllocator_AllocID_Call { + return &MockAllocator_AllocID_Call{Call: _e.mock.On("AllocID", _a0)} +} + +func (_c *MockAllocator_AllocID_Call) Run(run func(_a0 context.Context)) *MockAllocator_AllocID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockAllocator_AllocID_Call) Return(_a0 int64, _a1 error) *MockAllocator_AllocID_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockAllocator_AllocID_Call) RunAndReturn(run func(context.Context) (int64, error)) *MockAllocator_AllocID_Call { + _c.Call.Return(run) + return _c +} + +// AllocN provides a mock function with given fields: n +func (_m *MockAllocator) AllocN(n int64) (int64, int64, error) { + ret := _m.Called(n) + + var r0 int64 + var r1 int64 + var r2 error + if rf, ok := ret.Get(0).(func(int64) (int64, int64, error)); ok { + return rf(n) + } + if rf, ok := ret.Get(0).(func(int64) int64); ok { + r0 = rf(n) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(int64) int64); ok { + r1 = rf(n) + } else { + r1 = ret.Get(1).(int64) + } + + if rf, ok := ret.Get(2).(func(int64) error); ok { + r2 = rf(n) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// MockAllocator_AllocN_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocN' +type MockAllocator_AllocN_Call struct { + *mock.Call +} + +// AllocN is a helper method to define mock.On call +// - n int64 +func (_e *MockAllocator_Expecter) AllocN(n interface{}) *MockAllocator_AllocN_Call { + return &MockAllocator_AllocN_Call{Call: _e.mock.On("AllocN", n)} +} + +func (_c *MockAllocator_AllocN_Call) Run(run func(n int64)) *MockAllocator_AllocN_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockAllocator_AllocN_Call) Return(_a0 int64, _a1 int64, _a2 error) *MockAllocator_AllocN_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *MockAllocator_AllocN_Call) RunAndReturn(run func(int64) (int64, int64, error)) *MockAllocator_AllocN_Call { + _c.Call.Return(run) + return _c +} + +// AllocTimestamp provides a mock function with given fields: _a0 +func (_m *MockAllocator) AllocTimestamp(_a0 context.Context) (uint64, error) { + ret := _m.Called(_a0) + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { + return rf(_a0) + } + if rf, ok := ret.Get(0).(func(context.Context) uint64); ok { + r0 = rf(_a0) + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockAllocator_AllocTimestamp_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocTimestamp' +type MockAllocator_AllocTimestamp_Call struct { + *mock.Call +} + +// AllocTimestamp is a helper method to define mock.On call +// - _a0 context.Context +func (_e *MockAllocator_Expecter) AllocTimestamp(_a0 interface{}) *MockAllocator_AllocTimestamp_Call { + return &MockAllocator_AllocTimestamp_Call{Call: _e.mock.On("AllocTimestamp", _a0)} +} + +func (_c *MockAllocator_AllocTimestamp_Call) Run(run func(_a0 context.Context)) *MockAllocator_AllocTimestamp_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockAllocator_AllocTimestamp_Call) Return(_a0 uint64, _a1 error) *MockAllocator_AllocTimestamp_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockAllocator_AllocTimestamp_Call) RunAndReturn(run func(context.Context) (uint64, error)) *MockAllocator_AllocTimestamp_Call { + _c.Call.Return(run) + return _c +} + +// NewMockAllocator creates a new instance of MockAllocator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockAllocator(t interface { + mock.TestingT + Cleanup(func()) +}) *MockAllocator { + mock := &MockAllocator{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datacoord/allocator_test.go b/internal/datacoord/allocator_test.go deleted file mode 100644 index 923c244928..0000000000 --- a/internal/datacoord/allocator_test.go +++ /dev/null @@ -1,55 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package datacoord - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/milvus-io/milvus/pkg/util/paramtable" -) - -func TestAllocator_Basic(t *testing.T) { - paramtable.Init() - ms := newMockRootCoordClient() - allocator := newRootCoordAllocator(ms) - ctx := context.Background() - - t.Run("Test allocTimestamp", func(t *testing.T) { - _, err := allocator.allocTimestamp(ctx) - assert.NoError(t, err) - }) - - t.Run("Test allocID", func(t *testing.T) { - _, err := allocator.allocID(ctx) - assert.NoError(t, err) - }) - - t.Run("Test Unhealthy Root", func(t *testing.T) { - ms := newMockRootCoordClient() - allocator := newRootCoordAllocator(ms) - err := ms.Stop() - assert.NoError(t, err) - - _, err = allocator.allocTimestamp(ctx) - assert.Error(t, err) - _, err = allocator.allocID(ctx) - assert.Error(t, err) - }) -} diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 239044bbf2..c8b3a03e07 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -26,6 +26,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/log" @@ -67,7 +68,7 @@ type ChannelManagerImpl struct { h Handler store RWChannelStore subCluster SubCluster // sessionManager - allocator allocator + allocator allocator.Allocator factory ChannelPolicyFactory balancePolicy BalanceChannelPolicy @@ -98,7 +99,7 @@ func NewChannelManager( kv kv.TxnKV, h Handler, subCluster SubCluster, // sessionManager - alloc allocator, + alloc allocator.Allocator, options ...ChannelmanagerOpt, ) (*ChannelManagerImpl, error) { m := &ChannelManagerImpl{ @@ -694,7 +695,7 @@ func (m *ChannelManagerImpl) fillChannelWatchInfo(op *ChannelOp) error { startTs := time.Now().Unix() for _, ch := range op.Channels { vcInfo := m.h.GetDataVChanPositions(ch, allPartitionID) - opID, err := m.allocator.allocID(context.Background()) + opID, err := m.allocator.AllocID(context.Background()) if err != nil { return err } diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 2902309437..97f874466b 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -28,6 +28,7 @@ import ( "go.uber.org/zap" "google.golang.org/protobuf/proto" + "github.com/milvus-io/milvus/internal/datacoord/allocator" kvmock "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/kv/predicates" @@ -45,7 +46,7 @@ type ChannelManagerSuite struct { mockKv *kvmock.MetaKv mockCluster *MockSubCluster - mockAlloc *NMockAllocator + mockAlloc *allocator.MockAllocator mockHandler *NMockHandler } @@ -94,7 +95,7 @@ func (s *ChannelManagerSuite) checkNoAssignment(m *ChannelManagerImpl, nodeID in func (s *ChannelManagerSuite) SetupTest() { s.mockKv = kvmock.NewMetaKv(s.T()) s.mockCluster = NewMockSubCluster(s.T()) - s.mockAlloc = NewNMockAllocator(s.T()) + s.mockAlloc = allocator.NewMockAllocator(s.T()) s.mockHandler = NewNMockHandler(s.T()) s.mockHandler.EXPECT().GetDataVChanPositions(mock.Anything, mock.Anything). RunAndReturn(func(ch RWChannel, partitionID UniqueID) *datapb.VchannelInfo { @@ -103,7 +104,7 @@ func (s *ChannelManagerSuite) SetupTest() { ChannelName: ch.GetName(), } }).Maybe() - s.mockAlloc.EXPECT().allocID(mock.Anything).Return(19530, nil).Maybe() + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(19530, nil).Maybe() s.mockKv.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).RunAndReturn( func(save map[string]string, removals []string, preds ...predicates.Predicate) error { log.Info("test save and remove", zap.Any("save", save), zap.Any("removals", removals)) @@ -715,8 +716,8 @@ func (s *ChannelManagerSuite) TestStartupRootCoordFailed() { } s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch) - s.mockAlloc = NewNMockAllocator(s.T()) - s.mockAlloc.EXPECT().allocID(mock.Anything).Return(0, errors.New("mock rootcoord failure")) + s.mockAlloc = allocator.NewMockAllocator(s.T()) + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(0, errors.New("mock rootcoord failure")) m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc) s.Require().NoError(err) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 2350387fc5..05245e436d 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -79,7 +80,7 @@ type compactionPlanHandler struct { executingTasks map[int64]CompactionTask // planID -> task meta CompactionMeta - allocator allocator + allocator allocator.Allocator chManager ChannelManager sessions SessionManager cluster Cluster @@ -176,7 +177,7 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64) return cnt } -func newCompactionPlanHandler(cluster Cluster, sessions SessionManager, cm ChannelManager, meta CompactionMeta, allocator allocator, analyzeScheduler *taskScheduler, handler Handler, +func newCompactionPlanHandler(cluster Cluster, sessions SessionManager, cm ChannelManager, meta CompactionMeta, allocator allocator.Allocator, analyzeScheduler *taskScheduler, handler Handler, ) *compactionPlanHandler { return &compactionPlanHandler{ queueTasks: make(map[int64]CompactionTask), diff --git a/internal/datacoord/compaction_policy_clustering.go b/internal/datacoord/compaction_policy_clustering.go index f61b58cee2..093e0caa18 100644 --- a/internal/datacoord/compaction_policy_clustering.go +++ b/internal/datacoord/compaction_policy_clustering.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/util/clustering" "github.com/milvus-io/milvus/pkg/log" @@ -34,11 +35,11 @@ import ( type clusteringCompactionPolicy struct { meta *meta - allocator allocator + allocator allocator.Allocator handler Handler } -func newClusteringCompactionPolicy(meta *meta, allocator allocator, handler Handler) *clusteringCompactionPolicy { +func newClusteringCompactionPolicy(meta *meta, allocator allocator.Allocator, handler Handler) *clusteringCompactionPolicy { return &clusteringCompactionPolicy{meta: meta, allocator: allocator, handler: handler} } @@ -113,7 +114,7 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte return nil, triggerID, nil } - newTriggerID, err := policy.allocator.allocID(ctx) + newTriggerID, err := policy.allocator.AllocID(ctx) if err != nil { log.Warn("fail to allocate triggerID", zap.Error(err)) return nil, 0, err diff --git a/internal/datacoord/compaction_policy_clustering_test.go b/internal/datacoord/compaction_policy_clustering_test.go index b8a25e710e..5e8ce0dab5 100644 --- a/internal/datacoord/compaction_policy_clustering_test.go +++ b/internal/datacoord/compaction_policy_clustering_test.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" @@ -40,7 +41,7 @@ func TestClusteringCompactionPolicySuite(t *testing.T) { type ClusteringCompactionPolicySuite struct { suite.Suite - mockAlloc *NMockAllocator + mockAlloc *allocator.MockAllocator mockTriggerManager *MockTriggerManager handler *NMockHandler mockPlanContext *MockCompactionPlanContext @@ -73,7 +74,8 @@ func (s *ClusteringCompactionPolicySuite) SetupTest() { } s.meta = meta - mockAllocator := newMockAllocator() + mockAllocator := allocator.NewMockAllocator(s.T()) + mockAllocator.EXPECT().AllocID(mock.Anything).Return(19530, nil).Maybe() mockHandler := NewNMockHandler(s.T()) s.handler = mockHandler s.clusteringCompactionPolicy = newClusteringCompactionPolicy(s.meta, mockAllocator, mockHandler) diff --git a/internal/datacoord/compaction_policy_l0_test.go b/internal/datacoord/compaction_policy_l0_test.go index cf8e0c20ae..3760442bd1 100644 --- a/internal/datacoord/compaction_policy_l0_test.go +++ b/internal/datacoord/compaction_policy_l0_test.go @@ -23,6 +23,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" ) @@ -34,7 +35,7 @@ func TestL0CompactionPolicySuite(t *testing.T) { type L0CompactionPolicySuite struct { suite.Suite - mockAlloc *NMockAllocator + mockAlloc *allocator.MockAllocator mockTriggerManager *MockTriggerManager testLabel *CompactionGroupLabel handler Handler diff --git a/internal/datacoord/compaction_policy_single.go b/internal/datacoord/compaction_policy_single.go index 0340bbb110..68f52853b9 100644 --- a/internal/datacoord/compaction_policy_single.go +++ b/internal/datacoord/compaction_policy_single.go @@ -24,6 +24,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" ) @@ -33,11 +34,11 @@ import ( // todo: move l1 single compaction here type singleCompactionPolicy struct { meta *meta - allocator allocator + allocator allocator.Allocator handler Handler } -func newSingleCompactionPolicy(meta *meta, allocator allocator, handler Handler) *singleCompactionPolicy { +func newSingleCompactionPolicy(meta *meta, allocator allocator.Allocator, handler Handler) *singleCompactionPolicy { return &singleCompactionPolicy{meta: meta, allocator: allocator, handler: handler} } @@ -81,7 +82,7 @@ func (policy *singleCompactionPolicy) triggerOneCollection(ctx context.Context, return nil, 0, nil } - newTriggerID, err := policy.allocator.allocID(ctx) + newTriggerID, err := policy.allocator.AllocID(ctx) if err != nil { log.Warn("fail to allocate triggerID", zap.Error(err)) return nil, 0, err diff --git a/internal/datacoord/compaction_policy_single_test.go b/internal/datacoord/compaction_policy_single_test.go index 7f6b998264..d25fa62294 100644 --- a/internal/datacoord/compaction_policy_single_test.go +++ b/internal/datacoord/compaction_policy_single_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -35,7 +36,7 @@ func TestSingleCompactionPolicySuite(t *testing.T) { type SingleCompactionPolicySuite struct { suite.Suite - mockAlloc *NMockAllocator + mockAlloc *allocator.MockAllocator mockTriggerManager *MockTriggerManager testLabel *CompactionGroupLabel handler *NMockHandler @@ -56,10 +57,11 @@ func (s *SingleCompactionPolicySuite) SetupTest() { for id, segment := range segments { meta.segments.SetSegment(id, segment) } - mockAllocator := newMockAllocator() + + s.mockAlloc = newMockAllocator(s.T()) mockHandler := NewNMockHandler(s.T()) s.handler = mockHandler - s.singlePolicy = newSingleCompactionPolicy(meta, mockAllocator, mockHandler) + s.singlePolicy = newSingleCompactionPolicy(meta, s.mockAlloc, mockHandler) } func (s *SingleCompactionPolicySuite) TestTrigger() { diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 8ec052ccfe..65efe4c1a8 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -28,6 +28,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/pkg/common" @@ -46,7 +47,7 @@ type clusteringCompactionTask struct { result *datapb.CompactionPlanResult span trace.Span - allocator allocator + allocator allocator.Allocator meta CompactionMeta sessions SessionManager handler Handler @@ -55,7 +56,7 @@ type clusteringCompactionTask struct { maxRetryTimes int32 } -func newClusteringCompactionTask(t *datapb.CompactionTask, allocator allocator, meta CompactionMeta, session SessionManager, handler Handler, analyzeScheduler *taskScheduler) *clusteringCompactionTask { +func newClusteringCompactionTask(t *datapb.CompactionTask, allocator allocator.Allocator, meta CompactionMeta, session SessionManager, handler Handler, analyzeScheduler *taskScheduler) *clusteringCompactionTask { return &clusteringCompactionTask{ CompactionTask: t, allocator: allocator, @@ -151,7 +152,7 @@ func (t *clusteringCompactionTask) retryableProcess() error { } func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { - beginLogID, _, err := t.allocator.allocN(1) + beginLogID, _, err := t.allocator.AllocN(1) if err != nil { return nil, err } diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index 6f46e2b282..1456923b50 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -45,7 +46,7 @@ type ClusteringCompactionTaskSuite struct { suite.Suite mockID atomic.Int64 - mockAlloc *NMockAllocator + mockAlloc *allocator.MockAllocator meta *meta mockSessMgr *MockSessionManager handler *NMockHandler @@ -64,13 +65,13 @@ func (s *ClusteringCompactionTaskSuite) SetupTest() { s.mockSessMgr = NewMockSessionManager(s.T()) s.mockID.Store(time.Now().UnixMilli()) - s.mockAlloc = NewNMockAllocator(s.T()) - s.mockAlloc.EXPECT().allocN(mock.Anything).RunAndReturn(func(x int64) (int64, int64, error) { + s.mockAlloc = allocator.NewMockAllocator(s.T()) + s.mockAlloc.EXPECT().AllocN(mock.Anything).RunAndReturn(func(x int64) (int64, int64, error) { start := s.mockID.Load() end := s.mockID.Add(int64(x)) return start, end, nil }).Maybe() - s.mockAlloc.EXPECT().allocID(mock.Anything).RunAndReturn(func(ctx context.Context) (int64, error) { + s.mockAlloc.EXPECT().AllocID(mock.Anything).RunAndReturn(func(ctx context.Context) (int64, error) { end := s.mockID.Add(1) return end, nil }).Maybe() diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index f29f057b10..ba74db096a 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -27,6 +27,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -41,7 +42,7 @@ type l0CompactionTask struct { result *datapb.CompactionPlanResult span trace.Span - allocator allocator + allocator allocator.Allocator sessions SessionManager meta CompactionMeta } @@ -279,7 +280,7 @@ func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compac } func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { - beginLogID, _, err := t.allocator.allocN(1) + beginLogID, _, err := t.allocator.AllocN(1) if err != nil { return nil, err } diff --git a/internal/datacoord/compaction_task_l0_test.go b/internal/datacoord/compaction_task_l0_test.go index 3a994b6136..7ffd011bc8 100644 --- a/internal/datacoord/compaction_task_l0_test.go +++ b/internal/datacoord/compaction_task_l0_test.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/otel/trace" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/merr" ) @@ -39,7 +40,7 @@ func TestL0CompactionTaskSuite(t *testing.T) { type L0CompactionTaskSuite struct { suite.Suite - mockAlloc *NMockAllocator + mockAlloc *allocator.MockAllocator mockMeta *MockCompactionMeta mockSessMgr *MockSessionManager } @@ -47,7 +48,7 @@ type L0CompactionTaskSuite struct { func (s *L0CompactionTaskSuite) SetupTest() { s.mockMeta = NewMockCompactionMeta(s.T()) s.mockSessMgr = NewMockSessionManager(s.T()) - s.mockAlloc = NewNMockAllocator(s.T()) + s.mockAlloc = allocator.NewMockAllocator(s.T()) } func (s *L0CompactionTaskSuite) SetupSubTest() { @@ -100,8 +101,8 @@ func (s *L0CompactionTaskSuite) TestProcessRefreshPlan_NormalL0() { }, meta: s.mockMeta, } - alloc := NewNMockAllocator(s.T()) - alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) + alloc := allocator.NewMockAllocator(s.T()) + alloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil) task.allocator = alloc plan, err := task.BuildCompactionRequest() s.Require().NoError(err) @@ -133,8 +134,8 @@ func (s *L0CompactionTaskSuite) TestProcessRefreshPlan_SegmentNotFoundL0() { }, meta: s.mockMeta, } - alloc := NewNMockAllocator(s.T()) - alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) + alloc := allocator.NewMockAllocator(s.T()) + alloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil) task.allocator = alloc _, err := task.BuildCompactionRequest() @@ -169,8 +170,8 @@ func (s *L0CompactionTaskSuite) TestProcessRefreshPlan_SelectZeroSegmentsL0() { }, meta: s.mockMeta, } - alloc := NewNMockAllocator(s.T()) - alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) + alloc := allocator.NewMockAllocator(s.T()) + alloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil) task.allocator = alloc _, err := task.BuildCompactionRequest() s.Error(err) @@ -179,7 +180,7 @@ func (s *L0CompactionTaskSuite) TestProcessRefreshPlan_SelectZeroSegmentsL0() { func (s *L0CompactionTaskSuite) TestBuildCompactionRequestFailed_AllocFailed() { var task CompactionTask - s.mockAlloc.EXPECT().allocN(mock.Anything).Return(100, 200, errors.New("mock alloc err")) + s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, errors.New("mock alloc err")) task = &l0CompactionTask{ allocator: s.mockAlloc, @@ -233,7 +234,7 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { }) s.Run("test pipelining BuildCompactionRequest failed", func() { - s.mockAlloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) + s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil) t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) t.NodeID = 100 channel := "ch-1" @@ -269,7 +270,7 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { }) s.Run("test pipelining saveTaskMeta failed", func() { t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) - s.mockAlloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) + s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil) t.NodeID = 100 channel := "ch-1" deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} @@ -300,7 +301,7 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { }) s.Run("test pipelining Compaction failed", func() { - s.mockAlloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) + s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil) t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) t.NodeID = 100 channel := "ch-1" @@ -339,7 +340,7 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { }) s.Run("test pipelining success", func() { - s.mockAlloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) + s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil) t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) t.NodeID = 100 channel := "ch-1" diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index 154d45a16b..bfd951b027 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" @@ -22,7 +23,7 @@ type mixCompactionTask struct { result *datapb.CompactionPlanResult span trace.Span - allocator allocator + allocator allocator.Allocator sessions SessionManager meta CompactionMeta newSegment *SegmentInfo @@ -330,7 +331,7 @@ func (t *mixCompactionTask) CleanLogPath() { } func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { - beginLogID, _, err := t.allocator.allocN(1) + beginLogID, _, err := t.allocator.AllocN(1) if err != nil { return nil, err } diff --git a/internal/datacoord/compaction_task_mix_test.go b/internal/datacoord/compaction_task_mix_test.go index 50eb094f8a..a5f8149f6b 100644 --- a/internal/datacoord/compaction_task_mix_test.go +++ b/internal/datacoord/compaction_task_mix_test.go @@ -5,6 +5,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/merr" ) @@ -36,8 +37,8 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_NormalMix() { // plan: plan, meta: s.mockMeta, } - alloc := NewNMockAllocator(s.T()) - alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) + alloc := allocator.NewMockAllocator(s.T()) + alloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil) task.allocator = alloc plan, err := task.BuildCompactionRequest() s.Require().NoError(err) @@ -70,8 +71,8 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_MixSegmentNotFound() { }, meta: s.mockMeta, } - alloc := NewNMockAllocator(s.T()) - alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) + alloc := allocator.NewMockAllocator(s.T()) + alloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil) task.allocator = alloc _, err := task.BuildCompactionRequest() s.Error(err) diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index f9f534de43..0140786a15 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -41,7 +42,7 @@ type CompactionPlanHandlerSuite struct { suite.Suite mockMeta *MockCompactionMeta - mockAlloc *NMockAllocator + mockAlloc *allocator.MockAllocator mockCm *MockChannelManager mockSessMgr *MockSessionManager handler *compactionPlanHandler @@ -50,7 +51,7 @@ type CompactionPlanHandlerSuite struct { func (s *CompactionPlanHandlerSuite) SetupTest() { s.mockMeta = NewMockCompactionMeta(s.T()) - s.mockAlloc = NewNMockAllocator(s.T()) + s.mockAlloc = allocator.NewMockAllocator(s.T()) s.mockCm = NewMockChannelManager(s.T()) s.mockSessMgr = NewMockSessionManager(s.T()) s.cluster = NewMockCluster(s.T()) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 7b9f6c4cff..3b1808b46a 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/lifetime" @@ -70,7 +71,7 @@ var _ trigger = (*compactionTrigger)(nil) type compactionTrigger struct { handler Handler meta *meta - allocator allocator + allocator allocator.Allocator signals chan *compactionSignal compactionHandler compactionPlanContext globalTrigger *time.Ticker @@ -90,7 +91,7 @@ type compactionTrigger struct { func newCompactionTrigger( meta *meta, compactionHandler compactionPlanContext, - allocator allocator, + allocator allocator.Allocator, handler Handler, indexVersionManager IndexEngineVersionManager, ) *compactionTrigger { @@ -281,7 +282,7 @@ func (t *compactionTrigger) triggerManualCompaction(collectionID int64) (UniqueI func (t *compactionTrigger) allocSignalID() (UniqueID, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - return t.allocator.allocID(ctx) + return t.allocator.AllocID(ctx) } func (t *compactionTrigger) getExpectedSegmentSize(collectionID int64) int64 { @@ -355,7 +356,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { } plans := t.generatePlans(group.segments, signal, ct) - currentID, _, err := t.allocator.allocN(int64(len(plans) * 2)) + currentID, _, err := t.allocator.AllocN(int64(len(plans) * 2)) if err != nil { return err } @@ -457,7 +458,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { } plans := t.generatePlans(segments, signal, ct) - currentID, _, err := t.allocator.allocN(int64(len(plans) * 2)) + currentID, _, err := t.allocator.AllocN(int64(len(plans) * 2)) if err != nil { log.Warn("fail to allocate id", zap.Error(err)) return diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 01ba21d23b..3549b14602 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -44,6 +45,7 @@ import ( ) type spyCompactionHandler struct { + t *testing.T spyChan chan *datapb.CompactionPlan meta *meta } @@ -66,7 +68,7 @@ func (h *spyCompactionHandler) enqueueCompaction(task *datapb.CompactionTask) er CompactionTask: task, meta: h.meta, } - alloc := &MockAllocator0{} + alloc := newMock0Allocator(h.t) t.allocator = alloc t.ResultSegments = []int64{100} plan, err := t.BuildCompactionRequest() @@ -98,7 +100,7 @@ func Test_compactionTrigger_force(t *testing.T) { paramtable.Init() type fields struct { meta *meta - allocator allocator + allocator allocator.Allocator signals chan *compactionSignal compactionHandler compactionPlanContext globalTrigger *time.Ticker @@ -125,6 +127,8 @@ func Test_compactionTrigger_force(t *testing.T) { }, } + mock0Allocator := newMock0Allocator(t) + tests := []struct { name string fields fields @@ -419,9 +423,9 @@ func Test_compactionTrigger_force(t *testing.T) { }, }, }, - &MockAllocator0{}, + mock0Allocator, nil, - &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 1)}, + &spyCompactionHandler{t: t, spyChan: make(chan *datapb.CompactionPlan, 1)}, nil, }, 2, @@ -604,7 +608,7 @@ func Test_compactionTrigger_force(t *testing.T) { func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { type fields struct { meta *meta - allocator allocator + allocator allocator.Allocator signals chan *compactionSignal compactionHandler compactionPlanContext globalTrigger *time.Ticker @@ -685,6 +689,8 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { segmentInfos.segments[i] = info } + mock0Allocator := newMockAllocator(t) + tests := []struct { name string fields fields @@ -719,9 +725,9 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { }, indexMeta: indexMeta, }, - newMockAllocator(), + mock0Allocator, nil, - &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 2)}, + &spyCompactionHandler{t: t, spyChan: make(chan *datapb.CompactionPlan, 2)}, nil, }, args{ @@ -821,7 +827,7 @@ func sortPlanCompactionBinlogs(plan *datapb.CompactionPlan) { func Test_compactionTrigger_noplan(t *testing.T) { type fields struct { meta *meta - allocator allocator + allocator allocator.Allocator signals chan *compactionSignal compactionHandler compactionPlanContext globalTrigger *time.Ticker @@ -832,6 +838,7 @@ func Test_compactionTrigger_noplan(t *testing.T) { } Params.DataCoordCfg.MinSegmentToMerge.DefaultValue = "4" vecFieldID := int64(201) + mock0Allocator := newMockAllocator(t) tests := []struct { name string fields fields @@ -918,9 +925,9 @@ func Test_compactionTrigger_noplan(t *testing.T) { }, }, }, - newMockAllocator(), + mock0Allocator, make(chan *compactionSignal, 1), - &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 1)}, + &spyCompactionHandler{t: t, spyChan: make(chan *datapb.CompactionPlan, 1)}, nil, }, args{ @@ -965,7 +972,7 @@ func Test_compactionTrigger_noplan(t *testing.T) { func Test_compactionTrigger_PrioritizedCandi(t *testing.T) { type fields struct { meta *meta - allocator allocator + allocator allocator.Allocator signals chan *compactionSignal compactionHandler compactionPlanContext globalTrigger *time.Ticker @@ -998,6 +1005,7 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) { }, } } + mock0Allocator := newMockAllocator(t) genSegIndex := func(segID, indexID UniqueID, numRows int64) map[UniqueID]*model.SegmentIndex { return map[UniqueID]*model.SegmentIndex{ @@ -1107,9 +1115,9 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) { }, }, }, - newMockAllocator(), + mock0Allocator, make(chan *compactionSignal, 1), - &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 1)}, + &spyCompactionHandler{t: t, spyChan: make(chan *datapb.CompactionPlan, 1)}, nil, }, false, @@ -1155,7 +1163,7 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) { func Test_compactionTrigger_SmallCandi(t *testing.T) { type fields struct { meta *meta - allocator allocator + allocator allocator.Allocator signals chan *compactionSignal compactionHandler compactionPlanContext globalTrigger *time.Ticker @@ -1165,6 +1173,7 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { compactTime *compactTime } vecFieldID := int64(201) + mock0Allocator := newMockAllocator(t) genSeg := func(segID, numRows int64) *datapb.SegmentInfo { return &datapb.SegmentInfo{ @@ -1294,9 +1303,9 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { }, }, }, - newMockAllocator(), + mock0Allocator, make(chan *compactionSignal, 1), - &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 1)}, + &spyCompactionHandler{t: t, spyChan: make(chan *datapb.CompactionPlan, 1)}, nil, }, args{ @@ -1350,7 +1359,7 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { type fields struct { meta *meta - allocator allocator + allocator allocator.Allocator signals chan *compactionSignal compactionHandler compactionPlanContext globalTrigger *time.Ticker @@ -1396,6 +1405,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { }, } } + mock0Allocator := newMockAllocator(t) tests := []struct { name string fields fields @@ -1484,9 +1494,9 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { }, }, }, - newMockAllocator(), + mock0Allocator, make(chan *compactionSignal, 1), - &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 1)}, + &spyCompactionHandler{t: t, spyChan: make(chan *datapb.CompactionPlan, 1)}, nil, }, args{ @@ -1541,7 +1551,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { func Test_compactionTrigger_noplan_random_size(t *testing.T) { type fields struct { meta *meta - allocator allocator + allocator allocator.Allocator signals chan *compactionSignal compactionHandler compactionPlanContext globalTrigger *time.Ticker @@ -1626,6 +1636,8 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { segmentInfos.segments[i] = info } + mock0Allocator := newMockAllocator(t) + tests := []struct { name string fields fields @@ -1661,9 +1673,9 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { }, indexMeta: indexMeta, }, - newMockAllocator(), + mock0Allocator, make(chan *compactionSignal, 1), - &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 10)}, + &spyCompactionHandler{t: t, spyChan: make(chan *datapb.CompactionPlan, 10)}, nil, }, args{ @@ -1728,10 +1740,11 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { // Test shouldDoSingleCompaction func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { indexMeta := newSegmentIndexMeta(nil) + mock0Allocator := newMockAllocator(t) trigger := newCompactionTrigger(&meta{ indexMeta: indexMeta, channelCPs: newChannelCps(), - }, &compactionPlanHandler{}, newMockAllocator(), newMockHandler(), newIndexEngineVersionManager()) + }, &compactionPlanHandler{}, mock0Allocator, newMockHandler(), newIndexEngineVersionManager()) // Test too many deltalogs. var binlogs []*datapb.FieldBinlog @@ -1946,7 +1959,7 @@ func Test_compactionTrigger_new(t *testing.T) { type args struct { meta *meta compactionHandler compactionPlanContext - allocator allocator + allocator allocator.Allocator } tests := []struct { name string @@ -1957,7 +1970,7 @@ func Test_compactionTrigger_new(t *testing.T) { args{ &meta{}, &compactionPlanHandler{}, - newMockAllocator(), + allocator.NewMockAllocator(t), }, }, } @@ -1996,7 +2009,7 @@ func Test_triggerSingleCompaction(t *testing.T) { channelCPs: newChannelCps(), segments: NewSegmentsInfo(), collections: make(map[UniqueID]*collectionInfo), } - got := newCompactionTrigger(m, &compactionPlanHandler{}, newMockAllocator(), + got := newCompactionTrigger(m, &compactionPlanHandler{}, newMockAllocator(t), &ServerHandler{ &Server{ meta: m, @@ -2073,7 +2086,7 @@ type CompactionTriggerSuite struct { meta *meta tr *compactionTrigger - allocator *NMockAllocator + allocator *allocator.MockAllocator handler *NMockHandler compactionHandler *MockCompactionPlanContext versionManager *MockVersionManager @@ -2239,7 +2252,7 @@ func (s *CompactionTriggerSuite) SetupTest() { Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0), MsgID: []byte{1, 2, 3, 4}, }) - s.allocator = NewNMockAllocator(s.T()) + s.allocator = allocator.NewMockAllocator(s.T()) s.compactionHandler = NewMockCompactionPlanContext(s.T()) s.handler = NewNMockHandler(s.T()) s.versionManager = NewMockVersionManager(s.T()) @@ -2258,7 +2271,7 @@ func (s *CompactionTriggerSuite) TestHandleSignal() { defer s.SetupTest() tr := s.tr s.compactionHandler.EXPECT().isFull().Return(false) - // s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + // s.allocator.EXPECT().AllocTimestamp(mock.Anything).Return(10000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(nil, errors.New("mocked")) tr.handleSignal(&compactionSignal{ segmentID: 1, @@ -2275,7 +2288,7 @@ func (s *CompactionTriggerSuite) TestHandleSignal() { defer s.SetupTest() tr := s.tr s.compactionHandler.EXPECT().isFull().Return(false) - // s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + // s.allocator.EXPECT().AllocTimestamp(mock.Anything).Return(10000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ Properties: map[string]string{ common.CollectionAutoCompactionKey: "bad_value", @@ -2304,7 +2317,7 @@ func (s *CompactionTriggerSuite) TestHandleSignal() { defer s.SetupTest() tr := s.tr s.compactionHandler.EXPECT().isFull().Return(false) - // s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + // s.allocator.EXPECT().AllocTimestamp(mock.Anything).Return(10000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ Properties: map[string]string{ common.CollectionAutoCompactionKey: "false", @@ -2334,13 +2347,13 @@ func (s *CompactionTriggerSuite) TestHandleSignal() { defer s.SetupTest() tr := s.tr s.compactionHandler.EXPECT().isFull().Return(false) - // s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) - // s.allocator.EXPECT().allocID(mock.Anything).Return(20000, nil) + // s.allocator.EXPECT().AllocTimestamp(mock.Anything).Return(10000, nil) + // s.allocator.EXPECT().AllocID(mock.Anything).Return(20000, nil) start := int64(20000) - s.allocator.EXPECT().allocN(mock.Anything).RunAndReturn(func(i int64) (int64, int64, error) { + s.allocator.EXPECT().AllocN(mock.Anything).RunAndReturn(func(i int64) (int64, int64, error) { return start, start + i, nil }) - s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + s.allocator.EXPECT().AllocTimestamp(mock.Anything).Return(10000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ Properties: map[string]string{ common.CollectionAutoCompactionKey: "false", @@ -2394,7 +2407,7 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { defer s.SetupTest() tr := s.tr s.compactionHandler.EXPECT().isFull().Return(false) - // s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + // s.allocator.EXPECT().AllocTimestamp(mock.Anything).Return(10000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(nil, errors.New("mocked")) tr.handleGlobalSignal(&compactionSignal{ segmentID: 1, @@ -2411,7 +2424,7 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { defer s.SetupTest() tr := s.tr s.compactionHandler.EXPECT().isFull().Return(false) - s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + s.allocator.EXPECT().AllocTimestamp(mock.Anything).Return(10000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ Schema: schema, Properties: map[string]string{ @@ -2433,7 +2446,7 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { defer s.SetupTest() tr := s.tr s.compactionHandler.EXPECT().isFull().Return(false) - // s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + // s.allocator.EXPECT().AllocTimestamp(mock.Anything).Return(10000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ Schema: schema, Properties: map[string]string{ @@ -2455,13 +2468,13 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { defer s.SetupTest() tr := s.tr // s.compactionHandler.EXPECT().isFull().Return(false) - // s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) - // s.allocator.EXPECT().allocID(mock.Anything).Return(20000, nil).Maybe() + // s.allocator.EXPECT().AllocTimestamp(mock.Anything).Return(10000, nil) + // s.allocator.EXPECT().AllocID(mock.Anything).Return(20000, nil).Maybe() start := int64(20000) - s.allocator.EXPECT().allocN(mock.Anything).RunAndReturn(func(i int64) (int64, int64, error) { + s.allocator.EXPECT().AllocN(mock.Anything).RunAndReturn(func(i int64) (int64, int64, error) { return start, start + i, nil }).Maybe() - s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + s.allocator.EXPECT().AllocTimestamp(mock.Anything).Return(10000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ Schema: schema, Properties: map[string]string{ diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 2cb80a7143..add3ccdb31 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -25,6 +25,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" @@ -65,7 +66,7 @@ var _ TriggerManager = (*CompactionTriggerManager)(nil) type CompactionTriggerManager struct { compactionHandler compactionPlanContext handler Handler - allocator allocator + allocator allocator.Allocator view *FullViews // todo handle this lock @@ -80,7 +81,7 @@ type CompactionTriggerManager struct { closeWg sync.WaitGroup } -func NewCompactionTriggerManager(alloc allocator, handler Handler, compactionHandler compactionPlanContext, meta *meta) *CompactionTriggerManager { +func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, compactionHandler compactionPlanContext, meta *meta) *CompactionTriggerManager { m := &CompactionTriggerManager{ allocator: alloc, handler: handler, @@ -250,7 +251,7 @@ func (m *CompactionTriggerManager) notify(ctx context.Context, eventType Compact func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, view CompactionView) { log := log.With(zap.String("view", view.String())) - taskID, err := m.allocator.allocID(ctx) + taskID, err := m.allocator.AllocID(ctx) if err != nil { log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.Error(err)) return @@ -300,7 +301,7 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.Context, view CompactionView) { log := log.With(zap.String("view", view.String())) - taskID, _, err := m.allocator.allocN(2) + taskID, _, err := m.allocator.AllocN(2) if err != nil { log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.Error(err)) return @@ -319,7 +320,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C } resultSegmentNum := totalRows / preferSegmentRows * 2 - start, end, err := m.allocator.allocN(resultSegmentNum) + start, end, err := m.allocator.AllocN(resultSegmentNum) if err != nil { log.Warn("pre-allocate result segments failed", zap.String("view", view.String()), zap.Error(err)) return @@ -362,7 +363,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Context, view CompactionView) { log := log.With(zap.String("view", view.String())) - taskID, _, err := m.allocator.allocN(2) + taskID, _, err := m.allocator.AllocN(2) if err != nil { log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.Error(err)) return diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index ac853e505e..854d560703 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -12,6 +12,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" @@ -26,7 +27,7 @@ func TestCompactionTriggerManagerSuite(t *testing.T) { type CompactionTriggerManagerSuite struct { suite.Suite - mockAlloc *NMockAllocator + mockAlloc *allocator.MockAllocator handler Handler mockPlanContext *MockCompactionPlanContext testLabel *CompactionGroupLabel @@ -36,7 +37,7 @@ type CompactionTriggerManagerSuite struct { } func (s *CompactionTriggerManagerSuite) SetupTest() { - s.mockAlloc = NewNMockAllocator(s.T()) + s.mockAlloc = allocator.NewMockAllocator(s.T()) s.handler = NewNMockHandler(s.T()) s.mockPlanContext = NewMockCompactionPlanContext(s.T()) @@ -83,7 +84,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { s.NotNil(cView) log.Info("view", zap.Any("cView", cView)) - s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil) + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(1, nil) s.mockPlanContext.EXPECT().enqueueCompaction(mock.Anything). RunAndReturn(func(task *datapb.CompactionTask) error { s.EqualValues(19530, task.GetTriggerID()) @@ -100,7 +101,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { s.ElementsMatch(expectedSegs, task.GetInputSegments()) return nil }).Return(nil).Once() - s.mockAlloc.EXPECT().allocID(mock.Anything).Return(19530, nil).Maybe() + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(19530, nil).Maybe() s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewIDLE, levelZeroView) } @@ -127,7 +128,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { s.NotNil(cView) log.Info("view", zap.Any("cView", cView)) - s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil) + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(1, nil) s.mockPlanContext.EXPECT().enqueueCompaction(mock.Anything). RunAndReturn(func(task *datapb.CompactionTask) error { s.EqualValues(19530, task.GetTriggerID()) @@ -143,7 +144,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { s.ElementsMatch(expectedSegs, task.GetInputSegments()) return nil }).Return(nil).Once() - s.mockAlloc.EXPECT().allocID(mock.Anything).Return(19530, nil).Maybe() + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(19530, nil).Maybe() s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewChange, levelZeroView) } diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index 1213d0a0c2..7f48e680bb 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -25,6 +25,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/broker" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -43,7 +44,7 @@ type importChecker struct { meta *meta broker broker.Broker cluster Cluster - alloc allocator + alloc allocator.Allocator sm Manager imeta ImportMeta @@ -54,7 +55,7 @@ type importChecker struct { func NewImportChecker(meta *meta, broker broker.Broker, cluster Cluster, - alloc allocator, + alloc allocator.Allocator, sm Manager, imeta ImportMeta, ) ImportChecker { diff --git a/internal/datacoord/import_checker_test.go b/internal/datacoord/import_checker_test.go index 152c5e730e..2d3888fe87 100644 --- a/internal/datacoord/import_checker_test.go +++ b/internal/datacoord/import_checker_test.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" broker2 "github.com/milvus-io/milvus/internal/datacoord/broker" "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -41,6 +42,7 @@ type ImportCheckerSuite struct { jobID int64 imeta ImportMeta checker *importChecker + alloc *allocator.MockAllocator } func (s *ImportCheckerSuite) SetupTest() { @@ -57,7 +59,7 @@ func (s *ImportCheckerSuite) SetupTest() { catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil) cluster := NewMockCluster(s.T()) - alloc := NewNMockAllocator(s.T()) + s.alloc = allocator.NewMockAllocator(s.T()) imeta, err := NewImportMeta(catalog) s.NoError(err) @@ -69,7 +71,7 @@ func (s *ImportCheckerSuite) SetupTest() { broker := broker2.NewMockBroker(s.T()) sm := NewMockManager(s.T()) - checker := NewImportChecker(meta, broker, cluster, alloc, sm, imeta).(*importChecker) + checker := NewImportChecker(meta, broker, cluster, s.alloc, sm, imeta).(*importChecker) s.checker = checker job := &importJob{ @@ -137,8 +139,8 @@ func (s *ImportCheckerSuite) TestCheckJob() { job := s.imeta.GetJob(s.jobID) // test checkPendingJob - alloc := s.checker.alloc.(*NMockAllocator) - alloc.EXPECT().allocN(mock.Anything).RunAndReturn(func(n int64) (int64, int64, error) { + alloc := s.alloc + alloc.EXPECT().AllocN(mock.Anything).RunAndReturn(func(n int64) (int64, int64, error) { id := rand.Int63() return id, id + n, nil }) @@ -216,8 +218,8 @@ func (s *ImportCheckerSuite) TestCheckJob_Failed() { job := s.imeta.GetJob(s.jobID) // test checkPendingJob - alloc := s.checker.alloc.(*NMockAllocator) - alloc.EXPECT().allocN(mock.Anything).Return(0, 0, nil) + alloc := s.alloc + alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, nil) catalog := s.imeta.(*importMeta).catalog.(*mocks.DataCoordCatalog) catalog.EXPECT().SavePreImportTask(mock.Anything).Return(mockErr) @@ -227,14 +229,14 @@ func (s *ImportCheckerSuite) TestCheckJob_Failed() { s.Equal(internalpb.ImportJobState_Pending, s.imeta.GetJob(job.GetJobID()).GetState()) alloc.ExpectedCalls = nil - alloc.EXPECT().allocN(mock.Anything).Return(0, 0, mockErr) + alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, mockErr) s.checker.checkPendingJob(job) preimportTasks = s.imeta.GetTaskBy(WithJob(job.GetJobID()), WithType(PreImportTaskType)) s.Equal(0, len(preimportTasks)) s.Equal(internalpb.ImportJobState_Pending, s.imeta.GetJob(job.GetJobID()).GetState()) alloc.ExpectedCalls = nil - alloc.EXPECT().allocN(mock.Anything).Return(0, 0, nil) + alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, nil) catalog.ExpectedCalls = nil catalog.EXPECT().SaveImportJob(mock.Anything).Return(nil) catalog.EXPECT().SavePreImportTask(mock.Anything).Return(nil) @@ -257,7 +259,7 @@ func (s *ImportCheckerSuite) TestCheckJob_Failed() { s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(job.GetJobID()).GetState()) alloc.ExpectedCalls = nil - alloc.EXPECT().allocN(mock.Anything).Return(0, 0, mockErr) + alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, mockErr) importTasks = s.imeta.GetTaskBy(WithJob(job.GetJobID()), WithType(ImportTaskType)) s.Equal(0, len(importTasks)) s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(job.GetJobID()).GetState()) @@ -266,7 +268,7 @@ func (s *ImportCheckerSuite) TestCheckJob_Failed() { catalog.EXPECT().SaveImportJob(mock.Anything).Return(nil) catalog.EXPECT().SaveImportTask(mock.Anything).Return(nil) alloc.ExpectedCalls = nil - alloc.EXPECT().allocN(mock.Anything).Return(0, 0, nil) + alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, nil) s.checker.checkPreImportingJob(job) importTasks = s.imeta.GetTaskBy(WithJob(job.GetJobID()), WithType(ImportTaskType)) s.Equal(1, len(importTasks)) diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index 453c4bd761..967d71aceb 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -26,6 +26,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -46,7 +47,7 @@ type ImportScheduler interface { type importScheduler struct { meta *meta cluster Cluster - alloc allocator + alloc allocator.Allocator imeta ImportMeta buildIndexCh chan UniqueID @@ -57,7 +58,7 @@ type importScheduler struct { func NewImportScheduler(meta *meta, cluster Cluster, - alloc allocator, + alloc allocator.Allocator, imeta ImportMeta, buildIndexCh chan UniqueID, ) ImportScheduler { diff --git a/internal/datacoord/import_scheduler_test.go b/internal/datacoord/import_scheduler_test.go index a8f51d28ae..fbdc27c286 100644 --- a/internal/datacoord/import_scheduler_test.go +++ b/internal/datacoord/import_scheduler_test.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" ) @@ -37,7 +38,7 @@ type ImportSchedulerSuite struct { collectionID int64 catalog *mocks.DataCoordCatalog - alloc *NMockAllocator + alloc *allocator.MockAllocator cluster *MockCluster meta *meta imeta ImportMeta @@ -62,7 +63,7 @@ func (s *ImportSchedulerSuite) SetupTest() { s.catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil) s.cluster = NewMockCluster(s.T()) - s.alloc = NewNMockAllocator(s.T()) + s.alloc = allocator.NewMockAllocator(s.T()) s.meta, err = newMeta(context.TODO(), s.catalog, nil) s.NoError(err) s.meta.AddCollection(&collectionInfo{ @@ -174,8 +175,8 @@ func (s *ImportSchedulerSuite) TestProcessImport() { // pending -> inProgress const nodeID = 10 - s.alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) - s.alloc.EXPECT().allocTimestamp(mock.Anything).Return(300, nil) + s.alloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil) + s.alloc.EXPECT().AllocTimestamp(mock.Anything).Return(300, nil) s.cluster.EXPECT().QueryImport(mock.Anything, mock.Anything).Return(&datapb.QueryImportResponse{ Slots: 1, }, nil) diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 1775c16e50..d72ae49110 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -27,6 +27,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" @@ -49,9 +50,9 @@ func WrapTaskLog(task ImportTask, fields ...zap.Field) []zap.Field { func NewPreImportTasks(fileGroups [][]*internalpb.ImportFile, job ImportJob, - alloc allocator, + alloc allocator.Allocator, ) ([]ImportTask, error) { - idStart, _, err := alloc.allocN(int64(len(fileGroups))) + idStart, _, err := alloc.AllocN(int64(len(fileGroups))) if err != nil { return nil, err } @@ -79,9 +80,9 @@ func NewPreImportTasks(fileGroups [][]*internalpb.ImportFile, func NewImportTasks(fileGroups [][]*datapb.ImportFileStats, job ImportJob, manager Manager, - alloc allocator, + alloc allocator.Allocator, ) ([]ImportTask, error) { - idBegin, _, err := alloc.allocN(int64(len(fileGroups))) + idBegin, _, err := alloc.AllocN(int64(len(fileGroups))) if err != nil { return nil, err } @@ -176,7 +177,7 @@ func AssemblePreImportRequest(task ImportTask, job ImportJob) *datapb.PreImportR } } -func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc allocator) (*datapb.ImportRequest, error) { +func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc allocator.Allocator) (*datapb.ImportRequest, error) { requestSegments := make([]*datapb.ImportRequestSegment, 0) for _, segmentID := range task.(*importTask).GetSegmentIDs() { segment := meta.GetSegment(segmentID) @@ -191,7 +192,7 @@ func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc all } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - ts, err := alloc.allocTimestamp(ctx) + ts, err := alloc.AllocTimestamp(ctx) if err != nil { return nil, err } @@ -203,7 +204,7 @@ func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc all // Allocated IDs are used for rowID and the BEGINNING of the logID. allocNum := totalRows + 1 - idBegin, idEnd, err := alloc.allocN(allocNum) + idBegin, idEnd, err := alloc.AllocN(allocNum) if err != nil { return nil, err } diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 411bfb51ff..6c19cee153 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -30,6 +30,7 @@ import ( "go.uber.org/atomic" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/metastore/mocks" mocks2 "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -54,8 +55,8 @@ func TestImportUtil_NewPreImportTasks(t *testing.T) { job := &importJob{ ImportJob: &datapb.ImportJob{JobID: 1, CollectionID: 2}, } - alloc := NewNMockAllocator(t) - alloc.EXPECT().allocN(mock.Anything).RunAndReturn(func(n int64) (int64, int64, error) { + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocN(mock.Anything).RunAndReturn(func(n int64) (int64, int64, error) { id := rand.Int63() return id, id + n, nil }) @@ -91,8 +92,8 @@ func TestImportUtil_NewImportTasks(t *testing.T) { job := &importJob{ ImportJob: &datapb.ImportJob{JobID: 1, CollectionID: 2}, } - alloc := NewNMockAllocator(t) - alloc.EXPECT().allocN(mock.Anything).RunAndReturn(func(n int64) (int64, int64, error) { + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocN(mock.Anything).RunAndReturn(func(n int64) (int64, int64, error) { id := rand.Int63() return id, id + n, nil }) @@ -158,12 +159,12 @@ func TestImportUtil_AssembleRequest(t *testing.T) { catalog.EXPECT().ListCompactionTask(mock.Anything).Return(nil, nil) catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil) - alloc := NewNMockAllocator(t) - alloc.EXPECT().allocN(mock.Anything).RunAndReturn(func(n int64) (int64, int64, error) { + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocN(mock.Anything).RunAndReturn(func(n int64) (int64, int64, error) { id := rand.Int63() return id, id + n, nil }) - alloc.EXPECT().allocTimestamp(mock.Anything).Return(800, nil) + alloc.EXPECT().AllocTimestamp(mock.Anything).Return(800, nil) meta, err := newMeta(context.TODO(), catalog, nil) assert.NoError(t, err) diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 3db33a4c39..3b981df605 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -53,7 +53,7 @@ func (s *Server) startIndexService(ctx context.Context) { func (s *Server) createIndexForSegment(segment *SegmentInfo, indexID UniqueID) error { log.Info("create index for segment", zap.Int64("segmentID", segment.ID), zap.Int64("indexID", indexID)) - buildID, err := s.allocator.allocID(context.Background()) + buildID, err := s.allocator.AllocID(context.Background()) if err != nil { return err } @@ -216,7 +216,7 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques } if indexID == 0 { - indexID, err = s.allocator.allocID(ctx) + indexID, err = s.allocator.AllocID(ctx) if err != nil { log.Warn("failed to alloc indexID", zap.Error(err)) metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index dba63d3348..b44761547c 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/broker" mockkv "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" @@ -83,6 +84,8 @@ func TestServer_CreateIndex(t *testing.T) { catalog := catalogmocks.NewDataCoordCatalog(t) catalog.EXPECT().CreateIndex(mock.Anything, mock.Anything).Return(nil).Maybe() + mock0Allocator := newMockAllocator(t) + indexMeta := newSegmentIndexMeta(catalog) s := &Server{ meta: &meta{ @@ -99,7 +102,7 @@ func TestServer_CreateIndex(t *testing.T) { }, indexMeta: indexMeta, }, - allocator: newMockAllocator(), + allocator: mock0Allocator, notifyIndexChan: make(chan UniqueID, 1), } @@ -193,14 +196,17 @@ func TestServer_CreateIndex(t *testing.T) { t.Run("alloc ID fail", func(t *testing.T) { req.FieldID = fieldID - s.allocator = &FailsAllocator{allocIDSucceed: false} + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocID(mock.Anything).Return(0, errors.New("mock")).Maybe() + alloc.EXPECT().AllocTimestamp(mock.Anything).Return(0, nil).Maybe() + s.allocator = alloc s.meta.indexMeta.indexes = map[UniqueID]map[UniqueID]*model.Index{} resp, err := s.CreateIndex(ctx, req) assert.Error(t, merr.CheckRPCCall(resp, err)) }) t.Run("not support disk index", func(t *testing.T) { - s.allocator = newMockAllocator() + s.allocator = mock0Allocator s.meta.indexMeta.indexes = map[UniqueID]map[UniqueID]*model.Index{} req.IndexParams = []*commonpb.KeyValuePair{ { @@ -214,7 +220,7 @@ func TestServer_CreateIndex(t *testing.T) { }) t.Run("disk index with mmap", func(t *testing.T) { - s.allocator = newMockAllocator() + s.allocator = mock0Allocator s.meta.indexMeta.indexes = map[UniqueID]map[UniqueID]*model.Index{} req.IndexParams = []*commonpb.KeyValuePair{ { @@ -299,6 +305,8 @@ func TestServer_AlterIndex(t *testing.T) { mock.Anything, ).Return(nil) + mock0Allocator := newMockAllocator(t) + indexMeta := &indexMeta{ catalog: catalog, indexes: map[UniqueID]map[UniqueID]*model.Index{ @@ -598,7 +606,7 @@ func TestServer_AlterIndex(t *testing.T) { }, }, }, - allocator: newMockAllocator(), + allocator: mock0Allocator, notifyIndexChan: make(chan UniqueID, 1), } @@ -672,12 +680,13 @@ func TestServer_GetIndexState(t *testing.T) { IndexName: "", } ) + mock0Allocator := newMockAllocator(t) s := &Server{ meta: &meta{ catalog: &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)}, indexMeta: newSegmentIndexMeta(&datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)}), }, - allocator: newMockAllocator(), + allocator: mock0Allocator, notifyIndexChan: make(chan UniqueID, 1), } @@ -880,6 +889,7 @@ func TestServer_GetSegmentIndexState(t *testing.T) { } ) + mock0Allocator := newMockAllocator(t) indexMeta := newSegmentIndexMeta(&datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)}) s := &Server{ @@ -888,7 +898,7 @@ func TestServer_GetSegmentIndexState(t *testing.T) { indexMeta: indexMeta, segments: NewSegmentsInfo(), }, - allocator: newMockAllocator(), + allocator: mock0Allocator, notifyIndexChan: make(chan UniqueID, 1), } @@ -1010,13 +1020,15 @@ func TestServer_GetIndexBuildProgress(t *testing.T) { } ) + mock0Allocator := newMockAllocator(t) + s := &Server{ meta: &meta{ catalog: &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)}, indexMeta: newSegmentIndexMeta(&datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)}), segments: NewSegmentsInfo(), }, - allocator: newMockAllocator(), + allocator: mock0Allocator, notifyIndexChan: make(chan UniqueID, 1), } t.Run("server not available", func(t *testing.T) { @@ -1197,6 +1209,8 @@ func TestServer_DescribeIndex(t *testing.T) { mock.Anything, ).Return(nil) + mock0Allocator := newMockAllocator(t) + segments := map[UniqueID]*SegmentInfo{ invalidSegID: { SegmentInfo: &datapb.SegmentInfo{ @@ -1493,7 +1507,7 @@ func TestServer_DescribeIndex(t *testing.T) { segments: NewSegmentsInfo(), }, - allocator: newMockAllocator(), + allocator: mock0Allocator, notifyIndexChan: make(chan UniqueID, 1), } for id, segment := range segments { @@ -1557,6 +1571,8 @@ func TestServer_ListIndexes(t *testing.T) { } ) + mock0Allocator := newMockAllocator(t) + catalog := catalogmocks.NewDataCoordCatalog(t) s := &Server{ meta: &meta{ @@ -1656,7 +1672,7 @@ func TestServer_ListIndexes(t *testing.T) { segments: NewSegmentsInfo(), }, - allocator: newMockAllocator(), + allocator: mock0Allocator, notifyIndexChan: make(chan UniqueID, 1), } @@ -1714,6 +1730,8 @@ func TestServer_GetIndexStatistics(t *testing.T) { mock.Anything, ).Return(nil) + mock0Allocator := newMockAllocator(t) + segments := map[UniqueID]*SegmentInfo{ invalidSegID: { SegmentInfo: &datapb.SegmentInfo{ @@ -1931,7 +1949,7 @@ func TestServer_GetIndexStatistics(t *testing.T) { segments: NewSegmentsInfo(), }, - allocator: newMockAllocator(), + allocator: mock0Allocator, notifyIndexChan: make(chan UniqueID, 1), } for id, segment := range segments { @@ -2004,6 +2022,8 @@ func TestServer_DropIndex(t *testing.T) { mock.Anything, ).Return(nil) + mock0Allocator := newMockAllocator(t) + s := &Server{ meta: &meta{ catalog: catalog, @@ -2088,7 +2108,7 @@ func TestServer_DropIndex(t *testing.T) { segments: NewSegmentsInfo(), }, - allocator: newMockAllocator(), + allocator: mock0Allocator, notifyIndexChan: make(chan UniqueID, 1), } @@ -2203,6 +2223,8 @@ func TestServer_GetIndexInfos(t *testing.T) { cli, err := chunkManagerFactory.NewPersistentStorageChunkManager(ctx) assert.NoError(t, err) + mock0Allocator := newMockAllocator(t) + s := &Server{ meta: &meta{ catalog: &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)}, @@ -2252,7 +2274,7 @@ func TestServer_GetIndexInfos(t *testing.T) { segments: NewSegmentsInfo(), chunkManager: cli, }, - allocator: newMockAllocator(), + allocator: mock0Allocator, notifyIndexChan: make(chan UniqueID, 1), } s.meta.segments.SetSegment(segID, &SegmentInfo{ diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index fbf33a791f..9da41a094c 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -18,6 +18,7 @@ package datacoord import ( "context" + "sync/atomic" "testing" "github.com/cockroachdb/errors" @@ -374,9 +375,8 @@ func TestMeta_Basic(t *testing.T) { const partID0 = UniqueID(100) const partID1 = UniqueID(101) const channelName = "c1" - ctx := context.Background() - mockAllocator := newMockAllocator() + // mockAllocator := newMockAllocator(t) meta, err := newMemoryMeta() assert.NoError(t, err) @@ -395,17 +395,19 @@ func TestMeta_Basic(t *testing.T) { Partitions: []UniqueID{}, } + count := atomic.Int64{} + AllocID := func() int64 { + return count.Add(1) + } + t.Run("Test Segment", func(t *testing.T) { meta.AddCollection(collInfoWoPartition) // create seg0 for partition0, seg0/seg1 for partition1 - segID0_0, err := mockAllocator.allocID(ctx) - assert.NoError(t, err) + segID0_0 := AllocID() segInfo0_0 := buildSegment(collID, partID0, segID0_0, channelName) - segID1_0, err := mockAllocator.allocID(ctx) - assert.NoError(t, err) + segID1_0 := AllocID() segInfo1_0 := buildSegment(collID, partID1, segID1_0, channelName) - segID1_1, err := mockAllocator.allocID(ctx) - assert.NoError(t, err) + segID1_1 := AllocID() segInfo1_1 := buildSegment(collID, partID1, segID1_1, channelName) // check AddSegment @@ -507,16 +509,14 @@ func TestMeta_Basic(t *testing.T) { assert.EqualValues(t, 0, nums) // add seg1 with 100 rows - segID0, err := mockAllocator.allocID(ctx) - assert.NoError(t, err) + segID0 := AllocID() segInfo0 := buildSegment(collID, partID0, segID0, channelName) segInfo0.NumOfRows = rowCount0 err = meta.AddSegment(context.TODO(), segInfo0) assert.NoError(t, err) // add seg2 with 300 rows - segID1, err := mockAllocator.allocID(ctx) - assert.NoError(t, err) + segID1 := AllocID() segInfo1 := buildSegment(collID, partID0, segID1, channelName) segInfo1.NumOfRows = rowCount1 err = meta.AddSegment(context.TODO(), segInfo1) @@ -573,16 +573,14 @@ func TestMeta_Basic(t *testing.T) { const size1 = 2048 // add seg0 with size0 - segID0, err := mockAllocator.allocID(ctx) - assert.NoError(t, err) + segID0 := AllocID() segInfo0 := buildSegment(collID, partID0, segID0, channelName) segInfo0.size.Store(size0) err = meta.AddSegment(context.TODO(), segInfo0) assert.NoError(t, err) // add seg1 with size1 - segID1, err := mockAllocator.allocID(ctx) - assert.NoError(t, err) + segID1 := AllocID() segInfo1 := buildSegment(collID, partID0, segID1, channelName) segInfo1.size.Store(size1) err = meta.AddSegment(context.TODO(), segInfo1) diff --git a/internal/datacoord/mock_allocator_test.go b/internal/datacoord/mock_allocator_test.go deleted file mode 100644 index a18c3f426f..0000000000 --- a/internal/datacoord/mock_allocator_test.go +++ /dev/null @@ -1,199 +0,0 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. - -package datacoord - -import ( - context "context" - - mock "github.com/stretchr/testify/mock" -) - -// NMockAllocator is an autogenerated mock type for the allocator type -type NMockAllocator struct { - mock.Mock -} - -type NMockAllocator_Expecter struct { - mock *mock.Mock -} - -func (_m *NMockAllocator) EXPECT() *NMockAllocator_Expecter { - return &NMockAllocator_Expecter{mock: &_m.Mock} -} - -// allocID provides a mock function with given fields: _a0 -func (_m *NMockAllocator) allocID(_a0 context.Context) (int64, error) { - ret := _m.Called(_a0) - - var r0 int64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (int64, error)); ok { - return rf(_a0) - } - if rf, ok := ret.Get(0).(func(context.Context) int64); ok { - r0 = rf(_a0) - } else { - r0 = ret.Get(0).(int64) - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(_a0) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// NMockAllocator_allocID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'allocID' -type NMockAllocator_allocID_Call struct { - *mock.Call -} - -// allocID is a helper method to define mock.On call -// - _a0 context.Context -func (_e *NMockAllocator_Expecter) allocID(_a0 interface{}) *NMockAllocator_allocID_Call { - return &NMockAllocator_allocID_Call{Call: _e.mock.On("allocID", _a0)} -} - -func (_c *NMockAllocator_allocID_Call) Run(run func(_a0 context.Context)) *NMockAllocator_allocID_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *NMockAllocator_allocID_Call) Return(_a0 int64, _a1 error) *NMockAllocator_allocID_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *NMockAllocator_allocID_Call) RunAndReturn(run func(context.Context) (int64, error)) *NMockAllocator_allocID_Call { - _c.Call.Return(run) - return _c -} - -// allocN provides a mock function with given fields: n -func (_m *NMockAllocator) allocN(n int64) (int64, int64, error) { - ret := _m.Called(n) - - var r0 int64 - var r1 int64 - var r2 error - if rf, ok := ret.Get(0).(func(int64) (int64, int64, error)); ok { - return rf(n) - } - if rf, ok := ret.Get(0).(func(int64) int64); ok { - r0 = rf(n) - } else { - r0 = ret.Get(0).(int64) - } - - if rf, ok := ret.Get(1).(func(int64) int64); ok { - r1 = rf(n) - } else { - r1 = ret.Get(1).(int64) - } - - if rf, ok := ret.Get(2).(func(int64) error); ok { - r2 = rf(n) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 -} - -// NMockAllocator_allocN_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'allocN' -type NMockAllocator_allocN_Call struct { - *mock.Call -} - -// allocN is a helper method to define mock.On call -// - n int64 -func (_e *NMockAllocator_Expecter) allocN(n interface{}) *NMockAllocator_allocN_Call { - return &NMockAllocator_allocN_Call{Call: _e.mock.On("allocN", n)} -} - -func (_c *NMockAllocator_allocN_Call) Run(run func(n int64)) *NMockAllocator_allocN_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64)) - }) - return _c -} - -func (_c *NMockAllocator_allocN_Call) Return(_a0 int64, _a1 int64, _a2 error) *NMockAllocator_allocN_Call { - _c.Call.Return(_a0, _a1, _a2) - return _c -} - -func (_c *NMockAllocator_allocN_Call) RunAndReturn(run func(int64) (int64, int64, error)) *NMockAllocator_allocN_Call { - _c.Call.Return(run) - return _c -} - -// allocTimestamp provides a mock function with given fields: _a0 -func (_m *NMockAllocator) allocTimestamp(_a0 context.Context) (uint64, error) { - ret := _m.Called(_a0) - - var r0 uint64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { - return rf(_a0) - } - if rf, ok := ret.Get(0).(func(context.Context) uint64); ok { - r0 = rf(_a0) - } else { - r0 = ret.Get(0).(uint64) - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(_a0) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// NMockAllocator_allocTimestamp_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'allocTimestamp' -type NMockAllocator_allocTimestamp_Call struct { - *mock.Call -} - -// allocTimestamp is a helper method to define mock.On call -// - _a0 context.Context -func (_e *NMockAllocator_Expecter) allocTimestamp(_a0 interface{}) *NMockAllocator_allocTimestamp_Call { - return &NMockAllocator_allocTimestamp_Call{Call: _e.mock.On("allocTimestamp", _a0)} -} - -func (_c *NMockAllocator_allocTimestamp_Call) Run(run func(_a0 context.Context)) *NMockAllocator_allocTimestamp_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *NMockAllocator_allocTimestamp_Call) Return(_a0 uint64, _a1 error) *NMockAllocator_allocTimestamp_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *NMockAllocator_allocTimestamp_Call) RunAndReturn(run func(context.Context) (uint64, error)) *NMockAllocator_allocTimestamp_Call { - _c.Call.Return(run) - return _c -} - -// NewNMockAllocator creates a new instance of NMockAllocator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewNMockAllocator(t interface { - mock.TestingT - Cleanup(func()) -}) *NMockAllocator { - mock := &NMockAllocator{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index e2b04e9db2..1a2b22f24f 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -18,16 +18,18 @@ package datacoord import ( "context" - "sync/atomic" + "testing" "time" - "github.com/cockroachdb/errors" + "github.com/stretchr/testify/mock" clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/atomic" "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" memkv "github.com/milvus-io/milvus/internal/kv/mem" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -88,72 +90,28 @@ func newMemoryMeta() (*meta, error) { return newMeta(context.TODO(), catalog, nil) } -var _ allocator = (*MockAllocator)(nil) - -type MockAllocator struct { - cnt int64 +func newMockAllocator(t *testing.T) *allocator.MockAllocator { + counter := atomic.NewInt64(0) + mockAllocator := allocator.NewMockAllocator(t) + mockAllocator.EXPECT().AllocID(mock.Anything).RunAndReturn(func(ctx context.Context) (int64, error) { + return counter.Inc(), nil + }).Maybe() + mockAllocator.EXPECT().AllocTimestamp(mock.Anything).RunAndReturn(func(ctx context.Context) (uint64, error) { + return uint64(counter.Inc()), nil + }).Maybe() + mockAllocator.EXPECT().AllocN(mock.Anything).RunAndReturn(func(i int64) (int64, int64, error) { + v := counter.Add(i) + return v, v + i, nil + }).Maybe() + return mockAllocator } -func (m *MockAllocator) allocTimestamp(ctx context.Context) (Timestamp, error) { - val := atomic.AddInt64(&m.cnt, 1) - return Timestamp(val), nil -} - -func (m *MockAllocator) allocID(ctx context.Context) (UniqueID, error) { - val := atomic.AddInt64(&m.cnt, 1) - return val, nil -} - -func (m *MockAllocator) allocN(n int64) (UniqueID, UniqueID, error) { - val := atomic.AddInt64(&m.cnt, n) - return val, val + n, nil -} - -type MockAllocator0 struct{} - -func (m *MockAllocator0) allocTimestamp(ctx context.Context) (Timestamp, error) { - return Timestamp(0), nil -} - -func (m *MockAllocator0) allocID(ctx context.Context) (UniqueID, error) { - return 0, nil -} - -func (m *MockAllocator0) allocN(n int64) (UniqueID, UniqueID, error) { - return 0, n, nil -} - -var _ allocator = (*FailsAllocator)(nil) - -// FailsAllocator allocator that fails -type FailsAllocator struct { - allocTsSucceed bool - allocIDSucceed bool -} - -func (a *FailsAllocator) allocTimestamp(_ context.Context) (Timestamp, error) { - if a.allocTsSucceed { - return 0, nil - } - return 0, errors.New("always fail") -} - -func (a *FailsAllocator) allocID(_ context.Context) (UniqueID, error) { - if a.allocIDSucceed { - return 0, nil - } - return 0, errors.New("always fail") -} - -func (a *FailsAllocator) allocN(_ int64) (UniqueID, UniqueID, error) { - if a.allocIDSucceed { - return 0, 0, nil - } - return 0, 0, errors.New("always fail") -} - -func newMockAllocator() *MockAllocator { - return &MockAllocator{} +func newMock0Allocator(t *testing.T) *allocator.MockAllocator { + mock0Allocator := allocator.NewMockAllocator(t) + mock0Allocator.EXPECT().AllocID(mock.Anything).Return(0, nil).Maybe() + mock0Allocator.EXPECT().AllocTimestamp(mock.Anything).Return(0, nil).Maybe() + mock0Allocator.EXPECT().AllocN(mock.Anything).Return(0, 0, nil).Maybe() + return mock0Allocator } func newTestSchema() *schemapb.CollectionSchema { @@ -345,7 +303,7 @@ func (c *mockDataNodeClient) Stop() error { type mockRootCoordClient struct { state commonpb.StateCode - cnt int64 + cnt atomic.Int64 } func (m *mockRootCoordClient) DescribeDatabase(ctx context.Context, in *rootcoordpb.DescribeDatabaseRequest, opts ...grpc.CallOption) (*rootcoordpb.DescribeDatabaseResponse, error) { @@ -519,7 +477,7 @@ func (m *mockRootCoordClient) AllocTimestamp(ctx context.Context, req *rootcoord return &rootcoordpb.AllocTimestampResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}}, nil } - val := atomic.AddInt64(&m.cnt, int64(req.Count)) + val := m.cnt.Add(int64(req.Count)) phy := time.Now().UnixNano() / int64(time.Millisecond) ts := tsoutil.ComposeTS(phy, val) return &rootcoordpb.AllocTimestampResponse{ @@ -533,7 +491,7 @@ func (m *mockRootCoordClient) AllocID(ctx context.Context, req *rootcoordpb.Allo if m.state != commonpb.StateCode_Healthy { return &rootcoordpb.AllocIDResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}}, nil } - val := atomic.AddInt64(&m.cnt, int64(req.Count)) + val := m.cnt.Add(int64(req.Count)) return &rootcoordpb.AllocIDResponse{ Status: merr.Success(), ID: val, diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index ad186de4b5..7fc3ed8353 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/lock" @@ -115,7 +116,7 @@ var _ Manager = (*SegmentManager)(nil) type SegmentManager struct { meta *meta mu lock.RWMutex - allocator allocator + allocator allocator.Allocator helper allocHelper segments []UniqueID estimatePolicy calUpperLimitPolicy @@ -213,7 +214,7 @@ func defaultFlushPolicy() flushPolicy { } // newSegmentManager should be the only way to retrieve SegmentManager. -func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) (*SegmentManager, error) { +func newSegmentManager(meta *meta, allocator allocator.Allocator, opts ...allocOption) (*SegmentManager, error) { manager := &SegmentManager{ meta: meta, allocator: allocator, @@ -358,7 +359,7 @@ func isGrowing(segment *SegmentInfo) bool { } func (s *SegmentManager) genExpireTs(ctx context.Context) (Timestamp, error) { - ts, err := s.allocator.allocTimestamp(ctx) + ts, err := s.allocator.AllocTimestamp(ctx) if err != nil { return 0, err } @@ -374,12 +375,12 @@ func (s *SegmentManager) AllocImportSegment(ctx context.Context, taskID int64, c log := log.Ctx(ctx) ctx, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "open-Segment") defer sp.End() - id, err := s.allocator.allocID(ctx) + id, err := s.allocator.AllocID(ctx) if err != nil { - log.Error("failed to open new segment while allocID", zap.Error(err)) + log.Error("failed to open new segment while AllocID", zap.Error(err)) return nil, err } - ts, err := s.allocator.allocTimestamp(ctx) + ts, err := s.allocator.AllocTimestamp(ctx) if err != nil { return nil, err } @@ -430,9 +431,9 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique log := log.Ctx(ctx) ctx, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "open-Segment") defer sp.End() - id, err := s.allocator.allocID(ctx) + id, err := s.allocator.AllocID(ctx) if err != nil { - log.Error("failed to open new segment while allocID", zap.Error(err)) + log.Error("failed to open new segment while AllocID", zap.Error(err)) return nil, err } return s.openNewSegmentWithGivenSegmentID(ctx, collectionID, partitionID, id, channelName) diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 6e81419252..5fc1a8267c 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" mockkv "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" @@ -41,7 +42,7 @@ import ( func TestManagerOptions(t *testing.T) { // ctx := context.Background() paramtable.Init() - mockAllocator := newMockAllocator() + mockAllocator := newMockAllocator(t) meta, err := newMemoryMeta() assert.NoError(t, err) segmentManager, _ := newSegmentManager(meta, mockAllocator) @@ -102,13 +103,13 @@ func TestAllocSegment(t *testing.T) { ctx := context.Background() paramtable.Init() Params.Save(Params.DataCoordCfg.AllocLatestExpireAttempt.Key, "1") - mockAllocator := newMockAllocator() + mockAllocator := newMockAllocator(t) meta, err := newMemoryMeta() assert.NoError(t, err) segmentManager, _ := newSegmentManager(meta, mockAllocator) schema := newTestSchema() - collID, err := mockAllocator.allocID(ctx) + collID, err := mockAllocator.AllocID(ctx) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) @@ -122,10 +123,13 @@ func TestAllocSegment(t *testing.T) { }) t.Run("allocation fails 1", func(t *testing.T) { - failsAllocator := &FailsAllocator{ - allocTsSucceed: true, - allocIDSucceed: false, - } + failsAllocator := allocator.NewMockAllocator(t) + failsAllocator.EXPECT().AllocID(mock.Anything).Return(0, errors.New("mock")).Maybe() + failsAllocator.EXPECT().AllocTimestamp(mock.Anything).Return(0, nil).Maybe() + // failsAllocator := &FailsAllocator{ + // allocTsSucceed: true, + // AllocIDSucceed: false, + // } segmentManager, err := newSegmentManager(meta, failsAllocator) assert.NoError(t, err) _, err = segmentManager.AllocSegment(ctx, collID, 100, "c2", 100) @@ -133,10 +137,9 @@ func TestAllocSegment(t *testing.T) { }) t.Run("allocation fails 2", func(t *testing.T) { - failsAllocator := &FailsAllocator{ - allocTsSucceed: false, - allocIDSucceed: true, - } + failsAllocator := allocator.NewMockAllocator(t) + failsAllocator.EXPECT().AllocID(mock.Anything).Return(0, nil).Maybe() + failsAllocator.EXPECT().AllocTimestamp(mock.Anything).Return(0, errors.New("mock")).Maybe() segmentManager, err := newSegmentManager(meta, failsAllocator) assert.Error(t, err) assert.Nil(t, segmentManager) @@ -170,7 +173,7 @@ func TestLastExpireReset(t *testing.T) { Params.Save(Params.DataCoordCfg.AllocLatestExpireAttempt.Key, "200") Params.Save(Params.DataCoordCfg.SegmentMaxSize.Key, "1024") }() - mockAllocator := newRootCoordAllocator(newMockRootCoordClient()) + mockAllocator := allocator.NewRootCoordAllocator(newMockRootCoordClient()) etcdCli, _ := etcd.GetEtcdClient( Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), Params.EtcdCfg.EtcdUseSSL.GetAsBool(), @@ -188,7 +191,7 @@ func TestLastExpireReset(t *testing.T) { // add collection channelName := "c1" schema := newTestSchema() - collID, err := mockAllocator.allocID(ctx) + collID, err := mockAllocator.AllocID(ctx) assert.Nil(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) initSegment := &SegmentInfo{ @@ -264,9 +267,9 @@ func TestSegmentManager_AllocImportSegment(t *testing.T) { mockErr := errors.New("mock error") t.Run("normal case", func(t *testing.T) { - alloc := NewNMockAllocator(t) - alloc.EXPECT().allocID(mock.Anything).Return(0, nil) - alloc.EXPECT().allocTimestamp(mock.Anything).Return(0, nil) + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocID(mock.Anything).Return(0, nil) + alloc.EXPECT().AllocTimestamp(mock.Anything).Return(0, nil) meta, err := newMemoryMeta() assert.NoError(t, err) sm, err := newSegmentManager(meta, alloc) @@ -280,8 +283,8 @@ func TestSegmentManager_AllocImportSegment(t *testing.T) { }) t.Run("alloc id failed", func(t *testing.T) { - alloc := NewNMockAllocator(t) - alloc.EXPECT().allocID(mock.Anything).Return(0, mockErr) + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocID(mock.Anything).Return(0, mockErr) meta, err := newMemoryMeta() assert.NoError(t, err) sm, err := newSegmentManager(meta, alloc) @@ -291,9 +294,9 @@ func TestSegmentManager_AllocImportSegment(t *testing.T) { }) t.Run("alloc ts failed", func(t *testing.T) { - alloc := NewNMockAllocator(t) - alloc.EXPECT().allocID(mock.Anything).Return(0, nil) - alloc.EXPECT().allocTimestamp(mock.Anything).Return(0, mockErr) + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocID(mock.Anything).Return(0, nil) + alloc.EXPECT().AllocTimestamp(mock.Anything).Return(0, mockErr) meta, err := newMemoryMeta() assert.NoError(t, err) sm, err := newSegmentManager(meta, alloc) @@ -303,9 +306,9 @@ func TestSegmentManager_AllocImportSegment(t *testing.T) { }) t.Run("add segment failed", func(t *testing.T) { - alloc := NewNMockAllocator(t) - alloc.EXPECT().allocID(mock.Anything).Return(0, nil) - alloc.EXPECT().allocTimestamp(mock.Anything).Return(0, nil) + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocID(mock.Anything).Return(0, nil) + alloc.EXPECT().AllocTimestamp(mock.Anything).Return(0, nil) meta, err := newMemoryMeta() assert.NoError(t, err) sm, _ := newSegmentManager(meta, alloc) @@ -320,12 +323,12 @@ func TestSegmentManager_AllocImportSegment(t *testing.T) { func TestLoadSegmentsFromMeta(t *testing.T) { ctx := context.Background() paramtable.Init() - mockAllocator := newMockAllocator() + mockAllocator := newMockAllocator(t) meta, err := newMemoryMeta() assert.NoError(t, err) schema := newTestSchema() - collID, err := mockAllocator.allocID(ctx) + collID, err := mockAllocator.AllocID(ctx) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) @@ -370,12 +373,12 @@ func TestLoadSegmentsFromMeta(t *testing.T) { func TestSaveSegmentsToMeta(t *testing.T) { paramtable.Init() - mockAllocator := newMockAllocator() + mockAllocator := newMockAllocator(t) meta, err := newMemoryMeta() assert.NoError(t, err) schema := newTestSchema() - collID, err := mockAllocator.allocID(context.Background()) + collID, err := mockAllocator.AllocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) segmentManager, _ := newSegmentManager(meta, mockAllocator) @@ -392,12 +395,12 @@ func TestSaveSegmentsToMeta(t *testing.T) { func TestSaveSegmentsToMetaWithSpecificSegments(t *testing.T) { paramtable.Init() - mockAllocator := newMockAllocator() + mockAllocator := newMockAllocator(t) meta, err := newMemoryMeta() assert.NoError(t, err) schema := newTestSchema() - collID, err := mockAllocator.allocID(context.Background()) + collID, err := mockAllocator.AllocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) segmentManager, _ := newSegmentManager(meta, mockAllocator) @@ -414,12 +417,12 @@ func TestSaveSegmentsToMetaWithSpecificSegments(t *testing.T) { func TestDropSegment(t *testing.T) { paramtable.Init() - mockAllocator := newMockAllocator() + mockAllocator := newMockAllocator(t) meta, err := newMemoryMeta() assert.NoError(t, err) schema := newTestSchema() - collID, err := mockAllocator.allocID(context.Background()) + collID, err := mockAllocator.AllocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) segmentManager, _ := newSegmentManager(meta, mockAllocator) @@ -437,12 +440,12 @@ func TestDropSegment(t *testing.T) { func TestAllocRowsLargerThanOneSegment(t *testing.T) { paramtable.Init() - mockAllocator := newMockAllocator() + mockAllocator := newMockAllocator(t) meta, err := newMemoryMeta() assert.NoError(t, err) schema := newTestSchema() - collID, err := mockAllocator.allocID(context.Background()) + collID, err := mockAllocator.AllocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) @@ -459,12 +462,12 @@ func TestAllocRowsLargerThanOneSegment(t *testing.T) { func TestExpireAllocation(t *testing.T) { paramtable.Init() - mockAllocator := newMockAllocator() + mockAllocator := newMockAllocator(t) meta, err := newMemoryMeta() assert.NoError(t, err) schema := newTestSchema() - collID, err := mockAllocator.allocID(context.Background()) + collID, err := mockAllocator.AllocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) @@ -502,12 +505,12 @@ func TestExpireAllocation(t *testing.T) { func TestGetFlushableSegments(t *testing.T) { t.Run("get flushable segments between small interval", func(t *testing.T) { paramtable.Init() - mockAllocator := newMockAllocator() + mockAllocator := newMockAllocator(t) meta, err := newMemoryMeta() assert.NoError(t, err) schema := newTestSchema() - collID, err := mockAllocator.allocID(context.Background()) + collID, err := mockAllocator.AllocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) segmentManager, _ := newSegmentManager(meta, mockAllocator) @@ -548,12 +551,12 @@ func TestGetFlushableSegments(t *testing.T) { func TestTryToSealSegment(t *testing.T) { t.Run("normal seal with segment policies", func(t *testing.T) { paramtable.Init() - mockAllocator := newMockAllocator() + mockAllocator := newMockAllocator(t) meta, err := newMemoryMeta() assert.NoError(t, err) schema := newTestSchema() - collID, err := mockAllocator.allocID(context.Background()) + collID, err := mockAllocator.AllocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) segmentManager, _ := newSegmentManager(meta, mockAllocator, withSegmentSealPolices(sealL1SegmentByLifetime(math.MinInt64))) // always seal @@ -561,7 +564,7 @@ func TestTryToSealSegment(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, 1, len(allocations)) - ts, err := segmentManager.allocator.allocTimestamp(context.Background()) + ts, err := segmentManager.allocator.AllocTimestamp(context.Background()) assert.NoError(t, err) err = segmentManager.tryToSealSegment(ts, "c1") assert.NoError(t, err) @@ -573,12 +576,12 @@ func TestTryToSealSegment(t *testing.T) { t.Run("normal seal with channel seal policies", func(t *testing.T) { paramtable.Init() - mockAllocator := newMockAllocator() + mockAllocator := newMockAllocator(t) meta, err := newMemoryMeta() assert.NoError(t, err) schema := newTestSchema() - collID, err := mockAllocator.allocID(context.Background()) + collID, err := mockAllocator.AllocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) segmentManager, _ := newSegmentManager(meta, mockAllocator, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) // always seal @@ -586,7 +589,7 @@ func TestTryToSealSegment(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, 1, len(allocations)) - ts, err := segmentManager.allocator.allocTimestamp(context.Background()) + ts, err := segmentManager.allocator.AllocTimestamp(context.Background()) assert.NoError(t, err) err = segmentManager.tryToSealSegment(ts, "c1") assert.NoError(t, err) @@ -598,12 +601,12 @@ func TestTryToSealSegment(t *testing.T) { t.Run("normal seal with both segment & channel seal policy", func(t *testing.T) { paramtable.Init() - mockAllocator := newMockAllocator() + mockAllocator := newMockAllocator(t) meta, err := newMemoryMeta() assert.NoError(t, err) schema := newTestSchema() - collID, err := mockAllocator.allocID(context.Background()) + collID, err := mockAllocator.AllocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) segmentManager, _ := newSegmentManager(meta, mockAllocator, @@ -613,7 +616,7 @@ func TestTryToSealSegment(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, 1, len(allocations)) - ts, err := segmentManager.allocator.allocTimestamp(context.Background()) + ts, err := segmentManager.allocator.AllocTimestamp(context.Background()) assert.NoError(t, err) err = segmentManager.tryToSealSegment(ts, "c1") assert.NoError(t, err) @@ -625,12 +628,12 @@ func TestTryToSealSegment(t *testing.T) { t.Run("test sealByMaxBinlogFileNumberPolicy", func(t *testing.T) { paramtable.Init() - mockAllocator := newMockAllocator() + mockAllocator := newMockAllocator(t) meta, err := newMemoryMeta() assert.NoError(t, err) schema := newTestSchema() - collID, err := mockAllocator.allocID(context.Background()) + collID, err := mockAllocator.AllocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) segmentManager, _ := newSegmentManager(meta, mockAllocator) @@ -638,7 +641,7 @@ func TestTryToSealSegment(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, 1, len(allocations)) - ts, err := segmentManager.allocator.allocTimestamp(context.Background()) + ts, err := segmentManager.allocator.AllocTimestamp(context.Background()) assert.NoError(t, err) // No seal polices @@ -707,14 +710,14 @@ func TestTryToSealSegment(t *testing.T) { t.Run("seal with segment policy with kv fails", func(t *testing.T) { paramtable.Init() - mockAllocator := newMockAllocator() + mockAllocator := newMockAllocator(t) memoryKV := NewMetaMemoryKV() catalog := datacoord.NewCatalog(memoryKV, "", "") meta, err := newMeta(context.TODO(), catalog, nil) assert.NoError(t, err) schema := newTestSchema() - collID, err := mockAllocator.allocID(context.Background()) + collID, err := mockAllocator.AllocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) segmentManager, _ := newSegmentManager(meta, mockAllocator, withSegmentSealPolices(sealL1SegmentByLifetime(math.MinInt64))) // always seal @@ -728,7 +731,7 @@ func TestTryToSealSegment(t *testing.T) { metakv.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil).Maybe() segmentManager.meta.catalog = &datacoord.Catalog{MetaKv: metakv} - ts, err := segmentManager.allocator.allocTimestamp(context.Background()) + ts, err := segmentManager.allocator.AllocTimestamp(context.Background()) assert.NoError(t, err) err = segmentManager.tryToSealSegment(ts, "c1") assert.Error(t, err) @@ -736,14 +739,14 @@ func TestTryToSealSegment(t *testing.T) { t.Run("seal with channel policy with kv fails", func(t *testing.T) { paramtable.Init() - mockAllocator := newMockAllocator() + mockAllocator := newMockAllocator(t) memoryKV := NewMetaMemoryKV() catalog := datacoord.NewCatalog(memoryKV, "", "") meta, err := newMeta(context.TODO(), catalog, nil) assert.NoError(t, err) schema := newTestSchema() - collID, err := mockAllocator.allocID(context.Background()) + collID, err := mockAllocator.AllocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) segmentManager, _ := newSegmentManager(meta, mockAllocator, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) // always seal @@ -757,7 +760,7 @@ func TestTryToSealSegment(t *testing.T) { metakv.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil).Maybe() segmentManager.meta.catalog = &datacoord.Catalog{MetaKv: metakv} - ts, err := segmentManager.allocator.allocTimestamp(context.Background()) + ts, err := segmentManager.allocator.AllocTimestamp(context.Background()) assert.NoError(t, err) err = segmentManager.tryToSealSegment(ts, "c1") assert.Error(t, err) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index f461c4c009..ada9dab222 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/broker" datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" indexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client" @@ -111,7 +112,7 @@ type Server struct { kv kv.MetaKv meta *meta segmentManager Manager - allocator allocator + allocator allocator.Allocator cluster Cluster sessionManager SessionManager channelManager ChannelManager @@ -337,7 +338,7 @@ func (s *Server) initDataCoord() error { log.Info("init rootcoord client done") s.broker = broker.NewCoordinatorBroker(s.rootCoordClient) - s.allocator = newRootCoordAllocator(s.rootCoordClient) + s.allocator = allocator.NewRootCoordAllocator(s.rootCoordClient) storageCli, err := s.newChunkManagerFactory() if err != nil { diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 64848e960e..b01e78b94f 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -41,6 +41,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/broker" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore/model" @@ -2344,7 +2345,7 @@ func TestManualCompaction(t *testing.T) { paramtable.Get().Save(Params.DataCoordCfg.EnableCompaction.Key, "true") defer paramtable.Get().Reset(Params.DataCoordCfg.EnableCompaction.Key) t.Run("test manual compaction successfully", func(t *testing.T) { - svr := &Server{allocator: &MockAllocator{}} + svr := &Server{allocator: allocator.NewMockAllocator(t)} svr.stateCode.Store(commonpb.StateCode_Healthy) svr.compactionTrigger = &mockCompactionTrigger{ methods: map[string]interface{}{ @@ -2366,7 +2367,7 @@ func TestManualCompaction(t *testing.T) { }) t.Run("test manual compaction failure", func(t *testing.T) { - svr := &Server{allocator: &MockAllocator{}} + svr := &Server{allocator: allocator.NewMockAllocator(t)} svr.stateCode.Store(commonpb.StateCode_Healthy) svr.compactionTrigger = &mockCompactionTrigger{ methods: map[string]interface{}{ @@ -2463,7 +2464,7 @@ func TestOptions(t *testing.T) { defer kv.RemoveWithPrefix("") sessionManager := NewSessionManagerImpl() - channelManager, err := NewChannelManager(kv, newMockHandler(), sessionManager, newMockAllocator()) + channelManager, err := NewChannelManager(kv, newMockHandler(), sessionManager, allocator.NewMockAllocator(t)) assert.NoError(t, err) cluster := NewClusterImpl(sessionManager, channelManager) @@ -2502,9 +2503,10 @@ func TestHandleSessionEvent(t *testing.T) { }() ctx, cancel := context.WithCancel(context.TODO()) defer cancel() + alloc := allocator.NewMockAllocator(t) sessionManager := NewSessionManagerImpl() - channelManager, err := NewChannelManager(kv, newMockHandler(), sessionManager, newMockAllocator()) + channelManager, err := NewChannelManager(kv, newMockHandler(), sessionManager, alloc) assert.NoError(t, err) cluster := NewClusterImpl(sessionManager, channelManager) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 288761abab..a63f243906 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -102,7 +102,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F } // generate a timestamp timeOfSeal, all data before timeOfSeal is guaranteed to be sealed or flushed - ts, err := s.allocator.allocTimestamp(ctx) + ts, err := s.allocator.AllocTimestamp(ctx) if err != nil { log.Warn("unable to alloc timestamp", zap.Error(err)) return &datapb.FlushResponse{ @@ -1699,7 +1699,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter log.Info("list binlogs prefixes for import", zap.Any("binlog_prefixes", files)) } - idStart, _, err := s.allocator.allocN(int64(len(files)) + 1) + idStart, _, err := s.allocator.AllocN(int64(len(files)) + 1) if err != nil { resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("alloc id failed, err=%w", err))) return resp, nil diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 6a6081b5b3..8484ffca59 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -19,6 +19,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/metastore/model" mocks2 "github.com/milvus-io/milvus/internal/mocks" @@ -1317,14 +1318,14 @@ func TestImportV2(t *testing.T) { assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed)) // alloc failed - alloc := NewNMockAllocator(t) - alloc.EXPECT().allocN(mock.Anything).Return(0, 0, mockErr) + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, mockErr) s.allocator = alloc resp, err = s.ImportV2(ctx, &internalpb.ImportRequestInternal{}) assert.NoError(t, err) assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed)) - alloc = NewNMockAllocator(t) - alloc.EXPECT().allocN(mock.Anything).Return(0, 0, nil) + alloc = allocator.NewMockAllocator(t) + alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, nil) s.allocator = alloc // add job failed diff --git a/internal/datacoord/util_test.go b/internal/datacoord/util_test.go index c8a48c0cfa..52a0ccf04e 100644 --- a/internal/datacoord/util_test.go +++ b/internal/datacoord/util_test.go @@ -126,15 +126,15 @@ type fixedTSOAllocator struct { fixedTime time.Time } -func (f *fixedTSOAllocator) allocTimestamp(_ context.Context) (Timestamp, error) { +func (f *fixedTSOAllocator) AllocTimestamp(_ context.Context) (Timestamp, error) { return tsoutil.ComposeTS(f.fixedTime.UnixNano()/int64(time.Millisecond), 0), nil } -func (f *fixedTSOAllocator) allocID(_ context.Context) (UniqueID, error) { +func (f *fixedTSOAllocator) AllocID(_ context.Context) (UniqueID, error) { panic("not implemented") // TODO: Implement } -func (f *fixedTSOAllocator) allocN(_ context.Context, _ int64) (UniqueID, UniqueID, error) { +func (f *fixedTSOAllocator) AllocN(_ context.Context, _ int64) (UniqueID, UniqueID, error) { panic("not implemented") // TODO: Implement }