From d666c2a190560cfd725cf79fac37c5c21b32018f Mon Sep 17 00:00:00 2001 From: xige-16 Date: Thu, 18 Nov 2021 19:35:15 +0800 Subject: [PATCH] Fix txn limit error when querycoord remove task from etcd (#12084) Signed-off-by: xige-16 --- internal/querycoord/cluster.go | 1 + internal/querycoord/querynode.go | 5 ++++- internal/querycoord/segment_allocator.go | 6 +++-- internal/querycoord/task.go | 2 ++ internal/querycoord/task_scheduler.go | 28 ++++++++++++++---------- 5 files changed, 28 insertions(+), 14 deletions(-) diff --git a/internal/querycoord/cluster.go b/internal/querycoord/cluster.go index 9e2bfe76ae..f34b74d353 100644 --- a/internal/querycoord/cluster.go +++ b/internal/querycoord/cluster.go @@ -222,6 +222,7 @@ func (c *queryNodeCluster) reloadFromKV() error { log.Debug("ReloadFromKV: failed to add queryNode meta to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error())) return err } + log.Debug("ReloadFromKV: reload collection info from etcd", zap.Any("info", collectionInfo)) } } return nil diff --git a/internal/querycoord/querynode.go b/internal/querycoord/querynode.go index 92ba6c4017..063b059f71 100644 --- a/internal/querycoord/querynode.go +++ b/internal/querycoord/querynode.go @@ -171,6 +171,7 @@ func (qn *queryNode) addCollection(collectionID UniqueID, schema *schemapb.Colle log.Error("AddCollection: save collectionInfo error", zap.Any("error", err.Error()), zap.Int64("collectionID", collectionID)) return err } + log.Debug("queryNode addCollection", zap.Int64("nodeID", qn.id), zap.Any("collectionInfo", newCollection)) } return nil @@ -215,6 +216,7 @@ func (qn *queryNode) addPartition(collectionID UniqueID, partitionID UniqueID) e if err != nil { log.Error("AddPartition: save collectionInfo error", zap.Any("error", err.Error()), zap.Int64("collectionID", collectionID)) } + log.Debug("queryNode add partition", zap.Int64("nodeID", qn.id), zap.Any("collectionInfo", col)) return nil } return errors.New("AddPartition: can't find collection when add partition") @@ -255,11 +257,12 @@ func (qn *queryNode) releasePartitionsInfo(collectionID UniqueID, partitionIDs [ } } info.PartitionIDs = newPartitionIDs - err := removeNodeCollectionInfo(collectionID, qn.id, qn.kvClient) + err := saveNodeCollectionInfo(collectionID, info, qn.id, qn.kvClient) if err != nil { log.Error("ReleasePartitionsInfo: remove collectionInfo error", zap.Any("error", err.Error()), zap.Int64("collectionID", collectionID)) return err } + log.Debug("queryNode release partition info", zap.Int64("nodeID", qn.id), zap.Any("info", info)) } return nil diff --git a/internal/querycoord/segment_allocator.go b/internal/querycoord/segment_allocator.go index 564d857839..a089347e65 100644 --- a/internal/querycoord/segment_allocator.go +++ b/internal/querycoord/segment_allocator.go @@ -98,7 +98,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme if len(reqs) == 0 { return nil } - + log.Debug("shuffleSegmentsToQueryNodeV2: start estimate the size of loadReqs") dataSizePerReq := make([]int64, 0) for _, req := range reqs { sizeOfReq, err := cluster.estimateSegmentsSize(req) @@ -107,7 +107,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme } dataSizePerReq = append(dataSizePerReq, sizeOfReq) } - + log.Debug("shuffleSegmentsToQueryNodeV2: estimate the size of loadReqs end") for { // online nodes map and totalMem, usedMem, memUsage of every node totalMem := make(map[int64]uint64) @@ -143,6 +143,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme // update totalMem, memUsage, memUsageRate totalMem[nodeID], memUsage[nodeID], memUsageRate[nodeID] = queryNodeInfo.totalMem, queryNodeInfo.memUsage, queryNodeInfo.memUsageRate } + log.Debug("shuffleSegmentsToQueryNodeV2: num of availableNodes", zap.Int("size", len(availableNodes))) if len(availableNodes) > 0 { nodeIDSlice := make([]int64, 0, len(availableNodes)) for nodeID := range availableNodes { @@ -176,6 +177,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme } if allocateSegmentsDone { + log.Debug("shuffleSegmentsToQueryNodeV2: shuffle segment to query node success") return nil } } diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 7e1386a0b2..342d9a57e2 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -1718,6 +1718,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { return err } + log.Debug("loadBalanceTask: partitions to recover", zap.Int64s("partitionIDs", partitionIDs)) for _, partitionID := range partitionIDs { getRecoveryInfo := &datapb.GetRecoveryInfoRequest{ Base: &commonpb.MsgBase{ @@ -2079,6 +2080,7 @@ func assignInternalTask(ctx context.Context, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64) ([]task, error) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() + log.Debug("assignInternalTask: start assign task to query node") internalTasks := make([]task, 0) err := cluster.allocateSegmentsToQueryNode(ctx, loadSegmentRequests, wait, excludeNodeIDs, includeNodeIDs) if err != nil { diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index da6e755471..444ac61f0f 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -631,21 +631,27 @@ func (scheduler *TaskScheduler) scheduleLoop() { } removeTaskFromKVFn := func(triggerTask task) error { - keys := make([]string, 0) - taskKey := fmt.Sprintf("%s/%d", triggerTaskPrefix, triggerTask.getTaskID()) - stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, triggerTask.getTaskID()) - keys = append(keys, taskKey) - keys = append(keys, stateKey) childTasks := triggerTask.getChildTask() for _, t := range childTasks { - taskKey = fmt.Sprintf("%s/%d", activeTaskPrefix, t.getTaskID()) - stateKey = fmt.Sprintf("%s/%d", taskInfoPrefix, t.getTaskID()) - keys = append(keys, taskKey) - keys = append(keys, stateKey) + childTaskKeys := make([]string, 0) + taskKey := fmt.Sprintf("%s/%d", activeTaskPrefix, t.getTaskID()) + stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, t.getTaskID()) + childTaskKeys = append(childTaskKeys, taskKey) + childTaskKeys = append(childTaskKeys, stateKey) + err := scheduler.client.MultiRemove(childTaskKeys) + // after recover, child Task's state will be TaskDone, will not be repeat executed + if err != nil { + panic(err) + } } - err := scheduler.client.MultiRemove(keys) + triggerTaskKeys := make([]string, 0) + taskKey := fmt.Sprintf("%s/%d", triggerTaskPrefix, triggerTask.getTaskID()) + stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, triggerTask.getTaskID()) + triggerTaskKeys = append(triggerTaskKeys, taskKey) + triggerTaskKeys = append(triggerTaskKeys, stateKey) + err := scheduler.client.MultiRemove(triggerTaskKeys) if err != nil { - return err + panic(err) } return nil }