From 1a4732bb190aebd6889b42ffb204559ee637d5d1 Mon Sep 17 00:00:00 2001 From: yah01 Date: Fri, 10 Mar 2023 17:15:54 +0800 Subject: [PATCH] Use new errors to handle load failures cache (#22672) Signed-off-by: yah01 --- internal/querycoordv2/handlers.go | 8 +-- .../querycoordv2/meta/coordinator_broker.go | 8 +-- .../querycoordv2/meta/failed_load_cache.go | 63 ++++++++++------- .../meta/failed_load_cache_test.go | 29 ++++---- internal/querycoordv2/server_test.go | 9 +-- internal/querycoordv2/services.go | 58 +++++++-------- internal/querycoordv2/services_test.go | 70 +++++++++---------- internal/querycoordv2/task/executor.go | 9 +-- internal/querycoordv2/task/scheduler.go | 9 +-- internal/querynode/load_segment_task_test.go | 4 +- internal/querynode/segment_loader.go | 19 ++--- 11 files changed, 147 insertions(+), 139 deletions(-) diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index 18bf8bc3da..d62585ea28 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -238,14 +238,14 @@ func (s *Server) fillMetricsWithNodes(topo *metricsinfo.QueryClusterTopology, no continue } - if metric.resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if metric.resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("invalid metrics of query node was found", - zap.Any("error_code", metric.resp.Status.ErrorCode), - zap.Any("error_reason", metric.resp.Status.Reason)) + zap.Any("error_code", metric.resp.GetStatus().GetErrorCode()), + zap.Any("error_reason", metric.resp.GetStatus().GetReason())) topo.ConnectedNodes = append(topo.ConnectedNodes, metricsinfo.QueryNodeInfos{ BaseComponentInfos: metricsinfo.BaseComponentInfos{ HasError: true, - ErrorReason: metric.resp.Status.Reason, + ErrorReason: metric.resp.GetStatus().GetReason(), Name: metric.resp.ComponentName, ID: int64(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()), }, diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index d0236eed23..fe9e6af24c 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -103,8 +103,8 @@ func (broker *CoordinatorBroker) GetPartitions(ctx context.Context, collectionID return nil, err } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { - err = errors.New(resp.Status.Reason) + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { + err = errors.New(resp.GetStatus().GetReason()) log.Warn("showPartition failed", zap.Int64("collectionID", collectionID), zap.Error(err)) return nil, err } @@ -129,8 +129,8 @@ func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collection return nil, nil, err } - if recoveryInfo.Status.ErrorCode != commonpb.ErrorCode_Success { - err = errors.New(recoveryInfo.Status.Reason) + if recoveryInfo.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { + err = errors.New(recoveryInfo.GetStatus().GetReason()) log.Error("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err)) return nil, nil, err } diff --git a/internal/querycoordv2/meta/failed_load_cache.go b/internal/querycoordv2/meta/failed_load_cache.go index 1954e8a565..4acb2cd9ec 100644 --- a/internal/querycoordv2/meta/failed_load_cache.go +++ b/internal/querycoordv2/meta/failed_load_cache.go @@ -22,9 +22,8 @@ import ( "go.uber.org/zap" - "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/log" - . "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/milvus-io/milvus/internal/util/merr" ) const expireTime = 24 * time.Hour @@ -38,60 +37,70 @@ type failInfo struct { } type FailedLoadCache struct { - mu sync.RWMutex - records map[UniqueID]map[commonpb.ErrorCode]*failInfo + mu sync.RWMutex + // CollectionID, ErrorCode -> error + records map[int64]map[int32]*failInfo } func NewFailedLoadCache() *FailedLoadCache { return &FailedLoadCache{ - records: make(map[UniqueID]map[commonpb.ErrorCode]*failInfo), + records: make(map[int64]map[int32]*failInfo), } } -func (l *FailedLoadCache) Get(collectionID UniqueID) *commonpb.Status { +func (l *FailedLoadCache) Get(collectionID int64) error { l.mu.RLock() defer l.mu.RUnlock() - status := &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success} + if _, ok := l.records[collectionID]; !ok { - return status + return nil } if len(l.records[collectionID]) == 0 { - return status + return nil } - var max = 0 - for code, info := range l.records[collectionID] { + + var ( + max = 0 + err error + ) + for _, info := range l.records[collectionID] { if info.count > max { max = info.count - status.ErrorCode = code - status.Reason = info.err.Error() + err = info.err } } - log.Warn("FailedLoadCache hits failed record", zap.Int64("collectionID", collectionID), - zap.String("errCode", status.GetErrorCode().String()), zap.String("reason", status.GetReason())) - return status + log.Warn("FailedLoadCache hits failed record", + zap.Int64("collectionID", collectionID), + zap.Error(err), + ) + return err } -func (l *FailedLoadCache) Put(collectionID UniqueID, errCode commonpb.ErrorCode, err error) { - if errCode == commonpb.ErrorCode_Success { +func (l *FailedLoadCache) Put(collectionID int64, err error) { + if err == nil { return } + code := merr.Code(err) + l.mu.Lock() defer l.mu.Unlock() if _, ok := l.records[collectionID]; !ok { - l.records[collectionID] = make(map[commonpb.ErrorCode]*failInfo) + l.records[collectionID] = make(map[int32]*failInfo) } - if _, ok := l.records[collectionID][errCode]; !ok { - l.records[collectionID][errCode] = &failInfo{} + if _, ok := l.records[collectionID][code]; !ok { + l.records[collectionID][code] = &failInfo{} } - l.records[collectionID][errCode].count++ - l.records[collectionID][errCode].err = err - l.records[collectionID][errCode].lastTime = time.Now() - log.Warn("FailedLoadCache put failed record", zap.Int64("collectionID", collectionID), - zap.String("errCode", errCode.String()), zap.Error(err)) + l.records[collectionID][code].count++ + l.records[collectionID][code].err = err + l.records[collectionID][code].lastTime = time.Now() + log.Warn("FailedLoadCache put failed record", + zap.Int64("collectionID", collectionID), + zap.Error(err), + ) } -func (l *FailedLoadCache) Remove(collectionID UniqueID) { +func (l *FailedLoadCache) Remove(collectionID int64) { l.mu.Lock() defer l.mu.Unlock() delete(l.records, collectionID) diff --git a/internal/querycoordv2/meta/failed_load_cache_test.go b/internal/querycoordv2/meta/failed_load_cache_test.go index af6a7819da..15eb4145a5 100644 --- a/internal/querycoordv2/meta/failed_load_cache_test.go +++ b/internal/querycoordv2/meta/failed_load_cache_test.go @@ -17,39 +17,38 @@ package meta import ( - "fmt" "testing" "time" "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus-proto/go-api/commonpb" + "github.com/milvus-io/milvus/internal/util/merr" ) func TestFailedLoadCache(t *testing.T) { GlobalFailedLoadCache = NewFailedLoadCache() colID := int64(0) - errCode := commonpb.ErrorCode_InsufficientMemoryToLoad - mockErr := fmt.Errorf("mock insufficient memory reason") + mockErr := merr.WrapErrServiceMemoryLimitExceeded(0, 0) - GlobalFailedLoadCache.Put(colID, commonpb.ErrorCode_Success, nil) - res := GlobalFailedLoadCache.Get(colID) - assert.Equal(t, commonpb.ErrorCode_Success, res.GetErrorCode()) + GlobalFailedLoadCache.Put(colID, nil) + err := GlobalFailedLoadCache.Get(colID) + assert.NoError(t, err) - GlobalFailedLoadCache.Put(colID, errCode, mockErr) - res = GlobalFailedLoadCache.Get(colID) - assert.Equal(t, errCode, res.GetErrorCode()) + GlobalFailedLoadCache.Put(colID, mockErr) + err = GlobalFailedLoadCache.Get(colID) + assert.Equal(t, merr.Code(merr.ErrServiceMemoryLimitExceeded), merr.Code(err)) GlobalFailedLoadCache.Remove(colID) - res = GlobalFailedLoadCache.Get(colID) - assert.Equal(t, commonpb.ErrorCode_Success, res.GetErrorCode()) + err = GlobalFailedLoadCache.Get(colID) + assert.Equal(t, commonpb.ErrorCode_Success, merr.Status(err).ErrorCode) - GlobalFailedLoadCache.Put(colID, errCode, mockErr) + GlobalFailedLoadCache.Put(colID, mockErr) GlobalFailedLoadCache.mu.Lock() - GlobalFailedLoadCache.records[colID][errCode].lastTime = time.Now().Add(-expireTime * 2) + GlobalFailedLoadCache.records[colID][merr.Code(mockErr)].lastTime = time.Now().Add(-expireTime * 2) GlobalFailedLoadCache.mu.Unlock() GlobalFailedLoadCache.TryExpire() - res = GlobalFailedLoadCache.Get(colID) - assert.Equal(t, commonpb.ErrorCode_Success, res.GetErrorCode()) + err = GlobalFailedLoadCache.Get(colID) + assert.Equal(t, commonpb.ErrorCode_Success, merr.Status(err).ErrorCode) } diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index 4c3ce0b546..d458a1421d 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -44,6 +44,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/util/commonpbutil" "github.com/milvus-io/milvus/internal/util/etcd" + "github.com/milvus-io/milvus/internal/util/merr" "github.com/milvus-io/milvus/internal/util/paramtable" ) @@ -171,7 +172,7 @@ func (suite *ServerSuite) TestRecoverFailed() { func (suite *ServerSuite) TestNodeUp() { newNode := mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli, 100) - newNode.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{}, nil) + newNode.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{Status: merr.Status(nil)}, nil) err := newNode.Start() suite.NoError(err) defer newNode.Stop() @@ -254,7 +255,7 @@ func (suite *ServerSuite) TestEnableActiveStandby() { mockDataCoord := coordMocks.NewDataCoord(suite.T()) mockRootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{ - Status: successStatus, + Status: merr.Status(nil), Schema: &schemapb.CollectionSchema{}, }, nil).Maybe() for _, collection := range suite.collections { @@ -266,7 +267,7 @@ func (suite *ServerSuite) TestEnableActiveStandby() { CollectionID: collection, } mockRootCoord.EXPECT().ShowPartitionsInternal(mock.Anything, req).Return(&milvuspb.ShowPartitionsResponse{ - Status: successStatus, + Status: merr.Status(nil), PartitionIDs: suite.partitions[collection], }, nil).Maybe() } @@ -414,7 +415,7 @@ func (suite *ServerSuite) expectGetRecoverInfoByMockDataCoord(collection int64, }) } dataCoord.EXPECT().GetRecoveryInfo(mock.Anything, getRecoveryInfoRequest).Maybe().Return(&datapb.GetRecoveryInfoResponse{ - Status: successStatus, + Status: merr.Status(nil), Channels: vChannels, Binlogs: segmentBinlogs, }, nil) diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index acafe264d8..f463ed6605 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -46,8 +46,6 @@ import ( ) var ( - successStatus = utils.WrapStatus(commonpb.ErrorCode_Success, "") - ErrCreateResourceGroupFailed = errors.New("failed to create resource group") ErrDropResourceGroupFailed = errors.New("failed to drop resource group") ErrAddNodeToRGFailed = errors.New("failed to add node to resource group") @@ -86,7 +84,7 @@ func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectio collections := collectionSet.Collect() resp := &querypb.ShowCollectionsResponse{ - Status: successStatus, + Status: &commonpb.Status{}, CollectionIDs: make([]int64, 0, len(collectionSet)), InMemoryPercentages: make([]int64, 0, len(collectionSet)), QueryServiceAvailable: make([]bool, 0, len(collectionSet)), @@ -101,17 +99,20 @@ func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectio // ignore it continue } - status := meta.GlobalFailedLoadCache.Get(collectionID) - if status.ErrorCode != commonpb.ErrorCode_Success { - log.Warn("show collection failed", zap.String("errCode", status.GetErrorCode().String()), zap.String("reason", status.GetReason())) + err := meta.GlobalFailedLoadCache.Get(collectionID) + if err != nil { + log.Warn("show collection failed", zap.Error(err)) + status := merr.Status(err) + status.ErrorCode = commonpb.ErrorCode_InsufficientMemoryToLoad return &querypb.ShowCollectionsResponse{ Status: status, }, nil } - err := fmt.Errorf("collection %d has not been loaded to memory or load failed", collectionID) + + err = fmt.Errorf("collection %d has not been loaded to memory or load failed", collectionID) log.Warn("show collection failed", zap.Error(err)) return &querypb.ShowCollectionsResponse{ - Status: utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), + Status: merr.Status(err), }, nil } resp.CollectionIDs = append(resp.CollectionIDs, collectionID) @@ -186,9 +187,10 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions } if isReleased { - status := meta.GlobalFailedLoadCache.Get(req.GetCollectionID()) - if status.ErrorCode != commonpb.ErrorCode_Success { - log.Warn("show collection failed", zap.String("errCode", status.GetErrorCode().String()), zap.String("reason", status.GetReason())) + err := meta.GlobalFailedLoadCache.Get(req.GetCollectionID()) + if err != nil { + status := merr.Status(err) + status.ErrorCode = commonpb.ErrorCode_InsufficientMemoryToLoad return &querypb.ShowPartitionsResponse{ Status: status, }, nil @@ -201,7 +203,7 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions } return &querypb.ShowPartitionsResponse{ - Status: successStatus, + Status: merr.Status(nil), PartitionIDs: partitions, InMemoryPercentages: percentages, }, nil @@ -258,7 +260,7 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection } metrics.QueryCoordLoadCount.WithLabelValues(metrics.SuccessLabel).Inc() - return successStatus, nil + return merr.Status(nil), nil } func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) { @@ -298,7 +300,7 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl metrics.QueryCoordReleaseLatency.WithLabelValues().Observe(float64(tr.ElapseSpan().Milliseconds())) meta.GlobalFailedLoadCache.Remove(req.GetCollectionID()) - return successStatus, nil + return merr.Status(nil), nil } func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) { @@ -352,7 +354,7 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions } metrics.QueryCoordLoadCount.WithLabelValues(metrics.SuccessLabel).Inc() - return successStatus, nil + return merr.Status(nil), nil } func (s *Server) checkResourceGroup(collectionID int64, resourceGroups []string) error { @@ -415,7 +417,7 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart metrics.QueryCoordReleaseLatency.WithLabelValues().Observe(float64(tr.ElapseSpan().Milliseconds())) meta.GlobalFailedLoadCache.Remove(req.GetCollectionID()) - return successStatus, nil + return merr.Status(nil), nil } func (s *Server) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) { @@ -481,7 +483,7 @@ func (s *Server) GetPartitionStates(ctx context.Context, req *querypb.GetPartiti } return &querypb.GetPartitionStatesResponse{ - Status: successStatus, + Status: merr.Status(nil), PartitionDescriptions: states, }, nil } @@ -521,7 +523,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfo } return &querypb.GetSegmentInfoResponse{ - Status: successStatus, + Status: merr.Status(nil), Infos: infos, }, nil } @@ -703,7 +705,7 @@ func (s *Server) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques log.Warn(msg, zap.Error(err)) return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, err), nil } - return successStatus, nil + return merr.Status(nil), nil } func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { @@ -751,7 +753,7 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest } resp := &milvuspb.GetMetricsResponse{ - Status: successStatus, + Status: merr.Status(nil), ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, paramtable.GetNodeID()), } @@ -799,7 +801,7 @@ func (s *Server) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasReque } resp := &milvuspb.GetReplicasResponse{ - Status: successStatus, + Status: merr.Status(nil), Replicas: make([]*milvuspb.ReplicaInfo, 0), } @@ -840,7 +842,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade } resp := &querypb.GetShardLeadersResponse{ - Status: successStatus, + Status: merr.Status(nil), } if s.meta.CollectionManager.GetLoadPercentage(req.GetCollectionID()) < 100 { @@ -991,7 +993,7 @@ func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateRe log.Warn(ErrCreateResourceGroupFailed.Error(), zap.Error(err)) return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrCreateResourceGroupFailed.Error(), err), nil } - return successStatus, nil + return merr.Status(nil), nil } func (s *Server) DropResourceGroup(ctx context.Context, req *milvuspb.DropResourceGroupRequest) (*commonpb.Status, error) { @@ -1016,7 +1018,7 @@ func (s *Server) DropResourceGroup(ctx context.Context, req *milvuspb.DropResour log.Warn(ErrDropResourceGroupFailed.Error(), zap.Error(err)) return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrDropResourceGroupFailed.Error(), err), nil } - return successStatus, nil + return merr.Status(nil), nil } func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeRequest) (*commonpb.Status, error) { @@ -1071,7 +1073,7 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq utils.AddNodesToCollectionsInRG(s.meta, req.GetTargetResourceGroup(), nodes...) - return successStatus, nil + return merr.Status(nil), nil } func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferReplicaRequest) (*commonpb.Status, error) { @@ -1128,7 +1130,7 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument, ErrTransferReplicaFailed.Error(), err), nil } - return successStatus, nil + return merr.Status(nil), nil } func (s *Server) transferReplica(targetRG string, replicas []*meta.Replica) error { @@ -1152,7 +1154,7 @@ func (s *Server) ListResourceGroups(ctx context.Context, req *milvuspb.ListResou log.Info("list resource group request received") resp := &milvuspb.ListResourceGroupsResponse{ - Status: successStatus, + Status: merr.Status(nil), } if s.status.Load() != commonpb.StateCode_Healthy { log.Warn(ErrListResourceGroupsFailed.Error(), zap.Error(ErrNotHealthy)) @@ -1171,7 +1173,7 @@ func (s *Server) DescribeResourceGroup(ctx context.Context, req *querypb.Describ log.Info("describe resource group request received") resp := &querypb.DescribeResourceGroupResponse{ - Status: successStatus, + Status: merr.Status(nil), } if s.status.Load() != commonpb.StateCode_Healthy { log.Warn(ErrDescribeResourceGroupFailed.Error(), zap.Error(ErrNotHealthy)) diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 9da268d9e0..d2154bd2cc 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -19,7 +19,6 @@ package querycoordv2 import ( "context" "encoding/json" - "fmt" "testing" "time" @@ -41,6 +40,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/internal/util/etcd" + "github.com/milvus-io/milvus/internal/util/merr" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -199,7 +199,7 @@ func (suite *ServiceSuite) TestShowCollections() { req := &querypb.ShowCollectionsRequest{} resp, err := server.ShowCollections(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) suite.Len(resp.CollectionIDs, collectionNum) for _, collection := range suite.collections { suite.Contains(resp.CollectionIDs, collection) @@ -210,7 +210,7 @@ func (suite *ServiceSuite) TestShowCollections() { req.CollectionIDs = []int64{collection} resp, err = server.ShowCollections(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) suite.Len(resp.CollectionIDs, 1) suite.Equal(collection, resp.CollectionIDs[0]) @@ -218,7 +218,7 @@ func (suite *ServiceSuite) TestShowCollections() { colBak := suite.meta.CollectionManager.GetCollection(collection) err = suite.meta.CollectionManager.RemoveCollection(collection) suite.NoError(err) - meta.GlobalFailedLoadCache.Put(collection, commonpb.ErrorCode_InsufficientMemoryToLoad, fmt.Errorf("mock insufficient memory reason")) + meta.GlobalFailedLoadCache.Put(collection, merr.WrapErrServiceMemoryLimitExceeded(100, 10)) resp, err = server.ShowCollections(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_InsufficientMemoryToLoad, resp.GetStatus().GetErrorCode()) @@ -230,7 +230,7 @@ func (suite *ServiceSuite) TestShowCollections() { server.UpdateStateCode(commonpb.StateCode_Initializing) resp, err = server.ShowCollections(ctx, req) suite.NoError(err) - suite.Contains(resp.Status.Reason, ErrNotHealthy.Error()) + suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error()) } func (suite *ServiceSuite) TestShowPartitions() { @@ -248,7 +248,7 @@ func (suite *ServiceSuite) TestShowPartitions() { } resp, err := server.ShowPartitions(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) suite.Len(resp.PartitionIDs, partitionNum) for _, partition := range partitions { suite.Contains(resp.PartitionIDs, partition) @@ -261,7 +261,7 @@ func (suite *ServiceSuite) TestShowPartitions() { } resp, err = server.ShowPartitions(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) suite.Len(resp.PartitionIDs, 1) for _, partition := range partitions[0:1] { suite.Contains(resp.PartitionIDs, partition) @@ -272,7 +272,7 @@ func (suite *ServiceSuite) TestShowPartitions() { colBak := suite.meta.CollectionManager.GetCollection(collection) err = suite.meta.CollectionManager.RemoveCollection(collection) suite.NoError(err) - meta.GlobalFailedLoadCache.Put(collection, commonpb.ErrorCode_InsufficientMemoryToLoad, fmt.Errorf("mock insufficient memory reason")) + meta.GlobalFailedLoadCache.Put(collection, merr.WrapErrServiceMemoryLimitExceeded(100, 10)) resp, err = server.ShowPartitions(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_InsufficientMemoryToLoad, resp.GetStatus().GetErrorCode()) @@ -284,7 +284,7 @@ func (suite *ServiceSuite) TestShowPartitions() { parBak := suite.meta.CollectionManager.GetPartition(partitionID) err = suite.meta.CollectionManager.RemovePartition(partitionID) suite.NoError(err) - meta.GlobalFailedLoadCache.Put(collection, commonpb.ErrorCode_InsufficientMemoryToLoad, fmt.Errorf("mock insufficient memory reason")) + meta.GlobalFailedLoadCache.Put(collection, merr.WrapErrServiceMemoryLimitExceeded(100, 10)) resp, err = server.ShowPartitions(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_InsufficientMemoryToLoad, resp.GetStatus().GetErrorCode()) @@ -301,7 +301,7 @@ func (suite *ServiceSuite) TestShowPartitions() { server.UpdateStateCode(commonpb.StateCode_Initializing) resp, err := server.ShowPartitions(ctx, req) suite.NoError(err) - suite.Contains(resp.Status.Reason, ErrNotHealthy.Error()) + suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error()) } func (suite *ServiceSuite) TestLoadCollection() { @@ -363,7 +363,7 @@ func (suite *ServiceSuite) TestResourceGroup() { listRG := &milvuspb.ListResourceGroupsRequest{} resp1, err := server.ListResourceGroups(ctx, listRG) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, resp1.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_Success, resp1.GetStatus().GetErrorCode()) suite.Len(resp1.ResourceGroups, 2) server.nodeMgr.Add(session.NewNodeInfo(1011, "localhost")) @@ -398,7 +398,7 @@ func (suite *ServiceSuite) TestResourceGroup() { } resp2, err := server.DescribeResourceGroup(ctx, describeRG) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, resp2.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_Success, resp2.GetStatus().GetErrorCode()) suite.Equal("rg11", resp2.GetResourceGroup().GetName()) suite.Equal(int32(2), resp2.GetResourceGroup().GetCapacity()) suite.Equal(int32(2), resp2.GetResourceGroup().GetNumAvailableNode()) @@ -416,7 +416,7 @@ func (suite *ServiceSuite) TestResourceGroup() { resp4, err := server.ListResourceGroups(ctx, listRG) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, resp4.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_Success, resp4.GetStatus().GetErrorCode()) suite.Len(resp4.GetResourceGroups(), 3) } @@ -430,7 +430,7 @@ func (suite *ServiceSuite) TestResourceGroupFailed() { } resp, err := server.DescribeResourceGroup(ctx, describeRG) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.GetStatus().GetErrorCode()) // server unhealthy server.status.Store(commonpb.StateCode_Abnormal) @@ -446,14 +446,14 @@ func (suite *ServiceSuite) TestResourceGroupFailed() { listRG := &milvuspb.ListResourceGroupsRequest{} resp2, err := server.ListResourceGroups(ctx, listRG) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp2.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_UnexpectedError, resp2.GetStatus().GetErrorCode()) describeRG = &querypb.DescribeResourceGroupRequest{ ResourceGroup: "rg1", } resp3, err := server.DescribeResourceGroup(ctx, describeRG) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp3.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_UnexpectedError, resp3.GetStatus().GetErrorCode()) dropRG := &milvuspb.DropResourceGroupRequest{ ResourceGroup: "rg1", @@ -464,7 +464,7 @@ func (suite *ServiceSuite) TestResourceGroupFailed() { resp5, err := server.ListResourceGroups(ctx, listRG) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp5.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_UnexpectedError, resp5.GetStatus().GetErrorCode()) } func (suite *ServiceSuite) TestTransferNode() { @@ -1106,7 +1106,7 @@ func (suite *ServiceSuite) TestGetPartitionStates() { } resp, err := server.GetPartitionStates(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) suite.Len(resp.PartitionDescriptions, len(suite.partitions[collection])) } @@ -1117,7 +1117,7 @@ func (suite *ServiceSuite) TestGetPartitionStates() { } resp, err := server.GetPartitionStates(ctx, req) suite.NoError(err) - suite.Contains(resp.Status.Reason, ErrNotHealthy.Error()) + suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error()) } func (suite *ServiceSuite) TestGetSegmentInfo() { @@ -1133,7 +1133,7 @@ func (suite *ServiceSuite) TestGetSegmentInfo() { } resp, err := server.GetSegmentInfo(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) suite.assertSegments(collection, resp.GetInfos()) } @@ -1145,7 +1145,7 @@ func (suite *ServiceSuite) TestGetSegmentInfo() { } resp, err := server.GetSegmentInfo(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) suite.assertSegments(collection, resp.GetInfos()) } @@ -1156,7 +1156,7 @@ func (suite *ServiceSuite) TestGetSegmentInfo() { } resp, err := server.GetSegmentInfo(ctx, req) suite.NoError(err) - suite.Contains(resp.Status.Reason, ErrNotHealthy.Error()) + suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error()) } func (suite *ServiceSuite) TestLoadBalance() { @@ -1392,7 +1392,7 @@ func (suite *ServiceSuite) TestShowConfigurations() { } resp, err := server.ShowConfigurations(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) suite.Len(resp.Configuations, 1) suite.Equal("querycoord.port", resp.Configuations[0].Key) @@ -1403,7 +1403,7 @@ func (suite *ServiceSuite) TestShowConfigurations() { } resp, err = server.ShowConfigurations(ctx, req) suite.NoError(err) - suite.Contains(resp.Status.Reason, ErrNotHealthy.Error()) + suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error()) } func (suite *ServiceSuite) TestGetMetrics() { @@ -1412,7 +1412,7 @@ func (suite *ServiceSuite) TestGetMetrics() { for _, node := range suite.nodes { suite.cluster.EXPECT().GetMetrics(ctx, node, mock.Anything).Return(&milvuspb.GetMetricsResponse{ - Status: successStatus, + Status: merr.Status(nil), ComponentName: "QueryNode", }, nil) } @@ -1426,7 +1426,7 @@ func (suite *ServiceSuite) TestGetMetrics() { Request: string(req), }) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) // Test when server is not healthy server.UpdateStateCode(commonpb.StateCode_Initializing) @@ -1435,7 +1435,7 @@ func (suite *ServiceSuite) TestGetMetrics() { Request: string(req), }) suite.NoError(err) - suite.Contains(resp.Status.Reason, ErrNotHealthy.Error()) + suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error()) } func (suite *ServiceSuite) TestGetReplicas() { @@ -1450,7 +1450,7 @@ func (suite *ServiceSuite) TestGetReplicas() { } resp, err := server.GetReplicas(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) suite.EqualValues(suite.replicaNumber[collection], len(resp.Replicas)) } @@ -1467,7 +1467,7 @@ func (suite *ServiceSuite) TestGetReplicas() { } resp, err := server.GetReplicas(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) suite.EqualValues(suite.replicaNumber[collection], len(resp.Replicas)) } @@ -1478,7 +1478,7 @@ func (suite *ServiceSuite) TestGetReplicas() { } resp, err := server.GetReplicas(ctx, req) suite.NoError(err) - suite.Contains(resp.Status.Reason, ErrNotHealthy.Error()) + suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error()) } func (suite *ServiceSuite) TestCheckHealth() { @@ -1537,7 +1537,7 @@ func (suite *ServiceSuite) TestGetShardLeaders() { suite.fetchHeartbeats(time.Now()) resp, err := server.GetShardLeaders(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) suite.Len(resp.Shards, len(suite.channels[collection])) for _, shard := range resp.Shards { suite.Len(shard.NodeIds, int(suite.replicaNumber[collection])) @@ -1551,7 +1551,7 @@ func (suite *ServiceSuite) TestGetShardLeaders() { } resp, err := server.GetShardLeaders(ctx, req) suite.NoError(err) - suite.Contains(resp.Status.Reason, ErrNotHealthy.Error()) + suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error()) } func (suite *ServiceSuite) TestGetShardLeadersFailed() { @@ -1573,7 +1573,7 @@ func (suite *ServiceSuite) TestGetShardLeadersFailed() { } resp, err := server.GetShardLeaders(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.GetStatus().GetErrorCode()) for _, node := range suite.nodes { suite.nodeMgr.Add(session.NewNodeInfo(node, "localhost")) } @@ -1582,7 +1582,7 @@ func (suite *ServiceSuite) TestGetShardLeadersFailed() { suite.fetchHeartbeats(time.Now().Add(-Params.QueryCoordCfg.HeartbeatAvailableInterval.GetAsDuration(time.Millisecond) - 1)) resp, err = server.GetShardLeaders(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.GetStatus().GetErrorCode()) // Segment not fully loaded for _, node := range suite.nodes { @@ -1594,7 +1594,7 @@ func (suite *ServiceSuite) TestGetShardLeadersFailed() { suite.fetchHeartbeats(time.Now()) resp, err = server.GetShardLeaders(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.GetStatus().GetErrorCode()) } } diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 95bc20ef86..f754b02935 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -18,7 +18,6 @@ package task import ( "context" - "fmt" "sync" "time" @@ -31,6 +30,7 @@ import ( . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/internal/util/merr" "github.com/milvus-io/milvus/internal/util/tsoutil" "go.uber.org/atomic" "go.uber.org/zap" @@ -190,9 +190,10 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) { log.Warn("failed to load segment, it may be a false failure", zap.Error(err)) return } - if status.ErrorCode == commonpb.ErrorCode_InsufficientMemoryToLoad { - log.Warn("insufficient memory to load segment", zap.String("err", status.GetReason())) - task.SetErr(fmt.Errorf("%w, err:%s", ErrInsufficientMemory, status.GetReason())) + err = merr.Error(status) + if errors.Is(err, merr.ErrServiceMemoryLimitExceeded) { + log.Warn("insufficient memory to load segment", zap.Error(err)) + task.SetErr(err) task.Cancel() return } diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index e4be9ed3e1..b25d617cfc 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -24,7 +24,6 @@ import ( "github.com/cockroachdb/errors" - "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -661,13 +660,7 @@ func (scheduler *taskScheduler) RemoveByNode(node int64) { } func (scheduler *taskScheduler) recordSegmentTaskError(task *SegmentTask) { - var errCode commonpb.ErrorCode - if errors.Is(task.Err(), ErrInsufficientMemory) { - errCode = commonpb.ErrorCode_InsufficientMemoryToLoad - } else { - errCode = commonpb.ErrorCode_UnexpectedError - } - meta.GlobalFailedLoadCache.Put(task.collectionID, errCode, task.Err()) + meta.GlobalFailedLoadCache.Put(task.collectionID, task.Err()) } func (scheduler *taskScheduler) remove(task Task) { diff --git a/internal/querynode/load_segment_task_test.go b/internal/querynode/load_segment_task_test.go index 20ec41dab5..87a73fa34d 100644 --- a/internal/querynode/load_segment_task_test.go +++ b/internal/querynode/load_segment_task_test.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/hardware" + "github.com/milvus-io/milvus/internal/util/merr" "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -189,8 +190,7 @@ func TestTask_loadSegmentsTask(t *testing.T) { task.req.Infos[0].SegmentSize *= 2 } err = task.Execute(ctx) - assert.Error(t, err) - assert.Contains(t, err.Error(), "OOM") + assert.ErrorIs(t, err, merr.ErrServiceMemoryLimitExceeded) }) factory := node.loader.factory diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index c606eacfdf..0cd90429d8 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -49,6 +49,7 @@ import ( "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/hardware" "github.com/milvus-io/milvus/internal/util/indexparamcheck" + "github.com/milvus-io/milvus/internal/util/merr" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/tsoutil" @@ -947,14 +948,16 @@ func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentLoad zap.Uint64("diskUsageAfterLoad", toMB(usedLocalSizeAfterLoad))) if memLoadingUsage > uint64(float64(totalMem)*Params.QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat()) { - return fmt.Errorf("%w, load segment failed, OOM if load, collectionID = %d, maxSegmentSize = %v MB, concurrency = %d, usedMemAfterLoad = %v MB, totalMem = %v MB, thresholdFactor = %f", - ErrInsufficientMemory, - collectionID, - toMB(maxSegmentSize), - concurrency, - toMB(usedMemAfterLoad), - toMB(totalMem), - Params.QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat()) + err := merr.WrapErrServiceMemoryLimitExceeded(float32(usedMemAfterLoad), float32(totalMem), "failed to load segment, no enough memory") + log.Warn("load segment failed, OOM if load", + zap.Int64("collectionID", collectionID), + zap.Uint64("maxSegmentSize", toMB(maxSegmentSize)), + zap.Int("concurrency", concurrency), + zap.Uint64("usedMemAfterLoad", toMB(usedMemAfterLoad)), + zap.Uint64("totalMem", toMB(totalMem)), + zap.Float64("thresholdFactor", Params.QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat()), + ) + return err } if usedLocalSizeAfterLoad > uint64(Params.QueryNodeCfg.DiskCapacityLimit.GetAsFloat()*Params.QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()) {