diff --git a/internal/distributed/indexcoord/client/client_test.go b/internal/distributed/indexcoord/client/client_test.go new file mode 100644 index 0000000000..80e1137f98 --- /dev/null +++ b/internal/distributed/indexcoord/client/client_test.go @@ -0,0 +1,122 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcindexcoordclient + +import ( + "context" + "testing" + + grpcindexcoord "github.com/milvus-io/milvus/internal/distributed/indexcoord" + "github.com/milvus-io/milvus/internal/indexcoord" + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/stretchr/testify/assert" +) + +func TestIndexCoordClient(t *testing.T) { + ctx := context.Background() + indexCoordServer, err := grpcindexcoord.NewServer(ctx) + assert.Nil(t, err) + icm := &indexcoord.Mock{} + err = indexCoordServer.SetClient(icm) + assert.Nil(t, err) + + err = indexCoordServer.Run() + assert.Nil(t, err) + + icc, err := NewClient(ctx, indexcoord.Params.MetaRootPath, indexcoord.Params.EtcdEndpoints) + assert.Nil(t, err) + assert.NotNil(t, icc) + + err = icc.Init() + assert.Nil(t, err) + + err = icc.Register() + assert.Nil(t, err) + + err = icc.Start() + assert.Nil(t, err) + + t.Run("GetComponentStates", func(t *testing.T) { + states, err := icc.GetComponentStates(ctx) + assert.Nil(t, err) + assert.Equal(t, internalpb.StateCode_Healthy, states.State.StateCode) + assert.Equal(t, commonpb.ErrorCode_Success, states.Status.ErrorCode) + }) + + t.Run("GetTimeTickChannel", func(t *testing.T) { + resp, err := icc.GetTimeTickChannel(ctx) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("GetStatisticsChannel", func(t *testing.T) { + resp, err := icc.GetStatisticsChannel(ctx) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("BuildIndex", func(t *testing.T) { + req := &indexpb.BuildIndexRequest{ + IndexBuildID: 0, + IndexID: 0, + } + resp, err := icc.BuildIndex(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("DropIndex", func(t *testing.T) { + req := &indexpb.DropIndexRequest{ + IndexID: 0, + } + resp, err := icc.DropIndex(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("GetIndexStates", func(t *testing.T) { + req := &indexpb.GetIndexStatesRequest{ + IndexBuildIDs: []int64{0}, + } + resp, err := icc.GetIndexStates(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.Equal(t, len(req.IndexBuildIDs), len(resp.States)) + assert.Equal(t, commonpb.IndexState_Finished, resp.States[0].State) + }) + + t.Run("GetIndexFilePaths", func(t *testing.T) { + req := &indexpb.GetIndexFilePathsRequest{ + IndexBuildIDs: []int64{0}, + } + resp, err := icc.GetIndexFilePaths(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.Equal(t, len(req.IndexBuildIDs), len(resp.FilePaths)) + }) + + t.Run("GetMetrics", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + resp, err := icc.GetMetrics(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + err = indexCoordServer.Stop() + assert.Nil(t, err) + + err = icc.Stop() + assert.Nil(t, err) +} diff --git a/internal/distributed/indexcoord/service.go b/internal/distributed/indexcoord/service.go index 7fe7a5b1ca..73dc8f44c2 100644 --- a/internal/distributed/indexcoord/service.go +++ b/internal/distributed/indexcoord/service.go @@ -18,6 +18,8 @@ import ( "strconv" "sync" + "github.com/milvus-io/milvus/internal/types" + "go.uber.org/zap" "google.golang.org/grpc" @@ -37,7 +39,7 @@ type UniqueID = typeutil.UniqueID type Timestamp = typeutil.Timestamp type Server struct { - indexcoord *indexcoord.IndexCoord + indexcoord types.IndexCoord grpcServer *grpc.Server grpcErrChan chan error @@ -82,8 +84,6 @@ func (s *Server) init() error { log.Error("IndexCoord", zap.Any("init error", err)) return err } - s.indexcoord.UpdateStateCode(internalpb.StateCode_Initializing) - if err := s.indexcoord.Init(); err != nil { log.Error("IndexCoord", zap.Any("init error", err)) return err @@ -118,6 +118,11 @@ func (s *Server) Stop() error { return nil } +func (s *Server) SetClient(indexCoordClient types.IndexCoord) error { + s.indexcoord = indexCoordClient + return nil +} + func (s *Server) GetComponentStates(ctx context.Context, req *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) { return s.indexcoord.GetComponentStates(ctx) } diff --git a/internal/distributed/indexcoord/service_test.go b/internal/distributed/indexcoord/service_test.go new file mode 100644 index 0000000000..4a4f868279 --- /dev/null +++ b/internal/distributed/indexcoord/service_test.go @@ -0,0 +1,110 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcindexcoord + +import ( + "context" + "testing" + + "github.com/milvus-io/milvus/internal/indexcoord" + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/stretchr/testify/assert" +) + +func TestIndexCoordinateServer(t *testing.T) { + ctx := context.Background() + indexCoord, err := NewServer(ctx) + assert.Nil(t, err) + assert.NotNil(t, indexCoord) + indexCoordClient := &indexcoord.Mock{} + err = indexCoord.SetClient(indexCoordClient) + assert.Nil(t, err) + err = indexCoord.Run() + assert.Nil(t, err) + + t.Run("GetComponentStates", func(t *testing.T) { + req := &internalpb.GetComponentStatesRequest{} + states, err := indexCoord.GetComponentStates(ctx, req) + assert.Nil(t, err) + assert.Equal(t, internalpb.StateCode_Healthy, states.State.StateCode) + }) + + t.Run("GetTimeTickChannel", func(t *testing.T) { + req := &internalpb.GetTimeTickChannelRequest{} + resp, err := indexCoord.GetTimeTickChannel(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("GetStatisticsChannel", func(t *testing.T) { + req := &internalpb.GetStatisticsChannelRequest{} + resp, err := indexCoord.GetStatisticsChannel(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("BuildIndex", func(t *testing.T) { + req := &indexpb.BuildIndexRequest{ + IndexBuildID: 0, + IndexID: 0, + DataPaths: []string{}, + } + resp, err := indexCoord.BuildIndex(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("GetIndexStates", func(t *testing.T) { + req := &indexpb.GetIndexStatesRequest{ + IndexBuildIDs: []UniqueID{0}, + } + resp, err := indexCoord.GetIndexStates(ctx, req) + assert.Nil(t, err) + assert.Equal(t, len(req.IndexBuildIDs), len(resp.States)) + assert.Equal(t, commonpb.IndexState_Finished, resp.States[0].State) + }) + + t.Run("DropIndex", func(t *testing.T) { + req := &indexpb.DropIndexRequest{ + IndexID: 0, + } + resp, err := indexCoord.DropIndex(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("GetIndexFilePaths", func(t *testing.T) { + req := &indexpb.GetIndexFilePathsRequest{ + IndexBuildIDs: []UniqueID{0, 1}, + } + resp, err := indexCoord.GetIndexFilePaths(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.Equal(t, len(req.IndexBuildIDs), len(resp.FilePaths)) + }) + + t.Run("GetMetrics", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{ + Request: "", + } + resp, err := indexCoord.GetMetrics(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.Equal(t, "IndexCoord", resp.ComponentName) + }) + + err = indexCoord.Stop() + assert.Nil(t, err) +} diff --git a/internal/distributed/indexnode/client/client_test.go b/internal/distributed/indexnode/client/client_test.go new file mode 100644 index 0000000000..28a1b088fd --- /dev/null +++ b/internal/distributed/indexnode/client/client_test.go @@ -0,0 +1,92 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcindexnodeclient + +import ( + "context" + "testing" + + grpcindexnode "github.com/milvus-io/milvus/internal/distributed/indexnode" + "github.com/milvus-io/milvus/internal/indexnode" + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/stretchr/testify/assert" +) + +func TestIndexNodeClient(t *testing.T) { + ctx := context.Background() + + ins, err := grpcindexnode.NewServer(ctx) + assert.Nil(t, err) + assert.NotNil(t, ins) + + inm := &indexnode.Mock{} + err = ins.SetClient(inm) + assert.Nil(t, err) + + err = ins.Run() + assert.Nil(t, err) + + inc, err := NewClient(ctx, "localhost:21121") + assert.Nil(t, err) + assert.NotNil(t, inc) + + err = inc.Init() + assert.Nil(t, err) + + err = inc.Start() + assert.Nil(t, err) + + t.Run("GetComponentStates", func(t *testing.T) { + states, err := inc.GetComponentStates(ctx) + assert.Nil(t, err) + assert.Equal(t, internalpb.StateCode_Healthy, states.State.StateCode) + assert.Equal(t, commonpb.ErrorCode_Success, states.Status.ErrorCode) + }) + + t.Run("GetTimeTickChannel", func(t *testing.T) { + resp, err := inc.GetTimeTickChannel(ctx) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("GetStatisticsChannel", func(t *testing.T) { + resp, err := inc.GetStatisticsChannel(ctx) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("CreateIndex", func(t *testing.T) { + req := &indexpb.CreateIndexRequest{ + IndexBuildID: 0, + IndexID: 0, + } + resp, err := inc.CreateIndex(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("GetMetrics", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + resp, err := inc.GetMetrics(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + err = ins.Stop() + assert.Nil(t, err) + + err = inc.Stop() + assert.Nil(t, err) +} diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index dd0a4c5c4e..60fa71b167 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -19,6 +19,8 @@ import ( "strconv" "sync" + "github.com/milvus-io/milvus/internal/types" + "go.uber.org/zap" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" @@ -34,7 +36,7 @@ import ( ) type Server struct { - indexnode *indexnode.IndexNode + indexnode types.IndexNode grpcServer *grpc.Server grpcErrChan chan error @@ -132,8 +134,6 @@ func (s *Server) init() error { return err } - s.indexnode.UpdateStateCode(internalpb.StateCode_Initializing) - log.Debug("IndexNode", zap.Any("State", internalpb.StateCode_Initializing)) err = s.indexnode.Init() if err != nil { log.Error("IndexNode Init failed", zap.Error(err)) @@ -168,6 +168,11 @@ func (s *Server) Stop() error { return nil } +func (s *Server) SetClient(indexNodeClient types.IndexNode) error { + s.indexnode = indexNodeClient + return nil +} + func (s *Server) GetComponentStates(ctx context.Context, req *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) { return s.indexnode.GetComponentStates(ctx) } diff --git a/internal/distributed/indexnode/service_test.go b/internal/distributed/indexnode/service_test.go new file mode 100644 index 0000000000..4dc07a3c1b --- /dev/null +++ b/internal/distributed/indexnode/service_test.go @@ -0,0 +1,83 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcindexnode + +import ( + "context" + "testing" + + "github.com/milvus-io/milvus/internal/indexnode" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/stretchr/testify/assert" +) + +func TestIndexNodeServer(t *testing.T) { + ctx := context.Background() + ins, err := NewServer(ctx) + assert.Nil(t, err) + assert.NotNil(t, ins) + + inm := &indexnode.Mock{} + err = ins.SetClient(inm) + assert.Nil(t, err) + + err = ins.Run() + assert.Nil(t, err) + + t.Run("GetComponentStates", func(t *testing.T) { + req := &internalpb.GetComponentStatesRequest{} + states, err := ins.GetComponentStates(ctx, req) + assert.Nil(t, err) + assert.Equal(t, internalpb.StateCode_Healthy, states.State.StateCode) + }) + + t.Run("GetTimeTickChannel", func(t *testing.T) { + req := &internalpb.GetTimeTickChannelRequest{} + resp, err := ins.GetTimeTickChannel(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("GetStatisticsChannel", func(t *testing.T) { + req := &internalpb.GetStatisticsChannelRequest{} + resp, err := ins.GetStatisticsChannel(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("CreateIndex", func(t *testing.T) { + req := &indexpb.CreateIndexRequest{ + IndexBuildID: 0, + IndexID: 0, + DataPaths: []string{}, + } + resp, err := ins.CreateIndex(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("GetMetrics", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{ + Request: "", + } + resp, err := ins.GetMetrics(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + err = ins.Stop() + assert.Nil(t, err) +} diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index c0868958ec..54e797690e 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -104,6 +104,7 @@ func (i *IndexCoord) Register() error { func (i *IndexCoord) Init() error { log.Debug("IndexCoord", zap.Any("etcd endpoints", Params.EtcdEndpoints)) + i.UpdateStateCode(internalpb.StateCode_Initializing) connectEtcdFn := func() error { etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath) @@ -191,9 +192,6 @@ func (i *IndexCoord) Init() error { i.metricsCacheManager = metricsinfo.NewMetricsCacheManager() - i.UpdateStateCode(internalpb.StateCode_Healthy) - log.Debug("IndexCoord", zap.Any("State", i.stateCode.Load())) - log.Debug("IndexCoord assign tasks server success", zap.Error(err)) return nil } @@ -219,7 +217,9 @@ func (i *IndexCoord) Start() error { for _, cb := range i.startCallbacks { cb() } - log.Debug("IndexCoord start") + i.UpdateStateCode(internalpb.StateCode_Healthy) + log.Debug("IndexCoord", zap.Any("State", i.stateCode.Load())) + log.Debug("IndexCoord start successfully") return nil } @@ -227,6 +227,7 @@ func (i *IndexCoord) Start() error { func (i *IndexCoord) Stop() error { i.loopCancel() i.sched.Close() + i.loopWg.Wait() for _, cb := range i.closeCallbacks { cb() } @@ -699,7 +700,7 @@ func (i *IndexCoord) assignTaskLoop() { if err != nil { log.Debug("IndexCoord assignTaskLoop", zap.Any("GetSessions error", err)) } - if len(i.nodeManager.nodeClients) <= 0 { + if len(sessions) <= 0 { log.Debug("There is no IndexNode available as this time.") break } diff --git a/internal/indexcoord/index_coord_mock.go b/internal/indexcoord/index_coord_mock.go new file mode 100644 index 0000000000..0992fc9d7c --- /dev/null +++ b/internal/indexcoord/index_coord_mock.go @@ -0,0 +1,213 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package indexcoord + +import ( + "context" + "errors" + "strconv" + + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +type Mock struct { + etcdKV *etcdkv.EtcdKV + + Failure bool +} + +func (icm *Mock) Init() error { + if icm.Failure { + return errors.New("IndexCoordinate init failed") + } + return nil +} + +func (icm *Mock) Start() error { + if icm.Failure { + return errors.New("IndexCoordinate start failed") + } + return nil +} + +func (icm *Mock) Stop() error { + if icm.Failure { + return errors.New("IndexCoordinate stop failed") + } + err := icm.etcdKV.RemoveWithPrefix("session/" + typeutil.IndexCoordRole) + return err +} + +func (icm *Mock) Register() error { + if icm.Failure { + return errors.New("IndexCoordinate register failed") + } + icm.etcdKV, _ = etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath) + err := icm.etcdKV.RemoveWithPrefix("session/" + typeutil.IndexCoordRole) + session := sessionutil.NewSession(context.Background(), Params.MetaRootPath, Params.EtcdEndpoints) + session.Init(typeutil.IndexCoordRole, Params.Address, true) + return err +} + +func (icm *Mock) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { + if icm.Failure { + return &internalpb.ComponentStates{ + State: &internalpb.ComponentInfo{ + StateCode: internalpb.StateCode_Abnormal, + }, + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + }, errors.New("IndexCoordinate GetComponentStates failed") + } + return &internalpb.ComponentStates{ + State: &internalpb.ComponentInfo{ + StateCode: internalpb.StateCode_Healthy, + }, + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil +} + +func (icm *Mock) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + if icm.Failure { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + }, errors.New("IndexCoordinate GetStatisticsChannel failed") + } + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + Value: "", + }, nil +} + +func (icm *Mock) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + if icm.Failure { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + }, errors.New("IndexCoordinate GetTimeTickChannel failed") + } + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + Value: "", + }, nil +} + +func (icm *Mock) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) { + if icm.Failure { + return &indexpb.BuildIndexResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + IndexBuildID: 0, + }, errors.New("IndexCoordinate BuildIndex error") + } + return &indexpb.BuildIndexResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + IndexBuildID: 0, + }, nil +} + +func (icm *Mock) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) { + if icm.Failure { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, errors.New("IndexCoordinate DropIndex failed") + } + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, nil +} + +func (icm *Mock) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) { + if icm.Failure { + return &indexpb.GetIndexStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + }, errors.New("IndexCoordinate GetIndexStates failed") + } + states := make([]*indexpb.IndexInfo, len(req.IndexBuildIDs)) + for i := range states { + states[i] = &indexpb.IndexInfo{ + IndexBuildID: req.IndexBuildIDs[i], + State: commonpb.IndexState_Finished, + IndexID: 0, + } + } + return &indexpb.GetIndexStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + States: states, + }, nil +} + +func (icm *Mock) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) { + if icm.Failure { + return &indexpb.GetIndexFilePathsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + }, errors.New("IndexCoordinate GetIndexFilePaths failed") + } + filePaths := make([]*indexpb.IndexFilePathInfo, len(req.IndexBuildIDs)) + for i := range filePaths { + filePaths[i] = &indexpb.IndexFilePathInfo{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + IndexBuildID: req.IndexBuildIDs[i], + IndexFilePaths: []string{strconv.FormatInt(req.IndexBuildIDs[i], 10)}, + } + } + return &indexpb.GetIndexFilePathsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + FilePaths: filePaths, + }, nil +} + +func (icm *Mock) GetMetrics(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + if icm.Failure { + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + }, errors.New("IndexCoordinate GetMetrics failed") + } + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + Response: "", + ComponentName: "IndexCoord", + }, nil +} diff --git a/internal/indexcoord/index_coord_mock_test.go b/internal/indexcoord/index_coord_mock_test.go new file mode 100644 index 0000000000..3facce1ae6 --- /dev/null +++ b/internal/indexcoord/index_coord_mock_test.go @@ -0,0 +1,193 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package indexcoord + +import ( + "context" + "testing" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + + "github.com/stretchr/testify/assert" +) + +func TestIndexCoordMock(t *testing.T) { + Params.Init() + icm := Mock{} + err := icm.Register() + assert.Nil(t, err) + err = icm.Init() + assert.Nil(t, err) + err = icm.Start() + assert.Nil(t, err) + ctx := context.Background() + + t.Run("Register", func(t *testing.T) { + + }) + t.Run("GetComponentStates", func(t *testing.T) { + states, err := icm.GetComponentStates(ctx) + assert.Nil(t, err) + assert.Equal(t, internalpb.StateCode_Healthy, states.State.StateCode) + }) + + t.Run("GetTimeTickChannel", func(t *testing.T) { + resp, err := icm.GetTimeTickChannel(ctx) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("GetStatisticsChannel", func(t *testing.T) { + resp, err := icm.GetStatisticsChannel(ctx) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("BuildIndex", func(t *testing.T) { + req := &indexpb.BuildIndexRequest{ + IndexBuildID: 0, + IndexID: 0, + DataPaths: []string{}, + } + resp, err := icm.BuildIndex(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("GetIndexStates", func(t *testing.T) { + req := &indexpb.GetIndexStatesRequest{ + IndexBuildIDs: []UniqueID{0}, + } + resp, err := icm.GetIndexStates(ctx, req) + assert.Nil(t, err) + assert.Equal(t, len(req.IndexBuildIDs), len(resp.States)) + assert.Equal(t, commonpb.IndexState_Finished, resp.States[0].State) + }) + + t.Run("DropIndex", func(t *testing.T) { + req := &indexpb.DropIndexRequest{ + IndexID: 0, + } + resp, err := icm.DropIndex(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("GetIndexFilePaths", func(t *testing.T) { + req := &indexpb.GetIndexFilePathsRequest{ + IndexBuildIDs: []UniqueID{0, 1}, + } + resp, err := icm.GetIndexFilePaths(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.Equal(t, len(req.IndexBuildIDs), len(resp.FilePaths)) + }) + + t.Run("GetMetrics", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{ + Request: "", + } + resp, err := icm.GetMetrics(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.Equal(t, "IndexCoord", resp.ComponentName) + }) + + err = icm.Stop() + assert.Nil(t, err) +} + +func TestIndexCoordMockError(t *testing.T) { + icm := Mock{ + Failure: true, + } + err := icm.Init() + assert.NotNil(t, err) + err = icm.Start() + assert.NotNil(t, err) + ctx := context.Background() + + t.Run("Register", func(t *testing.T) { + err = icm.Register() + assert.NotNil(t, err) + }) + t.Run("GetComponentStates", func(t *testing.T) { + states, err := icm.GetComponentStates(ctx) + assert.NotNil(t, err) + assert.Equal(t, internalpb.StateCode_Abnormal, states.State.StateCode) + }) + + t.Run("GetTimeTickChannel", func(t *testing.T) { + resp, err := icm.GetTimeTickChannel(ctx) + assert.NotNil(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode) + }) + + t.Run("GetStatisticsChannel", func(t *testing.T) { + resp, err := icm.GetStatisticsChannel(ctx) + assert.NotNil(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode) + }) + + t.Run("BuildIndex", func(t *testing.T) { + req := &indexpb.BuildIndexRequest{ + IndexBuildID: 0, + IndexID: 0, + DataPaths: []string{}, + } + resp, err := icm.BuildIndex(ctx, req) + assert.NotNil(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode) + }) + + t.Run("GetIndexStates", func(t *testing.T) { + req := &indexpb.GetIndexStatesRequest{ + IndexBuildIDs: []UniqueID{0}, + } + resp, err := icm.GetIndexStates(ctx, req) + assert.NotNil(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode) + }) + + t.Run("DropIndex", func(t *testing.T) { + req := &indexpb.DropIndexRequest{ + IndexID: 0, + } + resp, err := icm.DropIndex(ctx, req) + assert.NotNil(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.ErrorCode) + }) + + t.Run("GetIndexFilePaths", func(t *testing.T) { + req := &indexpb.GetIndexFilePathsRequest{ + IndexBuildIDs: []UniqueID{0, 1}, + } + resp, err := icm.GetIndexFilePaths(ctx, req) + assert.NotNil(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode) + }) + + t.Run("GetMetrics", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{ + Request: "", + } + resp, err := icm.GetMetrics(ctx, req) + assert.NotNil(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode) + }) + + err = icm.Stop() + assert.NotNil(t, err) +} diff --git a/internal/indexcoord/index_coord_test.go b/internal/indexcoord/index_coord_test.go index 617a471385..84af43844c 100644 --- a/internal/indexcoord/index_coord_test.go +++ b/internal/indexcoord/index_coord_test.go @@ -17,122 +17,25 @@ import ( "testing" "time" - "github.com/milvus-io/milvus/internal/util/typeutil" - "github.com/milvus-io/milvus/internal/proto/milvuspb" + grpcindexnode "github.com/milvus-io/milvus/internal/distributed/indexnode" + + "github.com/milvus-io/milvus/internal/indexnode" + "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/metricsinfo" - "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" - "github.com/milvus-io/milvus/internal/types" - "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" ) -type indexNodeMock struct { - types.IndexNode -} - -func (in *indexNodeMock) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) { - indexMeta := indexpb.IndexMeta{} - etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath) - if err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, err - } - _, values, versions, err := etcdKV.LoadWithPrefix2(req.MetaPath) - if err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, err - } - err = proto.UnmarshalText(values[0], &indexMeta) - if err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, err - } - indexMeta.IndexFilePaths = []string{"IndexFilePath-1", "IndexFilePath-2"} - indexMeta.State = commonpb.IndexState_Finished - _ = etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0], - proto.MarshalTextString(&indexMeta)) - - time.Sleep(10 * time.Second) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, nil -} - -func getSystemInfoMetricsByIndexNodeMock( - ctx context.Context, - req *milvuspb.GetMetricsRequest, - in *indexNodeMock, -) (*milvuspb.GetMetricsResponse, error) { - - id := UniqueID(16384) - - nodeInfos := metricsinfo.IndexNodeInfos{ - BaseComponentInfos: metricsinfo.BaseComponentInfos{ - Name: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id), - }, - } - resp, err := metricsinfo.MarshalComponentInfos(nodeInfos) - if err != nil { - return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, - Response: "", - ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id), - }, nil - } - - return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, - Response: resp, - ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id), - }, nil -} - -func (in *indexNodeMock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { - metricType, err := metricsinfo.ParseMetricType(req.Request) - if err != nil { - return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, - Response: "", - }, nil - } - - if metricType == metricsinfo.SystemInfoMetrics { - return getSystemInfoMetricsByIndexNodeMock(ctx, req, in) - } - - return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: metricsinfo.MsgUnimplementedMetric, - }, - Response: "", - }, nil -} - func TestIndexCoord(t *testing.T) { ctx := context.Background() ic, err := NewIndexCoord(ctx) @@ -140,14 +43,25 @@ func TestIndexCoord(t *testing.T) { Params.Init() err = ic.Register() assert.Nil(t, err) - // TODO: add indexNodeMock to etcd + err = ic.Init() assert.Nil(t, err) - indexNodeID := UniqueID(100) - ic.nodeManager.setClient(indexNodeID, &indexNodeMock{}) err = ic.Start() assert.Nil(t, err) + in, err := grpcindexnode.NewServer(ctx) + assert.Nil(t, err) + assert.NotNil(t, in) + inm := &indexnode.Mock{ + Build: true, + Failure: false, + } + err = in.SetClient(inm) + assert.Nil(t, err) + + err = in.Run() + assert.Nil(t, err) + state, err := ic.GetComponentStates(ctx) assert.Nil(t, err) assert.Equal(t, internalpb.StateCode_Healthy, state.State.StateCode) @@ -183,7 +97,7 @@ func TestIndexCoord(t *testing.T) { if resp.States[0].State == commonpb.IndexState_Finished { break } - time.Sleep(3 * time.Second) + time.Sleep(1 * time.Second) } }) @@ -221,8 +135,38 @@ func TestIndexCoord(t *testing.T) { zap.String("resp", resp.Response)) }) + t.Run("GetTimeTickChannel", func(t *testing.T) { + resp, err := ic.GetTimeTickChannel(ctx) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("GetStatisticsChannel", func(t *testing.T) { + resp, err := ic.GetStatisticsChannel(ctx) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("GetMetrics when indexcoord is not healthy", func(t *testing.T) { + ic.UpdateStateCode(internalpb.StateCode_Abnormal) + req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) + assert.Nil(t, err) + resp, err := ic.GetMetrics(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode) + ic.UpdateStateCode(internalpb.StateCode_Healthy) + }) + + t.Run("GetMetrics when request is illegal", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + resp, err := ic.GetMetrics(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode) + }) + + err = in.Stop() + assert.Nil(t, err) time.Sleep(11 * time.Second) - ic.nodeManager.RemoveNode(indexNodeID) err = ic.Stop() assert.Nil(t, err) diff --git a/internal/indexcoord/task_scheduler.go b/internal/indexcoord/task_scheduler.go index 0233a6d89b..a51be079c6 100644 --- a/internal/indexcoord/task_scheduler.go +++ b/internal/indexcoord/task_scheduler.go @@ -32,7 +32,7 @@ type TaskQueue interface { utEmpty() bool utFull() bool addUnissuedTask(t task) error - FrontUnissuedTask() task + //FrontUnissuedTask() task PopUnissuedTask() task AddActiveTask(t task) PopActiveTask(tID UniqueID) task @@ -78,17 +78,17 @@ func (queue *BaseTaskQueue) addUnissuedTask(t task) error { return nil } -func (queue *BaseTaskQueue) FrontUnissuedTask() task { - queue.utLock.Lock() - defer queue.utLock.Unlock() - - if queue.unissuedTasks.Len() <= 0 { - log.Warn("sorry, but the unissued task list is empty!") - return nil - } - - return queue.unissuedTasks.Front().Value.(task) -} +//func (queue *BaseTaskQueue) FrontUnissuedTask() task { +// queue.utLock.Lock() +// defer queue.utLock.Unlock() +// +// if queue.unissuedTasks.Len() <= 0 { +// log.Warn("sorry, but the unissued task list is empty!") +// return nil +// } +// +// return queue.unissuedTasks.Front().Value.(task) +//} func (queue *BaseTaskQueue) PopUnissuedTask() task { queue.utLock.Lock() diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 8a9b073b0a..ba9dcafd4a 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -87,6 +87,8 @@ func (i *IndexNode) Register() error { } func (i *IndexNode) Init() error { + i.UpdateStateCode(internalpb.StateCode_Initializing) + log.Debug("IndexNode", zap.Any("State", internalpb.StateCode_Initializing)) connectEtcdFn := func() error { etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath) i.etcdKV = etcdKV @@ -114,15 +116,14 @@ func (i *IndexNode) Init() error { } log.Debug("IndexNode NewMinIOKV success") i.closer = trace.InitTracing("index_node") - - i.UpdateStateCode(internalpb.StateCode_Healthy) - log.Debug("IndexNode", zap.Any("State", i.stateCode.Load())) return nil } func (i *IndexNode) Start() error { i.sched.Start() + i.UpdateStateCode(internalpb.StateCode_Healthy) + log.Debug("IndexNode", zap.Any("State", i.stateCode.Load())) // Start callbacks for _, cb := range i.startCallbacks { cb() @@ -199,14 +200,14 @@ func (i *IndexNode) CreateIndex(ctx context.Context, request *indexpb.CreateInde } // AddStartCallback adds a callback in the startServer phase. -func (i *IndexNode) AddStartCallback(callbacks ...func()) { - i.startCallbacks = append(i.startCallbacks, callbacks...) -} +//func (i *IndexNode) AddStartCallback(callbacks ...func()) { +// i.startCallbacks = append(i.startCallbacks, callbacks...) +//} // AddCloseCallback adds a callback in the Close phase. -func (i *IndexNode) AddCloseCallback(callbacks ...func()) { - i.closeCallbacks = append(i.closeCallbacks, callbacks...) -} +//func (i *IndexNode) AddCloseCallback(callbacks ...func()) { +// i.closeCallbacks = append(i.closeCallbacks, callbacks...) +//} func (i *IndexNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { log.Debug("get IndexNode components states ...") diff --git a/internal/indexnode/indexnode_mock.go b/internal/indexnode/indexnode_mock.go new file mode 100644 index 0000000000..0137b397aa --- /dev/null +++ b/internal/indexnode/indexnode_mock.go @@ -0,0 +1,247 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package indexnode + +import ( + "context" + "errors" + "strconv" + "sync" + "time" + + "github.com/milvus-io/milvus/internal/log" + + "github.com/golang/protobuf/proto" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/util/metricsinfo" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +type Mock struct { + Build bool + Failure bool + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + + etcdKV *etcdkv.EtcdKV + + buildIndex chan *indexpb.CreateIndexRequest +} + +func (inm *Mock) Init() error { + if inm.Failure { + return errors.New("IndexNode init failed") + } + inm.ctx, inm.cancel = context.WithCancel(context.Background()) + inm.buildIndex = make(chan *indexpb.CreateIndexRequest) + return nil +} + +func (inm *Mock) buildIndexTask() { + log.Debug("IndexNodeMock wait for building index") + defer inm.wg.Done() + for { + select { + case <-inm.ctx.Done(): + return + case req := <-inm.buildIndex: + if inm.Failure && inm.Build { + indexMeta := indexpb.IndexMeta{} + + _, values, versions, _ := inm.etcdKV.LoadWithPrefix2(req.MetaPath) + _ = proto.UnmarshalText(values[0], &indexMeta) + indexMeta.IndexFilePaths = []string{"IndexFilePath-1", "IndexFilePath-2"} + indexMeta.State = commonpb.IndexState_Failed + time.Sleep(time.Second) + _ = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0], + proto.MarshalTextString(&indexMeta)) + } + if inm.Build { + indexMeta := indexpb.IndexMeta{} + _, values, versions, _ := inm.etcdKV.LoadWithPrefix2(req.MetaPath) + _ = proto.UnmarshalText(values[0], &indexMeta) + indexMeta.IndexFilePaths = []string{"IndexFilePath-1", "IndexFilePath-2"} + indexMeta.State = commonpb.IndexState_Failed + time.Sleep(time.Second) + _ = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0], + proto.MarshalTextString(&indexMeta)) + indexMeta.Version = indexMeta.Version + 1 + indexMeta.State = commonpb.IndexState_Finished + _ = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0]+1, + proto.MarshalTextString(&indexMeta)) + } + } + } +} + +func (inm *Mock) Start() error { + inm.wg.Add(1) + go inm.buildIndexTask() + if inm.Failure { + return errors.New("IndexNode start failed") + } + return nil +} + +func (inm *Mock) Stop() error { + inm.cancel() + inm.wg.Wait() + inm.etcdKV.RemoveWithPrefix("session/" + typeutil.IndexNodeRole) + if inm.Failure { + return errors.New("IndexNode stop failed") + } + return nil +} + +func (inm *Mock) Register() error { + if inm.Failure { + return errors.New("IndexNode register failed") + } + inm.etcdKV, _ = etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath) + inm.etcdKV.RemoveWithPrefix("session/" + typeutil.IndexNodeRole) + session := sessionutil.NewSession(context.Background(), Params.MetaRootPath, Params.EtcdEndpoints) + session.Init(typeutil.IndexNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false) + return nil +} + +func (inm *Mock) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { + if inm.Failure { + return &internalpb.ComponentStates{ + State: &internalpb.ComponentInfo{ + StateCode: internalpb.StateCode_Abnormal, + }, + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + }, nil + } + return &internalpb.ComponentStates{ + State: &internalpb.ComponentInfo{ + StateCode: internalpb.StateCode_Healthy, + }, + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil +} + +func (inm *Mock) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + if inm.Failure { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + }, errors.New("IndexNode GetStatisticsChannel failed") + } + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + Value: "", + }, nil +} + +func (inm *Mock) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + if inm.Failure { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + }, errors.New("IndexNode GetTimeTickChannel failed") + } + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + Value: "", + }, nil +} + +func (inm *Mock) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) { + if inm.Build { + inm.buildIndex <- req + } + + if inm.Failure { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, errors.New("IndexNode CreateIndex failed") + } + + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, nil +} + +func (inm *Mock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + if inm.Failure { + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: metricsinfo.MsgUnimplementedMetric, + }, + Response: "", + }, errors.New("IndexNode GetMetrics failed") + } + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + Response: "", + ComponentName: "IndexNode", + }, nil +} + +//func getSystemInfoMetricsByIndexNodeMock( +// ctx context.Context, +// req *milvuspb.GetMetricsRequest, +// in *IndexNodeMock, +//) (*milvuspb.GetMetricsResponse, error) { +// +// id := UniqueID(16384) +// +// nodeInfos := metricsinfo.IndexNodeInfos{ +// BaseComponentInfos: metricsinfo.BaseComponentInfos{ +// Name: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id), +// }, +// } +// resp, err := metricsinfo.MarshalComponentInfos(nodeInfos) +// if err != nil { +// return &milvuspb.GetMetricsResponse{ +// Status: &commonpb.Status{ +// ErrorCode: commonpb.ErrorCode_UnexpectedError, +// Reason: err.Error(), +// }, +// Response: "", +// ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id), +// }, nil +// } +// +// return &milvuspb.GetMetricsResponse{ +// Status: &commonpb.Status{ +// ErrorCode: commonpb.ErrorCode_Success, +// Reason: "", +// }, +// Response: resp, +// ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id), +// }, nil +//} diff --git a/internal/indexnode/indexnode_mock_test.go b/internal/indexnode/indexnode_mock_test.go new file mode 100644 index 0000000000..d4b88c22ff --- /dev/null +++ b/internal/indexnode/indexnode_mock_test.go @@ -0,0 +1,77 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package indexnode + +import ( + "context" + "testing" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/stretchr/testify/assert" +) + +func TestIndexNodeMock(t *testing.T) { + Params.Init() + inm := Mock{} + err := inm.Register() + assert.Nil(t, err) + err = inm.Init() + assert.Nil(t, err) + err = inm.Start() + assert.Nil(t, err) + ctx := context.Background() + + t.Run("GetComponentStates", func(t *testing.T) { + states, err := inm.GetComponentStates(ctx) + assert.Nil(t, err) + assert.Equal(t, internalpb.StateCode_Healthy, states.State.StateCode) + }) + + t.Run("GetTimeTickChannel", func(t *testing.T) { + resp, err := inm.GetTimeTickChannel(ctx) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("GetStatisticsChannel", func(t *testing.T) { + resp, err := inm.GetStatisticsChannel(ctx) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("CreateIndex", func(t *testing.T) { + req := &indexpb.CreateIndexRequest{ + IndexBuildID: 0, + IndexID: 0, + DataPaths: []string{}, + } + resp, err := inm.CreateIndex(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("GetMetrics", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{ + Request: "", + } + resp, err := inm.GetMetrics(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.Equal(t, "IndexNode", resp.ComponentName) + }) + + err = inm.Stop() + assert.Nil(t, err) +} diff --git a/internal/types/types.go b/internal/types/types.go index 121ee603bd..c16afb79ea 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -83,7 +83,6 @@ type IndexCoord interface { DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) - GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) }