Init once for IndexCoord (#8028)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2021-09-22 19:31:54 +08:00 committed by GitHub
parent 68f00fd68a
commit 41a6f9b5f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -69,7 +69,8 @@ type IndexCoord struct {
nodeLock sync.RWMutex
once sync.Once
initOnce sync.Once
startOnce sync.Once
reqTimeoutInterval time.Duration
durationInterval time.Duration
@ -110,103 +111,116 @@ func (i *IndexCoord) Register() error {
}
func (i *IndexCoord) Init() error {
Params.Init()
log.Debug("IndexCoord", zap.Any("etcd endpoints", Params.EtcdEndpoints))
i.UpdateStateCode(internalpb.StateCode_Initializing)
var initErr error = nil
i.initOnce.Do(func() {
Params.Init()
log.Debug("IndexCoord", zap.Any("etcd endpoints", Params.EtcdEndpoints))
i.UpdateStateCode(internalpb.StateCode_Initializing)
connectEtcdFn := func() error {
etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
if err != nil {
return err
}
metakv, err := NewMetaTable(etcdKV)
if err != nil {
return err
}
i.metaTable = metakv
return err
}
log.Debug("IndexCoord try to connect etcd")
err := retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(300))
if err != nil {
log.Debug("IndexCoord try to connect etcd failed", zap.Error(err))
return err
}
log.Debug("IndexCoord try to connect etcd success")
i.nodeManager = NewNodeManager()
sessions, revision, err := i.session.GetSessions(typeutil.IndexNodeRole)
log.Debug("IndexCoord", zap.Any("session number", len(sessions)), zap.Any("revision", revision))
if err != nil {
log.Debug("IndexCoord", zap.Any("Get IndexNode Sessions error", err))
}
for _, session := range sessions {
session := session
go func() {
if err := i.nodeManager.AddNode(session.ServerID, session.Address); err != nil {
log.Debug("IndexCoord", zap.Any("ServerID", session.ServerID),
zap.Any("Add IndexNode error", err))
connectEtcdFn := func() error {
etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
if err != nil {
return err
}
}()
metakv, err := NewMetaTable(etcdKV)
if err != nil {
return err
}
i.metaTable = metakv
return err
}
log.Debug("IndexCoord try to connect etcd")
err := retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(300))
if err != nil {
log.Debug("IndexCoord try to connect etcd failed", zap.Error(err))
initErr = err
return
}
log.Debug("IndexCoord try to connect etcd success")
i.nodeManager = NewNodeManager()
}
log.Debug("IndexCoord", zap.Any("IndexNode number", len(i.nodeManager.nodeClients)))
i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1)
nodeTasks := i.metaTable.GetNodeTaskStats()
for nodeID, taskNum := range nodeTasks {
i.nodeManager.pq.UpdatePriority(nodeID, taskNum)
}
sessions, revision, err := i.session.GetSessions(typeutil.IndexNodeRole)
log.Debug("IndexCoord", zap.Any("session number", len(sessions)), zap.Any("revision", revision))
if err != nil {
log.Debug("IndexCoord", zap.Any("Get IndexNode Sessions error", err))
initErr = err
return
}
for _, session := range sessions {
session := session
go func() {
if err := i.nodeManager.AddNode(session.ServerID, session.Address); err != nil {
log.Debug("IndexCoord", zap.Any("ServerID", session.ServerID),
zap.Any("Add IndexNode error", err))
}
}()
//init idAllocator
kvRootPath := Params.KvRootPath
etcdKV, err := tsoutil.NewTSOKVBase(Params.EtcdEndpoints, kvRootPath, "index_gid")
if err != nil {
log.Debug("IndexCoord TSOKVBase initialize failed", zap.Error(err))
}
}
log.Debug("IndexCoord", zap.Any("IndexNode number", len(i.nodeManager.nodeClients)))
i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1)
nodeTasks := i.metaTable.GetNodeTaskStats()
for nodeID, taskNum := range nodeTasks {
i.nodeManager.pq.UpdatePriority(nodeID, taskNum)
}
i.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", etcdKV)
if err := i.idAllocator.Initialize(); err != nil {
log.Debug("IndexCoord idAllocator initialize failed", zap.Error(err))
return err
}
//init idAllocator
kvRootPath := Params.KvRootPath
etcdKV, err := tsoutil.NewTSOKVBase(Params.EtcdEndpoints, kvRootPath, "index_gid")
if err != nil {
log.Debug("IndexCoord TSOKVBase initialize failed", zap.Error(err))
initErr = err
return
}
i.ID, err = i.idAllocator.AllocOne()
if err != nil {
return err
}
i.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", etcdKV)
if err := i.idAllocator.Initialize(); err != nil {
log.Debug("IndexCoord idAllocator initialize failed", zap.Error(err))
return
}
option := &miniokv.Option{
Address: Params.MinIOAddress,
AccessKeyID: Params.MinIOAccessKeyID,
SecretAccessKeyID: Params.MinIOSecretAccessKey,
UseSSL: Params.MinIOUseSSL,
BucketName: Params.MinioBucketName,
CreateBucket: true,
}
i.ID, err = i.idAllocator.AllocOne()
if err != nil {
log.Debug("IndexCoord idAllocator allocOne failed", zap.Error(err))
initErr = err
return
}
i.kv, err = miniokv.NewMinIOKV(i.loopCtx, option)
if err != nil {
log.Debug("IndexCoord new minio kv failed", zap.Error(err))
return err
}
log.Debug("IndexCoord new minio kv success")
option := &miniokv.Option{
Address: Params.MinIOAddress,
AccessKeyID: Params.MinIOAccessKeyID,
SecretAccessKeyID: Params.MinIOSecretAccessKey,
UseSSL: Params.MinIOUseSSL,
BucketName: Params.MinioBucketName,
CreateBucket: true,
}
i.sched, err = NewTaskScheduler(i.loopCtx, i.idAllocator, i.kv, i.metaTable)
if err != nil {
log.Debug("IndexCoord new task scheduler failed", zap.Error(err))
return err
}
log.Debug("IndexCoord new task scheduler success")
i.kv, err = miniokv.NewMinIOKV(i.loopCtx, option)
if err != nil {
log.Debug("IndexCoord new minio kv failed", zap.Error(err))
initErr = err
return
}
log.Debug("IndexCoord new minio kv success")
i.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
i.sched, err = NewTaskScheduler(i.loopCtx, i.idAllocator, i.kv, i.metaTable)
if err != nil {
log.Debug("IndexCoord new task scheduler failed", zap.Error(err))
initErr = err
return
}
log.Debug("IndexCoord new task scheduler success")
log.Debug("IndexCoord assign tasks server success", zap.Error(err))
return nil
i.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
})
log.Debug("IndexCoord init finished", zap.Error(initErr))
return initErr
}
func (i *IndexCoord) Start() error {
var startErr error = nil
i.once.Do(func() {
i.startOnce.Do(func() {
i.loopWg.Add(1)
go i.tsLoop()
@ -231,8 +245,8 @@ func (i *IndexCoord) Start() error {
cb()
}
log.Debug("IndexCoord", zap.Any("State", i.stateCode.Load()))
log.Debug("IndexCoord start successfully", zap.Error(startErr))
i.UpdateStateCode(internalpb.StateCode_Healthy)
log.Debug("IndexCoord start successfully", zap.Any("State", i.stateCode.Load()))
return startErr
}