diff --git a/internal/distributed/datanode/param_table_test.go b/internal/distributed/datanode/param_table_test.go index cc8486902d..11c1ea5ffc 100644 --- a/internal/distributed/datanode/param_table_test.go +++ b/internal/distributed/datanode/param_table_test.go @@ -20,16 +20,19 @@ import ( func TestParamTable(t *testing.T) { Params.Init() + Params.LoadFromEnv() + assert.NotEqual(t, Params.IP, "") + t.Logf("DataNode IP:%s", Params.IP) + assert.NotEqual(t, Params.Port, 0) t.Logf("DataNode Port:%d", Params.Port) assert.NotNil(t, Params.listener) - t.Logf("DataNode Port:%d", Params.Port) + t.Logf("DataNode listener:%d", Params.listener) assert.NotEqual(t, Params.DataServiceAddress, "") t.Logf("DataServiceAddress:%s", Params.DataServiceAddress) assert.NotEqual(t, Params.MasterAddress, "") t.Logf("MasterAddress:%s", Params.MasterAddress) - } diff --git a/internal/distributed/dataservice/dataservice_test.go b/internal/distributed/dataservice/dataservice_test.go new file mode 100644 index 0000000000..49ad701098 --- /dev/null +++ b/internal/distributed/dataservice/dataservice_test.go @@ -0,0 +1,102 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcdataserviceclient + +import ( + "context" + "fmt" + "math/rand" + "testing" + + "github.com/milvus-io/milvus/internal/msgstream" + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/types" + "github.com/stretchr/testify/assert" +) + +type mockMaster struct { + types.MasterService +} + +func (m *mockMaster) Init() error { + return nil +} + +func (m *mockMaster) Start() error { + return nil +} + +func (m *mockMaster) Stop() error { + return fmt.Errorf("stop error") +} + +func (m *mockMaster) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { + return &internalpb.ComponentStates{ + State: &internalpb.ComponentInfo{ + StateCode: internalpb.StateCode_Healthy, + }, + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + SubcomponentStates: []*internalpb.ComponentInfo{ + { + StateCode: internalpb.StateCode_Healthy, + }, + }, + }, nil +} + +func (m *mockMaster) GetDdChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + Value: "c1", + }, nil +} + +func (m *mockMaster) ShowCollections(ctx context.Context, req *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) { + return &milvuspb.ShowCollectionsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + CollectionNames: []string{}, + }, nil +} + +func TestRun(t *testing.T) { + ctx := context.Background() + msFactory := msgstream.NewPmsFactory() + svr, err := NewServer(ctx, msFactory) + assert.Nil(t, err) + + Params.Init() + Params.Port = 1000000 + err = svr.Run() + assert.NotNil(t, err) + assert.EqualError(t, err, "listen tcp: address 1000000: invalid port") + + svr.newMasterServiceClient = func(s string) (types.MasterService, error) { + return &mockMaster{}, nil + } + + Params.Port = rand.Int()%100 + 10000 + err = svr.Run() + assert.Nil(t, err) + + err = svr.Stop() + assert.Nil(t, err) +} diff --git a/internal/distributed/dataservice/paramtable_test.go b/internal/distributed/dataservice/paramtable_test.go new file mode 100644 index 0000000000..269bc70552 --- /dev/null +++ b/internal/distributed/dataservice/paramtable_test.go @@ -0,0 +1,28 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcdataserviceclient + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + assert.NotEqual(t, Params.Port, 0) + t.Logf("DataService Port:%d", Params.Port) + + assert.NotEqual(t, Params.MasterAddress, "") + t.Logf("MasterAddress:%s", Params.MasterAddress) +} diff --git a/internal/distributed/dataservice/service.go b/internal/distributed/dataservice/service.go index 8cc4bc2a0d..630f948292 100644 --- a/internal/distributed/dataservice/service.go +++ b/internal/distributed/dataservice/service.go @@ -53,6 +53,8 @@ type Server struct { grpcServer *grpc.Server masterService types.MasterService + newMasterServiceClient func(string) (types.MasterService, error) + closer io.Closer } @@ -64,8 +66,10 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) ctx: ctx1, cancel: cancel, grpcErrChan: make(chan error), + newMasterServiceClient: func(s string) (types.MasterService, error) { + return msc.NewClient(s, 10*time.Second) + }, } - s.dataService, err = dataservice.CreateServer(s.ctx, factory) if err != nil { return nil, err @@ -77,37 +81,37 @@ func (s *Server) init() error { Params.Init() Params.LoadFromEnv() + ctx := context.Background() + closer := trace.InitTracing("data_service") s.closer = closer - s.wg.Add(1) - go s.startGrpcLoop(Params.Port) - // wait for grpc server loop start - if err := <-s.grpcErrChan; err != nil { + err := s.startGrpc() + if err != nil { return err } - log.Debug("master address", zap.String("address", Params.MasterAddress)) - client, err := msc.NewClient(Params.MasterAddress, 10*time.Second) - if err != nil { - panic(err) - } - log.Debug("master client create complete") - if err = client.Init(); err != nil { - panic(err) - } - if err = client.Start(); err != nil { - panic(err) - } s.dataService.UpdateStateCode(internalpb.StateCode_Initializing) - ctx := context.Background() - err = funcutil.WaitForComponentInitOrHealthy(ctx, client, "MasterService", 1000000, time.Millisecond*200) + if s.newMasterServiceClient != nil { + log.Debug("master service", zap.String("address", Params.MasterAddress)) + masterServiceClient, err := s.newMasterServiceClient(Params.MasterAddress) + if err != nil { + panic(err) + } + log.Debug("master service client created") - if err != nil { - panic(err) + if err = masterServiceClient.Init(); err != nil { + panic(err) + } + if err = masterServiceClient.Start(); err != nil { + panic(err) + } + if err = funcutil.WaitForComponentInitOrHealthy(ctx, masterServiceClient, "MasterService", 1000000, 200*time.Millisecond); err != nil { + panic(err) + } + s.dataService.SetMasterClient(masterServiceClient) } - s.dataService.SetMasterClient(client) dataservice.Params.Init() if err := s.dataService.Init(); err != nil { @@ -117,6 +121,14 @@ func (s *Server) init() error { return nil } +func (s *Server) startGrpc() error { + s.wg.Add(1) + go s.startGrpcLoop(Params.Port) + // wait for grpc server loop start + err := <-s.grpcErrChan + return err +} + func (s *Server) startGrpcLoop(grpcPort int) { defer logutil.LogPanic() defer s.wg.Done() diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index bda0666670..5fecf61b4b 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -835,6 +835,7 @@ func TestRun(t *testing.T) { connectIndexService: true, connectQueryService: true, } + Params.Init() Params.Port = 1000000 err := svr.Run() assert.NotNil(t, err)