From 836a45ec26d5469b890bd8d3bc723fe89aded441 Mon Sep 17 00:00:00 2001 From: sunby Date: Tue, 6 Jul 2021 09:24:05 +0800 Subject: [PATCH] Remove sending newSegmentMsg to rootcoord (#6301) Signed-off-by: sunby --- internal/datacoord/server.go | 35 ++++++++--------------------------- 1 file changed, 8 insertions(+), 27 deletions(-) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 9bf6ddad50..2328880c11 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -71,14 +71,13 @@ type Server struct { serverLoopWg sync.WaitGroup isServing ServerState - kvClient *etcdkv.EtcdKV - meta *meta - segmentInfoStream msgstream.MsgStream - segmentManager Manager - allocator allocator - cluster *cluster - rootCoordClient types.RootCoord - ddChannelName string + kvClient *etcdkv.EtcdKV + meta *meta + segmentManager Manager + allocator allocator + cluster *cluster + rootCoordClient types.RootCoord + ddChannelName string flushCh chan UniqueID msFactory msgstream.Factory @@ -155,10 +154,6 @@ func (s *Server) Start() error { return err } - if err = s.initSegmentInfoChannel(); err != nil { - return err - } - s.allocator = newRootCoordAllocator(s.ctx, s.rootCoordClient) s.startSegmentManager() @@ -231,20 +226,7 @@ func (s *Server) loadDataNodes() []*datapb.DataNodeInfo { } func (s *Server) startSegmentManager() { - helper := createNewSegmentHelper(s.segmentInfoStream) - s.segmentManager = newSegmentManager(s.meta, s.allocator, withAllocHelper(helper)) -} - -func (s *Server) initSegmentInfoChannel() error { - var err error - s.segmentInfoStream, err = s.msFactory.NewMsgStream(s.ctx) - if err != nil { - return err - } - s.segmentInfoStream.AsProducer([]string{Params.SegmentInfoChannelName}) - log.Debug("DataCoord AsProducer: " + Params.SegmentInfoChannelName) - s.segmentInfoStream.Start() - return nil + s.segmentManager = newSegmentManager(s.meta, s.allocator) } func (s *Server) initMeta() error { @@ -501,7 +483,6 @@ func (s *Server) Stop() error { log.Debug("DataCoord server shutdown") atomic.StoreInt64(&s.isServing, ServerStateStopped) s.cluster.releaseSessions() - s.segmentInfoStream.Close() s.stopServerLoop() return nil }