diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 950f5377b2..4bda7b7e92 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -64,9 +64,9 @@ func TestGrpcService(t *testing.T) { err = svr.startGrpc() assert.Nil(t, err) - svr.core.UpdateStateCode(internalpb2.StateCode_INITIALIZING) + svr.masterService.UpdateStateCode(internalpb2.StateCode_INITIALIZING) - core := svr.core + core := svr.masterService err = core.Init() assert.Nil(t, err) @@ -144,7 +144,7 @@ func TestGrpcService(t *testing.T) { err = svr.start() assert.Nil(t, err) - svr.core.UpdateStateCode(internalpb2.StateCode_HEALTHY) + svr.masterService.UpdateStateCode(internalpb2.StateCode_HEALTHY) cli, err := grpcmasterserviceclient.NewClient(Params.Address, 3*time.Second) assert.Nil(t, err) diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index 2eba5e33db..6fc3af69ef 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -22,6 +22,8 @@ import ( "github.com/zilliztech/milvus-distributed/internal/log" cms "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/types" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" @@ -31,19 +33,19 @@ import ( // grpc wrapper type Server struct { - core *cms.Core - grpcServer *grpc.Server - grpcErrChan chan error + masterService *cms.Core + grpcServer *grpc.Server + grpcErrChan chan error wg sync.WaitGroup ctx context.Context cancel context.CancelFunc - proxyService *psc.Client - dataService *dsc.Client - indexService *isc.Client - queryService *qsc.Client + proxyService types.ProxyService + dataService types.DataService + indexService types.IndexService + queryService types.QueryService connectProxyService bool connectDataService bool @@ -82,7 +84,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) opentracing.SetGlobalTracer(tracer) s.closer = closer - s.core, err = cms.NewCore(s.ctx, factory) + s.masterService, err = cms.NewCore(s.ctx, factory) if err != nil { return nil, err } @@ -111,7 +113,7 @@ func (s *Server) init() error { return err } - s.core.UpdateStateCode(internalpb2.StateCode_INITIALIZING) + s.masterService.UpdateStateCode(internalpb2.StateCode_INITIALIZING) if s.connectProxyService { log.Debug("proxy service", zap.String("address", Params.ProxyServiceAddress)) @@ -125,7 +127,7 @@ func (s *Server) init() error { panic(err) } - if err = s.core.SetProxyService(ctx, proxyService); err != nil { + if err = s.masterService.SetProxyService(ctx, proxyService); err != nil { panic(err) } } @@ -143,7 +145,7 @@ func (s *Server) init() error { panic(err) } - if err = s.core.SetDataService(ctx, dataService); err != nil { + if err = s.masterService.SetDataService(ctx, dataService); err != nil { panic(err) } } @@ -154,7 +156,7 @@ func (s *Server) init() error { panic(err) } - if err := s.core.SetIndexService(ctx, indexService); err != nil { + if err := s.masterService.SetIndexService(ctx, indexService); err != nil { panic(err) } @@ -170,14 +172,14 @@ func (s *Server) init() error { if err = queryService.Start(); err != nil { panic(err) } - if err = s.core.SetQueryService(ctx, queryService); err != nil { + if err = s.masterService.SetQueryService(ctx, queryService); err != nil { panic(err) } } cms.Params.Init() log.Debug("grpc init done ...") - if err := s.core.Init(); err != nil { + if err := s.masterService.Init(); err != nil { return err } return nil @@ -222,7 +224,7 @@ func (s *Server) startGrpcLoop(grpcPort int) { func (s *Server) start() error { log.Debug("Master Core start ...") - if err := s.core.Start(); err != nil { + if err := s.masterService.Start(); err != nil { return err } return nil @@ -244,8 +246,8 @@ func (s *Server) Stop() error { if s.queryService != nil { _ = s.queryService.Stop() } - if s.core != nil { - return s.core.Stop() + if s.masterService != nil { + return s.masterService.Stop() } s.cancel() if s.grpcServer != nil { @@ -256,87 +258,87 @@ func (s *Server) Stop() error { } func (s *Server) GetComponentStatesRPC(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { - return s.core.GetComponentStates(ctx) + return s.masterService.GetComponentStates(ctx) } //DDL request func (s *Server) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { - return s.core.CreateCollection(ctx, in) + return s.masterService.CreateCollection(ctx, in) } func (s *Server) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { - return s.core.DropCollection(ctx, in) + return s.masterService.DropCollection(ctx, in) } func (s *Server) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { - return s.core.HasCollection(ctx, in) + return s.masterService.HasCollection(ctx, in) } func (s *Server) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { - return s.core.DescribeCollection(ctx, in) + return s.masterService.DescribeCollection(ctx, in) } func (s *Server) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { - return s.core.ShowCollections(ctx, in) + return s.masterService.ShowCollections(ctx, in) } func (s *Server) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { - return s.core.CreatePartition(ctx, in) + return s.masterService.CreatePartition(ctx, in) } func (s *Server) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { - return s.core.DropPartition(ctx, in) + return s.masterService.DropPartition(ctx, in) } func (s *Server) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { - return s.core.HasPartition(ctx, in) + return s.masterService.HasPartition(ctx, in) } func (s *Server) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { - return s.core.ShowPartitions(ctx, in) + return s.masterService.ShowPartitions(ctx, in) } //index builder service func (s *Server) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { - return s.core.CreateIndex(ctx, in) + return s.masterService.CreateIndex(ctx, in) } func (s *Server) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*commonpb.Status, error) { - return s.core.DropIndex(ctx, in) + return s.masterService.DropIndex(ctx, in) } func (s *Server) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { - return s.core.DescribeIndex(ctx, in) + return s.masterService.DescribeIndex(ctx, in) } //global timestamp allocator func (s *Server) AllocTimestamp(ctx context.Context, in *masterpb.TsoRequest) (*masterpb.TsoResponse, error) { - return s.core.AllocTimestamp(ctx, in) + return s.masterService.AllocTimestamp(ctx, in) } func (s *Server) AllocID(ctx context.Context, in *masterpb.IDRequest) (*masterpb.IDResponse, error) { - return s.core.AllocID(ctx, in) + return s.masterService.AllocID(ctx, in) } //receiver time tick from proxy service, and put it into this channel func (s *Server) GetTimeTickChannelRPC(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { - return s.core.GetTimeTickChannel(ctx) + return s.masterService.GetTimeTickChannel(ctx) } //receive ddl from rpc and time tick from proxy service, and put them into this channel func (s *Server) GetDdChannelRPC(ctx context.Context, in *commonpb.Empty) (*milvuspb.StringResponse, error) { - return s.core.GetDdChannel(ctx) + return s.masterService.GetDdChannel(ctx) } //just define a channel, not used currently func (s *Server) GetStatisticsChannelRPC(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { - return s.core.GetStatisticsChannel(ctx) + return s.masterService.GetStatisticsChannel(ctx) } func (s *Server) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { - return s.core.DescribeSegment(ctx, in) + return s.masterService.DescribeSegment(ctx, in) } func (s *Server) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) { - return s.core.ShowSegments(ctx, in) + return s.masterService.ShowSegments(ctx, in) } diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 909ec854a3..921c49e78a 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -2,6 +2,7 @@ package masterservice import ( "context" + "errors" "fmt" "math/rand" "sync" @@ -11,12 +12,16 @@ import ( "go.etcd.io/etcd/clientv3" "go.uber.org/zap" - "errors" - "github.com/zilliztech/milvus-distributed/internal/allocator" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" "github.com/zilliztech/milvus-distributed/internal/log" ms "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/tso" + "github.com/zilliztech/milvus-distributed/internal/types" + "github.com/zilliztech/milvus-distributed/internal/util/retry" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" @@ -25,10 +30,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" - "github.com/zilliztech/milvus-distributed/internal/tso" - "github.com/zilliztech/milvus-distributed/internal/util/retry" - "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) // internalpb2 -> internalpb @@ -39,65 +40,6 @@ import ( // milvuspb -> milvuspb // masterpb2 -> masterpb (master_service) -type ProxyServiceInterface interface { - GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) - InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) -} - -type DataServiceInterface interface { - GetInsertBinlogPaths(ctx context.Context, req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) - GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error) -} - -type IndexServiceInterface interface { - BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) - DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) -} - -type QueryServiceInterface interface { - ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) -} - -type Interface interface { - //service - typeutil.Component - - //DDL request - CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) - DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) - HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) - DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) - ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) - CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) - DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) - HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) - ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) - - //index builder service - CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) - DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) - DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*commonpb.Status, error) - - //global timestamp allocator - AllocTimestamp(ctx context.Context, in *masterpb.TsoRequest) (*masterpb.TsoResponse, error) - AllocID(ctx context.Context, in *masterpb.IDRequest) (*masterpb.IDResponse, error) - - //TODO, master load these channel form config file ? - - //receiver time tick from proxy service, and put it into this channel - GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) - - //receive ddl from rpc and time tick from proxy service, and put them into this channel - GetDdChannel(ctx context.Context) (*milvuspb.StringResponse, error) - - //just define a channel, not used currently - GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) - - //segment - DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) - ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) -} - // ------------------ struct ----------------------- // master core @@ -642,7 +584,7 @@ func (c *Core) setMsgStreams() error { return nil } -func (c *Core) SetProxyService(ctx context.Context, s ProxyServiceInterface) error { +func (c *Core) SetProxyService(ctx context.Context, s types.ProxyService) error { rsp, err := s.GetTimeTickChannel(ctx) if err != nil { return err @@ -672,7 +614,7 @@ func (c *Core) SetProxyService(ctx context.Context, s ProxyServiceInterface) err return nil } -func (c *Core) SetDataService(ctx context.Context, s DataServiceInterface) error { +func (c *Core) SetDataService(ctx context.Context, s types.DataService) error { rsp, err := s.GetSegmentInfoChannel(ctx) if err != nil { return err @@ -710,7 +652,7 @@ func (c *Core) SetDataService(ctx context.Context, s DataServiceInterface) error return nil } -func (c *Core) SetIndexService(ctx context.Context, s IndexServiceInterface) error { +func (c *Core) SetIndexService(ctx context.Context, s types.IndexService) error { c.BuildIndexReq = func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) { rsp, err := s.BuildIndex(ctx, &indexpb.BuildIndexRequest{ DataPaths: binlog, @@ -744,7 +686,7 @@ func (c *Core) SetIndexService(ctx context.Context, s IndexServiceInterface) err return nil } -func (c *Core) SetQueryService(ctx context.Context, s QueryServiceInterface) error { +func (c *Core) SetQueryService(ctx context.Context, s types.QueryService) error { c.ReleaseCollection = func(ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error { req := &querypb.ReleaseCollectionRequest{ Base: &commonpb.MsgBase{ diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index 14af5d4dcb..382ba29d51 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -20,10 +20,12 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "github.com/zilliztech/milvus-distributed/internal/types" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) type proxyMock struct { + types.ProxyService randVal int collArray []string mutex sync.Mutex @@ -54,6 +56,7 @@ func (p *proxyMock) GetCollArray() []string { } type dataMock struct { + types.DataService randVal int } @@ -81,6 +84,7 @@ func (d *dataMock) GetInsertBinlogPaths(ctx context.Context, req *datapb.InsertB } type queryMock struct { + types.QueryService collID []typeutil.UniqueID mutex sync.Mutex } @@ -105,6 +109,7 @@ func (d *dataMock) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringR } type indexMock struct { + types.IndexService fileArray []string idxBuildID []int64 idxID []int64 diff --git a/internal/masterservice/meta_table.go b/internal/masterservice/meta_table.go index d14b0b056a..5023a80e48 100644 --- a/internal/masterservice/meta_table.go +++ b/internal/masterservice/meta_table.go @@ -13,11 +13,12 @@ import ( "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/log" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) const ( diff --git a/internal/masterservice/meta_table_test.go b/internal/masterservice/meta_table_test.go index bcd59c9a52..271b02f3cf 100644 --- a/internal/masterservice/meta_table_test.go +++ b/internal/masterservice/meta_table_test.go @@ -164,8 +164,7 @@ func TestMetaTable(t *testing.T) { IndexParams: params, } - ids, _, err := mt.GetNotIndexedSegments("collTest", "field110", idxInfo) - assert.Nil(t, ids) + _, _, err := mt.GetNotIndexedSegments("collTest", "field110", idxInfo) assert.NotNil(t, err) seg, field, err := mt.GetNotIndexedSegments("testColl", "field110", idxInfo) assert.Nil(t, err) diff --git a/tests/python_test/test_partition.py b/tests/python_test/test_partition.py index d9cd34eeea..ba7bb4f3c4 100644 --- a/tests/python_test/test_partition.py +++ b/tests/python_test/test_partition.py @@ -28,6 +28,7 @@ class TestCreateBase: connect.create_partition(collection, default_tag) # TODO: enable + @pytest.mark.tags("0331") @pytest.mark.level(2) @pytest.mark.timeout(600) def test_create_partition_limit(self, connect, collection, args):