diff --git a/internal/querycoord/impl.go b/internal/querycoord/impl.go index dd1ed6d33f..9c555cc525 100644 --- a/internal/querycoord/impl.go +++ b/internal/querycoord/impl.go @@ -228,54 +228,6 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle return status, nil } - if collectionInfo, err := qc.meta.getCollectionInfoByID(collectionID); err == nil { - // if collection has been loaded by load collection request, return success - if collectionInfo.LoadType == querypb.LoadType_LoadCollection { - if collectionInfo.ReplicaNumber != req.ReplicaNumber { - msg := fmt.Sprintf("collection has already been loaded, and the number of replicas %v is not same as the request's %v. Should release first then reload with the new number of replicas", - collectionInfo.ReplicaNumber, - req.ReplicaNumber) - log.Warn(msg, - zap.String("role", typeutil.QueryCoordRole), - zap.Int64("collectionID", collectionID), - zap.Int64("msgID", req.Base.MsgID), - zap.Int32("collectionReplicaNumber", collectionInfo.ReplicaNumber), - zap.Int32("requestReplicaNumber", req.ReplicaNumber)) - - status.ErrorCode = commonpb.ErrorCode_IllegalArgument - status.Reason = msg - - metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc() - return status, nil - } - - log.Info("collection has already been loaded, return load success directly", - zap.String("role", typeutil.QueryCoordRole), - zap.Int64("collectionID", collectionID), - zap.Int64("msgID", req.Base.MsgID)) - - metrics.QueryCoordLoadCount.WithLabelValues(metrics.SuccessLabel).Inc() - return status, nil - } - // if some partitions of the collection have been loaded by load partitions request, return error - // should release partitions first, then load collection again - if collectionInfo.LoadType == querypb.LoadType_LoadPartition { - status.ErrorCode = commonpb.ErrorCode_UnexpectedError - err = fmt.Errorf("some partitions %v of collection %d has been loaded into QueryNode, please release partitions firstly", - collectionInfo.PartitionIDs, collectionID) - status.Reason = err.Error() - log.Warn("loadCollectionRequest failed", - zap.String("role", typeutil.QueryCoordRole), - zap.Int64("collectionID", collectionID), - zap.Int64s("loaded partitionIDs", collectionInfo.PartitionIDs), - zap.Int64("msgID", req.Base.MsgID), - zap.Error(err)) - - metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc() - return status, nil - } - } - baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_GrpcRequest) loadCollectionTask := &loadCollectionTask{ baseTask: baseTask, @@ -300,16 +252,32 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle err = loadCollectionTask.waitToFinish() if err != nil { - log.Error("load collection to query nodes failed", - zap.String("role", typeutil.QueryCoordRole), - zap.Int64("collectionID", collectionID), - zap.Int64("msgID", req.Base.MsgID), - zap.Error(err)) - status.ErrorCode = commonpb.ErrorCode_UnexpectedError - status.Reason = err.Error() + if errors.Is(err, ErrCollectionLoaded) { + log.Info("collection has already been loaded, return load success directly", + zap.String("role", typeutil.QueryCoordRole), + zap.Int64("collectionID", collectionID), + zap.Int64("msgID", req.Base.MsgID)) - metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc() - return status, nil + metrics.QueryCoordLoadCount.WithLabelValues(metrics.SuccessLabel).Inc() + return status, nil + } else if errors.Is(err, ErrLoadParametersMismatch) { + status.ErrorCode = commonpb.ErrorCode_IllegalArgument + status.Reason = err.Error() + + metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc() + return status, nil + } else { + log.Error("load collection to query nodes failed", + zap.String("role", typeutil.QueryCoordRole), + zap.Int64("collectionID", collectionID), + zap.Int64("msgID", req.Base.MsgID), + zap.Error(err)) + status.ErrorCode = commonpb.ErrorCode_UnexpectedError + status.Reason = err.Error() + + metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc() + return status, nil + } } log.Info("loadCollectionRequest completed", @@ -533,75 +501,6 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti return status, nil } - if collectionInfo, err := qc.meta.getCollectionInfoByID(collectionID); err == nil { - // if the collection has been loaded into memory by load collection request, return error - // should release collection first, then load partitions again - if collectionInfo.LoadType == querypb.LoadType_LoadCollection { - status.ErrorCode = commonpb.ErrorCode_UnexpectedError - err = fmt.Errorf("collection %d has been loaded into QueryNode, please release collection firstly", collectionID) - status.Reason = err.Error() - } - - if collectionInfo.LoadType == querypb.LoadType_LoadPartition { - if collectionInfo.ReplicaNumber != req.ReplicaNumber { - msg := fmt.Sprintf("partitions has already been loaded, and the number of replicas %v is not same as the request's %v. Should release first then reload with the new number of replicas", - collectionInfo.ReplicaNumber, - req.ReplicaNumber) - log.Warn(msg, - zap.String("role", typeutil.QueryCoordRole), - zap.Int64("collectionID", collectionID), - zap.Int64("msgID", req.Base.MsgID), - zap.Int32("collectionReplicaNumber", collectionInfo.ReplicaNumber), - zap.Int32("requestReplicaNumber", req.ReplicaNumber)) - - status.ErrorCode = commonpb.ErrorCode_IllegalArgument - status.Reason = msg - - metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc() - return status, nil - } - - for _, toLoadPartitionID := range partitionIDs { - needLoad := true - for _, loadedPartitionID := range collectionInfo.PartitionIDs { - if toLoadPartitionID == loadedPartitionID { - needLoad = false - break - } - } - if needLoad { - // if new partitions need to be loaded, return error - // should release partitions first, then load partitions again - status.ErrorCode = commonpb.ErrorCode_UnexpectedError - err = fmt.Errorf("some partitions %v of collection %d has been loaded into QueryNode, please release partitions firstly", - collectionInfo.PartitionIDs, collectionID) - status.Reason = err.Error() - } - } - } - - if status.ErrorCode != commonpb.ErrorCode_Success { - log.Warn("loadPartitionRequest failed", - zap.String("role", typeutil.QueryCoordRole), - zap.Int64("collectionID", collectionID), - zap.Int64s("partitionIDs", partitionIDs), - zap.Int64("msgID", req.Base.MsgID), - zap.Error(err)) - - metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc() - return status, nil - } - - log.Info("loadPartitionRequest completed, all partitions to load have already been loaded into memory", - zap.String("role", typeutil.QueryCoordRole), - zap.Int64("collectionID", req.CollectionID), - zap.Int64s("partitionIDs", partitionIDs), - zap.Int64("msgID", req.Base.MsgID)) - - metrics.QueryCoordLoadCount.WithLabelValues(metrics.SuccessLabel).Inc() - return status, nil - } - baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_GrpcRequest) loadPartitionTask := &loadPartitionTask{ baseTask: baseTask, @@ -627,17 +526,34 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti err = loadPartitionTask.waitToFinish() if err != nil { - status.ErrorCode = commonpb.ErrorCode_UnexpectedError - status.Reason = err.Error() - log.Error("loadPartitionRequest failed", - zap.String("role", typeutil.QueryCoordRole), - zap.Int64("collectionID", req.CollectionID), - zap.Int64s("partitionIDs", partitionIDs), - zap.Int64("msgID", req.Base.MsgID), - zap.Error(err)) + if errors.Is(err, ErrCollectionLoaded) { + log.Info("loadPartitionRequest completed, all partitions to load have already been loaded into memory", + zap.String("role", typeutil.QueryCoordRole), + zap.Int64("collectionID", req.CollectionID), + zap.Int64s("partitionIDs", req.PartitionIDs), + zap.Int64("msgID", req.Base.MsgID)) - metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc() - return status, nil + metrics.QueryCoordLoadCount.WithLabelValues(metrics.SuccessLabel).Inc() + return status, nil + } else if errors.Is(err, ErrLoadParametersMismatch) { + status.ErrorCode = commonpb.ErrorCode_IllegalArgument + status.Reason = err.Error() + + metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc() + return status, nil + } else { + status.ErrorCode = commonpb.ErrorCode_UnexpectedError + status.Reason = err.Error() + log.Error("loadPartitionRequest failed", + zap.String("role", typeutil.QueryCoordRole), + zap.Int64("collectionID", req.CollectionID), + zap.Int64s("partitionIDs", partitionIDs), + zap.Int64("msgID", req.Base.MsgID), + zap.Error(err)) + + metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc() + return status, nil + } } log.Info("loadPartitionRequest completed", diff --git a/internal/querycoord/impl_test.go b/internal/querycoord/impl_test.go index 00a3ed45b2..daf559445a 100644 --- a/internal/querycoord/impl_test.go +++ b/internal/querycoord/impl_test.go @@ -885,7 +885,7 @@ func Test_LoadCollectionAndLoadPartitions(t *testing.T) { // second load defaultPartitionID status, err = queryCoord.LoadPartitions(ctx, loadPartitionReq) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) + assert.Equal(t, commonpb.ErrorCode_IllegalArgument, status.ErrorCode) assert.Nil(t, err) node.stop() @@ -1141,7 +1141,7 @@ func Test_RepeatedLoadDifferentPartitions(t *testing.T) { ReplicaNumber: 1, } status, err = queryCoord.LoadPartitions(ctx, failLoadRequest) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) + assert.Equal(t, commonpb.ErrorCode_IllegalArgument, status.ErrorCode) assert.Nil(t, err) node.stop() @@ -1187,7 +1187,7 @@ func Test_LoadPartitionsAndLoadCollection(t *testing.T) { // second load defaultCollectionID status, err = queryCoord.LoadCollection(ctx, loadCollectionReq) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) + assert.Equal(t, commonpb.ErrorCode_IllegalArgument, status.ErrorCode) assert.Nil(t, err) node.stop() diff --git a/internal/querycoord/mock_querynode_server_test.go b/internal/querycoord/mock_querynode_server_test.go index be5fc406f1..ba547d9c0c 100644 --- a/internal/querycoord/mock_querynode_server_test.go +++ b/internal/querycoord/mock_querynode_server_test.go @@ -275,6 +275,12 @@ func (qs *queryNodeServerMock) GetMetrics(ctx context.Context, req *milvuspb.Get if err != nil { return nil, err } + + // check whether the memory usage has been set + if len(response.Response) > 0 { + return response, nil + } + if response.Status.ErrorCode != commonpb.ErrorCode_Success { return nil, errors.New("query node do task failed") } diff --git a/internal/querycoord/query_coord_test.go b/internal/querycoord/query_coord_test.go index 8be0e9ebe7..0c7c87b6a6 100644 --- a/internal/querycoord/query_coord_test.go +++ b/internal/querycoord/query_coord_test.go @@ -38,10 +38,12 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/etcd" + "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/sessionutil" ) @@ -353,11 +355,12 @@ func TestHandoffSegmentLoop(t *testing.T) { waitTaskFinalState(handoffTask, taskExpired) }) + // genReleaseCollectionTask(baseCtx, queryCoord) + queryCoord.meta.releaseCollection(defaultCollectionID) loadCollectionTask := genLoadCollectionTask(baseCtx, queryCoord) err = queryCoord.scheduler.Enqueue(loadCollectionTask) assert.Nil(t, err) waitTaskFinalState(loadCollectionTask, taskExpired) - queryCoord.meta.setLoadType(defaultCollectionID, querypb.LoadType_LoadCollection) t.Run("Test handoffGrowingSegment", func(t *testing.T) { infos := queryCoord.meta.showSegmentInfos(defaultCollectionID, nil) @@ -578,34 +581,35 @@ func TestLoadBalanceSegmentLoop(t *testing.T) { assert.Nil(t, err) waitTaskFinalState(loadCollectionTask, taskExpired) - partitionID := defaultPartitionID - for { - req := &querypb.LoadPartitionsRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_LoadPartitions, + memoryStress := uint64(1 << 10) + queryNode1.getMetrics = func() (*milvuspb.GetMetricsResponse, error) { + nodeInfo := metricsinfo.QueryNodeInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{ + HardwareInfos: metricsinfo.HardwareMetrics{ + Memory: 1 << 30, + MemoryUsage: memoryStress, + }, }, - CollectionID: defaultCollectionID, - PartitionIDs: []UniqueID{partitionID}, - Schema: genDefaultCollectionSchema(false), - ReplicaNumber: 1, } - baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_GrpcRequest) - loadPartitionTask := &loadPartitionTask{ - baseTask: baseTask, - LoadPartitionsRequest: req, - broker: queryCoord.broker, - cluster: queryCoord.cluster, - meta: queryCoord.meta, - } - err = queryCoord.scheduler.Enqueue(loadPartitionTask) - assert.Nil(t, err) - waitTaskFinalState(loadPartitionTask, taskExpired) + + nodeInfoResp, err := metricsinfo.MarshalComponentInfos(nodeInfo) + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Response: nodeInfoResp, + }, err + } + for { nodeInfo, err := queryCoord.cluster.GetNodeInfoByID(queryNode1.queryNodeID) assert.Nil(t, err) if nodeInfo.(*queryNode).memUsageRate >= Params.QueryCoordCfg.OverloadedMemoryThresholdPercentage { + log.Debug("memory stress kicks the threshold") break } - partitionID++ + + log.Debug("memory stress...", + zap.Uint64("stress", memoryStress), + zap.Float64("memoryUsage", nodeInfo.(*queryNode).memUsageRate)) + memoryStress *= 2 } queryNode2, err := startQueryNodeServer(baseCtx) diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index c59ff1990f..4b27a55381 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -57,6 +57,11 @@ const ( MaxSendSizeToEtcd = 200000 ) +var ( + ErrCollectionLoaded = errors.New("CollectionLoaded") + ErrLoadParametersMismatch = errors.New("LoadParametersMismatch") +) + type taskState int const ( @@ -349,7 +354,51 @@ func (lct *loadCollectionTask) preExecute(ctx context.Context) error { collectionID := lct.CollectionID schema := lct.Schema + lct.setResultInfo(nil) + + collectionInfo, err := lct.meta.getCollectionInfoByID(collectionID) + if err == nil { + // if collection has been loaded by load collection request, return success + if collectionInfo.LoadType == querypb.LoadType_LoadCollection { + if collectionInfo.ReplicaNumber != lct.ReplicaNumber { + msg := fmt.Sprintf("collection has already been loaded, and the number of replicas %v is not same as the request's %v. Should release first then reload with the new number of replicas", + collectionInfo.ReplicaNumber, + lct.ReplicaNumber) + log.Warn(msg, + zap.String("role", typeutil.QueryCoordRole), + zap.Int64("collectionID", collectionID), + zap.Int64("msgID", lct.Base.MsgID), + zap.Int32("collectionReplicaNumber", collectionInfo.ReplicaNumber), + zap.Int32("requestReplicaNumber", lct.ReplicaNumber)) + + lct.result = &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_IllegalArgument, + Reason: msg, + } + + return fmt.Errorf(msg+" [%w]", ErrLoadParametersMismatch) + } + + return ErrCollectionLoaded + } else if collectionInfo.LoadType == querypb.LoadType_LoadPartition { + // if some partitions of the collection have been loaded by load partitions request, return error + // should release partitions first, then load collection again + err = fmt.Errorf("some partitions %v of collection %d has been loaded into QueryNode, please release partitions firstly", + collectionInfo.PartitionIDs, collectionID) + log.Warn("loadCollectionRequest failed", + zap.String("role", typeutil.QueryCoordRole), + zap.Int64("collectionID", collectionID), + zap.Int64s("loaded partitionIDs", collectionInfo.PartitionIDs), + zap.Int64("msgID", lct.Base.MsgID), + zap.Error(err)) + + lct.setResultInfo(err) + metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc() + return fmt.Errorf(err.Error()+" [%w]", ErrLoadParametersMismatch) + } + } + log.Info("start do loadCollectionTask", zap.Int64("msgID", lct.getTaskID()), zap.Int64("collectionID", collectionID), @@ -555,11 +604,6 @@ func (lct *loadCollectionTask) postExecute(ctx context.Context) error { collectionID := lct.CollectionID if lct.getResultInfo().ErrorCode != commonpb.ErrorCode_Success { lct.clearChildTasks() - err := lct.meta.releaseCollection(collectionID) - if err != nil { - log.Error("loadCollectionTask: occur error when release collection info from meta", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lct.Base.MsgID), zap.Error(err)) - panic(err) - } } log.Info("loadCollectionTask postExecute done", @@ -793,8 +837,74 @@ func (lpt *loadPartitionTask) preExecute(context.Context) error { lpt.ReplicaNumber = 1 } - collectionID := lpt.CollectionID lpt.setResultInfo(nil) + + collectionID := lpt.CollectionID + collectionInfo, err := lpt.meta.getCollectionInfoByID(collectionID) + if err == nil { + // if the collection has been loaded into memory by load collection request, return error + // should release collection first, then load partitions again + if collectionInfo.LoadType == querypb.LoadType_LoadCollection { + err = fmt.Errorf("collection %d has been loaded into QueryNode, please release collection firstly", collectionID) + lpt.setResultInfo(err) + } else if collectionInfo.LoadType == querypb.LoadType_LoadPartition { + if collectionInfo.ReplicaNumber != lpt.ReplicaNumber { + msg := fmt.Sprintf("partitions has already been loaded, and the number of replicas %v is not same as the request's %v. Should release first then reload with the new number of replicas", + collectionInfo.ReplicaNumber, + lpt.ReplicaNumber) + log.Warn(msg, + zap.String("role", typeutil.QueryCoordRole), + zap.Int64("collectionID", collectionID), + zap.Int64("msgID", lpt.Base.MsgID), + zap.Int32("collectionReplicaNumber", collectionInfo.ReplicaNumber), + zap.Int32("requestReplicaNumber", lpt.ReplicaNumber)) + + lpt.result = &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_IllegalArgument, + Reason: msg, + } + + return fmt.Errorf(msg+" [%w]", ErrLoadParametersMismatch) + } + + for _, toLoadPartitionID := range lpt.PartitionIDs { + needLoad := true + for _, loadedPartitionID := range collectionInfo.PartitionIDs { + if toLoadPartitionID == loadedPartitionID { + needLoad = false + break + } + } + if needLoad { + // if new partitions need to be loaded, return error + // should release partitions first, then load partitions again + err = fmt.Errorf("some partitions %v of collection %d has been loaded into QueryNode, please release partitions firstly", + collectionInfo.PartitionIDs, collectionID) + lpt.setResultInfo(err) + } + } + } + + if lpt.result.ErrorCode != commonpb.ErrorCode_Success { + log.Warn("loadPartitionRequest failed", + zap.String("role", typeutil.QueryCoordRole), + zap.Int64("collectionID", collectionID), + zap.Int64s("partitionIDs", lpt.PartitionIDs), + zap.Int64("msgID", lpt.Base.MsgID), + zap.Error(err)) + + return fmt.Errorf(err.Error()+" [%w]", ErrLoadParametersMismatch) + } + + log.Info("loadPartitionRequest completed, all partitions to load have already been loaded into memory", + zap.String("role", typeutil.QueryCoordRole), + zap.Int64("collectionID", lpt.CollectionID), + zap.Int64s("partitionIDs", lpt.PartitionIDs), + zap.Int64("msgID", lpt.Base.MsgID)) + + return ErrCollectionLoaded + } + log.Info("start do loadPartitionTask", zap.Int64("msgID", lpt.getTaskID()), zap.Int64("collectionID", collectionID)) @@ -991,11 +1101,6 @@ func (lpt *loadPartitionTask) postExecute(ctx context.Context) error { partitionIDs := lpt.PartitionIDs if lpt.getResultInfo().ErrorCode != commonpb.ErrorCode_Success { lpt.clearChildTasks() - err := lpt.meta.releaseCollection(collectionID) - if err != nil { - log.Error("loadPartitionTask: occur error when release collection info from meta", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lpt.Base.MsgID), zap.Error(err)) - panic(err) - } } log.Info("loadPartitionTask postExecute done", diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index f4b1290df0..450aa18fbe 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -537,7 +537,9 @@ func (scheduler *TaskScheduler) processTask(t task) error { span.LogFields(oplog.Int64("processTask: scheduler process PreExecute", t.getTaskID())) err = t.preExecute(ctx) if err != nil { - log.Warn("failed to preExecute task", zap.Error(err)) + log.Error("failed to preExecute task", + zap.Error(err)) + t.setResultInfo(err) return err } taskInfoKey = fmt.Sprintf("%s/%d", taskInfoPrefix, t.getTaskID()) @@ -638,8 +640,35 @@ func (scheduler *TaskScheduler) scheduleLoop() { if triggerTask.getState() == taskUndo || triggerTask.getState() == taskDoing { err = scheduler.processTask(triggerTask) if err != nil { - log.Warn("scheduleLoop: process triggerTask failed", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Error(err)) - alreadyNotify = false + // Checks in preExecute not passed, + // for only LoadCollection & LoadPartitions + if errors.Is(err, ErrCollectionLoaded) || + errors.Is(err, ErrLoadParametersMismatch) { + log.Warn("scheduleLoop: preExecute failed", + zap.Int64("triggerTaskID", triggerTask.getTaskID()), + zap.Error(err)) + + triggerTask.setState(taskExpired) + if errors.Is(err, ErrLoadParametersMismatch) { + triggerTask.setState(taskFailed) + } + + triggerTask.notify(err) + + err = removeTaskFromKVFn(triggerTask) + if err != nil { + log.Error("scheduleLoop: error when remove trigger and internal tasks from etcd", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Error(err)) + triggerTask.setResultInfo(err) + } else { + log.Info("scheduleLoop: trigger task done and delete from etcd", zap.Int64("triggerTaskID", triggerTask.getTaskID())) + } + + triggerTask.finishContext() + continue + } else { + log.Warn("scheduleLoop: process triggerTask failed", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Error(err)) + alreadyNotify = false + } } } if triggerTask.msgType() != commonpb.MsgType_LoadCollection && triggerTask.msgType() != commonpb.MsgType_LoadPartitions { diff --git a/internal/querycoord/task_test.go b/internal/querycoord/task_test.go index 5f956701e9..01a2ff1c2e 100644 --- a/internal/querycoord/task_test.go +++ b/internal/querycoord/task_test.go @@ -275,9 +275,14 @@ func genWatchDeltaChannelTask(ctx context.Context, queryCoord *QueryCoord, nodeI func waitTaskFinalState(t task, state taskState) { for { - if t.getState() == state { + currentState := t.getState() + if currentState == state { break } + + log.Debug("task state not match es", + zap.Int("actual", int(currentState)), + zap.Int("expected", int(state))) } } @@ -287,9 +292,6 @@ func TestTriggerTask(t *testing.T) { queryCoord, err := startQueryCoord(ctx) assert.Nil(t, err) - err = queryCoord.meta.addCollection(defaultCollectionID, querypb.LoadType_LoadCollection, genDefaultCollectionSchema(false)) - assert.NoError(t, err) - node1, err := startQueryNodeServer(ctx) assert.Nil(t, err) node2, err := startQueryNodeServer(ctx) @@ -565,6 +567,7 @@ func Test_LoadPartitionExecuteFailAfterLoadCollection(t *testing.T) { waitTaskFinalState(loadPartitionTask, taskFailed) + log.Debug("test done") node.stop() queryCoord.Stop() err = removeAllSession() @@ -585,14 +588,18 @@ func Test_ReleaseCollectionExecuteFail(t *testing.T) { assert.NoError(t, err) waitQueryNodeOnline(queryCoord.cluster, node.queryNodeID) + releaseCollectionTask := genReleaseCollectionTask(ctx, queryCoord) + notify := make(chan struct{}) + go func() { + waitTaskFinalState(releaseCollectionTask, taskDone) + node.setRPCInterface(&node.releaseCollection, returnSuccessResult) + waitTaskFinalState(releaseCollectionTask, taskExpired) + close(notify) + }() err = queryCoord.scheduler.Enqueue(releaseCollectionTask) assert.Nil(t, err) - - waitTaskFinalState(releaseCollectionTask, taskDone) - - node.setRPCInterface(&node.releaseCollection, returnSuccessResult) - waitTaskFinalState(releaseCollectionTask, taskExpired) + <-notify node.stop() queryCoord.Stop()