mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Fix the scheduler enqueues concurrent load tasks (#17950)
Signed-off-by: yah01 <yang.cen@zilliz.com> related to #17850
This commit is contained in:
parent
566f5b5766
commit
4def7047b4
@ -228,54 +228,6 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
|
||||
return status, nil
|
||||
}
|
||||
|
||||
if collectionInfo, err := qc.meta.getCollectionInfoByID(collectionID); err == nil {
|
||||
// if collection has been loaded by load collection request, return success
|
||||
if collectionInfo.LoadType == querypb.LoadType_LoadCollection {
|
||||
if collectionInfo.ReplicaNumber != req.ReplicaNumber {
|
||||
msg := fmt.Sprintf("collection has already been loaded, and the number of replicas %v is not same as the request's %v. Should release first then reload with the new number of replicas",
|
||||
collectionInfo.ReplicaNumber,
|
||||
req.ReplicaNumber)
|
||||
log.Warn(msg,
|
||||
zap.String("role", typeutil.QueryCoordRole),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("msgID", req.Base.MsgID),
|
||||
zap.Int32("collectionReplicaNumber", collectionInfo.ReplicaNumber),
|
||||
zap.Int32("requestReplicaNumber", req.ReplicaNumber))
|
||||
|
||||
status.ErrorCode = commonpb.ErrorCode_IllegalArgument
|
||||
status.Reason = msg
|
||||
|
||||
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
|
||||
return status, nil
|
||||
}
|
||||
|
||||
log.Info("collection has already been loaded, return load success directly",
|
||||
zap.String("role", typeutil.QueryCoordRole),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("msgID", req.Base.MsgID))
|
||||
|
||||
metrics.QueryCoordLoadCount.WithLabelValues(metrics.SuccessLabel).Inc()
|
||||
return status, nil
|
||||
}
|
||||
// if some partitions of the collection have been loaded by load partitions request, return error
|
||||
// should release partitions first, then load collection again
|
||||
if collectionInfo.LoadType == querypb.LoadType_LoadPartition {
|
||||
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||
err = fmt.Errorf("some partitions %v of collection %d has been loaded into QueryNode, please release partitions firstly",
|
||||
collectionInfo.PartitionIDs, collectionID)
|
||||
status.Reason = err.Error()
|
||||
log.Warn("loadCollectionRequest failed",
|
||||
zap.String("role", typeutil.QueryCoordRole),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64s("loaded partitionIDs", collectionInfo.PartitionIDs),
|
||||
zap.Int64("msgID", req.Base.MsgID),
|
||||
zap.Error(err))
|
||||
|
||||
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
|
||||
return status, nil
|
||||
}
|
||||
}
|
||||
|
||||
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_GrpcRequest)
|
||||
loadCollectionTask := &loadCollectionTask{
|
||||
baseTask: baseTask,
|
||||
@ -300,16 +252,32 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
|
||||
|
||||
err = loadCollectionTask.waitToFinish()
|
||||
if err != nil {
|
||||
log.Error("load collection to query nodes failed",
|
||||
zap.String("role", typeutil.QueryCoordRole),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("msgID", req.Base.MsgID),
|
||||
zap.Error(err))
|
||||
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||
status.Reason = err.Error()
|
||||
if errors.Is(err, ErrCollectionLoaded) {
|
||||
log.Info("collection has already been loaded, return load success directly",
|
||||
zap.String("role", typeutil.QueryCoordRole),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("msgID", req.Base.MsgID))
|
||||
|
||||
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
|
||||
return status, nil
|
||||
metrics.QueryCoordLoadCount.WithLabelValues(metrics.SuccessLabel).Inc()
|
||||
return status, nil
|
||||
} else if errors.Is(err, ErrLoadParametersMismatch) {
|
||||
status.ErrorCode = commonpb.ErrorCode_IllegalArgument
|
||||
status.Reason = err.Error()
|
||||
|
||||
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
|
||||
return status, nil
|
||||
} else {
|
||||
log.Error("load collection to query nodes failed",
|
||||
zap.String("role", typeutil.QueryCoordRole),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("msgID", req.Base.MsgID),
|
||||
zap.Error(err))
|
||||
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||
status.Reason = err.Error()
|
||||
|
||||
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
|
||||
return status, nil
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("loadCollectionRequest completed",
|
||||
@ -533,75 +501,6 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
|
||||
return status, nil
|
||||
}
|
||||
|
||||
if collectionInfo, err := qc.meta.getCollectionInfoByID(collectionID); err == nil {
|
||||
// if the collection has been loaded into memory by load collection request, return error
|
||||
// should release collection first, then load partitions again
|
||||
if collectionInfo.LoadType == querypb.LoadType_LoadCollection {
|
||||
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||
err = fmt.Errorf("collection %d has been loaded into QueryNode, please release collection firstly", collectionID)
|
||||
status.Reason = err.Error()
|
||||
}
|
||||
|
||||
if collectionInfo.LoadType == querypb.LoadType_LoadPartition {
|
||||
if collectionInfo.ReplicaNumber != req.ReplicaNumber {
|
||||
msg := fmt.Sprintf("partitions has already been loaded, and the number of replicas %v is not same as the request's %v. Should release first then reload with the new number of replicas",
|
||||
collectionInfo.ReplicaNumber,
|
||||
req.ReplicaNumber)
|
||||
log.Warn(msg,
|
||||
zap.String("role", typeutil.QueryCoordRole),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("msgID", req.Base.MsgID),
|
||||
zap.Int32("collectionReplicaNumber", collectionInfo.ReplicaNumber),
|
||||
zap.Int32("requestReplicaNumber", req.ReplicaNumber))
|
||||
|
||||
status.ErrorCode = commonpb.ErrorCode_IllegalArgument
|
||||
status.Reason = msg
|
||||
|
||||
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
|
||||
return status, nil
|
||||
}
|
||||
|
||||
for _, toLoadPartitionID := range partitionIDs {
|
||||
needLoad := true
|
||||
for _, loadedPartitionID := range collectionInfo.PartitionIDs {
|
||||
if toLoadPartitionID == loadedPartitionID {
|
||||
needLoad = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if needLoad {
|
||||
// if new partitions need to be loaded, return error
|
||||
// should release partitions first, then load partitions again
|
||||
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||
err = fmt.Errorf("some partitions %v of collection %d has been loaded into QueryNode, please release partitions firstly",
|
||||
collectionInfo.PartitionIDs, collectionID)
|
||||
status.Reason = err.Error()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
log.Warn("loadPartitionRequest failed",
|
||||
zap.String("role", typeutil.QueryCoordRole),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64s("partitionIDs", partitionIDs),
|
||||
zap.Int64("msgID", req.Base.MsgID),
|
||||
zap.Error(err))
|
||||
|
||||
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
|
||||
return status, nil
|
||||
}
|
||||
|
||||
log.Info("loadPartitionRequest completed, all partitions to load have already been loaded into memory",
|
||||
zap.String("role", typeutil.QueryCoordRole),
|
||||
zap.Int64("collectionID", req.CollectionID),
|
||||
zap.Int64s("partitionIDs", partitionIDs),
|
||||
zap.Int64("msgID", req.Base.MsgID))
|
||||
|
||||
metrics.QueryCoordLoadCount.WithLabelValues(metrics.SuccessLabel).Inc()
|
||||
return status, nil
|
||||
}
|
||||
|
||||
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_GrpcRequest)
|
||||
loadPartitionTask := &loadPartitionTask{
|
||||
baseTask: baseTask,
|
||||
@ -627,17 +526,34 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
|
||||
|
||||
err = loadPartitionTask.waitToFinish()
|
||||
if err != nil {
|
||||
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||
status.Reason = err.Error()
|
||||
log.Error("loadPartitionRequest failed",
|
||||
zap.String("role", typeutil.QueryCoordRole),
|
||||
zap.Int64("collectionID", req.CollectionID),
|
||||
zap.Int64s("partitionIDs", partitionIDs),
|
||||
zap.Int64("msgID", req.Base.MsgID),
|
||||
zap.Error(err))
|
||||
if errors.Is(err, ErrCollectionLoaded) {
|
||||
log.Info("loadPartitionRequest completed, all partitions to load have already been loaded into memory",
|
||||
zap.String("role", typeutil.QueryCoordRole),
|
||||
zap.Int64("collectionID", req.CollectionID),
|
||||
zap.Int64s("partitionIDs", req.PartitionIDs),
|
||||
zap.Int64("msgID", req.Base.MsgID))
|
||||
|
||||
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
|
||||
return status, nil
|
||||
metrics.QueryCoordLoadCount.WithLabelValues(metrics.SuccessLabel).Inc()
|
||||
return status, nil
|
||||
} else if errors.Is(err, ErrLoadParametersMismatch) {
|
||||
status.ErrorCode = commonpb.ErrorCode_IllegalArgument
|
||||
status.Reason = err.Error()
|
||||
|
||||
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
|
||||
return status, nil
|
||||
} else {
|
||||
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||
status.Reason = err.Error()
|
||||
log.Error("loadPartitionRequest failed",
|
||||
zap.String("role", typeutil.QueryCoordRole),
|
||||
zap.Int64("collectionID", req.CollectionID),
|
||||
zap.Int64s("partitionIDs", partitionIDs),
|
||||
zap.Int64("msgID", req.Base.MsgID),
|
||||
zap.Error(err))
|
||||
|
||||
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
|
||||
return status, nil
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("loadPartitionRequest completed",
|
||||
|
||||
@ -885,7 +885,7 @@ func Test_LoadCollectionAndLoadPartitions(t *testing.T) {
|
||||
|
||||
// second load defaultPartitionID
|
||||
status, err = queryCoord.LoadPartitions(ctx, loadPartitionReq)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
|
||||
assert.Equal(t, commonpb.ErrorCode_IllegalArgument, status.ErrorCode)
|
||||
assert.Nil(t, err)
|
||||
|
||||
node.stop()
|
||||
@ -1141,7 +1141,7 @@ func Test_RepeatedLoadDifferentPartitions(t *testing.T) {
|
||||
ReplicaNumber: 1,
|
||||
}
|
||||
status, err = queryCoord.LoadPartitions(ctx, failLoadRequest)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
|
||||
assert.Equal(t, commonpb.ErrorCode_IllegalArgument, status.ErrorCode)
|
||||
assert.Nil(t, err)
|
||||
|
||||
node.stop()
|
||||
@ -1187,7 +1187,7 @@ func Test_LoadPartitionsAndLoadCollection(t *testing.T) {
|
||||
|
||||
// second load defaultCollectionID
|
||||
status, err = queryCoord.LoadCollection(ctx, loadCollectionReq)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
|
||||
assert.Equal(t, commonpb.ErrorCode_IllegalArgument, status.ErrorCode)
|
||||
assert.Nil(t, err)
|
||||
|
||||
node.stop()
|
||||
|
||||
@ -275,6 +275,12 @@ func (qs *queryNodeServerMock) GetMetrics(ctx context.Context, req *milvuspb.Get
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// check whether the memory usage has been set
|
||||
if len(response.Response) > 0 {
|
||||
return response, nil
|
||||
}
|
||||
|
||||
if response.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return nil, errors.New("query node do task failed")
|
||||
}
|
||||
|
||||
@ -38,10 +38,12 @@ import (
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/util"
|
||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
"github.com/milvus-io/milvus/internal/util/etcd"
|
||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
)
|
||||
|
||||
@ -353,11 +355,12 @@ func TestHandoffSegmentLoop(t *testing.T) {
|
||||
waitTaskFinalState(handoffTask, taskExpired)
|
||||
})
|
||||
|
||||
// genReleaseCollectionTask(baseCtx, queryCoord)
|
||||
queryCoord.meta.releaseCollection(defaultCollectionID)
|
||||
loadCollectionTask := genLoadCollectionTask(baseCtx, queryCoord)
|
||||
err = queryCoord.scheduler.Enqueue(loadCollectionTask)
|
||||
assert.Nil(t, err)
|
||||
waitTaskFinalState(loadCollectionTask, taskExpired)
|
||||
queryCoord.meta.setLoadType(defaultCollectionID, querypb.LoadType_LoadCollection)
|
||||
|
||||
t.Run("Test handoffGrowingSegment", func(t *testing.T) {
|
||||
infos := queryCoord.meta.showSegmentInfos(defaultCollectionID, nil)
|
||||
@ -578,34 +581,35 @@ func TestLoadBalanceSegmentLoop(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
waitTaskFinalState(loadCollectionTask, taskExpired)
|
||||
|
||||
partitionID := defaultPartitionID
|
||||
for {
|
||||
req := &querypb.LoadPartitionsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_LoadPartitions,
|
||||
memoryStress := uint64(1 << 10)
|
||||
queryNode1.getMetrics = func() (*milvuspb.GetMetricsResponse, error) {
|
||||
nodeInfo := metricsinfo.QueryNodeInfos{
|
||||
BaseComponentInfos: metricsinfo.BaseComponentInfos{
|
||||
HardwareInfos: metricsinfo.HardwareMetrics{
|
||||
Memory: 1 << 30,
|
||||
MemoryUsage: memoryStress,
|
||||
},
|
||||
},
|
||||
CollectionID: defaultCollectionID,
|
||||
PartitionIDs: []UniqueID{partitionID},
|
||||
Schema: genDefaultCollectionSchema(false),
|
||||
ReplicaNumber: 1,
|
||||
}
|
||||
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_GrpcRequest)
|
||||
loadPartitionTask := &loadPartitionTask{
|
||||
baseTask: baseTask,
|
||||
LoadPartitionsRequest: req,
|
||||
broker: queryCoord.broker,
|
||||
cluster: queryCoord.cluster,
|
||||
meta: queryCoord.meta,
|
||||
}
|
||||
err = queryCoord.scheduler.Enqueue(loadPartitionTask)
|
||||
assert.Nil(t, err)
|
||||
waitTaskFinalState(loadPartitionTask, taskExpired)
|
||||
|
||||
nodeInfoResp, err := metricsinfo.MarshalComponentInfos(nodeInfo)
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
Response: nodeInfoResp,
|
||||
}, err
|
||||
}
|
||||
for {
|
||||
nodeInfo, err := queryCoord.cluster.GetNodeInfoByID(queryNode1.queryNodeID)
|
||||
assert.Nil(t, err)
|
||||
if nodeInfo.(*queryNode).memUsageRate >= Params.QueryCoordCfg.OverloadedMemoryThresholdPercentage {
|
||||
log.Debug("memory stress kicks the threshold")
|
||||
break
|
||||
}
|
||||
partitionID++
|
||||
|
||||
log.Debug("memory stress...",
|
||||
zap.Uint64("stress", memoryStress),
|
||||
zap.Float64("memoryUsage", nodeInfo.(*queryNode).memUsageRate))
|
||||
memoryStress *= 2
|
||||
}
|
||||
|
||||
queryNode2, err := startQueryNodeServer(baseCtx)
|
||||
|
||||
@ -57,6 +57,11 @@ const (
|
||||
MaxSendSizeToEtcd = 200000
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCollectionLoaded = errors.New("CollectionLoaded")
|
||||
ErrLoadParametersMismatch = errors.New("LoadParametersMismatch")
|
||||
)
|
||||
|
||||
type taskState int
|
||||
|
||||
const (
|
||||
@ -349,7 +354,51 @@ func (lct *loadCollectionTask) preExecute(ctx context.Context) error {
|
||||
|
||||
collectionID := lct.CollectionID
|
||||
schema := lct.Schema
|
||||
|
||||
lct.setResultInfo(nil)
|
||||
|
||||
collectionInfo, err := lct.meta.getCollectionInfoByID(collectionID)
|
||||
if err == nil {
|
||||
// if collection has been loaded by load collection request, return success
|
||||
if collectionInfo.LoadType == querypb.LoadType_LoadCollection {
|
||||
if collectionInfo.ReplicaNumber != lct.ReplicaNumber {
|
||||
msg := fmt.Sprintf("collection has already been loaded, and the number of replicas %v is not same as the request's %v. Should release first then reload with the new number of replicas",
|
||||
collectionInfo.ReplicaNumber,
|
||||
lct.ReplicaNumber)
|
||||
log.Warn(msg,
|
||||
zap.String("role", typeutil.QueryCoordRole),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("msgID", lct.Base.MsgID),
|
||||
zap.Int32("collectionReplicaNumber", collectionInfo.ReplicaNumber),
|
||||
zap.Int32("requestReplicaNumber", lct.ReplicaNumber))
|
||||
|
||||
lct.result = &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_IllegalArgument,
|
||||
Reason: msg,
|
||||
}
|
||||
|
||||
return fmt.Errorf(msg+" [%w]", ErrLoadParametersMismatch)
|
||||
}
|
||||
|
||||
return ErrCollectionLoaded
|
||||
} else if collectionInfo.LoadType == querypb.LoadType_LoadPartition {
|
||||
// if some partitions of the collection have been loaded by load partitions request, return error
|
||||
// should release partitions first, then load collection again
|
||||
err = fmt.Errorf("some partitions %v of collection %d has been loaded into QueryNode, please release partitions firstly",
|
||||
collectionInfo.PartitionIDs, collectionID)
|
||||
log.Warn("loadCollectionRequest failed",
|
||||
zap.String("role", typeutil.QueryCoordRole),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64s("loaded partitionIDs", collectionInfo.PartitionIDs),
|
||||
zap.Int64("msgID", lct.Base.MsgID),
|
||||
zap.Error(err))
|
||||
|
||||
lct.setResultInfo(err)
|
||||
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
|
||||
return fmt.Errorf(err.Error()+" [%w]", ErrLoadParametersMismatch)
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("start do loadCollectionTask",
|
||||
zap.Int64("msgID", lct.getTaskID()),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
@ -555,11 +604,6 @@ func (lct *loadCollectionTask) postExecute(ctx context.Context) error {
|
||||
collectionID := lct.CollectionID
|
||||
if lct.getResultInfo().ErrorCode != commonpb.ErrorCode_Success {
|
||||
lct.clearChildTasks()
|
||||
err := lct.meta.releaseCollection(collectionID)
|
||||
if err != nil {
|
||||
log.Error("loadCollectionTask: occur error when release collection info from meta", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lct.Base.MsgID), zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("loadCollectionTask postExecute done",
|
||||
@ -793,8 +837,74 @@ func (lpt *loadPartitionTask) preExecute(context.Context) error {
|
||||
lpt.ReplicaNumber = 1
|
||||
}
|
||||
|
||||
collectionID := lpt.CollectionID
|
||||
lpt.setResultInfo(nil)
|
||||
|
||||
collectionID := lpt.CollectionID
|
||||
collectionInfo, err := lpt.meta.getCollectionInfoByID(collectionID)
|
||||
if err == nil {
|
||||
// if the collection has been loaded into memory by load collection request, return error
|
||||
// should release collection first, then load partitions again
|
||||
if collectionInfo.LoadType == querypb.LoadType_LoadCollection {
|
||||
err = fmt.Errorf("collection %d has been loaded into QueryNode, please release collection firstly", collectionID)
|
||||
lpt.setResultInfo(err)
|
||||
} else if collectionInfo.LoadType == querypb.LoadType_LoadPartition {
|
||||
if collectionInfo.ReplicaNumber != lpt.ReplicaNumber {
|
||||
msg := fmt.Sprintf("partitions has already been loaded, and the number of replicas %v is not same as the request's %v. Should release first then reload with the new number of replicas",
|
||||
collectionInfo.ReplicaNumber,
|
||||
lpt.ReplicaNumber)
|
||||
log.Warn(msg,
|
||||
zap.String("role", typeutil.QueryCoordRole),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("msgID", lpt.Base.MsgID),
|
||||
zap.Int32("collectionReplicaNumber", collectionInfo.ReplicaNumber),
|
||||
zap.Int32("requestReplicaNumber", lpt.ReplicaNumber))
|
||||
|
||||
lpt.result = &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_IllegalArgument,
|
||||
Reason: msg,
|
||||
}
|
||||
|
||||
return fmt.Errorf(msg+" [%w]", ErrLoadParametersMismatch)
|
||||
}
|
||||
|
||||
for _, toLoadPartitionID := range lpt.PartitionIDs {
|
||||
needLoad := true
|
||||
for _, loadedPartitionID := range collectionInfo.PartitionIDs {
|
||||
if toLoadPartitionID == loadedPartitionID {
|
||||
needLoad = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if needLoad {
|
||||
// if new partitions need to be loaded, return error
|
||||
// should release partitions first, then load partitions again
|
||||
err = fmt.Errorf("some partitions %v of collection %d has been loaded into QueryNode, please release partitions firstly",
|
||||
collectionInfo.PartitionIDs, collectionID)
|
||||
lpt.setResultInfo(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if lpt.result.ErrorCode != commonpb.ErrorCode_Success {
|
||||
log.Warn("loadPartitionRequest failed",
|
||||
zap.String("role", typeutil.QueryCoordRole),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64s("partitionIDs", lpt.PartitionIDs),
|
||||
zap.Int64("msgID", lpt.Base.MsgID),
|
||||
zap.Error(err))
|
||||
|
||||
return fmt.Errorf(err.Error()+" [%w]", ErrLoadParametersMismatch)
|
||||
}
|
||||
|
||||
log.Info("loadPartitionRequest completed, all partitions to load have already been loaded into memory",
|
||||
zap.String("role", typeutil.QueryCoordRole),
|
||||
zap.Int64("collectionID", lpt.CollectionID),
|
||||
zap.Int64s("partitionIDs", lpt.PartitionIDs),
|
||||
zap.Int64("msgID", lpt.Base.MsgID))
|
||||
|
||||
return ErrCollectionLoaded
|
||||
}
|
||||
|
||||
log.Info("start do loadPartitionTask",
|
||||
zap.Int64("msgID", lpt.getTaskID()),
|
||||
zap.Int64("collectionID", collectionID))
|
||||
@ -991,11 +1101,6 @@ func (lpt *loadPartitionTask) postExecute(ctx context.Context) error {
|
||||
partitionIDs := lpt.PartitionIDs
|
||||
if lpt.getResultInfo().ErrorCode != commonpb.ErrorCode_Success {
|
||||
lpt.clearChildTasks()
|
||||
err := lpt.meta.releaseCollection(collectionID)
|
||||
if err != nil {
|
||||
log.Error("loadPartitionTask: occur error when release collection info from meta", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lpt.Base.MsgID), zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("loadPartitionTask postExecute done",
|
||||
|
||||
@ -537,7 +537,9 @@ func (scheduler *TaskScheduler) processTask(t task) error {
|
||||
span.LogFields(oplog.Int64("processTask: scheduler process PreExecute", t.getTaskID()))
|
||||
err = t.preExecute(ctx)
|
||||
if err != nil {
|
||||
log.Warn("failed to preExecute task", zap.Error(err))
|
||||
log.Error("failed to preExecute task",
|
||||
zap.Error(err))
|
||||
t.setResultInfo(err)
|
||||
return err
|
||||
}
|
||||
taskInfoKey = fmt.Sprintf("%s/%d", taskInfoPrefix, t.getTaskID())
|
||||
@ -638,8 +640,35 @@ func (scheduler *TaskScheduler) scheduleLoop() {
|
||||
if triggerTask.getState() == taskUndo || triggerTask.getState() == taskDoing {
|
||||
err = scheduler.processTask(triggerTask)
|
||||
if err != nil {
|
||||
log.Warn("scheduleLoop: process triggerTask failed", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Error(err))
|
||||
alreadyNotify = false
|
||||
// Checks in preExecute not passed,
|
||||
// for only LoadCollection & LoadPartitions
|
||||
if errors.Is(err, ErrCollectionLoaded) ||
|
||||
errors.Is(err, ErrLoadParametersMismatch) {
|
||||
log.Warn("scheduleLoop: preExecute failed",
|
||||
zap.Int64("triggerTaskID", triggerTask.getTaskID()),
|
||||
zap.Error(err))
|
||||
|
||||
triggerTask.setState(taskExpired)
|
||||
if errors.Is(err, ErrLoadParametersMismatch) {
|
||||
triggerTask.setState(taskFailed)
|
||||
}
|
||||
|
||||
triggerTask.notify(err)
|
||||
|
||||
err = removeTaskFromKVFn(triggerTask)
|
||||
if err != nil {
|
||||
log.Error("scheduleLoop: error when remove trigger and internal tasks from etcd", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Error(err))
|
||||
triggerTask.setResultInfo(err)
|
||||
} else {
|
||||
log.Info("scheduleLoop: trigger task done and delete from etcd", zap.Int64("triggerTaskID", triggerTask.getTaskID()))
|
||||
}
|
||||
|
||||
triggerTask.finishContext()
|
||||
continue
|
||||
} else {
|
||||
log.Warn("scheduleLoop: process triggerTask failed", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Error(err))
|
||||
alreadyNotify = false
|
||||
}
|
||||
}
|
||||
}
|
||||
if triggerTask.msgType() != commonpb.MsgType_LoadCollection && triggerTask.msgType() != commonpb.MsgType_LoadPartitions {
|
||||
|
||||
@ -275,9 +275,14 @@ func genWatchDeltaChannelTask(ctx context.Context, queryCoord *QueryCoord, nodeI
|
||||
|
||||
func waitTaskFinalState(t task, state taskState) {
|
||||
for {
|
||||
if t.getState() == state {
|
||||
currentState := t.getState()
|
||||
if currentState == state {
|
||||
break
|
||||
}
|
||||
|
||||
log.Debug("task state not match es",
|
||||
zap.Int("actual", int(currentState)),
|
||||
zap.Int("expected", int(state)))
|
||||
}
|
||||
}
|
||||
|
||||
@ -287,9 +292,6 @@ func TestTriggerTask(t *testing.T) {
|
||||
queryCoord, err := startQueryCoord(ctx)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = queryCoord.meta.addCollection(defaultCollectionID, querypb.LoadType_LoadCollection, genDefaultCollectionSchema(false))
|
||||
assert.NoError(t, err)
|
||||
|
||||
node1, err := startQueryNodeServer(ctx)
|
||||
assert.Nil(t, err)
|
||||
node2, err := startQueryNodeServer(ctx)
|
||||
@ -565,6 +567,7 @@ func Test_LoadPartitionExecuteFailAfterLoadCollection(t *testing.T) {
|
||||
|
||||
waitTaskFinalState(loadPartitionTask, taskFailed)
|
||||
|
||||
log.Debug("test done")
|
||||
node.stop()
|
||||
queryCoord.Stop()
|
||||
err = removeAllSession()
|
||||
@ -585,14 +588,18 @@ func Test_ReleaseCollectionExecuteFail(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
|
||||
waitQueryNodeOnline(queryCoord.cluster, node.queryNodeID)
|
||||
|
||||
releaseCollectionTask := genReleaseCollectionTask(ctx, queryCoord)
|
||||
notify := make(chan struct{})
|
||||
go func() {
|
||||
waitTaskFinalState(releaseCollectionTask, taskDone)
|
||||
node.setRPCInterface(&node.releaseCollection, returnSuccessResult)
|
||||
waitTaskFinalState(releaseCollectionTask, taskExpired)
|
||||
close(notify)
|
||||
}()
|
||||
err = queryCoord.scheduler.Enqueue(releaseCollectionTask)
|
||||
assert.Nil(t, err)
|
||||
|
||||
waitTaskFinalState(releaseCollectionTask, taskDone)
|
||||
|
||||
node.setRPCInterface(&node.releaseCollection, returnSuccessResult)
|
||||
waitTaskFinalState(releaseCollectionTask, taskExpired)
|
||||
<-notify
|
||||
|
||||
node.stop()
|
||||
queryCoord.Stop()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user