enhance: Remove rootcoord from datanode broker (#32818)

issue: https://github.com/milvus-io/milvus/issues/32827

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2024-05-14 10:03:32 +08:00 committed by GitHub
parent 09cca1fb51
commit a984e46a29
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 20 additions and 723 deletions

View File

@ -3,7 +3,6 @@ package broker
import (
"context"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
@ -11,22 +10,18 @@ import (
)
// Broker is the interface for datanode to interact with other components.
//
//go:generate mockery --name=Broker --structname=MockBroker --output=./ --filename=mock_broker.go --with-expecter --inpackage
type Broker interface {
RootCoord
DataCoord
}
type coordBroker struct {
*rootCoordBroker
*dataCoordBroker
}
func NewCoordBroker(rc types.RootCoordClient, dc types.DataCoordClient, serverID int64) Broker {
func NewCoordBroker(dc types.DataCoordClient, serverID int64) Broker {
return &coordBroker{
rootCoordBroker: &rootCoordBroker{
client: rc,
serverID: serverID,
},
dataCoordBroker: &dataCoordBroker{
client: dc,
serverID: serverID,
@ -34,13 +29,6 @@ func NewCoordBroker(rc types.RootCoordClient, dc types.DataCoordClient, serverID
}
}
// RootCoord is the interface wraps `RootCoord` grpc call
type RootCoord interface {
DescribeCollection(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*milvuspb.DescribeCollectionResponse, error)
ShowPartitions(ctx context.Context, dbName, collectionName string) (map[string]int64, error)
AllocTimestamp(ctx context.Context, num uint32) (ts uint64, count uint32, err error)
}
// DataCoord is the interface wraps `DataCoord` grpc call
type DataCoord interface {
AssignSegmentID(ctx context.Context, reqs ...*datapb.SegmentIDRequest) ([]typeutil.UniqueID, error)

View File

@ -33,7 +33,7 @@ func (s *dataCoordSuite) SetupSuite() {
func (s *dataCoordSuite) SetupTest() {
s.dc = mocks.NewMockDataCoordClient(s.T())
s.broker = NewCoordBroker(nil, s.dc, 1)
s.broker = NewCoordBroker(s.dc, 1)
}
func (s *dataCoordSuite) resetMock() {

View File

@ -1,13 +1,11 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
// Code generated by mockery v2.30.1. DO NOT EDIT.
package broker
import (
context "context"
milvuspb "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
mock "github.com/stretchr/testify/mock"
msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
@ -26,66 +24,6 @@ func (_m *MockBroker) EXPECT() *MockBroker_Expecter {
return &MockBroker_Expecter{mock: &_m.Mock}
}
// AllocTimestamp provides a mock function with given fields: ctx, num
func (_m *MockBroker) AllocTimestamp(ctx context.Context, num uint32) (uint64, uint32, error) {
ret := _m.Called(ctx, num)
var r0 uint64
var r1 uint32
var r2 error
if rf, ok := ret.Get(0).(func(context.Context, uint32) (uint64, uint32, error)); ok {
return rf(ctx, num)
}
if rf, ok := ret.Get(0).(func(context.Context, uint32) uint64); ok {
r0 = rf(ctx, num)
} else {
r0 = ret.Get(0).(uint64)
}
if rf, ok := ret.Get(1).(func(context.Context, uint32) uint32); ok {
r1 = rf(ctx, num)
} else {
r1 = ret.Get(1).(uint32)
}
if rf, ok := ret.Get(2).(func(context.Context, uint32) error); ok {
r2 = rf(ctx, num)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// MockBroker_AllocTimestamp_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocTimestamp'
type MockBroker_AllocTimestamp_Call struct {
*mock.Call
}
// AllocTimestamp is a helper method to define mock.On call
// - ctx context.Context
// - num uint32
func (_e *MockBroker_Expecter) AllocTimestamp(ctx interface{}, num interface{}) *MockBroker_AllocTimestamp_Call {
return &MockBroker_AllocTimestamp_Call{Call: _e.mock.On("AllocTimestamp", ctx, num)}
}
func (_c *MockBroker_AllocTimestamp_Call) Run(run func(ctx context.Context, num uint32)) *MockBroker_AllocTimestamp_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(uint32))
})
return _c
}
func (_c *MockBroker_AllocTimestamp_Call) Return(ts uint64, count uint32, err error) *MockBroker_AllocTimestamp_Call {
_c.Call.Return(ts, count, err)
return _c
}
func (_c *MockBroker_AllocTimestamp_Call) RunAndReturn(run func(context.Context, uint32) (uint64, uint32, error)) *MockBroker_AllocTimestamp_Call {
_c.Call.Return(run)
return _c
}
// AssignSegmentID provides a mock function with given fields: ctx, reqs
func (_m *MockBroker) AssignSegmentID(ctx context.Context, reqs ...*datapb.SegmentIDRequest) ([]int64, error) {
_va := make([]interface{}, len(reqs))
@ -155,62 +93,6 @@ func (_c *MockBroker_AssignSegmentID_Call) RunAndReturn(run func(context.Context
return _c
}
// DescribeCollection provides a mock function with given fields: ctx, collectionID, ts
func (_m *MockBroker) DescribeCollection(ctx context.Context, collectionID int64, ts uint64) (*milvuspb.DescribeCollectionResponse, error) {
ret := _m.Called(ctx, collectionID, ts)
var r0 *milvuspb.DescribeCollectionResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int64, uint64) (*milvuspb.DescribeCollectionResponse, error)); ok {
return rf(ctx, collectionID, ts)
}
if rf, ok := ret.Get(0).(func(context.Context, int64, uint64) *milvuspb.DescribeCollectionResponse); ok {
r0 = rf(ctx, collectionID, ts)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.DescribeCollectionResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, int64, uint64) error); ok {
r1 = rf(ctx, collectionID, ts)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockBroker_DescribeCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DescribeCollection'
type MockBroker_DescribeCollection_Call struct {
*mock.Call
}
// DescribeCollection is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - ts uint64
func (_e *MockBroker_Expecter) DescribeCollection(ctx interface{}, collectionID interface{}, ts interface{}) *MockBroker_DescribeCollection_Call {
return &MockBroker_DescribeCollection_Call{Call: _e.mock.On("DescribeCollection", ctx, collectionID, ts)}
}
func (_c *MockBroker_DescribeCollection_Call) Run(run func(ctx context.Context, collectionID int64, ts uint64)) *MockBroker_DescribeCollection_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64), args[2].(uint64))
})
return _c
}
func (_c *MockBroker_DescribeCollection_Call) Return(_a0 *milvuspb.DescribeCollectionResponse, _a1 error) *MockBroker_DescribeCollection_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockBroker_DescribeCollection_Call) RunAndReturn(run func(context.Context, int64, uint64) (*milvuspb.DescribeCollectionResponse, error)) *MockBroker_DescribeCollection_Call {
_c.Call.Return(run)
return _c
}
// DropVirtualChannel provides a mock function with given fields: ctx, req
func (_m *MockBroker) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
ret := _m.Called(ctx, req)
@ -407,62 +289,6 @@ func (_c *MockBroker_SaveBinlogPaths_Call) RunAndReturn(run func(context.Context
return _c
}
// ShowPartitions provides a mock function with given fields: ctx, dbName, collectionName
func (_m *MockBroker) ShowPartitions(ctx context.Context, dbName string, collectionName string) (map[string]int64, error) {
ret := _m.Called(ctx, dbName, collectionName)
var r0 map[string]int64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string, string) (map[string]int64, error)); ok {
return rf(ctx, dbName, collectionName)
}
if rf, ok := ret.Get(0).(func(context.Context, string, string) map[string]int64); ok {
r0 = rf(ctx, dbName, collectionName)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[string]int64)
}
}
if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok {
r1 = rf(ctx, dbName, collectionName)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockBroker_ShowPartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ShowPartitions'
type MockBroker_ShowPartitions_Call struct {
*mock.Call
}
// ShowPartitions is a helper method to define mock.On call
// - ctx context.Context
// - dbName string
// - collectionName string
func (_e *MockBroker_Expecter) ShowPartitions(ctx interface{}, dbName interface{}, collectionName interface{}) *MockBroker_ShowPartitions_Call {
return &MockBroker_ShowPartitions_Call{Call: _e.mock.On("ShowPartitions", ctx, dbName, collectionName)}
}
func (_c *MockBroker_ShowPartitions_Call) Run(run func(ctx context.Context, dbName string, collectionName string)) *MockBroker_ShowPartitions_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(string))
})
return _c
}
func (_c *MockBroker_ShowPartitions_Call) Return(_a0 map[string]int64, _a1 error) *MockBroker_ShowPartitions_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockBroker_ShowPartitions_Call) RunAndReturn(run func(context.Context, string, string) (map[string]int64, error)) *MockBroker_ShowPartitions_Call {
_c.Call.Return(run)
return _c
}
// UpdateChannelCheckpoint provides a mock function with given fields: ctx, channelCPs
func (_m *MockBroker) UpdateChannelCheckpoint(ctx context.Context, channelCPs []*msgpb.MsgPosition) error {
ret := _m.Called(ctx, channelCPs)

View File

@ -1,103 +0,0 @@
package broker
import (
"context"
"fmt"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type rootCoordBroker struct {
client types.RootCoordClient
serverID int64
}
func (rc *rootCoordBroker) DescribeCollection(ctx context.Context, collectionID typeutil.UniqueID, timestamp typeutil.Timestamp) (*milvuspb.DescribeCollectionResponse, error) {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", collectionID),
zap.Uint64("timestamp", timestamp),
)
req := &milvuspb.DescribeCollectionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
commonpbutil.WithSourceID(rc.serverID),
),
// please do not specify the collection name alone after database feature.
CollectionID: collectionID,
TimeStamp: timestamp,
}
resp, err := rc.client.DescribeCollectionInternal(ctx, req)
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("failed to DescribeCollectionInternal", zap.Error(err))
return nil, err
}
return resp, nil
}
func (rc *rootCoordBroker) ShowPartitions(ctx context.Context, dbName, collectionName string) (map[string]int64, error) {
req := &milvuspb.ShowPartitionsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions),
),
DbName: dbName,
CollectionName: collectionName,
}
log := log.Ctx(ctx).With(
zap.String("dbName", dbName),
zap.String("collectionName", collectionName),
)
resp, err := rc.client.ShowPartitions(ctx, req)
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("failed to get partitions of collection", zap.Error(err))
return nil, err
}
partitionNames := resp.GetPartitionNames()
partitionIDs := resp.GetPartitionIDs()
if len(partitionNames) != len(partitionIDs) {
log.Warn("partition names and ids are unequal",
zap.Int("partitionNameNumber", len(partitionNames)),
zap.Int("partitionIDNumber", len(partitionIDs)))
return nil, fmt.Errorf("partition names and ids are unequal, number of names: %d, number of ids: %d",
len(partitionNames), len(partitionIDs))
}
partitions := make(map[string]int64)
for i := 0; i < len(partitionNames); i++ {
partitions[partitionNames[i]] = partitionIDs[i]
}
return partitions, nil
}
func (rc *rootCoordBroker) AllocTimestamp(ctx context.Context, num uint32) (uint64, uint32, error) {
log := log.Ctx(ctx)
req := &rootcoordpb.AllocTimestampRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO),
commonpbutil.WithSourceID(rc.serverID),
),
Count: num,
}
resp, err := rc.client.AllocTimestamp(ctx, req)
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("failed to AllocTimestamp", zap.Error(err))
return 0, 0, err
}
return resp.GetTimestamp(), resp.GetCount(), nil
}

View File

@ -1,199 +0,0 @@
package broker
import (
"context"
"math/rand"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
type rootCoordSuite struct {
suite.Suite
rc *mocks.MockRootCoordClient
broker Broker
}
func (s *rootCoordSuite) SetupSuite() {
paramtable.Init()
}
func (s *rootCoordSuite) SetupTest() {
s.rc = mocks.NewMockRootCoordClient(s.T())
s.broker = NewCoordBroker(s.rc, nil, 1)
}
func (s *rootCoordSuite) resetMock() {
s.rc.AssertExpectations(s.T())
s.rc.ExpectedCalls = nil
}
func (s *rootCoordSuite) TestDescribeCollection() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
collectionID := int64(100)
timestamp := tsoutil.ComposeTSByTime(time.Now(), 0)
s.Run("normal_case", func() {
collName := "test_collection_name"
s.rc.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).
Run(func(_ context.Context, req *milvuspb.DescribeCollectionRequest, opts ...grpc.CallOption) {
s.Equal(collectionID, req.GetCollectionID())
s.Equal(timestamp, req.GetTimeStamp())
}).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
CollectionID: collectionID,
CollectionName: collName,
}, nil)
resp, err := s.broker.DescribeCollection(ctx, collectionID, timestamp)
s.NoError(err)
s.Equal(collectionID, resp.GetCollectionID())
s.Equal(collName, resp.GetCollectionName())
s.resetMock()
})
s.Run("rootcoord_return_error", func() {
s.rc.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).
Return(nil, errors.New("mock"))
_, err := s.broker.DescribeCollection(ctx, collectionID, timestamp)
s.Error(err)
s.resetMock()
})
s.Run("rootcoord_return_failure_status", func() {
s.rc.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(errors.New("mocked")),
}, nil)
_, err := s.broker.DescribeCollection(ctx, collectionID, timestamp)
s.Error(err)
s.resetMock()
})
}
func (s *rootCoordSuite) TestShowPartitions() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dbName := "defaultDB"
collName := "testCollection"
s.Run("normal_case", func() {
partitions := map[string]int64{
"part1": 1001,
"part2": 1002,
"part3": 1003,
}
names := lo.Keys(partitions)
ids := lo.Map(names, func(name string, _ int) int64 {
return partitions[name]
})
s.rc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).
Run(func(_ context.Context, req *milvuspb.ShowPartitionsRequest, _ ...grpc.CallOption) {
s.Equal(dbName, req.GetDbName())
s.Equal(collName, req.GetCollectionName())
}).
Return(&milvuspb.ShowPartitionsResponse{
Status: merr.Status(nil),
PartitionIDs: ids,
PartitionNames: names,
}, nil)
partNameIDs, err := s.broker.ShowPartitions(ctx, dbName, collName)
s.NoError(err)
s.Equal(len(partitions), len(partNameIDs))
for name, id := range partitions {
result, ok := partNameIDs[name]
s.True(ok)
s.Equal(id, result)
}
s.resetMock()
})
s.Run("rootcoord_return_error", func() {
s.rc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).
Return(nil, errors.New("mock"))
_, err := s.broker.ShowPartitions(ctx, dbName, collName)
s.Error(err)
s.resetMock()
})
s.Run("partition_id_name_not_match", func() {
s.rc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).
Return(&milvuspb.ShowPartitionsResponse{
Status: merr.Status(nil),
PartitionIDs: []int64{1, 2},
PartitionNames: []string{"part1"},
}, nil)
_, err := s.broker.ShowPartitions(ctx, dbName, collName)
s.Error(err)
s.resetMock()
})
}
func (s *rootCoordSuite) TestAllocTimestamp() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.Run("normal_case", func() {
num := rand.Intn(10) + 1
ts := tsoutil.ComposeTSByTime(time.Now(), 0)
s.rc.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).
Run(func(_ context.Context, req *rootcoordpb.AllocTimestampRequest, _ ...grpc.CallOption) {
s.EqualValues(num, req.GetCount())
}).
Return(&rootcoordpb.AllocTimestampResponse{
Status: merr.Status(nil),
Timestamp: ts,
Count: uint32(num),
}, nil)
timestamp, cnt, err := s.broker.AllocTimestamp(ctx, uint32(num))
s.NoError(err)
s.Equal(ts, timestamp)
s.EqualValues(num, cnt)
s.resetMock()
})
s.Run("rootcoord_return_error", func() {
s.rc.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).
Return(nil, errors.New("mock"))
_, _, err := s.broker.AllocTimestamp(ctx, 1)
s.Error(err)
s.resetMock()
})
s.Run("rootcoord_return_failure_status", func() {
s.rc.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).
Return(&rootcoordpb.AllocTimestampResponse{Status: merr.Status(errors.New("mock"))}, nil)
_, _, err := s.broker.AllocTimestamp(ctx, 1)
s.Error(err)
s.resetMock()
})
}
func TestRootCoordBroker(t *testing.T) {
suite.Run(t, new(rootCoordSuite))
}

View File

@ -30,10 +30,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/datanode/io"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
@ -278,12 +276,6 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
collectionID := int64(1)
meta := NewMetaFactory().GetCollectionMeta(collectionID, "test", schemapb.DataType_Int64)
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Schema: meta.GetSchema(),
}, nil).Maybe()
metaCache := metacache.NewMockMetaCache(t)
metaCache.EXPECT().Schema().Return(meta.GetSchema()).Maybe()
metaCache.EXPECT().GetSegmentByID(mock.Anything).RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) {

View File

@ -245,7 +245,7 @@ func (node *DataNode) Init() error {
serverID := node.GetNodeID()
log := log.Ctx(node.ctx).With(zap.String("role", typeutil.DataNodeRole), zap.Int64("nodeID", serverID))
node.broker = broker.NewCoordBroker(node.rootCoord, node.dataCoord, serverID)
node.broker = broker.NewCoordBroker(node.dataCoord, serverID)
err := node.initRateCollector()
if err != nil {

View File

@ -25,13 +25,11 @@ import (
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
)
func TestFlowGraphManager(t *testing.T) {
@ -55,20 +53,12 @@ func TestFlowGraphManager(t *testing.T) {
err = node.Init()
require.Nil(t, err)
meta := NewMetaFactory().GetCollectionMeta(1, "test_collection", schemapb.DataType_Int64)
broker := broker.NewMockBroker(t)
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe()
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
CollectionID: 1,
CollectionName: "test_collection",
Schema: meta.GetSchema(),
}, nil).Maybe()
node.broker = broker

View File

@ -1,80 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"context"
"reflect"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/pkg/log"
)
// metaService initialize channel collection in data node from root coord.
// Initializing channel collection happens on data node starting. It depends on
// a healthy root coord and a valid root coord grpc client.
type metaService struct {
collectionID UniqueID
broker broker.Broker
}
// newMetaService creates a new metaService with provided RootCoord and collectionID.
func newMetaService(broker broker.Broker, collectionID UniqueID) *metaService {
return &metaService{
broker: broker,
collectionID: collectionID,
}
}
// getCollectionSchema get collection schema with provided collection id at specified timestamp.
func (mService *metaService) getCollectionSchema(ctx context.Context, collID UniqueID, timestamp Timestamp) (*schemapb.CollectionSchema, error) {
response, err := mService.getCollectionInfo(ctx, collID, timestamp)
if err != nil {
return nil, err
}
return response.GetSchema(), nil
}
// getCollectionInfo get collection info with provided collection id at specified timestamp.
func (mService *metaService) getCollectionInfo(ctx context.Context, collID UniqueID, timestamp Timestamp) (*milvuspb.DescribeCollectionResponse, error) {
response, err := mService.broker.DescribeCollection(ctx, collID, timestamp)
if err != nil {
log.Error("failed to describe collection from rootcoord", zap.Int64("collectionID", collID), zap.Error(err))
return nil, err
}
return response, nil
}
// printCollectionStruct util function to print schema data, used in tests only.
func printCollectionStruct(obj *etcdpb.CollectionMeta) {
v := reflect.ValueOf(obj)
v = reflect.Indirect(v)
typeOfS := v.Type()
for i := 0; i < v.NumField()-3; i++ {
if typeOfS.Field(i).Name == "GrpcMarshalString" {
continue
}
log.Info("Collection field", zap.String("field", typeOfS.Field(i).Name), zap.Any("value", v.Field(i).Interface()))
}
}

View File

@ -1,106 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"context"
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/pkg/util/merr"
)
const (
collectionID0 = UniqueID(2)
collectionID1 = UniqueID(1)
collectionName0 = "collection_0"
collectionName1 = "collection_1"
)
func TestMetaService_All(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
meta := NewMetaFactory().GetCollectionMeta(collectionID0, collectionName0, schemapb.DataType_Int64)
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: meta.GetSchema(),
}, nil).Maybe()
ms := newMetaService(broker, collectionID0)
t.Run("Test getCollectionSchema", func(t *testing.T) {
sch, err := ms.getCollectionSchema(ctx, collectionID0, 0)
assert.NoError(t, err)
assert.NotNil(t, sch)
assert.Equal(t, sch.Name, collectionName0)
})
t.Run("Test printCollectionStruct", func(t *testing.T) {
mf := &MetaFactory{}
collectionMeta := mf.GetCollectionMeta(collectionID0, collectionName0, schemapb.DataType_Int64)
printCollectionStruct(collectionMeta)
})
}
// RootCoordFails1 root coord mock for failure
type RootCoordFails1 struct {
RootCoordFactory
}
// DescribeCollectionInternal override method that will fails
func (rc *RootCoordFails1) DescribeCollectionInternal(ctx context.Context, req *milvuspb.DescribeCollectionRequest, opts ...grpc.CallOption) (*milvuspb.DescribeCollectionResponse, error) {
return nil, errors.New("always fail")
}
// RootCoordFails2 root coord mock for failure
type RootCoordFails2 struct {
RootCoordFactory
}
// DescribeCollectionInternal override method that will fails
func (rc *RootCoordFails2) DescribeCollectionInternal(ctx context.Context, req *milvuspb.DescribeCollectionRequest, opts ...grpc.CallOption) (*milvuspb.DescribeCollectionResponse, error) {
return &milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError},
}, nil
}
func TestMetaServiceRootCoodFails(t *testing.T) {
t.Run("Test Describe with error", func(t *testing.T) {
rc := &RootCoordFails1{}
rc.setCollectionID(collectionID0)
rc.setCollectionName(collectionName0)
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(nil, errors.New("mock"))
ms := newMetaService(broker, collectionID0)
_, err := ms.getCollectionSchema(context.Background(), collectionID1, 0)
assert.Error(t, err)
})
}

View File

@ -21,7 +21,6 @@ import (
"math/rand"
"sync"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -46,7 +45,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
type DataNodeServicesSuite struct {
@ -94,21 +92,12 @@ func (s *DataNodeServicesSuite) SetupTest() {
}, nil).Maybe()
s.node.allocator = alloc
meta := NewMetaFactory().GetCollectionMeta(1, "collection", schemapb.DataType_Int64)
broker := broker.NewMockBroker(s.T())
broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).
Return([]*datapb.SegmentInfo{}, nil).Maybe()
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: meta.GetSchema(),
ShardsNum: common.DefaultShardsNum,
}, nil).Maybe()
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Call.Return(tsoutil.ComposeTSByTime(time.Now(), 0),
func(_ context.Context, num uint32) uint32 { return num }, nil).Maybe()
s.broker = broker
s.node.broker = broker