From 9f948d3095b6ca8a2ec3a151efbe83eb946c0b37 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Sat, 14 Nov 2020 16:40:10 +0800 Subject: [PATCH] Remove mockmaster Signed-off-by: zhenshan.cao --- cmd/mock/master/main.go | 54 ------ internal/master/meta_table_test.go | 6 +- internal/master/mock/grpc_service.go | 112 ------------- internal/master/mock/master.go | 239 --------------------------- 4 files changed, 5 insertions(+), 406 deletions(-) delete mode 100644 cmd/mock/master/main.go delete mode 100644 internal/master/mock/grpc_service.go delete mode 100644 internal/master/mock/master.go diff --git a/cmd/mock/master/main.go b/cmd/mock/master/main.go deleted file mode 100644 index a33b83c803..0000000000 --- a/cmd/mock/master/main.go +++ /dev/null @@ -1,54 +0,0 @@ -package main - -import ( - "context" - "log" - "os" - "os/signal" - "syscall" - - "go.uber.org/zap" - - mockmaster "github.com/zilliztech/milvus-distributed/internal/master/mock" -) - -func main() { - // Creates server. - ctx, cancel := context.WithCancel(context.Background()) - svr, err := mockmaster.CreateServer(ctx) - if err != nil { - log.Print("create server failed", zap.Error(err)) - } - - sc := make(chan os.Signal, 1) - signal.Notify(sc, - syscall.SIGHUP, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGQUIT) - - var sig os.Signal - go func() { - sig = <-sc - cancel() - }() - - if err := svr.Run(); err != nil { - log.Fatal("run server failed", zap.Error(err)) - } - - <-ctx.Done() - log.Print("Got signal to exit", zap.String("signal", sig.String())) - - svr.Close() - switch sig { - case syscall.SIGTERM: - exit(0) - default: - exit(1) - } -} - -func exit(code int) { - os.Exit(code) -} diff --git a/internal/master/meta_table_test.go b/internal/master/meta_table_test.go index 6e8d5c88cb..6c5572e1a9 100644 --- a/internal/master/meta_table_test.go +++ b/internal/master/meta_table_test.go @@ -2,6 +2,7 @@ package master import ( "context" + "reflect" "testing" "github.com/stretchr/testify/assert" @@ -105,7 +106,10 @@ func TestMetaTable_Collection(t *testing.T) { collsName, err := meta.ListCollections() assert.Nil(t, err) - assert.Equal(t, collsName, []string{"coll1", "coll2"}) + assert.Equal(t, len(collsName), 2) + e1 := reflect.DeepEqual(collsName, []string{"coll1", "coll2"}) + e2 := reflect.DeepEqual(collsName, []string{"coll2", "coll1"}) + assert.True(t, e1 || e2) hasCollection := meta.HasCollection(colMeta.ID) assert.True(t, hasCollection) diff --git a/internal/master/mock/grpc_service.go b/internal/master/mock/grpc_service.go deleted file mode 100644 index 84e199d3da..0000000000 --- a/internal/master/mock/grpc_service.go +++ /dev/null @@ -1,112 +0,0 @@ -package mockmaster - -import ( - "context" - - "github.com/zilliztech/milvus-distributed/internal/master/tso" - - "github.com/zilliztech/milvus-distributed/internal/master/id" - - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" - "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" -) - -func (s *Master) CreateCollection(ctx context.Context, in *internalpb.CreateCollectionRequest) (*commonpb.Status, error) { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, nil -} - -func (s *Master) DropCollection(ctx context.Context, in *internalpb.DropCollectionRequest) (*commonpb.Status, error) { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, nil -} - -func (s *Master) HasCollection(ctx context.Context, in *internalpb.HasCollectionRequest) (*servicepb.BoolResponse, error) { - return &servicepb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - Value: false, - }, nil -} - -func (s *Master) DescribeCollection(ctx context.Context, in *internalpb.DescribeCollectionRequest) (*servicepb.CollectionDescription, error) { - return &servicepb.CollectionDescription{}, nil -} - -func (s *Master) ShowCollections(ctx context.Context, in *internalpb.ShowCollectionRequest) (*servicepb.StringListResponse, error) { - return &servicepb.StringListResponse{}, nil -} - -////////////////////////////////////////////////////////////////////////// -func (s *Master) CreatePartition(ctx context.Context, in *internalpb.CreatePartitionRequest) (*commonpb.Status, error) { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, nil -} - -func (s *Master) DropPartition(ctx context.Context, in *internalpb.DropPartitionRequest) (*commonpb.Status, error) { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, nil -} - -func (s *Master) HasPartition(ctx context.Context, in *internalpb.HasPartitionRequest) (*servicepb.BoolResponse, error) { - - return &servicepb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - }, nil -} - -func (s *Master) DescribePartition(ctx context.Context, in *internalpb.DescribePartitionRequest) (*servicepb.PartitionDescription, error) { - return &servicepb.PartitionDescription{}, nil -} - -func (s *Master) ShowPartitions(ctx context.Context, in *internalpb.ShowPartitionRequest) (*servicepb.StringListResponse, error) { - return &servicepb.StringListResponse{}, nil -} - -//----------------------------------------Internal GRPC Service-------------------------------- - -func (s *Master) AllocTimestamp(ctx context.Context, request *internalpb.TsoRequest) (*internalpb.TsoResponse, error) { - count := request.GetCount() - ts, err := tso.Alloc(count) - - if err != nil { - return &internalpb.TsoResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR}, - }, err - } - - response := &internalpb.TsoResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR}, - Timestamp: ts, - Count: count, - } - - return response, nil -} - -func (s *Master) AllocID(ctx context.Context, request *internalpb.IDRequest) (*internalpb.IDResponse, error) { - count := request.GetCount() - ts, err := id.AllocOne() - - if err != nil { - return &internalpb.IDResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR}, - }, err - } - - response := &internalpb.IDResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR}, - ID: ts, - Count: count, - } - - return response, nil -} diff --git a/internal/master/mock/master.go b/internal/master/mock/master.go deleted file mode 100644 index 3166aa6c1e..0000000000 --- a/internal/master/mock/master.go +++ /dev/null @@ -1,239 +0,0 @@ -package mockmaster - -import ( - "context" - "fmt" - "log" - "math/rand" - "net" - "sync" - "sync/atomic" - "time" - - "google.golang.org/grpc" - - "github.com/zilliztech/milvus-distributed/internal/kv/mockkv" - "github.com/zilliztech/milvus-distributed/internal/master/id" - "github.com/zilliztech/milvus-distributed/internal/master/tso" - "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" - "github.com/zilliztech/milvus-distributed/internal/util/kvutil" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" -) - -// Server is the pd server. -type Master struct { - isServing int64 - - ctx context.Context - serverLoopCtx context.Context - serverLoopCancel func() - serverLoopWg sync.WaitGroup - - //grpc server - grpcServer *grpc.Server - - // for tso. - tsoAllocator tso.Allocator - - kvBase kvutil.Base - - // error chans - grpcErr chan error - - // Add callback functions at different stages - startCallbacks []func() - closeCallbacks []func() - - grpcAddr net.Addr -} - -func Init() { - rand.Seed(time.Now().UnixNano()) - id.Init() - tso.Init() -} - -// CreateServer creates the UNINITIALIZED pd server with given configuration. -func CreateServer(ctx context.Context) (*Master, error) { - Init() - m := &Master{ - ctx: ctx, - kvBase: mockkv.NewEtcdKV(), - grpcErr: make(chan error), - } - - m.grpcServer = grpc.NewServer() - masterpb.RegisterMasterServer(m.grpcServer, m) - return m, nil -} - -// AddStartCallback adds a callback in the startServer phase. -func (s *Master) AddStartCallback(callbacks ...func()) { - s.startCallbacks = append(s.startCallbacks, callbacks...) -} - -// for unittest, get the grpc server addr -func (s *Master) GetGRPCAddr() net.Addr { - return s.grpcAddr -} - -func (s *Master) startServer(ctx context.Context) error { - - // Run callbacks - for _, cb := range s.startCallbacks { - cb() - } - return nil -} - -// AddCloseCallback adds a callback in the Close phase. -func (s *Master) AddCloseCallback(callbacks ...func()) { - s.closeCallbacks = append(s.closeCallbacks, callbacks...) -} - -// Close closes the server. -func (s *Master) Close() { - if !atomic.CompareAndSwapInt64(&s.isServing, 1, 0) { - // server is already closed - return - } - - log.Print("closing server") - - s.stopServerLoop() - - if s.kvBase != nil { - s.kvBase.Close() - } - - // Run callbacks - for _, cb := range s.closeCallbacks { - cb() - } - - log.Print("close server") -} - -// IsClosed checks whether server is closed or not. -func (s *Master) IsServing() bool { - return !s.IsClosed() -} - -// IsClosed checks whether server is closed or not. -func (s *Master) IsClosed() bool { - return atomic.LoadInt64(&s.isServing) == 0 -} - -// Run runs the pd server. -func (s *Master) Run() error { - - if err := s.startServer(s.ctx); err != nil { - return err - } - - s.startServerLoop(s.ctx) - // Server has started. - - if err := <-s.grpcErr; err != nil { - s.Close() - return err - } - - atomic.StoreInt64(&s.isServing, 1) - return nil -} - -// Context returns the context of server. -func (s *Master) Context() context.Context { - return s.ctx -} - -// LoopContext returns the loop context of server. -func (s *Master) LoopContext() context.Context { - return s.serverLoopCtx -} - -func (s *Master) startServerLoop(ctx context.Context) { - s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(ctx) - s.serverLoopWg.Add(1) - go s.grpcLoop() - s.serverLoopWg.Add(1) - go s.pulsarLoop() - s.serverLoopWg.Add(1) - go s.segmentStatisticsLoop() -} - -func (s *Master) stopServerLoop() { - if s.grpcServer != nil { - s.grpcServer.GracefulStop() - } - s.serverLoopCancel() - s.serverLoopWg.Wait() -} - -func (s *Master) checkReady(ctx context.Context, targetCh chan error) { - select { - case <-time.After(100 * time.Millisecond): - targetCh <- nil - case <-ctx.Done(): - return - } -} - -func (s *Master) grpcLoop() { - defer s.serverLoopWg.Done() - masterAddr, err := gparams.GParams.Load("master.address") - if err != nil { - panic(err) - } - masterPort, err := gparams.GParams.Load("master.port") - if err != nil { - panic(err) - } - masterAddr = masterAddr + ":" + masterPort - lis, err := net.Listen("tcp", masterAddr) - if err != nil { - log.Printf("failed to listen: %v", err) - s.grpcErr <- err - return - } - ctx, cancel := context.WithCancel(s.serverLoopCtx) - defer cancel() - go s.checkReady(ctx, s.grpcErr) - s.grpcAddr = lis.Addr() - fmt.Printf("Start MockMaster grpc server , addr:%v\n", s.grpcAddr) - if err := s.grpcServer.Serve(lis); err != nil { - fmt.Println(err) - s.grpcErr <- err - } -} - -// todo use messagestream -func (s *Master) pulsarLoop() { - defer s.serverLoopWg.Done() - - ctx, cancel := context.WithCancel(s.serverLoopCtx) - defer cancel() - - <-ctx.Done() - log.Print("server is closed, exit pulsar loop") -} - -func (s *Master) tasksExecutionLoop() { - defer s.serverLoopWg.Done() - ctx, cancel := context.WithCancel(s.serverLoopCtx) - defer cancel() - - <-ctx.Done() - log.Print("server is closed, exit task execution loop") -} - -func (s *Master) segmentStatisticsLoop() { - defer s.serverLoopWg.Done() - - ctx, cancel := context.WithCancel(s.serverLoopCtx) - defer cancel() - - <-ctx.Done() - log.Print("server is closed, exit segmentStatistics loop") -}