mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
fix: drop partition can not be successful if load failed (#38793)
fix #38649 when partition load failed, the partition drop will also fail due to the wrong error message Signed-off-by: xiaofanluan <xiaofan.luan@zilliz.com>
This commit is contained in:
parent
ba3c2e6fb1
commit
cb6eca8e91
@ -1462,7 +1462,7 @@ func (t *dropPartitionTask) PreExecute(ctx context.Context) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if collLoaded {
|
if collLoaded {
|
||||||
loaded, err := isPartitionLoaded(ctx, t.queryCoord, collID, []int64{partID})
|
loaded, err := isPartitionLoaded(ctx, t.queryCoord, collID, partID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -49,7 +49,6 @@ import (
|
|||||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||||
"github.com/milvus-io/milvus/pkg/util/contextutil"
|
"github.com/milvus-io/milvus/pkg/util/contextutil"
|
||||||
"github.com/milvus-io/milvus/pkg/util/crypto"
|
"github.com/milvus-io/milvus/pkg/util/crypto"
|
||||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
"github.com/milvus-io/milvus/pkg/util/metric"
|
"github.com/milvus-io/milvus/pkg/util/metric"
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
@ -1476,11 +1475,11 @@ func isCollectionLoaded(ctx context.Context, qc types.QueryCoordClient, collID i
|
|||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func isPartitionLoaded(ctx context.Context, qc types.QueryCoordClient, collID int64, partIDs []int64) (bool, error) {
|
func isPartitionLoaded(ctx context.Context, qc types.QueryCoordClient, collID int64, partID int64) (bool, error) {
|
||||||
// get all loading collections
|
// get all loading collections
|
||||||
resp, err := qc.ShowPartitions(ctx, &querypb.ShowPartitionsRequest{
|
resp, err := qc.ShowPartitions(ctx, &querypb.ShowPartitionsRequest{
|
||||||
CollectionID: collID,
|
CollectionID: collID,
|
||||||
PartitionIDs: partIDs,
|
PartitionIDs: []int64{partID},
|
||||||
})
|
})
|
||||||
if err := merr.CheckRPCCall(resp, err); err != nil {
|
if err := merr.CheckRPCCall(resp, err); err != nil {
|
||||||
// qc returns error if partition not loaded
|
// qc returns error if partition not loaded
|
||||||
@ -1490,7 +1489,7 @@ func isPartitionLoaded(ctx context.Context, qc types.QueryCoordClient, collID in
|
|||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return funcutil.SliceSetEqual(partIDs, resp.GetPartitionIDs()), nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg, inInsert bool) error {
|
func checkFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg, inInsert bool) error {
|
||||||
|
|||||||
@ -1063,7 +1063,7 @@ func Test_isPartitionIsLoaded(t *testing.T) {
|
|||||||
Status: merr.Success(),
|
Status: merr.Success(),
|
||||||
PartitionIDs: []int64{partID},
|
PartitionIDs: []int64{partID},
|
||||||
}, nil)
|
}, nil)
|
||||||
loaded, err := isPartitionLoaded(ctx, qc, collID, []int64{partID})
|
loaded, err := isPartitionLoaded(ctx, qc, collID, partID)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, loaded)
|
assert.True(t, loaded)
|
||||||
})
|
})
|
||||||
@ -1088,7 +1088,7 @@ func Test_isPartitionIsLoaded(t *testing.T) {
|
|||||||
Status: merr.Success(),
|
Status: merr.Success(),
|
||||||
PartitionIDs: []int64{partID},
|
PartitionIDs: []int64{partID},
|
||||||
}, errors.New("error"))
|
}, errors.New("error"))
|
||||||
loaded, err := isPartitionLoaded(ctx, qc, collID, []int64{partID})
|
loaded, err := isPartitionLoaded(ctx, qc, collID, partID)
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
assert.False(t, loaded)
|
assert.False(t, loaded)
|
||||||
})
|
})
|
||||||
@ -1116,7 +1116,7 @@ func Test_isPartitionIsLoaded(t *testing.T) {
|
|||||||
},
|
},
|
||||||
PartitionIDs: []int64{partID},
|
PartitionIDs: []int64{partID},
|
||||||
}, nil)
|
}, nil)
|
||||||
loaded, err := isPartitionLoaded(ctx, qc, collID, []int64{partID})
|
loaded, err := isPartitionLoaded(ctx, qc, collID, partID)
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
assert.False(t, loaded)
|
assert.False(t, loaded)
|
||||||
})
|
})
|
||||||
|
|||||||
@ -159,15 +159,16 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions
|
|||||||
if percentage < 0 {
|
if percentage < 0 {
|
||||||
err := meta.GlobalFailedLoadCache.Get(req.GetCollectionID())
|
err := meta.GlobalFailedLoadCache.Get(req.GetCollectionID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
status := merr.Status(err)
|
partitionErr := merr.WrapErrPartitionNotLoaded(partitionID, err.Error())
|
||||||
log.Warn("show partition failed", zap.Error(err))
|
status := merr.Status(partitionErr)
|
||||||
|
log.Warn("show partition failed", zap.Error(partitionErr))
|
||||||
return &querypb.ShowPartitionsResponse{
|
return &querypb.ShowPartitionsResponse{
|
||||||
Status: status,
|
Status: status,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = merr.WrapErrPartitionNotLoaded(partitionID)
|
err = merr.WrapErrPartitionNotLoaded(partitionID)
|
||||||
log.Warn("show partitions failed", zap.Error(err))
|
log.Warn("show partition failed", zap.Error(err))
|
||||||
return &querypb.ShowPartitionsResponse{
|
return &querypb.ShowPartitionsResponse{
|
||||||
Status: merr.Status(err),
|
Status: merr.Status(err),
|
||||||
}, nil
|
}, nil
|
||||||
|
|||||||
@ -24,6 +24,7 @@ import (
|
|||||||
|
|
||||||
"github.com/cockroachdb/errors"
|
"github.com/cockroachdb/errors"
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
@ -310,7 +311,8 @@ func (suite *ServiceSuite) TestShowPartitions() {
|
|||||||
meta.GlobalFailedLoadCache.Put(collection, merr.WrapErrServiceMemoryLimitExceeded(100, 10))
|
meta.GlobalFailedLoadCache.Put(collection, merr.WrapErrServiceMemoryLimitExceeded(100, 10))
|
||||||
resp, err = server.ShowPartitions(ctx, req)
|
resp, err = server.ShowPartitions(ctx, req)
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.Equal(commonpb.ErrorCode_InsufficientMemoryToLoad, resp.GetStatus().GetErrorCode())
|
err := merr.CheckRPCCall(resp, err)
|
||||||
|
assert.True(suite.T(), errors.Is(err, merr.ErrPartitionNotLoaded))
|
||||||
meta.GlobalFailedLoadCache.Remove(collection)
|
meta.GlobalFailedLoadCache.Remove(collection)
|
||||||
err = suite.meta.CollectionManager.PutCollection(ctx, colBak)
|
err = suite.meta.CollectionManager.PutCollection(ctx, colBak)
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
@ -322,7 +324,8 @@ func (suite *ServiceSuite) TestShowPartitions() {
|
|||||||
meta.GlobalFailedLoadCache.Put(collection, merr.WrapErrServiceMemoryLimitExceeded(100, 10))
|
meta.GlobalFailedLoadCache.Put(collection, merr.WrapErrServiceMemoryLimitExceeded(100, 10))
|
||||||
resp, err = server.ShowPartitions(ctx, req)
|
resp, err = server.ShowPartitions(ctx, req)
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.Equal(commonpb.ErrorCode_InsufficientMemoryToLoad, resp.GetStatus().GetErrorCode())
|
err := merr.CheckRPCCall(resp, err)
|
||||||
|
assert.True(suite.T(), errors.Is(err, merr.ErrPartitionNotLoaded))
|
||||||
meta.GlobalFailedLoadCache.Remove(collection)
|
meta.GlobalFailedLoadCache.Remove(collection)
|
||||||
err = suite.meta.CollectionManager.PutPartition(ctx, parBak)
|
err = suite.meta.CollectionManager.PutPartition(ctx, parBak)
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
|
|||||||
@ -40,14 +40,14 @@ func Code(err error) int32 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
cause := errors.Cause(err)
|
cause := errors.Cause(err)
|
||||||
switch cause := cause.(type) {
|
switch specificErr := cause.(type) {
|
||||||
case milvusError:
|
case milvusError:
|
||||||
return cause.code()
|
return specificErr.code()
|
||||||
|
|
||||||
default:
|
default:
|
||||||
if errors.Is(cause, context.Canceled) {
|
if errors.Is(specificErr, context.Canceled) {
|
||||||
return CanceledCode
|
return CanceledCode
|
||||||
} else if errors.Is(cause, context.DeadlineExceeded) {
|
} else if errors.Is(specificErr, context.DeadlineExceeded) {
|
||||||
return TimeoutCode
|
return TimeoutCode
|
||||||
} else {
|
} else {
|
||||||
return errUnexpected.code()
|
return errUnexpected.code()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user