From 0b1c4f0420dfd54cbeec4caa00b0c493a2e4e3e3 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 26 Jan 2021 17:40:26 +0800 Subject: [PATCH] Add building datanode cmd in Makefile Signed-off-by: XuanYang-cn --- Makefile | 2 + internal/dataservice/dd_handler.go | 89 ------------------- internal/dataservice/server.go | 81 +++++------------ .../distributed/dataservice/grpc_service.go | 16 ++-- 4 files changed, 34 insertions(+), 154 deletions(-) delete mode 100644 internal/dataservice/dd_handler.go diff --git a/Makefile b/Makefile index cc1df294cb..bf36806194 100644 --- a/Makefile +++ b/Makefile @@ -139,6 +139,8 @@ build-go: build-cpp @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/indexservice $(PWD)/cmd/distributed/indexservice/main.go 1>/dev/null @echo "Building distributed indexnode ..." @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/indexnode $(PWD)/cmd/distributed/indexnode/main.go 1>/dev/null + @echo "Building data node ..." + @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/datanode $(PWD)/cmd/datanode/main.go 1>/dev/null @echo "Building dataservice ..." @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/dataservice $(PWD)/cmd/dataservice/main.go 1>/dev/null diff --git a/internal/dataservice/dd_handler.go b/internal/dataservice/dd_handler.go deleted file mode 100644 index c3f001067d..0000000000 --- a/internal/dataservice/dd_handler.go +++ /dev/null @@ -1,89 +0,0 @@ -package dataservice - -import ( - "fmt" - - "github.com/golang/protobuf/proto" - - "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" -) - -type ddHandler struct { - meta *meta - segmentAllocator segmentAllocator -} - -func newDDHandler(meta *meta, allocator segmentAllocator) *ddHandler { - return &ddHandler{ - meta: meta, - segmentAllocator: allocator, - } -} - -func (handler *ddHandler) HandleDDMsg(msg msgstream.TsMsg) error { - switch msg.Type() { - case commonpb.MsgType_kCreateCollection: - realMsg := msg.(*msgstream.CreateCollectionMsg) - return handler.handleCreateCollection(realMsg) - case commonpb.MsgType_kDropCollection: - realMsg := msg.(*msgstream.DropCollectionMsg) - return handler.handleDropCollection(realMsg) - case commonpb.MsgType_kCreatePartition: - realMsg := msg.(*msgstream.CreatePartitionMsg) - return handler.handleCreatePartition(realMsg) - case commonpb.MsgType_kDropPartition: - realMsg := msg.(*msgstream.DropPartitionMsg) - return handler.handleDropPartition(realMsg) - default: - return fmt.Errorf("unknown msg type: %v", msg.Type()) - } -} - -func (handler *ddHandler) handleCreateCollection(msg *msgstream.CreateCollectionMsg) error { - schema := &schemapb.CollectionSchema{} - if err := proto.Unmarshal(msg.Schema, schema); err != nil { - return err - } - err := handler.meta.AddCollection(&collectionInfo{ - ID: msg.CollectionID, - Schema: schema, - }) - if err != nil { - return err - } - return nil -} - -func (handler *ddHandler) handleDropCollection(msg *msgstream.DropCollectionMsg) error { - ids := handler.meta.GetSegmentsByCollectionID(msg.CollectionID) - for _, id := range ids { - if err := handler.meta.DropSegment(id); err != nil { - continue - } - handler.segmentAllocator.DropSegment(id) - } - if err := handler.meta.DropCollection(msg.CollectionID); err != nil { - return err - } - return nil -} - -func (handler *ddHandler) handleDropPartition(msg *msgstream.DropPartitionMsg) error { - ids := handler.meta.GetSegmentsByCollectionAndPartitionID(msg.CollectionID, msg.PartitionID) - for _, id := range ids { - if err := handler.meta.DropSegment(id); err != nil { - return err - } - handler.segmentAllocator.DropSegment(id) - } - if err := handler.meta.DropPartition(msg.CollectionID, msg.PartitionID); err != nil { - return err - } - return nil -} - -func (handler *ddHandler) handleCreatePartition(msg *msgstream.CreatePartitionMsg) error { - return handler.meta.AddPartition(msg.CollectionID, msg.PartitionID) -} diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 4844b84110..a478bbc367 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -65,26 +65,26 @@ type ( UniqueID = typeutil.UniqueID Timestamp = typeutil.Timestamp Server struct { - ctx context.Context - serverLoopCtx context.Context - serverLoopCancel context.CancelFunc - serverLoopWg sync.WaitGroup - state atomic.Value - client *etcdkv.EtcdKV - meta *meta - segAllocator segmentAllocator - statsHandler *statsHandler - ddHandler *ddHandler - insertChannelMgr *insertChannelManager - allocator allocator - cluster *dataNodeCluster - msgProducer *timesync.MsgProducer - registerFinishCh chan struct{} - masterClient MasterClient - ttMsgStream msgstream.MsgStream - k2sMsgStream msgstream.MsgStream - ddChannelName string - segmentInfoStream msgstream.MsgStream + ctx context.Context + serverLoopCtx context.Context + serverLoopCancel context.CancelFunc + serverLoopWg sync.WaitGroup + state atomic.Value + client *etcdkv.EtcdKV + meta *meta + segAllocator segmentAllocator + statsHandler *statsHandler + insertChannelMgr *insertChannelManager + allocator allocator + cluster *dataNodeCluster + msgProducer *timesync.MsgProducer + registerFinishCh chan struct{} + masterClient MasterClient + ttMsgStream msgstream.MsgStream + k2sMsgStream msgstream.MsgStream + ddChannelName string + segmentInfoStream msgstream.MsgStream + segmentFlushStream msgstream.MsgStream } ) @@ -97,6 +97,7 @@ func CreateServer(ctx context.Context) (*Server, error) { registerFinishCh: ch, cluster: newDataNodeCluster(ch), } + s.state.Store(internalpb2.StateCode_INITIALIZING) return s, nil } @@ -105,7 +106,6 @@ func (s *Server) SetMasterClient(masterClient MasterClient) { } func (s *Server) Init() error { - s.state.Store(internalpb2.StateCode_INITIALIZING) return nil } @@ -120,7 +120,6 @@ func (s *Server) Start() error { if err != nil { return err } - s.ddHandler = newDDHandler(s.meta, s.segAllocator) s.initSegmentInfoChannel() if err = s.initMsgProducer(); err != nil { return err @@ -188,13 +187,6 @@ func (s *Server) loadMetaFromMaster() error { if err := s.checkMasterIsHealthy(); err != nil { return err } - if s.ddChannelName == "" { - channel, err := s.masterClient.GetDdChannel() - if err != nil { - return err - } - s.ddChannelName = channel - } collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kShowCollections, @@ -282,10 +274,9 @@ func (s *Server) checkMasterIsHealthy() error { func (s *Server) startServerLoop() { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) - s.serverLoopWg.Add(3) + s.serverLoopWg.Add(2) go s.startStatsChannel(s.serverLoopCtx) go s.startSegmentFlushChannel(s.serverLoopCtx) - go s.startDDChannel(s.serverLoopCtx) } func (s *Server) startStatsChannel(ctx context.Context) { @@ -349,30 +340,6 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) { } } -func (s *Server) startDDChannel(ctx context.Context) { - defer s.serverLoopWg.Done() - ddStream := pulsarms.NewPulsarMsgStream(ctx, 1024) - ddStream.SetPulsarClient(Params.PulsarAddress) - ddStream.CreatePulsarConsumers([]string{s.ddChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024) - ddStream.Start() - defer ddStream.Close() - for { - select { - case <-ctx.Done(): - log.Println("dd channel shut down") - return - default: - } - msgPack := ddStream.Consume() - for _, msg := range msgPack.Msgs { - if err := s.ddHandler.HandleDDMsg(msg); err != nil { - log.Println(err.Error()) - continue - } - } - } -} - func (s *Server) waitDataNodeRegister() { log.Println("waiting data node to register") <-s.registerFinishCh @@ -545,8 +512,8 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kSegmentInfo, MsgID: 0, - Timestamp: 0, - SourceID: Params.NodeID, + Timestamp: 0, // todo + SourceID: 0, }, Segment: segmentInfo, }, diff --git a/internal/distributed/dataservice/grpc_service.go b/internal/distributed/dataservice/grpc_service.go index cf3019dffb..67c20bb458 100644 --- a/internal/distributed/dataservice/grpc_service.go +++ b/internal/distributed/dataservice/grpc_service.go @@ -35,14 +35,6 @@ func NewGrpcService(ctx context.Context) *Service { log.Fatalf("create server error: %s", err.Error()) return nil } - return s -} - -func (s *Service) SetMasterClient(masterClient dataservice.MasterClient) { - s.server.SetMasterClient(masterClient) -} - -func (s *Service) Init() error { s.grpcServer = grpc.NewServer() datapb.RegisterDataServiceServer(s.grpcServer, s) lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", dataservice.Params.Address, dataservice.Params.Port)) @@ -54,6 +46,14 @@ func (s *Service) Init() error { log.Fatal(err.Error()) return nil } + return s +} + +func (s *Service) SetMasterClient(masterClient dataservice.MasterClient) { + s.server.SetMasterClient(masterClient) +} + +func (s *Service) Init() error { return s.server.Init() }