From 81d3546b7ce83f62850001795071a1a7a8bbe682 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Fri, 11 Jun 2021 16:53:42 +0800 Subject: [PATCH] Remove indexnode in indexservice when indexnode crashed (#5719) * Remove indexnode in indexservice when indexnode crashed Signed-off-by: xiaocai2333 --- internal/indexservice/indexservice.go | 5 +++++ internal/indexservice/meta_table.go | 14 ++++++++++++++ internal/indexservice/node_mgr.go | 1 + 3 files changed, 20 insertions(+) diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go index ec305bf474..0e17faf9c2 100644 --- a/internal/indexservice/indexservice.go +++ b/internal/indexservice/indexservice.go @@ -493,6 +493,7 @@ func (i *IndexService) assignmentTasksLoop() { i.assignChan <- []UniqueID{indexBuildID} continue } + i.nodeTasks.assignTask(nodeID, indexBuildID) req := &indexpb.CreateIndexRequest{ IndexBuildID: indexBuildID, IndexName: meta.indexMeta.Req.IndexName, @@ -506,9 +507,11 @@ func (i *IndexService) assignmentTasksLoop() { resp, err := builderClient.CreateIndex(ctx, req) if err != nil { log.Debug("IndexService assignmentTasksLoop builderClient.CreateIndex failed", zap.Error(err)) + continue } if resp.ErrorCode != commonpb.ErrorCode_Success { log.Debug("IndexService assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.Reason)) + continue } if err = i.metaTable.BuildIndex(indexBuildID, nodeID); err != nil { log.Debug("IndexService assignmentTasksLoop metaTable.BuildIndex failed", zap.Error(err)) @@ -537,8 +540,10 @@ func (i *IndexService) watchNodeLoop() { log.Debug("IndexService watchNodeLoop SessionAddEvent", zap.Any("serverID", serverID)) case sessionutil.SessionDelEvent: serverID := event.Session.ServerID + i.removeNode(serverID) log.Debug("IndexService watchNodeLoop SessionDelEvent ", zap.Any("serverID", serverID)) indexBuildIDs := i.nodeTasks.getTasksByNodeID(serverID) + log.Debug("IndexNode crashed", zap.Any("IndexNode ID", serverID), zap.Any("task IDs", indexBuildIDs)) i.assignChan <- indexBuildIDs i.nodeTasks.delete(serverID) } diff --git a/internal/indexservice/meta_table.go b/internal/indexservice/meta_table.go index dc8fc75c43..61ca74f46a 100644 --- a/internal/indexservice/meta_table.go +++ b/internal/indexservice/meta_table.go @@ -434,6 +434,8 @@ func (mt *metaTable) LoadMetaFromETCD(indexBuildID int64, revision int64) bool { type nodeTasks struct { nodeID2Tasks map[int64][]UniqueID + + lock sync.RWMutex } func NewNodeTasks() *nodeTasks { @@ -443,6 +445,9 @@ func NewNodeTasks() *nodeTasks { } func (nt *nodeTasks) getTasksByNodeID(nodeID int64) []UniqueID { + nt.lock.Lock() + defer nt.lock.Unlock() + indexBuildIDs, ok := nt.nodeID2Tasks[nodeID] if !ok { return nil @@ -451,6 +456,9 @@ func (nt *nodeTasks) getTasksByNodeID(nodeID int64) []UniqueID { } func (nt *nodeTasks) assignTask(serverID int64, indexBuildID UniqueID) { + nt.lock.Lock() + defer nt.lock.Unlock() + indexBuildIDs, ok := nt.nodeID2Tasks[serverID] if !ok { var IDs []UniqueID @@ -463,6 +471,9 @@ func (nt *nodeTasks) assignTask(serverID int64, indexBuildID UniqueID) { } func (nt *nodeTasks) finishTask(indexBuildID UniqueID) { + nt.lock.Lock() + defer nt.lock.Unlock() + for serverID := range nt.nodeID2Tasks { for i, buildID := range nt.nodeID2Tasks[serverID] { if buildID == indexBuildID { @@ -473,5 +484,8 @@ func (nt *nodeTasks) finishTask(indexBuildID UniqueID) { } func (nt *nodeTasks) delete(serverID int64) { + nt.lock.Lock() + defer nt.lock.Unlock() + delete(nt.nodeID2Tasks, serverID) } diff --git a/internal/indexservice/node_mgr.go b/internal/indexservice/node_mgr.go index 6d29734ddb..7735f7a11b 100644 --- a/internal/indexservice/node_mgr.go +++ b/internal/indexservice/node_mgr.go @@ -28,6 +28,7 @@ import ( func (i *IndexService) removeNode(nodeID UniqueID) { i.nodeLock.Lock() defer i.nodeLock.Unlock() + log.Debug("IndexService", zap.Any("Remove node with ID", nodeID)) i.nodeClients.Remove(nodeID) }