From e8f53af74928ba51b60d945a0191b14ee9d4fb25 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 23 Jun 2022 19:22:15 +0800 Subject: [PATCH] When the task is canceled, reduce the load of node (#17735) Signed-off-by: Cai.Zhang --- internal/indexcoord/index_coord.go | 15 +++++++++++++-- internal/indexcoord/meta_table.go | 17 +++++++++++------ internal/indexcoord/meta_table_test.go | 4 ++-- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index df66906e2f..4a81d869b5 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -546,7 +546,12 @@ func (i *IndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexReques ret := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, } - err := i.metaTable.MarkIndexAsDeleted(req.IndexID) + nodeTasks, err := i.metaTable.MarkIndexAsDeleted(req.IndexID) + defer func() { + for nodeID, taskNum := range nodeTasks { + i.nodeManager.pq.IncPriority(nodeID, taskNum*-1) + } + }() if err != nil { ret.ErrorCode = commonpb.ErrorCode_UnexpectedError ret.Reason = err.Error() @@ -584,7 +589,13 @@ func (i *IndexCoord) RemoveIndex(ctx context.Context, req *indexpb.RemoveIndexRe ret := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, } - err := i.metaTable.MarkIndexAsDeletedByBuildIDs(req.GetBuildIDs()) + + nodeTasks, err := i.metaTable.MarkIndexAsDeletedByBuildIDs(req.GetBuildIDs()) + defer func() { + for nodeID, taskNum := range nodeTasks { + i.nodeManager.pq.IncPriority(nodeID, -1*taskNum) + } + }() if err != nil { log.Error("IndexCoord MarkIndexAsDeletedByBuildIDs failed", zap.Int64s("buildIDs", req.GetBuildIDs()), zap.Error(err)) diff --git a/internal/indexcoord/meta_table.go b/internal/indexcoord/meta_table.go index 44d269aecf..24c4aeb73c 100644 --- a/internal/indexcoord/meta_table.go +++ b/internal/indexcoord/meta_table.go @@ -249,12 +249,13 @@ func (mt *metaTable) UpdateVersion(indexBuildID UniqueID) error { } // MarkIndexAsDeleted will mark the corresponding index as deleted, and recycleUnusedIndexFiles will recycle these tasks. -func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) error { +func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) (map[int64]int, error) { mt.lock.Lock() defer mt.lock.Unlock() log.Debug("IndexCoord metaTable MarkIndexAsDeleted ", zap.Int64("indexID", indexID)) + node2TaskNum := make(map[int64]int) for _, meta := range mt.indexBuildID2Meta { if meta.indexMeta.Req.IndexID == indexID && !meta.indexMeta.MarkDeleted { meta.indexMeta.MarkDeleted = true @@ -273,21 +274,24 @@ func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) error { } err2 := retry.Do(context.TODO(), fn, retry.Attempts(5)) if err2 != nil { - return err2 + return node2TaskNum, err2 } + } else { + node2TaskNum[meta.indexMeta.NodeID]++ } } } - return nil + return node2TaskNum, nil } -func (mt *metaTable) MarkIndexAsDeletedByBuildIDs(buildIDs []UniqueID) error { +func (mt *metaTable) MarkIndexAsDeletedByBuildIDs(buildIDs []UniqueID) (map[int64]int, error) { mt.lock.Lock() defer mt.lock.Unlock() log.Debug("IndexCoord metaTable MarkIndexAsDeletedByBuildIDs ", zap.Int64s("buildIDs", buildIDs)) + node2TaskNum := make(map[int64]int) for _, buildID := range buildIDs { if meta, ok := mt.indexBuildID2Meta[buildID]; ok { clonedMeta := &Meta{ @@ -310,12 +314,13 @@ func (mt *metaTable) MarkIndexAsDeletedByBuildIDs(buildIDs []UniqueID) error { } err2 := retry.Do(context.TODO(), fn, retry.Attempts(5)) if err2 != nil { - return err2 + return node2TaskNum, err2 } } + node2TaskNum[meta.indexMeta.NodeID]++ } } - return nil + return node2TaskNum, nil } // GetIndexStates gets the index states from meta table. diff --git a/internal/indexcoord/meta_table_test.go b/internal/indexcoord/meta_table_test.go index 2d35229dbc..526ed03aeb 100644 --- a/internal/indexcoord/meta_table_test.go +++ b/internal/indexcoord/meta_table_test.go @@ -135,7 +135,7 @@ func TestMetaTable(t *testing.T) { key = "indexes/" + strconv.FormatInt(indexMeta1.IndexBuildID, 10) err = etcdKV.Save(key, string(value)) assert.Nil(t, err) - err = metaTable.MarkIndexAsDeleted(indexMeta1.Req.IndexID) + _, err = metaTable.MarkIndexAsDeleted(indexMeta1.Req.IndexID) assert.Nil(t, err) }) @@ -153,7 +153,7 @@ func TestMetaTable(t *testing.T) { key = path.Join(indexFilePrefix, strconv.FormatInt(indexMeta1.IndexBuildID, 10)) err = etcdKV.Save(key, string(value)) assert.Nil(t, err) - err = metaTable.MarkIndexAsDeletedByBuildIDs([]UniqueID{indexMeta1.IndexBuildID}) + _, err = metaTable.MarkIndexAsDeletedByBuildIDs([]UniqueID{indexMeta1.IndexBuildID}) assert.Nil(t, err) })