diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index dc21b6e617..125b1556cd 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -133,10 +133,13 @@ func (c *cluster) refresh(dataNodes []*datapb.DataNodeInfo) error { } // paraRun parallel run, with max Parallel limit -func parraRun(works []func(), maxRunner int) { +func paraRun(works []func(), maxRunner int) { wg := sync.WaitGroup{} ch := make(chan func()) wg.Add(len(works)) + if maxRunner > len(works) { + maxRunner = len(works) + } for i := 0; i < maxRunner; i++ { go func() { @@ -210,6 +213,7 @@ func (c *cluster) watch(nodes []*datapb.DataNodeInfo) ([]*datapb.DataNodeInfo, e mut.Lock() errs = append(errs, err) mut.Unlock() + return } if resp.ErrorCode != commonpb.ErrorCode_Success { log.Warn("watch channels failed", zap.String("address", n.Address), zap.Error(err)) @@ -225,8 +229,11 @@ func (c *cluster) watch(nodes []*datapb.DataNodeInfo) ([]*datapb.DataNodeInfo, e } }) } - parraRun(works, 3) - return nodes, retry.ErrorList(errs) + paraRun(works, 20) + if len(errs) > 0 { + return nodes, retry.ErrorList(errs) + } + return nodes, nil } func (c *cluster) register(n *datapb.DataNodeInfo) { diff --git a/internal/datacoord/grpc_services.go b/internal/datacoord/grpc_services.go index 012b3a696a..c98d21dbba 100644 --- a/internal/datacoord/grpc_services.go +++ b/internal/datacoord/grpc_services.go @@ -19,7 +19,7 @@ import ( const serverNotServingErrMsg = "server is not serving" func (s *Server) isClosed() bool { - return atomic.LoadInt64(&s.isServing) != 2 + return atomic.LoadInt64(&s.isServing) != ServerStateHealthy } func (s *Server) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { @@ -354,9 +354,9 @@ func (s *Server) GetComponentStates(ctx context.Context) (*internalpb.ComponentS } state := atomic.LoadInt64(&s.isServing) switch state { - case 1: + case ServerStateInitializing: resp.State.StateCode = internalpb.StateCode_Initializing - case 2: + case ServerStateHealthy: resp.State.StateCode = internalpb.StateCode_Healthy default: resp.State.StateCode = internalpb.StateCode_Abnormal diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 61c3f344df..49944fc5e1 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -48,15 +48,29 @@ type ( Timestamp = typeutil.Timestamp ) +// ServerState type alias +type ServerState = int64 + +const ( + // ServerStateStopped state stands for just created or stopped `Server` instance + ServerStateStopped ServerState = 0 + // ServerStateInitializing state stands initializing `Server` instance + ServerStateInitializing ServerState = 1 + // ServerStateHealthy state stands for healthy `Server` instance + ServerStateHealthy ServerState = 2 +) + type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error) type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) +// Server implements `types.Datacoord` +// handles Data Cooridinator related jobs type Server struct { ctx context.Context serverLoopCtx context.Context serverLoopCancel context.CancelFunc serverLoopWg sync.WaitGroup - isServing int64 + isServing ServerState kvClient *etcdkv.EtcdKV meta *meta @@ -79,6 +93,7 @@ type Server struct { rootCoordClientCreator rootCoordCreatorFunc } +// CreateServer create `Server` instance func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { rand.Seed(time.Now().UnixNano()) s := &Server{ @@ -107,11 +122,19 @@ func (s *Server) Register() error { return nil } +// Init change server state to Initializing func (s *Server) Init() error { - atomic.StoreInt64(&s.isServing, 1) + atomic.StoreInt64(&s.isServing, ServerStateInitializing) return nil } +// Start initialize `Server` members and start loops, follow steps are taken: +// 1. initialize message factory parameters +// 2. initialize root coord client, meta, datanode cluster, segment info channel, +// allocator, segment manager +// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt) +// datanodes etcd watch, etcd alive check and flush completed status check +// 4. set server state to Healthy func (s *Server) Start() error { var err error m := map[string]interface{}{ @@ -151,7 +174,7 @@ func (s *Server) Start() error { s.startServerLoop() - atomic.StoreInt64(&s.isServing, 2) + atomic.StoreInt64(&s.isServing, ServerStateHealthy) log.Debug("DataCoordinator startup success") return nil } @@ -482,12 +505,16 @@ func (s *Server) initRootCoordClient() error { return s.rootCoordClient.Start() } +// Stop do the Server finalize processes +// it checks the server status is healthy, if not, just quit +// if Server is healthy, set server state to stopped, release etcd session, +// stop message stream client and stop server loops func (s *Server) Stop() error { - if !atomic.CompareAndSwapInt64(&s.isServing, 2, 0) { + if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) { return nil } log.Debug("DataCoord server shutdown") - atomic.StoreInt64(&s.isServing, 0) + atomic.StoreInt64(&s.isServing, ServerStateStopped) s.cluster.releaseSessions() s.segmentInfoStream.Close() s.flushMsgStream.Close()