From 41a6f9b5f78bf313d20518bb7f7ff9e8ee7f8fa1 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Wed, 22 Sep 2021 19:31:54 +0800 Subject: [PATCH] Init once for IndexCoord (#8028) Signed-off-by: cai.zhang --- internal/indexcoord/index_coord.go | 184 ++++++++++++++++------------- 1 file changed, 99 insertions(+), 85 deletions(-) diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index ab83a5a944..8cac8a5175 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -69,7 +69,8 @@ type IndexCoord struct { nodeLock sync.RWMutex - once sync.Once + initOnce sync.Once + startOnce sync.Once reqTimeoutInterval time.Duration durationInterval time.Duration @@ -110,103 +111,116 @@ func (i *IndexCoord) Register() error { } func (i *IndexCoord) Init() error { - Params.Init() - log.Debug("IndexCoord", zap.Any("etcd endpoints", Params.EtcdEndpoints)) - i.UpdateStateCode(internalpb.StateCode_Initializing) + var initErr error = nil + i.initOnce.Do(func() { + Params.Init() + log.Debug("IndexCoord", zap.Any("etcd endpoints", Params.EtcdEndpoints)) + i.UpdateStateCode(internalpb.StateCode_Initializing) - connectEtcdFn := func() error { - etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath) - if err != nil { - return err - } - metakv, err := NewMetaTable(etcdKV) - if err != nil { - return err - } - i.metaTable = metakv - return err - } - log.Debug("IndexCoord try to connect etcd") - err := retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(300)) - if err != nil { - log.Debug("IndexCoord try to connect etcd failed", zap.Error(err)) - return err - } - log.Debug("IndexCoord try to connect etcd success") - i.nodeManager = NewNodeManager() - - sessions, revision, err := i.session.GetSessions(typeutil.IndexNodeRole) - log.Debug("IndexCoord", zap.Any("session number", len(sessions)), zap.Any("revision", revision)) - if err != nil { - log.Debug("IndexCoord", zap.Any("Get IndexNode Sessions error", err)) - } - for _, session := range sessions { - session := session - go func() { - if err := i.nodeManager.AddNode(session.ServerID, session.Address); err != nil { - log.Debug("IndexCoord", zap.Any("ServerID", session.ServerID), - zap.Any("Add IndexNode error", err)) + connectEtcdFn := func() error { + etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath) + if err != nil { + return err } - }() + metakv, err := NewMetaTable(etcdKV) + if err != nil { + return err + } + i.metaTable = metakv + return err + } + log.Debug("IndexCoord try to connect etcd") + err := retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(300)) + if err != nil { + log.Debug("IndexCoord try to connect etcd failed", zap.Error(err)) + initErr = err + return + } + log.Debug("IndexCoord try to connect etcd success") + i.nodeManager = NewNodeManager() - } - log.Debug("IndexCoord", zap.Any("IndexNode number", len(i.nodeManager.nodeClients))) - i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1) - nodeTasks := i.metaTable.GetNodeTaskStats() - for nodeID, taskNum := range nodeTasks { - i.nodeManager.pq.UpdatePriority(nodeID, taskNum) - } + sessions, revision, err := i.session.GetSessions(typeutil.IndexNodeRole) + log.Debug("IndexCoord", zap.Any("session number", len(sessions)), zap.Any("revision", revision)) + if err != nil { + log.Debug("IndexCoord", zap.Any("Get IndexNode Sessions error", err)) + initErr = err + return + } + for _, session := range sessions { + session := session + go func() { + if err := i.nodeManager.AddNode(session.ServerID, session.Address); err != nil { + log.Debug("IndexCoord", zap.Any("ServerID", session.ServerID), + zap.Any("Add IndexNode error", err)) + } + }() - //init idAllocator - kvRootPath := Params.KvRootPath - etcdKV, err := tsoutil.NewTSOKVBase(Params.EtcdEndpoints, kvRootPath, "index_gid") - if err != nil { - log.Debug("IndexCoord TSOKVBase initialize failed", zap.Error(err)) - } + } + log.Debug("IndexCoord", zap.Any("IndexNode number", len(i.nodeManager.nodeClients))) + i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1) + nodeTasks := i.metaTable.GetNodeTaskStats() + for nodeID, taskNum := range nodeTasks { + i.nodeManager.pq.UpdatePriority(nodeID, taskNum) + } - i.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", etcdKV) - if err := i.idAllocator.Initialize(); err != nil { - log.Debug("IndexCoord idAllocator initialize failed", zap.Error(err)) - return err - } + //init idAllocator + kvRootPath := Params.KvRootPath + etcdKV, err := tsoutil.NewTSOKVBase(Params.EtcdEndpoints, kvRootPath, "index_gid") + if err != nil { + log.Debug("IndexCoord TSOKVBase initialize failed", zap.Error(err)) + initErr = err + return + } - i.ID, err = i.idAllocator.AllocOne() - if err != nil { - return err - } + i.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", etcdKV) + if err := i.idAllocator.Initialize(); err != nil { + log.Debug("IndexCoord idAllocator initialize failed", zap.Error(err)) + return + } - option := &miniokv.Option{ - Address: Params.MinIOAddress, - AccessKeyID: Params.MinIOAccessKeyID, - SecretAccessKeyID: Params.MinIOSecretAccessKey, - UseSSL: Params.MinIOUseSSL, - BucketName: Params.MinioBucketName, - CreateBucket: true, - } + i.ID, err = i.idAllocator.AllocOne() + if err != nil { + log.Debug("IndexCoord idAllocator allocOne failed", zap.Error(err)) + initErr = err + return + } - i.kv, err = miniokv.NewMinIOKV(i.loopCtx, option) - if err != nil { - log.Debug("IndexCoord new minio kv failed", zap.Error(err)) - return err - } - log.Debug("IndexCoord new minio kv success") + option := &miniokv.Option{ + Address: Params.MinIOAddress, + AccessKeyID: Params.MinIOAccessKeyID, + SecretAccessKeyID: Params.MinIOSecretAccessKey, + UseSSL: Params.MinIOUseSSL, + BucketName: Params.MinioBucketName, + CreateBucket: true, + } - i.sched, err = NewTaskScheduler(i.loopCtx, i.idAllocator, i.kv, i.metaTable) - if err != nil { - log.Debug("IndexCoord new task scheduler failed", zap.Error(err)) - return err - } - log.Debug("IndexCoord new task scheduler success") + i.kv, err = miniokv.NewMinIOKV(i.loopCtx, option) + if err != nil { + log.Debug("IndexCoord new minio kv failed", zap.Error(err)) + initErr = err + return + } + log.Debug("IndexCoord new minio kv success") - i.metricsCacheManager = metricsinfo.NewMetricsCacheManager() + i.sched, err = NewTaskScheduler(i.loopCtx, i.idAllocator, i.kv, i.metaTable) + if err != nil { + log.Debug("IndexCoord new task scheduler failed", zap.Error(err)) + initErr = err + return + } + log.Debug("IndexCoord new task scheduler success") - log.Debug("IndexCoord assign tasks server success", zap.Error(err)) - return nil + i.metricsCacheManager = metricsinfo.NewMetricsCacheManager() + }) + + log.Debug("IndexCoord init finished", zap.Error(initErr)) + + return initErr } func (i *IndexCoord) Start() error { var startErr error = nil - i.once.Do(func() { + i.startOnce.Do(func() { i.loopWg.Add(1) go i.tsLoop() @@ -231,8 +245,8 @@ func (i *IndexCoord) Start() error { cb() } - log.Debug("IndexCoord", zap.Any("State", i.stateCode.Load())) - log.Debug("IndexCoord start successfully", zap.Error(startErr)) + i.UpdateStateCode(internalpb.StateCode_Healthy) + log.Debug("IndexCoord start successfully", zap.Any("State", i.stateCode.Load())) return startErr }