From 1fbdafc943f15247ebb7c0ea14248ac32bd8feb0 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 23 Jun 2022 10:40:13 +0800 Subject: [PATCH] IndexCoord does not assign task to IndexNode prematurely (#17717) Signed-off-by: Cai.Zhang --- internal/indexcoord/index_coord.go | 29 ++++---- internal/indexcoord/meta_table.go | 16 ++++- internal/indexcoord/meta_table_test.go | 81 ++++++++++++++++++++++ internal/indexcoord/peek_client_policy.go | 2 +- internal/indexcoord/priority_queue_test.go | 4 +- 5 files changed, 113 insertions(+), 19 deletions(-) diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index 8a97139e33..df66906e2f 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -22,7 +22,6 @@ import ( "math/rand" "os" "path" - "sort" "strconv" "sync" "sync/atomic" @@ -118,7 +117,7 @@ func NewIndexCoord(ctx context.Context, factory dependency.Factory) (*IndexCoord loopCancel: cancel, reqTimeoutInterval: time.Second * 10, durationInterval: time.Second * 10, - assignTaskInterval: time.Second * 3, + assignTaskInterval: time.Second * 1, taskLimit: 20, factory: factory, } @@ -986,9 +985,7 @@ func (i *IndexCoord) assignTaskLoop() { continue } metas := i.metaTable.GetUnassignedTasks(serverIDs) - sort.Slice(metas, func(i, j int) bool { - return metas[i].indexMeta.Version <= metas[j].indexMeta.Version - }) + // only log if we find unassigned tasks if len(metas) != 0 { log.Debug("IndexCoord find unassigned tasks ", zap.Int("Unassigned tasks number", len(metas)), zap.Int64s("Available IndexNode IDs", serverIDs)) @@ -996,6 +993,18 @@ func (i *IndexCoord) assignTaskLoop() { for index, meta := range metas { indexBuildID := meta.indexMeta.IndexBuildID segID := meta.indexMeta.Req.SegmentID + nodeID, builderClient := i.nodeManager.PeekClient(meta) + if builderClient == nil && nodeID == -1 { + log.Warn("there is no indexnode online") + break + } + + if builderClient == nil && nodeID == 0 { + log.Warn("The memory of all indexnodes does not meet the requirements") + continue + } + log.Debug("IndexCoord PeekClient success", zap.Int64("nodeID", nodeID)) + if err := i.tryAcquireSegmentReferLock(ctx, indexBuildID, []UniqueID{segID}); err != nil { log.Warn("IndexCoord try to acquire segment reference lock failed, maybe this segment has been compacted", zap.Int64("segID", segID), zap.Int64("buildID", indexBuildID), zap.Error(err)) @@ -1007,16 +1016,6 @@ func (i *IndexCoord) assignTaskLoop() { } log.Debug("The version of the task has been updated", zap.Int64("indexBuildID", indexBuildID)) - nodeID, builderClient := i.nodeManager.PeekClient(meta) - if builderClient == nil && nodeID == -1 { - log.Warn("there is no indexnode online") - break - } - if builderClient == nil && nodeID == 0 { - log.Warn("The memory of all indexnodes does not meet the requirements") - continue - } - log.Debug("IndexCoord PeekClient success", zap.Int64("nodeID", nodeID)) req := &indexpb.CreateIndexRequest{ IndexBuildID: indexBuildID, IndexName: meta.indexMeta.Req.IndexName, diff --git a/internal/indexcoord/meta_table.go b/internal/indexcoord/meta_table.go index 1bfa6e1a76..44d269aecf 100644 --- a/internal/indexcoord/meta_table.go +++ b/internal/indexcoord/meta_table.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "path" + "sort" "strconv" "sync" @@ -439,6 +440,16 @@ func (mt *metaTable) GetUnusedIndexFiles(limit int) []Meta { return metas } +func sortMetaPolicy(metas []Meta) []Meta { + // the larger the segment, the higher the priority + sort.Slice(metas, func(i, j int) bool { + return metas[i].indexMeta.Version < metas[j].indexMeta.Version || + (metas[i].indexMeta.Version == metas[j].indexMeta.Version && + metas[i].indexMeta.Req.NumRows > metas[j].indexMeta.Req.NumRows) + }) + return metas +} + // GetUnassignedTasks get the unassigned tasks. func (mt *metaTable) GetUnassignedTasks(onlineNodeIDs []int64) []Meta { mt.lock.RLock() @@ -446,6 +457,9 @@ func (mt *metaTable) GetUnassignedTasks(onlineNodeIDs []int64) []Meta { var metas []Meta for _, meta := range mt.indexBuildID2Meta { + if meta.indexMeta.MarkDeleted { + continue + } if meta.indexMeta.State == commonpb.IndexState_Unissued { metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), revision: meta.revision}) continue @@ -465,7 +479,7 @@ func (mt *metaTable) GetUnassignedTasks(onlineNodeIDs []int64) []Meta { metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), revision: meta.revision}) } } - return metas + return sortMetaPolicy(metas) } // HasSameReq determine whether there are same indexing tasks. diff --git a/internal/indexcoord/meta_table_test.go b/internal/indexcoord/meta_table_test.go index 8cee5a7528..2d35229dbc 100644 --- a/internal/indexcoord/meta_table_test.go +++ b/internal/indexcoord/meta_table_test.go @@ -347,3 +347,84 @@ func TestMetaTable_Error(t *testing.T) { err = etcdKV.RemoveWithPrefix("indexes/") assert.Nil(t, err) } + +func TestMetaTable_GetUnassignedTasks(t *testing.T) { + mt := metaTable{ + indexBuildID2Meta: map[UniqueID]Meta{ + 1: { + indexMeta: &indexpb.IndexMeta{ + IndexBuildID: 1, + Req: &indexpb.BuildIndexRequest{ + NumRows: 10, + }, + Version: 1, + NodeID: 1, + State: commonpb.IndexState_Unissued, + }, + }, + 2: { + indexMeta: &indexpb.IndexMeta{ + IndexBuildID: 2, + Req: &indexpb.BuildIndexRequest{ + NumRows: 100, + }, + Version: 1, + NodeID: 1, + State: commonpb.IndexState_Unissued, + }, + }, + 3: { + indexMeta: &indexpb.IndexMeta{ + IndexBuildID: 3, + Req: &indexpb.BuildIndexRequest{ + NumRows: 1000, + }, + Version: 2, + NodeID: 1, + State: commonpb.IndexState_Unissued, + }, + }, + 4: { + indexMeta: &indexpb.IndexMeta{ + IndexBuildID: 4, + Req: &indexpb.BuildIndexRequest{ + NumRows: 1000, + }, + Version: 2, + NodeID: 2, + State: commonpb.IndexState_Finished, + }, + }, + 5: { + indexMeta: &indexpb.IndexMeta{ + IndexBuildID: 5, + Req: &indexpb.BuildIndexRequest{ + NumRows: 1000, + }, + Version: 3, + NodeID: 2, + State: commonpb.IndexState_InProgress, + }, + }, + 6: { + indexMeta: &indexpb.IndexMeta{ + IndexBuildID: 5, + Req: &indexpb.BuildIndexRequest{ + NumRows: 1000, + }, + Version: 1, + NodeID: 1, + State: commonpb.IndexState_InProgress, + MarkDeleted: true, + }, + }, + }, + } + + metas := mt.GetUnassignedTasks([]UniqueID{1, 3, 4}) + assert.Equal(t, 4, len(metas)) + assert.Equal(t, int64(2), metas[0].indexMeta.IndexBuildID) + assert.Equal(t, int64(1), metas[1].indexMeta.IndexBuildID) + assert.Equal(t, int64(3), metas[2].indexMeta.IndexBuildID) + assert.Equal(t, int64(5), metas[3].indexMeta.IndexBuildID) +} diff --git a/internal/indexcoord/peek_client_policy.go b/internal/indexcoord/peek_client_policy.go index a667442bec..0f258d8d9f 100644 --- a/internal/indexcoord/peek_client_policy.go +++ b/internal/indexcoord/peek_client_policy.go @@ -33,7 +33,7 @@ func PeekClientV0(memorySize uint64, indexParams []*commonpb.KeyValuePair, func PeekClientV1(memorySize uint64, indexParams []*commonpb.KeyValuePair, typeParams []*commonpb.KeyValuePair, pq *PriorityQueue) UniqueID { for i := range pq.items { - if pq.items[i].totalMem > memorySize { + if pq.items[i].totalMem > memorySize && pq.items[i].priority < 2 { return pq.items[i].key } } diff --git a/internal/indexcoord/priority_queue_test.go b/internal/indexcoord/priority_queue_test.go index 3627706e7c..9e1f6a7d53 100644 --- a/internal/indexcoord/priority_queue_test.go +++ b/internal/indexcoord/priority_queue_test.go @@ -33,7 +33,7 @@ func newPriorityQueue() *PriorityQueue { for i := 1; i <= QueueLen; i++ { item := &PQItem{ key: UniqueID(i), - priority: i, + priority: i - 1, index: i - 1, totalMem: 1000, } @@ -99,7 +99,7 @@ func TestPriorityQueue_SetMemory(t *testing.T) { for i := 0; i < QueueLen; i++ { item := &PQItem{ key: UniqueID(i), - priority: i, + priority: 0, index: i, totalMem: 1000, }