From a984e46a292ee63aeca8f7ad1c6d2f05a175d701 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Tue, 14 May 2024 10:03:32 +0800 Subject: [PATCH] enhance: Remove rootcoord from datanode broker (#32818) issue: https://github.com/milvus-io/milvus/issues/32827 Signed-off-by: bigsheeper --- internal/datanode/broker/broker.go | 18 +- internal/datanode/broker/datacoord_test.go | 2 +- internal/datanode/broker/mock_broker.go | 204 ++----------------- internal/datanode/broker/rootcoord.go | 103 ---------- internal/datanode/broker/rootcoord_test.go | 199 ------------------ internal/datanode/compactor_test.go | 8 - internal/datanode/data_node.go | 2 +- internal/datanode/flow_graph_manager_test.go | 10 - internal/datanode/meta_service.go | 80 -------- internal/datanode/meta_service_test.go | 106 ---------- internal/datanode/services_test.go | 11 - 11 files changed, 20 insertions(+), 723 deletions(-) delete mode 100644 internal/datanode/broker/rootcoord.go delete mode 100644 internal/datanode/broker/rootcoord_test.go delete mode 100644 internal/datanode/meta_service.go delete mode 100644 internal/datanode/meta_service_test.go diff --git a/internal/datanode/broker/broker.go b/internal/datanode/broker/broker.go index 59b8684ed3..7e85f7fa76 100644 --- a/internal/datanode/broker/broker.go +++ b/internal/datanode/broker/broker.go @@ -3,7 +3,6 @@ package broker import ( "context" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/types" @@ -11,22 +10,18 @@ import ( ) // Broker is the interface for datanode to interact with other components. +// +//go:generate mockery --name=Broker --structname=MockBroker --output=./ --filename=mock_broker.go --with-expecter --inpackage type Broker interface { - RootCoord DataCoord } type coordBroker struct { - *rootCoordBroker *dataCoordBroker } -func NewCoordBroker(rc types.RootCoordClient, dc types.DataCoordClient, serverID int64) Broker { +func NewCoordBroker(dc types.DataCoordClient, serverID int64) Broker { return &coordBroker{ - rootCoordBroker: &rootCoordBroker{ - client: rc, - serverID: serverID, - }, dataCoordBroker: &dataCoordBroker{ client: dc, serverID: serverID, @@ -34,13 +29,6 @@ func NewCoordBroker(rc types.RootCoordClient, dc types.DataCoordClient, serverID } } -// RootCoord is the interface wraps `RootCoord` grpc call -type RootCoord interface { - DescribeCollection(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*milvuspb.DescribeCollectionResponse, error) - ShowPartitions(ctx context.Context, dbName, collectionName string) (map[string]int64, error) - AllocTimestamp(ctx context.Context, num uint32) (ts uint64, count uint32, err error) -} - // DataCoord is the interface wraps `DataCoord` grpc call type DataCoord interface { AssignSegmentID(ctx context.Context, reqs ...*datapb.SegmentIDRequest) ([]typeutil.UniqueID, error) diff --git a/internal/datanode/broker/datacoord_test.go b/internal/datanode/broker/datacoord_test.go index bd8327a378..ace1c29eb9 100644 --- a/internal/datanode/broker/datacoord_test.go +++ b/internal/datanode/broker/datacoord_test.go @@ -33,7 +33,7 @@ func (s *dataCoordSuite) SetupSuite() { func (s *dataCoordSuite) SetupTest() { s.dc = mocks.NewMockDataCoordClient(s.T()) - s.broker = NewCoordBroker(nil, s.dc, 1) + s.broker = NewCoordBroker(s.dc, 1) } func (s *dataCoordSuite) resetMock() { diff --git a/internal/datanode/broker/mock_broker.go b/internal/datanode/broker/mock_broker.go index 623d1d40cf..f8b731c80e 100644 --- a/internal/datanode/broker/mock_broker.go +++ b/internal/datanode/broker/mock_broker.go @@ -1,13 +1,11 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.30.1. DO NOT EDIT. package broker import ( context "context" - milvuspb "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" datapb "github.com/milvus-io/milvus/internal/proto/datapb" - mock "github.com/stretchr/testify/mock" msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" @@ -26,66 +24,6 @@ func (_m *MockBroker) EXPECT() *MockBroker_Expecter { return &MockBroker_Expecter{mock: &_m.Mock} } -// AllocTimestamp provides a mock function with given fields: ctx, num -func (_m *MockBroker) AllocTimestamp(ctx context.Context, num uint32) (uint64, uint32, error) { - ret := _m.Called(ctx, num) - - var r0 uint64 - var r1 uint32 - var r2 error - if rf, ok := ret.Get(0).(func(context.Context, uint32) (uint64, uint32, error)); ok { - return rf(ctx, num) - } - if rf, ok := ret.Get(0).(func(context.Context, uint32) uint64); ok { - r0 = rf(ctx, num) - } else { - r0 = ret.Get(0).(uint64) - } - - if rf, ok := ret.Get(1).(func(context.Context, uint32) uint32); ok { - r1 = rf(ctx, num) - } else { - r1 = ret.Get(1).(uint32) - } - - if rf, ok := ret.Get(2).(func(context.Context, uint32) error); ok { - r2 = rf(ctx, num) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 -} - -// MockBroker_AllocTimestamp_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocTimestamp' -type MockBroker_AllocTimestamp_Call struct { - *mock.Call -} - -// AllocTimestamp is a helper method to define mock.On call -// - ctx context.Context -// - num uint32 -func (_e *MockBroker_Expecter) AllocTimestamp(ctx interface{}, num interface{}) *MockBroker_AllocTimestamp_Call { - return &MockBroker_AllocTimestamp_Call{Call: _e.mock.On("AllocTimestamp", ctx, num)} -} - -func (_c *MockBroker_AllocTimestamp_Call) Run(run func(ctx context.Context, num uint32)) *MockBroker_AllocTimestamp_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint32)) - }) - return _c -} - -func (_c *MockBroker_AllocTimestamp_Call) Return(ts uint64, count uint32, err error) *MockBroker_AllocTimestamp_Call { - _c.Call.Return(ts, count, err) - return _c -} - -func (_c *MockBroker_AllocTimestamp_Call) RunAndReturn(run func(context.Context, uint32) (uint64, uint32, error)) *MockBroker_AllocTimestamp_Call { - _c.Call.Return(run) - return _c -} - // AssignSegmentID provides a mock function with given fields: ctx, reqs func (_m *MockBroker) AssignSegmentID(ctx context.Context, reqs ...*datapb.SegmentIDRequest) ([]int64, error) { _va := make([]interface{}, len(reqs)) @@ -125,8 +63,8 @@ type MockBroker_AssignSegmentID_Call struct { } // AssignSegmentID is a helper method to define mock.On call -// - ctx context.Context -// - reqs ...*datapb.SegmentIDRequest +// - ctx context.Context +// - reqs ...*datapb.SegmentIDRequest func (_e *MockBroker_Expecter) AssignSegmentID(ctx interface{}, reqs ...interface{}) *MockBroker_AssignSegmentID_Call { return &MockBroker_AssignSegmentID_Call{Call: _e.mock.On("AssignSegmentID", append([]interface{}{ctx}, reqs...)...)} @@ -155,62 +93,6 @@ func (_c *MockBroker_AssignSegmentID_Call) RunAndReturn(run func(context.Context return _c } -// DescribeCollection provides a mock function with given fields: ctx, collectionID, ts -func (_m *MockBroker) DescribeCollection(ctx context.Context, collectionID int64, ts uint64) (*milvuspb.DescribeCollectionResponse, error) { - ret := _m.Called(ctx, collectionID, ts) - - var r0 *milvuspb.DescribeCollectionResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, uint64) (*milvuspb.DescribeCollectionResponse, error)); ok { - return rf(ctx, collectionID, ts) - } - if rf, ok := ret.Get(0).(func(context.Context, int64, uint64) *milvuspb.DescribeCollectionResponse); ok { - r0 = rf(ctx, collectionID, ts) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*milvuspb.DescribeCollectionResponse) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, int64, uint64) error); ok { - r1 = rf(ctx, collectionID, ts) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockBroker_DescribeCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DescribeCollection' -type MockBroker_DescribeCollection_Call struct { - *mock.Call -} - -// DescribeCollection is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - ts uint64 -func (_e *MockBroker_Expecter) DescribeCollection(ctx interface{}, collectionID interface{}, ts interface{}) *MockBroker_DescribeCollection_Call { - return &MockBroker_DescribeCollection_Call{Call: _e.mock.On("DescribeCollection", ctx, collectionID, ts)} -} - -func (_c *MockBroker_DescribeCollection_Call) Run(run func(ctx context.Context, collectionID int64, ts uint64)) *MockBroker_DescribeCollection_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(uint64)) - }) - return _c -} - -func (_c *MockBroker_DescribeCollection_Call) Return(_a0 *milvuspb.DescribeCollectionResponse, _a1 error) *MockBroker_DescribeCollection_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockBroker_DescribeCollection_Call) RunAndReturn(run func(context.Context, int64, uint64) (*milvuspb.DescribeCollectionResponse, error)) *MockBroker_DescribeCollection_Call { - _c.Call.Return(run) - return _c -} - // DropVirtualChannel provides a mock function with given fields: ctx, req func (_m *MockBroker) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) { ret := _m.Called(ctx, req) @@ -243,8 +125,8 @@ type MockBroker_DropVirtualChannel_Call struct { } // DropVirtualChannel is a helper method to define mock.On call -// - ctx context.Context -// - req *datapb.DropVirtualChannelRequest +// - ctx context.Context +// - req *datapb.DropVirtualChannelRequest func (_e *MockBroker_Expecter) DropVirtualChannel(ctx interface{}, req interface{}) *MockBroker_DropVirtualChannel_Call { return &MockBroker_DropVirtualChannel_Call{Call: _e.mock.On("DropVirtualChannel", ctx, req)} } @@ -298,8 +180,8 @@ type MockBroker_GetSegmentInfo_Call struct { } // GetSegmentInfo is a helper method to define mock.On call -// - ctx context.Context -// - segmentIDs []int64 +// - ctx context.Context +// - segmentIDs []int64 func (_e *MockBroker_Expecter) GetSegmentInfo(ctx interface{}, segmentIDs interface{}) *MockBroker_GetSegmentInfo_Call { return &MockBroker_GetSegmentInfo_Call{Call: _e.mock.On("GetSegmentInfo", ctx, segmentIDs)} } @@ -341,8 +223,8 @@ type MockBroker_ReportTimeTick_Call struct { } // ReportTimeTick is a helper method to define mock.On call -// - ctx context.Context -// - msgs []*msgpb.DataNodeTtMsg +// - ctx context.Context +// - msgs []*msgpb.DataNodeTtMsg func (_e *MockBroker_Expecter) ReportTimeTick(ctx interface{}, msgs interface{}) *MockBroker_ReportTimeTick_Call { return &MockBroker_ReportTimeTick_Call{Call: _e.mock.On("ReportTimeTick", ctx, msgs)} } @@ -384,8 +266,8 @@ type MockBroker_SaveBinlogPaths_Call struct { } // SaveBinlogPaths is a helper method to define mock.On call -// - ctx context.Context -// - req *datapb.SaveBinlogPathsRequest +// - ctx context.Context +// - req *datapb.SaveBinlogPathsRequest func (_e *MockBroker_Expecter) SaveBinlogPaths(ctx interface{}, req interface{}) *MockBroker_SaveBinlogPaths_Call { return &MockBroker_SaveBinlogPaths_Call{Call: _e.mock.On("SaveBinlogPaths", ctx, req)} } @@ -407,62 +289,6 @@ func (_c *MockBroker_SaveBinlogPaths_Call) RunAndReturn(run func(context.Context return _c } -// ShowPartitions provides a mock function with given fields: ctx, dbName, collectionName -func (_m *MockBroker) ShowPartitions(ctx context.Context, dbName string, collectionName string) (map[string]int64, error) { - ret := _m.Called(ctx, dbName, collectionName) - - var r0 map[string]int64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) (map[string]int64, error)); ok { - return rf(ctx, dbName, collectionName) - } - if rf, ok := ret.Get(0).(func(context.Context, string, string) map[string]int64); ok { - r0 = rf(ctx, dbName, collectionName) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(map[string]int64) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { - r1 = rf(ctx, dbName, collectionName) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockBroker_ShowPartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ShowPartitions' -type MockBroker_ShowPartitions_Call struct { - *mock.Call -} - -// ShowPartitions is a helper method to define mock.On call -// - ctx context.Context -// - dbName string -// - collectionName string -func (_e *MockBroker_Expecter) ShowPartitions(ctx interface{}, dbName interface{}, collectionName interface{}) *MockBroker_ShowPartitions_Call { - return &MockBroker_ShowPartitions_Call{Call: _e.mock.On("ShowPartitions", ctx, dbName, collectionName)} -} - -func (_c *MockBroker_ShowPartitions_Call) Run(run func(ctx context.Context, dbName string, collectionName string)) *MockBroker_ShowPartitions_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string)) - }) - return _c -} - -func (_c *MockBroker_ShowPartitions_Call) Return(_a0 map[string]int64, _a1 error) *MockBroker_ShowPartitions_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockBroker_ShowPartitions_Call) RunAndReturn(run func(context.Context, string, string) (map[string]int64, error)) *MockBroker_ShowPartitions_Call { - _c.Call.Return(run) - return _c -} - // UpdateChannelCheckpoint provides a mock function with given fields: ctx, channelCPs func (_m *MockBroker) UpdateChannelCheckpoint(ctx context.Context, channelCPs []*msgpb.MsgPosition) error { ret := _m.Called(ctx, channelCPs) @@ -483,8 +309,8 @@ type MockBroker_UpdateChannelCheckpoint_Call struct { } // UpdateChannelCheckpoint is a helper method to define mock.On call -// - ctx context.Context -// - channelCPs []*msgpb.MsgPosition +// - ctx context.Context +// - channelCPs []*msgpb.MsgPosition func (_e *MockBroker_Expecter) UpdateChannelCheckpoint(ctx interface{}, channelCPs interface{}) *MockBroker_UpdateChannelCheckpoint_Call { return &MockBroker_UpdateChannelCheckpoint_Call{Call: _e.mock.On("UpdateChannelCheckpoint", ctx, channelCPs)} } @@ -526,8 +352,8 @@ type MockBroker_UpdateSegmentStatistics_Call struct { } // UpdateSegmentStatistics is a helper method to define mock.On call -// - ctx context.Context -// - req *datapb.UpdateSegmentStatisticsRequest +// - ctx context.Context +// - req *datapb.UpdateSegmentStatisticsRequest func (_e *MockBroker_Expecter) UpdateSegmentStatistics(ctx interface{}, req interface{}) *MockBroker_UpdateSegmentStatistics_Call { return &MockBroker_UpdateSegmentStatistics_Call{Call: _e.mock.On("UpdateSegmentStatistics", ctx, req)} } diff --git a/internal/datanode/broker/rootcoord.go b/internal/datanode/broker/rootcoord.go deleted file mode 100644 index 69a6ebb9e1..0000000000 --- a/internal/datanode/broker/rootcoord.go +++ /dev/null @@ -1,103 +0,0 @@ -package broker - -import ( - "context" - "fmt" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus/internal/proto/rootcoordpb" - "github.com/milvus-io/milvus/internal/types" - "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/commonpbutil" - "github.com/milvus-io/milvus/pkg/util/merr" - "github.com/milvus-io/milvus/pkg/util/typeutil" -) - -type rootCoordBroker struct { - client types.RootCoordClient - serverID int64 -} - -func (rc *rootCoordBroker) DescribeCollection(ctx context.Context, collectionID typeutil.UniqueID, timestamp typeutil.Timestamp) (*milvuspb.DescribeCollectionResponse, error) { - log := log.Ctx(ctx).With( - zap.Int64("collectionID", collectionID), - zap.Uint64("timestamp", timestamp), - ) - req := &milvuspb.DescribeCollectionRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection), - commonpbutil.WithSourceID(rc.serverID), - ), - // please do not specify the collection name alone after database feature. - CollectionID: collectionID, - TimeStamp: timestamp, - } - - resp, err := rc.client.DescribeCollectionInternal(ctx, req) - if err := merr.CheckRPCCall(resp, err); err != nil { - log.Warn("failed to DescribeCollectionInternal", zap.Error(err)) - return nil, err - } - - return resp, nil -} - -func (rc *rootCoordBroker) ShowPartitions(ctx context.Context, dbName, collectionName string) (map[string]int64, error) { - req := &milvuspb.ShowPartitionsRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions), - ), - DbName: dbName, - CollectionName: collectionName, - } - - log := log.Ctx(ctx).With( - zap.String("dbName", dbName), - zap.String("collectionName", collectionName), - ) - - resp, err := rc.client.ShowPartitions(ctx, req) - if err := merr.CheckRPCCall(resp, err); err != nil { - log.Warn("failed to get partitions of collection", zap.Error(err)) - return nil, err - } - - partitionNames := resp.GetPartitionNames() - partitionIDs := resp.GetPartitionIDs() - if len(partitionNames) != len(partitionIDs) { - log.Warn("partition names and ids are unequal", - zap.Int("partitionNameNumber", len(partitionNames)), - zap.Int("partitionIDNumber", len(partitionIDs))) - return nil, fmt.Errorf("partition names and ids are unequal, number of names: %d, number of ids: %d", - len(partitionNames), len(partitionIDs)) - } - - partitions := make(map[string]int64) - for i := 0; i < len(partitionNames); i++ { - partitions[partitionNames[i]] = partitionIDs[i] - } - - return partitions, nil -} - -func (rc *rootCoordBroker) AllocTimestamp(ctx context.Context, num uint32) (uint64, uint32, error) { - log := log.Ctx(ctx) - - req := &rootcoordpb.AllocTimestampRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO), - commonpbutil.WithSourceID(rc.serverID), - ), - Count: num, - } - - resp, err := rc.client.AllocTimestamp(ctx, req) - if err := merr.CheckRPCCall(resp, err); err != nil { - log.Warn("failed to AllocTimestamp", zap.Error(err)) - return 0, 0, err - } - return resp.GetTimestamp(), resp.GetCount(), nil -} diff --git a/internal/datanode/broker/rootcoord_test.go b/internal/datanode/broker/rootcoord_test.go deleted file mode 100644 index f8f95b319e..0000000000 --- a/internal/datanode/broker/rootcoord_test.go +++ /dev/null @@ -1,199 +0,0 @@ -package broker - -import ( - "context" - "math/rand" - "testing" - "time" - - "github.com/cockroachdb/errors" - "github.com/samber/lo" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" - "google.golang.org/grpc" - - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus/internal/mocks" - "github.com/milvus-io/milvus/internal/proto/rootcoordpb" - "github.com/milvus-io/milvus/pkg/util/merr" - "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/tsoutil" -) - -type rootCoordSuite struct { - suite.Suite - - rc *mocks.MockRootCoordClient - broker Broker -} - -func (s *rootCoordSuite) SetupSuite() { - paramtable.Init() -} - -func (s *rootCoordSuite) SetupTest() { - s.rc = mocks.NewMockRootCoordClient(s.T()) - s.broker = NewCoordBroker(s.rc, nil, 1) -} - -func (s *rootCoordSuite) resetMock() { - s.rc.AssertExpectations(s.T()) - s.rc.ExpectedCalls = nil -} - -func (s *rootCoordSuite) TestDescribeCollection() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - collectionID := int64(100) - timestamp := tsoutil.ComposeTSByTime(time.Now(), 0) - - s.Run("normal_case", func() { - collName := "test_collection_name" - - s.rc.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything). - Run(func(_ context.Context, req *milvuspb.DescribeCollectionRequest, opts ...grpc.CallOption) { - s.Equal(collectionID, req.GetCollectionID()) - s.Equal(timestamp, req.GetTimeStamp()) - }). - Return(&milvuspb.DescribeCollectionResponse{ - Status: merr.Status(nil), - CollectionID: collectionID, - CollectionName: collName, - }, nil) - - resp, err := s.broker.DescribeCollection(ctx, collectionID, timestamp) - s.NoError(err) - s.Equal(collectionID, resp.GetCollectionID()) - s.Equal(collName, resp.GetCollectionName()) - s.resetMock() - }) - - s.Run("rootcoord_return_error", func() { - s.rc.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything). - Return(nil, errors.New("mock")) - - _, err := s.broker.DescribeCollection(ctx, collectionID, timestamp) - s.Error(err) - s.resetMock() - }) - - s.Run("rootcoord_return_failure_status", func() { - s.rc.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything). - Return(&milvuspb.DescribeCollectionResponse{ - Status: merr.Status(errors.New("mocked")), - }, nil) - - _, err := s.broker.DescribeCollection(ctx, collectionID, timestamp) - s.Error(err) - s.resetMock() - }) -} - -func (s *rootCoordSuite) TestShowPartitions() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dbName := "defaultDB" - collName := "testCollection" - - s.Run("normal_case", func() { - partitions := map[string]int64{ - "part1": 1001, - "part2": 1002, - "part3": 1003, - } - - names := lo.Keys(partitions) - ids := lo.Map(names, func(name string, _ int) int64 { - return partitions[name] - }) - - s.rc.EXPECT().ShowPartitions(mock.Anything, mock.Anything). - Run(func(_ context.Context, req *milvuspb.ShowPartitionsRequest, _ ...grpc.CallOption) { - s.Equal(dbName, req.GetDbName()) - s.Equal(collName, req.GetCollectionName()) - }). - Return(&milvuspb.ShowPartitionsResponse{ - Status: merr.Status(nil), - PartitionIDs: ids, - PartitionNames: names, - }, nil) - partNameIDs, err := s.broker.ShowPartitions(ctx, dbName, collName) - s.NoError(err) - s.Equal(len(partitions), len(partNameIDs)) - for name, id := range partitions { - result, ok := partNameIDs[name] - s.True(ok) - s.Equal(id, result) - } - s.resetMock() - }) - - s.Run("rootcoord_return_error", func() { - s.rc.EXPECT().ShowPartitions(mock.Anything, mock.Anything). - Return(nil, errors.New("mock")) - - _, err := s.broker.ShowPartitions(ctx, dbName, collName) - s.Error(err) - s.resetMock() - }) - - s.Run("partition_id_name_not_match", func() { - s.rc.EXPECT().ShowPartitions(mock.Anything, mock.Anything). - Return(&milvuspb.ShowPartitionsResponse{ - Status: merr.Status(nil), - PartitionIDs: []int64{1, 2}, - PartitionNames: []string{"part1"}, - }, nil) - - _, err := s.broker.ShowPartitions(ctx, dbName, collName) - s.Error(err) - s.resetMock() - }) -} - -func (s *rootCoordSuite) TestAllocTimestamp() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - s.Run("normal_case", func() { - num := rand.Intn(10) + 1 - ts := tsoutil.ComposeTSByTime(time.Now(), 0) - s.rc.EXPECT().AllocTimestamp(mock.Anything, mock.Anything). - Run(func(_ context.Context, req *rootcoordpb.AllocTimestampRequest, _ ...grpc.CallOption) { - s.EqualValues(num, req.GetCount()) - }). - Return(&rootcoordpb.AllocTimestampResponse{ - Status: merr.Status(nil), - Timestamp: ts, - Count: uint32(num), - }, nil) - - timestamp, cnt, err := s.broker.AllocTimestamp(ctx, uint32(num)) - s.NoError(err) - s.Equal(ts, timestamp) - s.EqualValues(num, cnt) - s.resetMock() - }) - - s.Run("rootcoord_return_error", func() { - s.rc.EXPECT().AllocTimestamp(mock.Anything, mock.Anything). - Return(nil, errors.New("mock")) - _, _, err := s.broker.AllocTimestamp(ctx, 1) - s.Error(err) - s.resetMock() - }) - - s.Run("rootcoord_return_failure_status", func() { - s.rc.EXPECT().AllocTimestamp(mock.Anything, mock.Anything). - Return(&rootcoordpb.AllocTimestampResponse{Status: merr.Status(errors.New("mock"))}, nil) - _, _, err := s.broker.AllocTimestamp(ctx, 1) - s.Error(err) - s.resetMock() - }) -} - -func TestRootCoordBroker(t *testing.T) { - suite.Run(t, new(rootCoordSuite)) -} diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index e3b329335c..ccae34beba 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -30,10 +30,8 @@ import ( "github.com/stretchr/testify/require" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/allocator" - "github.com/milvus-io/milvus/internal/datanode/broker" "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" @@ -278,12 +276,6 @@ func TestCompactionTaskInnerMethods(t *testing.T) { collectionID := int64(1) meta := NewMetaFactory().GetCollectionMeta(collectionID, "test", schemapb.DataType_Int64) - broker := broker.NewMockBroker(t) - broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything). - Return(&milvuspb.DescribeCollectionResponse{ - Schema: meta.GetSchema(), - }, nil).Maybe() - metaCache := metacache.NewMockMetaCache(t) metaCache.EXPECT().Schema().Return(meta.GetSchema()).Maybe() metaCache.EXPECT().GetSegmentByID(mock.Anything).RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) { diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 7ee37c7cae..820fd21e84 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -245,7 +245,7 @@ func (node *DataNode) Init() error { serverID := node.GetNodeID() log := log.Ctx(node.ctx).With(zap.String("role", typeutil.DataNodeRole), zap.Int64("nodeID", serverID)) - node.broker = broker.NewCoordBroker(node.rootCoord, node.dataCoord, serverID) + node.broker = broker.NewCoordBroker(node.dataCoord, serverID) err := node.initRateCollector() if err != nil { diff --git a/internal/datanode/flow_graph_manager_test.go b/internal/datanode/flow_graph_manager_test.go index 748d3828e5..6d9158fe42 100644 --- a/internal/datanode/flow_graph_manager_test.go +++ b/internal/datanode/flow_graph_manager_test.go @@ -25,13 +25,11 @@ import ( "github.com/stretchr/testify/require" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/broker" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/etcd" - "github.com/milvus-io/milvus/pkg/util/merr" ) func TestFlowGraphManager(t *testing.T) { @@ -55,20 +53,12 @@ func TestFlowGraphManager(t *testing.T) { err = node.Init() require.Nil(t, err) - meta := NewMetaFactory().GetCollectionMeta(1, "test_collection", schemapb.DataType_Int64) broker := broker.NewMockBroker(t) broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe() broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe() broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Maybe() - broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything). - Return(&milvuspb.DescribeCollectionResponse{ - Status: merr.Status(nil), - CollectionID: 1, - CollectionName: "test_collection", - Schema: meta.GetSchema(), - }, nil).Maybe() node.broker = broker diff --git a/internal/datanode/meta_service.go b/internal/datanode/meta_service.go deleted file mode 100644 index 32514d404e..0000000000 --- a/internal/datanode/meta_service.go +++ /dev/null @@ -1,80 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package datanode - -import ( - "context" - "reflect" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/datanode/broker" - "github.com/milvus-io/milvus/internal/proto/etcdpb" - "github.com/milvus-io/milvus/pkg/log" -) - -// metaService initialize channel collection in data node from root coord. -// Initializing channel collection happens on data node starting. It depends on -// a healthy root coord and a valid root coord grpc client. -type metaService struct { - collectionID UniqueID - broker broker.Broker -} - -// newMetaService creates a new metaService with provided RootCoord and collectionID. -func newMetaService(broker broker.Broker, collectionID UniqueID) *metaService { - return &metaService{ - broker: broker, - collectionID: collectionID, - } -} - -// getCollectionSchema get collection schema with provided collection id at specified timestamp. -func (mService *metaService) getCollectionSchema(ctx context.Context, collID UniqueID, timestamp Timestamp) (*schemapb.CollectionSchema, error) { - response, err := mService.getCollectionInfo(ctx, collID, timestamp) - if err != nil { - return nil, err - } - return response.GetSchema(), nil -} - -// getCollectionInfo get collection info with provided collection id at specified timestamp. -func (mService *metaService) getCollectionInfo(ctx context.Context, collID UniqueID, timestamp Timestamp) (*milvuspb.DescribeCollectionResponse, error) { - response, err := mService.broker.DescribeCollection(ctx, collID, timestamp) - if err != nil { - log.Error("failed to describe collection from rootcoord", zap.Int64("collectionID", collID), zap.Error(err)) - return nil, err - } - - return response, nil -} - -// printCollectionStruct util function to print schema data, used in tests only. -func printCollectionStruct(obj *etcdpb.CollectionMeta) { - v := reflect.ValueOf(obj) - v = reflect.Indirect(v) - typeOfS := v.Type() - - for i := 0; i < v.NumField()-3; i++ { - if typeOfS.Field(i).Name == "GrpcMarshalString" { - continue - } - log.Info("Collection field", zap.String("field", typeOfS.Field(i).Name), zap.Any("value", v.Field(i).Interface())) - } -} diff --git a/internal/datanode/meta_service_test.go b/internal/datanode/meta_service_test.go deleted file mode 100644 index c1a0c9ed60..0000000000 --- a/internal/datanode/meta_service_test.go +++ /dev/null @@ -1,106 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package datanode - -import ( - "context" - "testing" - - "github.com/cockroachdb/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "google.golang.org/grpc" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/datanode/broker" - "github.com/milvus-io/milvus/pkg/util/merr" -) - -const ( - collectionID0 = UniqueID(2) - collectionID1 = UniqueID(1) - collectionName0 = "collection_0" - collectionName1 = "collection_1" -) - -func TestMetaService_All(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - meta := NewMetaFactory().GetCollectionMeta(collectionID0, collectionName0, schemapb.DataType_Int64) - broker := broker.NewMockBroker(t) - broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything). - Return(&milvuspb.DescribeCollectionResponse{ - Status: merr.Status(nil), - Schema: meta.GetSchema(), - }, nil).Maybe() - - ms := newMetaService(broker, collectionID0) - - t.Run("Test getCollectionSchema", func(t *testing.T) { - sch, err := ms.getCollectionSchema(ctx, collectionID0, 0) - assert.NoError(t, err) - assert.NotNil(t, sch) - assert.Equal(t, sch.Name, collectionName0) - }) - - t.Run("Test printCollectionStruct", func(t *testing.T) { - mf := &MetaFactory{} - collectionMeta := mf.GetCollectionMeta(collectionID0, collectionName0, schemapb.DataType_Int64) - printCollectionStruct(collectionMeta) - }) -} - -// RootCoordFails1 root coord mock for failure -type RootCoordFails1 struct { - RootCoordFactory -} - -// DescribeCollectionInternal override method that will fails -func (rc *RootCoordFails1) DescribeCollectionInternal(ctx context.Context, req *milvuspb.DescribeCollectionRequest, opts ...grpc.CallOption) (*milvuspb.DescribeCollectionResponse, error) { - return nil, errors.New("always fail") -} - -// RootCoordFails2 root coord mock for failure -type RootCoordFails2 struct { - RootCoordFactory -} - -// DescribeCollectionInternal override method that will fails -func (rc *RootCoordFails2) DescribeCollectionInternal(ctx context.Context, req *milvuspb.DescribeCollectionRequest, opts ...grpc.CallOption) (*milvuspb.DescribeCollectionResponse, error) { - return &milvuspb.DescribeCollectionResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, - }, nil -} - -func TestMetaServiceRootCoodFails(t *testing.T) { - t.Run("Test Describe with error", func(t *testing.T) { - rc := &RootCoordFails1{} - rc.setCollectionID(collectionID0) - rc.setCollectionName(collectionName0) - - broker := broker.NewMockBroker(t) - broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything). - Return(nil, errors.New("mock")) - - ms := newMetaService(broker, collectionID0) - _, err := ms.getCollectionSchema(context.Background(), collectionID1, 0) - assert.Error(t, err) - }) -} diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 1393f2a57d..00a803642a 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -21,7 +21,6 @@ import ( "math/rand" "sync" "testing" - "time" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -46,7 +45,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/tsoutil" ) type DataNodeServicesSuite struct { @@ -94,21 +92,12 @@ func (s *DataNodeServicesSuite) SetupTest() { }, nil).Maybe() s.node.allocator = alloc - meta := NewMetaFactory().GetCollectionMeta(1, "collection", schemapb.DataType_Int64) broker := broker.NewMockBroker(s.T()) broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything). Return([]*datapb.SegmentInfo{}, nil).Maybe() - broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything). - Return(&milvuspb.DescribeCollectionResponse{ - Status: merr.Status(nil), - Schema: meta.GetSchema(), - ShardsNum: common.DefaultShardsNum, - }, nil).Maybe() broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Maybe() - broker.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Call.Return(tsoutil.ComposeTSByTime(time.Now(), 0), - func(_ context.Context, num uint32) uint32 { return num }, nil).Maybe() s.broker = broker s.node.broker = broker