From 0f5776e5fbce7dabc468823b7f6da0f24a35bb98 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Sat, 4 Dec 2021 11:39:34 +0800 Subject: [PATCH] Index coord should not get all node from etcd each time (#12668) Signed-off-by: xiaofan-luan --- internal/indexcoord/index_coord.go | 20 ++++---------------- internal/indexcoord/meta_table.go | 1 + internal/indexcoord/node_manager.go | 13 +++++++++++-- 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index a5d407064c..c663488262 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -800,18 +800,7 @@ func (i *IndexCoord) assignTaskLoop() { log.Debug("IndexCoord assignTaskLoop ctx Done") return case <-timeTicker.C: - sessions, _, err := i.session.GetSessions(typeutil.IndexNodeRole) - if err != nil { - log.Error("IndexCoord assignTaskLoop", zap.Any("GetSessions error", err)) - } - if len(sessions) <= 0 { - log.Warn("There is no IndexNode available as this time.") - break - } - var serverIDs []int64 - for _, session := range sessions { - serverIDs = append(serverIDs, session.ServerID) - } + serverIDs := i.nodeManager.ListNode() metas := i.metaTable.GetUnassignedTasks(serverIDs) sort.Slice(metas, func(i, j int) bool { return metas[i].indexMeta.Version <= metas[j].indexMeta.Version @@ -822,7 +811,7 @@ func (i *IndexCoord) assignTaskLoop() { } for index, meta := range metas { indexBuildID := meta.indexMeta.IndexBuildID - if err = i.metaTable.UpdateVersion(indexBuildID); err != nil { + if err := i.metaTable.UpdateVersion(indexBuildID); err != nil { log.Warn("IndexCoord assignmentTasksLoop metaTable.UpdateVersion failed", zap.Error(err)) continue } @@ -847,12 +836,11 @@ func (i *IndexCoord) assignTaskLoop() { log.Warn("IndexCoord assignTask assign task to IndexNode failed") continue } - if err = i.metaTable.BuildIndex(indexBuildID, nodeID); err != nil { + if err := i.metaTable.BuildIndex(indexBuildID, nodeID); err != nil { log.Error("IndexCoord assignmentTasksLoop metaTable.BuildIndex failed", zap.Error(err)) break } - log.Debug("This task has been assigned", zap.Int64("indexBuildID", indexBuildID), - zap.Int64("The IndexNode execute this task", nodeID)) + log.Debug("This task has been assigned successfully", zap.Int64("indexBuildID", indexBuildID), zap.Int64("nodeID", nodeID)) i.nodeManager.pq.IncPriority(nodeID, 1) if index > i.taskLimit { break diff --git a/internal/indexcoord/meta_table.go b/internal/indexcoord/meta_table.go index 47e73ab890..96324e9748 100644 --- a/internal/indexcoord/meta_table.go +++ b/internal/indexcoord/meta_table.go @@ -397,6 +397,7 @@ func (mt *metaTable) GetUnassignedTasks(onlineNodeIDs []int64) []Meta { } } if !alive { + log.Info("Reassign because node no longer alive", zap.Any("onlineID", onlineNodeIDs), zap.Int64("nodeID", meta.indexMeta.NodeID)) metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), revision: meta.revision}) } } diff --git a/internal/indexcoord/node_manager.go b/internal/indexcoord/node_manager.go index 1c00f65659..83dd659eb8 100644 --- a/internal/indexcoord/node_manager.go +++ b/internal/indexcoord/node_manager.go @@ -97,17 +97,26 @@ func (nm *NodeManager) PeekClient() (UniqueID, types.IndexNode) { nm.lock.Lock() defer nm.lock.Unlock() - log.Debug("IndexCoord NodeManager PeekClient") - nodeID := nm.pq.Peek() client, ok := nm.nodeClients[nodeID] if !ok { log.Error("IndexCoord NodeManager PeekClient", zap.Any("There is no IndexNode client corresponding to NodeID", nodeID)) return nodeID, nil } + log.Debug("IndexCoord NodeManager PeekClient ", zap.Int64("node", nodeID)) return nodeID, client } +func (nm *NodeManager) ListNode() []UniqueID { + nm.lock.Lock() + defer nm.lock.Unlock() + clients := []UniqueID{} + for id := range nm.nodeClients { + clients = append(clients, id) + } + return clients +} + type indexNodeGetMetricsResponse struct { resp *milvuspb.GetMetricsResponse err error