diff --git a/Makefile b/Makefile index 26fc8de496..cc1df294cb 100644 --- a/Makefile +++ b/Makefile @@ -139,6 +139,8 @@ build-go: build-cpp @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/indexservice $(PWD)/cmd/distributed/indexservice/main.go 1>/dev/null @echo "Building distributed indexnode ..." @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/indexnode $(PWD)/cmd/distributed/indexnode/main.go 1>/dev/null + @echo "Building dataservice ..." + @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/dataservice $(PWD)/cmd/dataservice/main.go 1>/dev/null build-cpp: diff --git a/cmd/dataservice/main.go b/cmd/dataservice/main.go index ac07eb9ecf..a555504882 100644 --- a/cmd/dataservice/main.go +++ b/cmd/dataservice/main.go @@ -1,4 +1,4 @@ -package dataservice +package main import ( "context" diff --git a/cmd/masterservice/main.go b/cmd/masterservice/main.go index 70b12f7e6d..00dc38aac1 100644 --- a/cmd/masterservice/main.go +++ b/cmd/masterservice/main.go @@ -31,31 +31,15 @@ func main() { } log.Printf("master service address : %s:%d", ms.Params.Address, ms.Params.Port) - cnt := 0 - psc.Params.Init() log.Printf("proxy service address : %s", psc.Params.NetworkAddress()) - proxyService := psc.NewClient(psc.Params.NetworkAddress()) + //proxyService := psc.NewClient(ctx, psc.Params.NetworkAddress()) - for cnt = 0; cnt < reTryCnt; cnt++ { - pxStates, err := proxyService.GetComponentStates() - if err != nil { - log.Printf("get state from proxy service, retry count = %d, error = %s", cnt, err.Error()) - continue - } - if pxStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - log.Printf("get state from proxy service, retry count = %d, error = %s", cnt, pxStates.Status.Reason) - continue - } - if pxStates.State.StateCode != internalpb2.StateCode_INITIALIZING && pxStates.State.StateCode != internalpb2.StateCode_HEALTHY { - continue - } - break - } + //TODO, test proxy service GetComponentStates, before set - if err = svr.SetProxyService(proxyService); err != nil { - panic(err) - } + //if err = svr.SetProxyService(proxyService); err != nil { + // panic(err) + //} ds.Params.Init() log.Printf("data service address : %s:%d", ds.Params.Address, ds.Params.Port) @@ -66,6 +50,7 @@ func main() { if err = dataService.Start(); err != nil { panic(err) } + cnt := 0 for cnt = 0; cnt < reTryCnt; cnt++ { dsStates, err := dataService.GetComponentStates() if err != nil { @@ -108,7 +93,5 @@ func main() { syscall.SIGQUIT) sig := <-sc log.Printf("Got %s signal to exit", sig.String()) - _ = indexService.Stop() - _ = dataService.Stop() _ = svr.Stop() } diff --git a/internal/distributed/proxyservice/client.go b/internal/distributed/proxyservice/client.go index cd99aaac6f..d53b9c83f2 100644 --- a/internal/distributed/proxyservice/client.go +++ b/internal/distributed/proxyservice/client.go @@ -49,10 +49,6 @@ func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMe } func (c *Client) GetTimeTickChannel() (string, error) { - err := c.tryConnect() - if err != nil { - return "", err - } response, err := c.proxyServiceClient.GetTimeTickChannel(c.ctx, &commonpb.Empty{}) if err != nil { return "", err @@ -61,10 +57,6 @@ func (c *Client) GetTimeTickChannel() (string, error) { } func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) { - err := c.tryConnect() - if err != nil { - return nil, err - } return c.proxyServiceClient.GetComponentStates(c.ctx, &commonpb.Empty{}) } diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 11841cbb8b..cc4338d4ab 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -37,7 +37,7 @@ import ( type ProxyServiceInterface interface { GetTimeTickChannel() (string, error) - InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error + InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) } type DataServiceInterface interface { @@ -582,7 +582,7 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error { Params.ProxyTimeTickChannel = rsp c.InvalidateCollectionMetaCache = func(ts typeutil.Timestamp, dbName string, collectionName string) error { - err := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{ + status, err := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{ Base: &commonpb.MsgBase{ MsgType: 0, //TODO,MsgType MsgID: 0, @@ -595,6 +595,9 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error { if err != nil { return err } + if status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.Errorf("InvalidateCollectionMetaCache failed, error = %s", status.Reason) + } return nil } return nil diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index f83091c68c..d366b4d424 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -32,11 +32,14 @@ type proxyMock struct { func (p *proxyMock) GetTimeTickChannel() (string, error) { return fmt.Sprintf("proxy-time-tick-%d", p.randVal), nil } -func (p *proxyMock) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error { +func (p *proxyMock) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { p.mutex.Lock() defer p.mutex.Unlock() p.collArray = append(p.collArray, request.CollectionName) - return nil + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + Reason: "", + }, nil } func (p *proxyMock) GetCollArray() []string { p.mutex.Lock() diff --git a/internal/proxyservice/timetick.go b/internal/proxyservice/timetick.go index e150e9cf54..e9181a682c 100644 --- a/internal/proxyservice/timetick.go +++ b/internal/proxyservice/timetick.go @@ -35,7 +35,7 @@ func (tt *TimeTickImpl) Start() error { select { case <-tt.ctx.Done(): log.Println("time tick loop was canceled by context!") - return + break default: current, err := tt.ttBarrier.GetTimeTick() if err != nil {