enhance: Use partitionID when delete by partitionKey (#38232)

When delete by partition_key, Milvus will generates L0 segments
globally. During L0 Compaction, those L0 segments will touch all
partitions collection wise. Due to the false-positive rate of segment
bloomfilters, L0 compactions will append false deltalogs to completed
irrelevant partitions, which causes *partition deletion amplification.

This PR uses partition_key to set targeted partitionID when producing
deleteMsgs into MsgStreams. This'll narrow down L0 segments scope to
partition level, and remove the false-positive influence
collection-wise.

However, due to DeleteMsg structure, we can only label one partition to
one deleteMsg, so this enhancement fails if user wants to delete over 2
partition_keys in one deletion.

pr: #38231 
See also: #34665

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2024-12-23 13:52:51 +08:00 committed by GitHub
parent 592f34c2d7
commit 21d76ad1ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 760 additions and 573 deletions

View File

@ -204,7 +204,6 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
zap.Int64("partitionID", r.GetPartitionID()),
zap.String("channelName", r.GetChannelName()),
zap.Uint32("count", r.GetCount()),
zap.String("segment level", r.GetLevel().String()),
)
// Load the collection info from Root Coordinator, if it is not found in server meta.

View File

@ -167,7 +167,7 @@ message SegmentIDRequest {
int64 partitionID = 4;
bool isImport = 5; // deprecated
int64 importTaskID = 6; // deprecated
SegmentLevel level = 7;
SegmentLevel level = 7; // deprecated
}
message AssignSegmentIDRequest {

View File

@ -234,7 +234,7 @@ func getPartitionIDs(ctx context.Context, dbName string, collectionName string,
useRegexp := Params.ProxyCfg.PartitionNameRegexp.GetAsBool()
partitionsSet := typeutil.NewSet[int64]()
partitionsSet := typeutil.NewUniqueSet()
for _, partitionName := range partitionNames {
if useRegexp {
// Legacy feature, use partition name as regexp
@ -259,9 +259,7 @@ func getPartitionIDs(ctx context.Context, dbName string, collectionName string,
// TODO change after testcase updated: return nil, merr.WrapErrPartitionNotFound(partitionName)
return nil, fmt.Errorf("partition name %s not found", partitionName)
}
if !partitionsSet.Contain(partitionID) {
partitionsSet.Insert(partitionID)
}
partitionsSet.Insert(partitionID)
}
}
return partitionsSet.Collect(), nil

View File

@ -52,11 +52,9 @@ type deleteTask struct {
idAllocator allocator.Interface
// delete info
primaryKeys *schemapb.IDs
collectionID UniqueID
partitionID UniqueID
dbID UniqueID
partitionKeyMode bool
primaryKeys *schemapb.IDs
collectionID UniqueID
partitionID UniqueID
// set by scheduler
ts Timestamp
@ -132,7 +130,6 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error {
func (dt *deleteTask) Execute(ctx context.Context) (err error) {
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Delete-Execute")
defer sp.End()
// log := log.Ctx(ctx)
if len(dt.req.GetExpr()) == 0 {
return merr.WrapErrParameterInvalid("valid expr", "empty expr", "invalid expression")
@ -224,13 +221,13 @@ func repackDeleteMsgByHash(
commonpbutil.WithTimeStamp(ts),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
CollectionID: collectionID,
PartitionID: partitionID,
ShardName: vchannel,
CollectionName: collectionName,
PartitionName: partitionName,
DbName: dbName,
CollectionID: collectionID,
PartitionID: partitionID,
PrimaryKeys: &schemapb.IDs{},
ShardName: vchannel,
},
}
}
@ -288,11 +285,11 @@ type deleteRunner struct {
limiter types.Limiter
// delete info
schema *schemaInfo
dbID UniqueID
collectionID UniqueID
partitionID UniqueID
partitionKeyMode bool
schema *schemaInfo
dbID UniqueID
collectionID UniqueID
partitionIDs []UniqueID
plan *planpb.PlanNode
// for query
msgID int64
@ -331,29 +328,52 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
return ErrWithLog(log, "Failed to get collection schema", err)
}
dr.partitionKeyMode = dr.schema.IsPartitionKeyCollection()
// get partitionIDs of delete
dr.partitionID = common.AllPartitionsID
if len(dr.req.PartitionName) > 0 {
if dr.partitionKeyMode {
dr.plan, err = planparserv2.CreateRetrievePlan(dr.schema.schemaHelper, dr.req.GetExpr(), dr.req.GetExprTemplateValues())
if err != nil {
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create delete plan: %v", err))
}
if planparserv2.IsAlwaysTruePlan(dr.plan) {
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("delete plan can't be empty or always true : %s", dr.req.GetExpr()))
}
// Set partitionIDs, could be empty if no partition name specified and no partition key
partName := dr.req.GetPartitionName()
if dr.schema.IsPartitionKeyCollection() {
if len(partName) > 0 {
return errors.New("not support manually specifying the partition names if partition key mode is used")
}
partName := dr.req.GetPartitionName()
expr, err := exprutil.ParseExprFromPlan(dr.plan)
if err != nil {
return err
}
partitionKeys := exprutil.ParseKeys(expr, exprutil.PartitionKey)
hashedPartitionNames, err := assignPartitionKeys(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), partitionKeys)
if err != nil {
return err
}
dr.partitionIDs, err = getPartitionIDs(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), hashedPartitionNames)
if err != nil {
return err
}
} else if len(partName) > 0 {
// static validation
if err := validatePartitionTag(partName, true); err != nil {
return ErrWithLog(log, "Invalid partition name", err)
}
// dynamic validation
partID, err := globalMetaCache.GetPartitionID(ctx, dr.req.GetDbName(), collName, partName)
if err != nil {
return ErrWithLog(log, "Failed to get partition id", err)
}
dr.partitionID = partID
dr.partitionIDs = []UniqueID{partID} // only one partID
}
// hash primary keys to channels
// set vchannels
channelNames, err := dr.chMgr.getVChannels(dr.collectionID)
if err != nil {
return ErrWithLog(log, "Failed to get primary keys from expr", err)
return ErrWithLog(log, "Failed to get vchannels from collection", err)
}
dr.vChannels = channelNames
@ -367,16 +387,7 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
}
func (dr *deleteRunner) Run(ctx context.Context) error {
plan, err := planparserv2.CreateRetrievePlan(dr.schema.schemaHelper, dr.req.GetExpr(), dr.req.GetExprTemplateValues())
if err != nil {
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create delete plan: %v", err))
}
if planparserv2.IsAlwaysTruePlan(plan) {
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("delete plan can't be empty or always true : %s", dr.req.GetExpr()))
}
isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema.CollectionSchema, plan)
isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema.CollectionSchema, dr.plan)
if isSimple {
// if could get delete.primaryKeys from delete expr
err := dr.simpleDelete(ctx, pk, numRow)
@ -386,7 +397,7 @@ func (dr *deleteRunner) Run(ctx context.Context) error {
} else {
// if get complex delete expr
// need query from querynode before delete
err = dr.complexDelete(ctx, plan)
err := dr.complexDelete(ctx, dr.plan)
if err != nil {
log.Warn("complex delete failed,but delete some data", zap.Int64("count", dr.result.DeleteCnt), zap.String("expr", dr.req.GetExpr()))
return err
@ -395,58 +406,35 @@ func (dr *deleteRunner) Run(ctx context.Context) error {
return nil
}
func (dr *deleteRunner) produce(ctx context.Context, primaryKeys *schemapb.IDs) (*deleteTask, error) {
task := &deleteTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
req: dr.req,
idAllocator: dr.idAllocator,
chMgr: dr.chMgr,
chTicker: dr.chTicker,
collectionID: dr.collectionID,
partitionID: dr.partitionID,
partitionKeyMode: dr.partitionKeyMode,
vChannels: dr.vChannels,
primaryKeys: primaryKeys,
dbID: dr.dbID,
func (dr *deleteRunner) produce(ctx context.Context, primaryKeys *schemapb.IDs, partitionID UniqueID) (*deleteTask, error) {
dt := &deleteTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
req: dr.req,
idAllocator: dr.idAllocator,
chMgr: dr.chMgr,
chTicker: dr.chTicker,
collectionID: dr.collectionID,
partitionID: partitionID,
vChannels: dr.vChannels,
primaryKeys: primaryKeys,
}
if err := dr.queue.Enqueue(task); err != nil {
if err := dr.queue.Enqueue(dt); err != nil {
log.Error("Failed to enqueue delete task: " + err.Error())
return nil, err
}
return task, nil
return dt, nil
}
// getStreamingQueryAndDelteFunc return query function used by LBPolicy
// make sure it concurrent safe
func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) executeFunc {
return func(ctx context.Context, nodeID int64, qn types.QueryNodeClient, channel string) error {
var partitionIDs []int64
// optimize query when partitionKey on
if dr.partitionKeyMode {
expr, err := exprutil.ParseExprFromPlan(plan)
if err != nil {
return err
}
partitionKeys := exprutil.ParseKeys(expr, exprutil.PartitionKey)
hashedPartitionNames, err := assignPartitionKeys(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), partitionKeys)
if err != nil {
return err
}
partitionIDs, err = getPartitionIDs(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), hashedPartitionNames)
if err != nil {
return err
}
} else if dr.partitionID != common.InvalidFieldID {
partitionIDs = []int64{dr.partitionID}
}
log := log.Ctx(ctx).With(
zap.Int64("collectionID", dr.collectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64s("partitionIDs", dr.partitionIDs),
zap.String("channel", channel),
zap.Int64("nodeID", nodeID))
@ -472,7 +460,7 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe
ReqID: paramtable.GetNodeID(),
DbID: 0, // TODO
CollectionID: dr.collectionID,
PartitionIDs: partitionIDs,
PartitionIDs: dr.partitionIDs,
SerializedExprPlan: serializedPlan,
OutputFieldsId: outputFieldIDs,
GuaranteeTimestamp: parseGuaranteeTsFromConsistency(dr.ts, dr.ts, dr.req.GetConsistencyLevel()),
@ -493,7 +481,7 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe
taskCh := make(chan *deleteTask, 256)
var receiveErr error
go func() {
receiveErr = dr.receiveQueryResult(ctx, client, taskCh, partitionIDs)
receiveErr = dr.receiveQueryResult(ctx, client, taskCh)
close(taskCh)
}()
var allQueryCnt int64
@ -516,7 +504,15 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe
}
}
func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask, partitionIDs []int64) error {
func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask) error {
// If a complex delete tries to delete multiple partitions in the filter, use AllPartitionID
// otherwise use the target partitionID, which can come from partition name(UDF) or a partition key expression
// TODO: Get partitionID from Query results
msgPartitionID := common.AllPartitionsID
if len(dr.partitionIDs) == 1 {
msgPartitionID = dr.partitionIDs[0]
}
for {
result, err := client.Recv()
if err != nil {
@ -534,14 +530,14 @@ func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.Q
}
if dr.limiter != nil {
err := dr.limiter.Alloc(ctx, dr.dbID, map[int64][]int64{dr.collectionID: partitionIDs}, internalpb.RateType_DMLDelete, proto.Size(result.GetIds()))
err := dr.limiter.Alloc(ctx, dr.dbID, map[int64][]int64{dr.collectionID: dr.partitionIDs}, internalpb.RateType_DMLDelete, proto.Size(result.GetIds()))
if err != nil {
log.Warn("query stream for delete failed because rate limiter", zap.Int64("msgID", dr.msgID), zap.Error(err))
return err
}
}
task, err := dr.produce(ctx, result.GetIds())
task, err := dr.produce(ctx, result.GetIds(), msgPartitionID)
if err != nil {
log.Warn("produce delete task failed", zap.Error(err))
return err
@ -587,12 +583,16 @@ func (dr *deleteRunner) complexDelete(ctx context.Context, plan *planpb.PlanNode
}
func (dr *deleteRunner) simpleDelete(ctx context.Context, pk *schemapb.IDs, numRow int64) error {
partitionID := common.AllPartitionsID
if len(dr.partitionIDs) == 1 {
partitionID = dr.partitionIDs[0]
}
log.Debug("get primary keys from expr",
zap.Int64("len of primary keys", numRow),
zap.Int64("collectionID", dr.collectionID),
zap.Int64("partitionID", dr.partitionID))
zap.Int64("partitionID", partitionID))
task, err := dr.produce(ctx, pk)
task, err := dr.produce(ctx, pk, partitionID)
if err != nil {
log.Warn("produce delete task failed")
return err
@ -605,70 +605,71 @@ func (dr *deleteRunner) simpleDelete(ctx context.Context, pk *schemapb.IDs, numR
return err
}
func getPrimaryKeysFromPlan(schema *schemapb.CollectionSchema, plan *planpb.PlanNode) (bool, *schemapb.IDs, int64) {
// simple delete request need expr with "pk in [a, b]"
func getPrimaryKeysFromPlan(schema *schemapb.CollectionSchema, plan *planpb.PlanNode) (isSimpleDelete bool, pks *schemapb.IDs, pkCount int64) {
var err error
// simple delete request with "pk in [a, b]"
termExpr, ok := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_TermExpr)
if ok {
if !termExpr.TermExpr.GetColumnInfo().GetIsPrimaryKey() {
return false, nil, 0
}
ids, rowNum, err := getPrimaryKeysFromTermExpr(schema, termExpr)
pks, pkCount, err = getPrimaryKeysFromTermExpr(schema, termExpr)
if err != nil {
return false, nil, 0
}
return true, ids, rowNum
return true, pks, pkCount
}
// simple delete if expr with "pk == a"
// simple delete with "pk == a"
unaryRangeExpr, ok := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_UnaryRangeExpr)
if ok {
if unaryRangeExpr.UnaryRangeExpr.GetOp() != planpb.OpType_Equal || !unaryRangeExpr.UnaryRangeExpr.GetColumnInfo().GetIsPrimaryKey() {
return false, nil, 0
}
ids, err := getPrimaryKeysFromUnaryRangeExpr(schema, unaryRangeExpr)
pks, err = getPrimaryKeysFromUnaryRangeExpr(schema, unaryRangeExpr)
if err != nil {
return false, nil, 0
}
return true, ids, 1
return true, pks, 1
}
return false, nil, 0
}
func getPrimaryKeysFromUnaryRangeExpr(schema *schemapb.CollectionSchema, unaryRangeExpr *planpb.Expr_UnaryRangeExpr) (res *schemapb.IDs, err error) {
res = &schemapb.IDs{}
func getPrimaryKeysFromUnaryRangeExpr(schema *schemapb.CollectionSchema, unaryRangeExpr *planpb.Expr_UnaryRangeExpr) (pks *schemapb.IDs, err error) {
pks = &schemapb.IDs{}
switch unaryRangeExpr.UnaryRangeExpr.GetColumnInfo().GetDataType() {
case schemapb.DataType_Int64:
res.IdField = &schemapb.IDs_IntId{
pks.IdField = &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{unaryRangeExpr.UnaryRangeExpr.GetValue().GetInt64Val()},
},
}
case schemapb.DataType_VarChar:
res.IdField = &schemapb.IDs_StrId{
pks.IdField = &schemapb.IDs_StrId{
StrId: &schemapb.StringArray{
Data: []string{unaryRangeExpr.UnaryRangeExpr.GetValue().GetStringVal()},
},
}
default:
return res, fmt.Errorf("invalid field data type specifyed in simple delete expr")
return pks, fmt.Errorf("invalid field data type specifyed in simple delete expr")
}
return res, nil
return pks, nil
}
func getPrimaryKeysFromTermExpr(schema *schemapb.CollectionSchema, termExpr *planpb.Expr_TermExpr) (res *schemapb.IDs, rowNum int64, err error) {
res = &schemapb.IDs{}
rowNum = int64(len(termExpr.TermExpr.Values))
func getPrimaryKeysFromTermExpr(schema *schemapb.CollectionSchema, termExpr *planpb.Expr_TermExpr) (pks *schemapb.IDs, pkCount int64, err error) {
pks = &schemapb.IDs{}
pkCount = int64(len(termExpr.TermExpr.Values))
switch termExpr.TermExpr.ColumnInfo.GetDataType() {
case schemapb.DataType_Int64:
ids := make([]int64, 0)
for _, v := range termExpr.TermExpr.Values {
ids = append(ids, v.GetInt64Val())
}
res.IdField = &schemapb.IDs_IntId{
pks.IdField = &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: ids,
},
@ -678,14 +679,14 @@ func getPrimaryKeysFromTermExpr(schema *schemapb.CollectionSchema, termExpr *pla
for _, v := range termExpr.TermExpr.Values {
ids = append(ids, v.GetStringVal())
}
res.IdField = &schemapb.IDs_StrId{
pks.IdField = &schemapb.IDs_StrId{
StrId: &schemapb.StringArray{
Data: ids,
},
}
default:
return res, 0, fmt.Errorf("invalid field data type specifyed in simple delete expr")
return pks, 0, fmt.Errorf("invalid field data type specifyed in simple delete expr")
}
return res, rowNum, nil
return pks, pkCount, nil
}

View File

@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
@ -232,18 +233,34 @@ func TestDeleteTask_Execute(t *testing.T) {
})
}
func TestDeleteRunner_Init(t *testing.T) {
collectionName := "test_delete"
collectionID := int64(111)
partitionName := "default"
partitionID := int64(222)
// channels := []string{"test_channel"}
dbName := "test_1"
func TestDeleteRunnerSuite(t *testing.T) {
suite.Run(t, new(DeleteRunnerSuite))
}
collSchema := &schemapb.CollectionSchema{
Name: collectionName,
Description: "",
AutoID: false,
type DeleteRunnerSuite struct {
suite.Suite
collectionName string
collectionID int64
partitionName string
partitionIDs []int64
schema *schemaInfo
mockCache *MockCache
}
func (s *DeleteRunnerSuite) SetupSubTest() {
s.SetupSuite()
}
func (s *DeleteRunnerSuite) SetupSuite() {
s.collectionName = "test_delete"
s.collectionID = int64(111)
s.partitionName = "default"
s.partitionIDs = []int64{222, 333, 444}
schema := &schemapb.CollectionSchema{
Name: s.collectionName,
Fields: []*schemapb.FieldSchema{
{
FieldID: common.StartOfUserFieldID,
@ -252,200 +269,353 @@ func TestDeleteRunner_Init(t *testing.T) {
DataType: schemapb.DataType_Int64,
},
{
FieldID: common.StartOfUserFieldID + 1,
Name: "non_pk",
IsPrimaryKey: false,
DataType: schemapb.DataType_Int64,
FieldID: common.StartOfUserFieldID + 1,
Name: "non_pk",
DataType: schemapb.DataType_Int64,
IsPartitionKey: true,
},
},
}
schema := newSchemaInfo(collSchema)
s.schema = newSchemaInfo(schema)
s.mockCache = NewMockCache(s.T())
}
t.Run("empty collection name", func(t *testing.T) {
dr := deleteRunner{}
assert.Error(t, dr.Init(context.Background()))
})
t.Run("fail to get database info", func(t *testing.T) {
func (s *DeleteRunnerSuite) TestInitSuccess() {
s.Run("non_pk == 1", func() {
mockChMgr := NewMockChannelsMgr(s.T())
dr := deleteRunner{
req: &milvuspb.DeleteRequest{
CollectionName: collectionName,
CollectionName: s.collectionName,
Expr: "non_pk == 1",
},
chMgr: mockChMgr,
}
cache := NewMockCache(t)
cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error"))
globalMetaCache = cache
s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil)
s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(s.schema, nil).Twice()
s.mockCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).Return([]string{"part1", "part2"}, nil)
s.mockCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(map[string]int64{"part1": 100, "part2": 101}, nil)
mockChMgr.EXPECT().getVChannels(mock.Anything).Return([]string{"vchan1"}, nil)
assert.Error(t, dr.Init(context.Background()))
globalMetaCache = s.mockCache
s.NoError(dr.Init(context.Background()))
s.Require().Equal(1, len(dr.partitionIDs))
s.True(typeutil.NewSet[int64](100, 101).Contain(dr.partitionIDs[0]))
})
t.Run("fail to get collection id", func(t *testing.T) {
s.Run("non_pk > 1, partition key", func() {
mockChMgr := NewMockChannelsMgr(s.T())
dr := deleteRunner{
req: &milvuspb.DeleteRequest{
CollectionName: collectionName,
CollectionName: s.collectionName,
Expr: "non_pk > 1",
},
chMgr: mockChMgr,
}
cache := NewMockCache(t)
cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
cache.On("GetCollectionID",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(int64(0), errors.New("mock GetCollectionID err"))
s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil)
s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(s.schema, nil).Twice()
s.mockCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).Return([]string{"part1", "part2"}, nil)
s.mockCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(map[string]int64{"part1": 100, "part2": 101}, nil)
mockChMgr.EXPECT().getVChannels(mock.Anything).Return([]string{"vchan1"}, nil)
globalMetaCache = cache
assert.Error(t, dr.Init(context.Background()))
globalMetaCache = s.mockCache
s.NoError(dr.Init(context.Background()))
s.Require().Equal(0, len(dr.partitionIDs))
})
t.Run("fail get collection schema", func(t *testing.T) {
dr := deleteRunner{req: &milvuspb.DeleteRequest{
CollectionName: collectionName,
DbName: dbName,
}}
cache := NewMockCache(t)
cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
cache.On("GetCollectionID",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(collectionID, nil)
cache.On("GetCollectionSchema",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(nil, errors.New("mock GetCollectionSchema err"))
s.Run("pk == 1, partition key", func() {
mockChMgr := NewMockChannelsMgr(s.T())
dr := deleteRunner{
req: &milvuspb.DeleteRequest{
CollectionName: s.collectionName,
Expr: "pk == 1",
},
chMgr: mockChMgr,
}
s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil)
s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(s.schema, nil).Twice()
s.mockCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).Return([]string{"part1", "part2"}, nil)
s.mockCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(map[string]int64{"part1": 100, "part2": 101}, nil)
mockChMgr.EXPECT().getVChannels(mock.Anything).Return([]string{"vchan1"}, nil)
globalMetaCache = cache
assert.Error(t, dr.Init(context.Background()))
globalMetaCache = s.mockCache
s.NoError(dr.Init(context.Background()))
s.Require().Equal(0, len(dr.partitionIDs))
})
t.Run("partition key mode but delete with partition name", func(t *testing.T) {
dr := deleteRunner{req: &milvuspb.DeleteRequest{
CollectionName: collectionName,
DbName: dbName,
PartitionName: partitionName,
}}
cache := NewMockCache(t)
cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
cache.On("GetCollectionID",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(collectionID, nil)
cache.On("GetCollectionSchema",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(newSchemaInfo(&schemapb.CollectionSchema{
Name: collectionName,
Description: "",
AutoID: false,
s.Run("pk == 1, no partition name", func() {
mockChMgr := NewMockChannelsMgr(s.T())
dr := deleteRunner{
req: &milvuspb.DeleteRequest{
CollectionName: s.collectionName,
Expr: "pk == 1",
},
chMgr: mockChMgr,
}
s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil)
// Schema without PartitionKey
schema := &schemapb.CollectionSchema{
Name: s.collectionName,
Fields: []*schemapb.FieldSchema{
{
FieldID: common.StartOfUserFieldID,
Name: "pk",
IsPrimaryKey: true,
FieldID: common.StartOfUserFieldID,
Name: "pk",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
FieldID: common.StartOfUserFieldID + 1,
Name: "non_pk",
DataType: schemapb.DataType_Int64,
IsPartitionKey: true,
IsPartitionKey: false,
},
},
}), nil)
}
s.schema = newSchemaInfo(schema)
s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(s.schema, nil).Once()
mockChMgr.EXPECT().getVChannels(mock.Anything).Return([]string{"vchan1"}, nil)
globalMetaCache = cache
assert.Error(t, dr.Init(context.Background()))
globalMetaCache = s.mockCache
s.NoError(dr.Init(context.Background()))
s.Equal(0, len(dr.partitionIDs))
})
t.Run("invalid partition name", func(t *testing.T) {
s.Run("pk == 1, with partition name", func() {
mockChMgr := NewMockChannelsMgr(s.T())
dr := deleteRunner{
req: &milvuspb.DeleteRequest{
CollectionName: collectionName,
DbName: dbName,
CollectionName: s.collectionName,
PartitionName: "part1",
Expr: "pk == 1",
},
chMgr: mockChMgr,
}
s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil)
// Schema without PartitionKey
schema := &schemapb.CollectionSchema{
Name: s.collectionName,
Fields: []*schemapb.FieldSchema{
{
FieldID: common.StartOfUserFieldID,
Name: "pk",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
FieldID: common.StartOfUserFieldID + 1,
Name: "non_pk",
DataType: schemapb.DataType_Int64,
IsPartitionKey: false,
},
},
}
s.schema = newSchemaInfo(schema)
s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(s.schema, nil).Once()
mockChMgr.EXPECT().getVChannels(mock.Anything).Return([]string{"vchan1"}, nil)
s.mockCache.EXPECT().GetPartitionID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1000), nil)
globalMetaCache = s.mockCache
s.NoError(dr.Init(context.Background()))
s.Equal(1, len(dr.partitionIDs))
s.EqualValues(1000, dr.partitionIDs[0])
})
}
func (s *DeleteRunnerSuite) TestInitFailure() {
s.Run("empty collection name", func() {
dr := deleteRunner{}
s.Error(dr.Init(context.Background()))
})
s.Run("fail to get database info", func() {
dr := deleteRunner{
req: &milvuspb.DeleteRequest{
CollectionName: s.collectionName,
},
}
s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error"))
globalMetaCache = s.mockCache
s.Error(dr.Init(context.Background()))
})
s.Run("fail to get collection id", func() {
dr := deleteRunner{
req: &milvuspb.DeleteRequest{
CollectionName: s.collectionName,
},
}
s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).
Return(int64(0), fmt.Errorf("mock get collectionID error"))
globalMetaCache = s.mockCache
s.Error(dr.Init(context.Background()))
})
s.Run("fail get collection schema", func() {
dr := deleteRunner{req: &milvuspb.DeleteRequest{
CollectionName: s.collectionName,
}}
s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).
Return(s.collectionID, nil)
s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).
Return(nil, errors.New("mock GetCollectionSchema err"))
globalMetaCache = s.mockCache
s.Error(dr.Init(context.Background()))
})
s.Run("create plan failed", func() {
dr := deleteRunner{
req: &milvuspb.DeleteRequest{
CollectionName: s.collectionName,
Expr: "????",
},
}
s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).
Return(s.collectionID, nil)
s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).
Return(s.schema, nil)
globalMetaCache = s.mockCache
s.Error(dr.Init(context.Background()))
})
s.Run("delete with always true expression failed", func() {
alwaysTrueExpr := " "
dr := deleteRunner{
req: &milvuspb.DeleteRequest{
CollectionName: s.collectionName,
Expr: alwaysTrueExpr,
},
}
s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).
Return(s.collectionID, nil)
s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).
Return(s.schema, nil)
globalMetaCache = s.mockCache
s.Error(dr.Init(context.Background()))
})
s.Run("partition key mode but delete with partition name", func() {
dr := deleteRunner{req: &milvuspb.DeleteRequest{
CollectionName: s.collectionName,
PartitionName: s.partitionName,
}}
s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).
Return(s.collectionID, nil)
s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).
Return(s.schema, nil)
// The schema enabled partitionKey
globalMetaCache = s.mockCache
s.Error(dr.Init(context.Background()))
})
s.Run("invalid partition name", func() {
dr := deleteRunner{
req: &milvuspb.DeleteRequest{
CollectionName: s.collectionName,
PartitionName: "???",
Expr: "non_pk in [1, 2, 3]",
},
}
cache := NewMockCache(t)
cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
cache.On("GetCollectionID",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(int64(10000), nil)
cache.On("GetCollectionSchema",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(schema, nil)
s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil)
globalMetaCache = cache
assert.Error(t, dr.Init(context.Background()))
// Schema without PartitionKey
schema := &schemapb.CollectionSchema{
Name: s.collectionName,
Fields: []*schemapb.FieldSchema{
{
FieldID: common.StartOfUserFieldID,
Name: "pk",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
FieldID: common.StartOfUserFieldID + 1,
Name: "non_pk",
DataType: schemapb.DataType_Int64,
IsPartitionKey: false,
},
},
}
s.schema = newSchemaInfo(schema)
s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).
Return(s.schema, nil)
globalMetaCache = s.mockCache
s.Error(dr.Init(context.Background()))
})
t.Run("get partition id failed", func(t *testing.T) {
s.Run("get partition id failed", func() {
dr := deleteRunner{
req: &milvuspb.DeleteRequest{
CollectionName: collectionName,
DbName: dbName,
PartitionName: partitionName,
CollectionName: s.collectionName,
PartitionName: s.partitionName,
Expr: "non_pk in [1, 2, 3]",
},
}
cache := NewMockCache(t)
cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
cache.On("GetCollectionID",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(collectionID, nil)
cache.On("GetCollectionSchema",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(schema, nil)
cache.On("GetPartitionID",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(int64(0), errors.New("mock GetPartitionID err"))
globalMetaCache = cache
assert.Error(t, dr.Init(context.Background()))
// Schema without PartitionKey
schema := &schemapb.CollectionSchema{
Name: s.collectionName,
Fields: []*schemapb.FieldSchema{
{
FieldID: common.StartOfUserFieldID,
Name: "pk",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
FieldID: common.StartOfUserFieldID + 1,
Name: "non_pk",
DataType: schemapb.DataType_Int64,
IsPartitionKey: false,
},
},
}
s.schema = newSchemaInfo(schema)
s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil)
s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(s.schema, nil)
s.mockCache.EXPECT().GetPartitionID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(int64(0), errors.New("mock GetPartitionID err"))
globalMetaCache = s.mockCache
s.Error(dr.Init(context.Background()))
})
t.Run("get vchannel failed", func(t *testing.T) {
chMgr := NewMockChannelsMgr(t)
s.Run("get vchannel failed", func() {
mockChMgr := NewMockChannelsMgr(s.T())
dr := deleteRunner{
req: &milvuspb.DeleteRequest{
CollectionName: collectionName,
DbName: dbName,
PartitionName: partitionName,
CollectionName: s.collectionName,
Expr: "non_pk in [1, 2, 3]",
},
chMgr: chMgr,
chMgr: mockChMgr,
}
cache := NewMockCache(t)
cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
cache.On("GetCollectionID",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(collectionID, nil)
cache.On("GetCollectionSchema",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(schema, nil)
cache.On("GetPartitionID",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(partitionID, nil)
chMgr.On("getVChannels", mock.Anything).Return(nil, fmt.Errorf("mock error"))
s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil)
s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(s.schema, nil).Twice()
s.mockCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).Return([]string{"part1", "part2"}, nil)
s.mockCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(map[string]int64{"part1": 100, "part2": 101}, nil)
mockChMgr.EXPECT().getVChannels(mock.Anything).Return(nil, fmt.Errorf("mock error"))
globalMetaCache = cache
assert.Error(t, dr.Init(context.Background()))
globalMetaCache = s.mockCache
s.Error(dr.Init(context.Background()))
})
}
@ -495,27 +665,19 @@ func TestDeleteRunner_Run(t *testing.T) {
globalMetaCache = nil
}()
t.Run("create plan failed", func(t *testing.T) {
mockMgr := NewMockChannelsMgr(t)
dr := deleteRunner{
chMgr: mockMgr,
req: &milvuspb.DeleteRequest{
Expr: "????",
},
schema: schema,
}
assert.Error(t, dr.Run(context.Background()))
})
t.Run("simple delete task failed", func(t *testing.T) {
mockMgr := NewMockChannelsMgr(t)
lb := NewMockLBPolicy(t)
expr := "pk in [1,2,3]"
plan, err := planparserv2.CreateRetrievePlan(schema.schemaHelper, expr, nil)
require.NoError(t, err)
dr := deleteRunner{
chMgr: mockMgr,
schema: schema,
collectionID: collectionID,
partitionID: partitionID,
partitionIDs: []int64{partitionID},
vChannels: channels,
tsoAllocatorIns: tsoAllocator,
idAllocator: idAllocator,
@ -531,8 +693,9 @@ func TestDeleteRunner_Run(t *testing.T) {
CollectionName: collectionName,
PartitionName: partitionName,
DbName: dbName,
Expr: "pk in [1,2,3]",
Expr: expr,
},
plan: plan,
}
stream := msgstream.NewMockMsgStream(t)
mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil)
@ -543,42 +706,13 @@ func TestDeleteRunner_Run(t *testing.T) {
assert.Equal(t, int64(0), dr.result.DeleteCnt)
})
t.Run("delete with always true expression failed", func(t *testing.T) {
mockMgr := NewMockChannelsMgr(t)
lb := NewMockLBPolicy(t)
dr := deleteRunner{
chMgr: mockMgr,
schema: schema,
collectionID: collectionID,
partitionID: partitionID,
vChannels: channels,
tsoAllocatorIns: tsoAllocator,
idAllocator: idAllocator,
queue: queue.dmQueue,
lb: lb,
result: &milvuspb.MutationResult{
Status: merr.Success(),
IDs: &schemapb.IDs{
IdField: nil,
},
},
req: &milvuspb.DeleteRequest{
CollectionName: collectionName,
PartitionName: partitionName,
DbName: dbName,
Expr: " ",
},
}
assert.Error(t, dr.Run(context.Background()))
assert.Equal(t, int64(0), dr.result.DeleteCnt)
})
t.Run("complex delete query rpc failed", func(t *testing.T) {
mockMgr := NewMockChannelsMgr(t)
qn := mocks.NewMockQueryNodeClient(t)
lb := NewMockLBPolicy(t)
expr := "pk < 3"
plan, err := planparserv2.CreateRetrievePlan(schema.schemaHelper, expr, nil)
require.NoError(t, err)
dr := deleteRunner{
idAllocator: idAllocator,
@ -587,21 +721,20 @@ func TestDeleteRunner_Run(t *testing.T) {
chMgr: mockMgr,
schema: schema,
collectionID: collectionID,
partitionID: partitionID,
partitionIDs: []int64{partitionID},
vChannels: channels,
lb: lb,
result: &milvuspb.MutationResult{
Status: merr.Success(),
IDs: &schemapb.IDs{
IdField: nil,
},
IDs: &schemapb.IDs{},
},
req: &milvuspb.DeleteRequest{
CollectionName: collectionName,
PartitionName: partitionName,
DbName: dbName,
Expr: "pk < 3",
Expr: expr,
},
plan: plan,
}
lb.EXPECT().Execute(mock.Anything, mock.Anything).Call.Return(func(ctx context.Context, workload CollectionWorkLoad) error {
return workload.exec(ctx, 1, qn, "")
@ -619,13 +752,16 @@ func TestDeleteRunner_Run(t *testing.T) {
mockMgr := NewMockChannelsMgr(t)
qn := mocks.NewMockQueryNodeClient(t)
lb := NewMockLBPolicy(t)
expr := "pk < 3"
plan, err := planparserv2.CreateRetrievePlan(schema.schemaHelper, expr, nil)
require.NoError(t, err)
dr := deleteRunner{
queue: queue.dmQueue,
chMgr: mockMgr,
schema: schema,
collectionID: collectionID,
partitionID: partitionID,
partitionIDs: []int64{partitionID},
vChannels: channels,
tsoAllocatorIns: tsoAllocator,
idAllocator: idAllocator,
@ -640,8 +776,9 @@ func TestDeleteRunner_Run(t *testing.T) {
CollectionName: collectionName,
PartitionName: partitionName,
DbName: dbName,
Expr: "pk < 3",
Expr: expr,
},
plan: plan,
}
stream := msgstream.NewMockMsgStream(t)
mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil)
@ -684,13 +821,16 @@ func TestDeleteRunner_Run(t *testing.T) {
mockMgr := NewMockChannelsMgr(t)
qn := mocks.NewMockQueryNodeClient(t)
lb := NewMockLBPolicy(t)
expr := "pk < 3"
plan, err := planparserv2.CreateRetrievePlan(schema.schemaHelper, expr, nil)
require.NoError(t, err)
dr := deleteRunner{
chMgr: mockMgr,
queue: queue.dmQueue,
schema: schema,
collectionID: collectionID,
partitionID: partitionID,
partitionIDs: []int64{partitionID},
vChannels: channels,
idAllocator: idAllocator,
tsoAllocatorIns: tsoAllocator,
@ -706,8 +846,9 @@ func TestDeleteRunner_Run(t *testing.T) {
CollectionName: collectionName,
PartitionName: partitionName,
DbName: dbName,
Expr: "pk < 3",
Expr: expr,
},
plan: plan,
}
lb.EXPECT().Execute(mock.Anything, mock.Anything).Call.Return(func(ctx context.Context, workload CollectionWorkLoad) error {
return workload.exec(ctx, 1, qn, "")
@ -743,13 +884,16 @@ func TestDeleteRunner_Run(t *testing.T) {
mockMgr := NewMockChannelsMgr(t)
qn := mocks.NewMockQueryNodeClient(t)
lb := NewMockLBPolicy(t)
expr := "pk < 3"
plan, err := planparserv2.CreateRetrievePlan(schema.schemaHelper, expr, nil)
require.NoError(t, err)
dr := deleteRunner{
chMgr: mockMgr,
queue: queue.dmQueue,
schema: schema,
collectionID: collectionID,
partitionID: partitionID,
partitionIDs: []int64{partitionID},
vChannels: channels,
idAllocator: idAllocator,
tsoAllocatorIns: tsoAllocator,
@ -764,8 +908,9 @@ func TestDeleteRunner_Run(t *testing.T) {
CollectionName: collectionName,
PartitionName: partitionName,
DbName: dbName,
Expr: "pk < 3",
Expr: expr,
},
plan: plan,
}
stream := msgstream.NewMockMsgStream(t)
mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil)
@ -805,13 +950,16 @@ func TestDeleteRunner_Run(t *testing.T) {
mockMgr := NewMockChannelsMgr(t)
qn := mocks.NewMockQueryNodeClient(t)
lb := NewMockLBPolicy(t)
expr := "pk < 3"
plan, err := planparserv2.CreateRetrievePlan(schema.schemaHelper, expr, nil)
require.NoError(t, err)
dr := deleteRunner{
queue: queue.dmQueue,
chMgr: mockMgr,
schema: schema,
collectionID: collectionID,
partitionID: partitionID,
partitionIDs: []int64{partitionID},
vChannels: channels,
idAllocator: idAllocator,
tsoAllocatorIns: tsoAllocator,
@ -826,8 +974,9 @@ func TestDeleteRunner_Run(t *testing.T) {
CollectionName: collectionName,
PartitionName: partitionName,
DbName: dbName,
Expr: "pk < 3",
Expr: expr,
},
plan: plan,
}
stream := msgstream.NewMockMsgStream(t)
mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil)
@ -865,7 +1014,6 @@ func TestDeleteRunner_Run(t *testing.T) {
partitionMaps["test_0"] = 1
partitionMaps["test_1"] = 2
partitionMaps["test_2"] = 3
indexedPartitions := []string{"test_0", "test_1", "test_2"}
t.Run("complex delete with partitionKey mode success", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
@ -877,26 +1025,22 @@ func TestDeleteRunner_Run(t *testing.T) {
mockCache := NewMockCache(t)
mockCache.EXPECT().GetCollectionID(mock.Anything, dbName, collectionName).Return(collectionID, nil).Maybe()
mockCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(
partitionMaps, nil)
mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(
schema, nil)
mockCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).
Return(indexedPartitions, nil)
globalMetaCache = mockCache
defer func() { globalMetaCache = metaCache }()
expr := "non_pk in [2, 3]"
plan, err := planparserv2.CreateRetrievePlan(schema.schemaHelper, expr, nil)
require.NoError(t, err)
dr := deleteRunner{
queue: queue.dmQueue,
chMgr: mockMgr,
schema: schema,
collectionID: collectionID,
partitionID: int64(-1),
vChannels: channels,
idAllocator: idAllocator,
tsoAllocatorIns: tsoAllocator,
lb: lb,
partitionKeyMode: true,
queue: queue.dmQueue,
chMgr: mockMgr,
schema: schema,
collectionID: collectionID,
partitionIDs: []int64{common.AllPartitionsID},
vChannels: channels,
idAllocator: idAllocator,
tsoAllocatorIns: tsoAllocator,
lb: lb,
result: &milvuspb.MutationResult{
Status: merr.Success(),
IDs: &schemapb.IDs{
@ -905,10 +1049,10 @@ func TestDeleteRunner_Run(t *testing.T) {
},
req: &milvuspb.DeleteRequest{
CollectionName: collectionName,
PartitionName: "",
DbName: dbName,
Expr: "non_pk in [2, 3]",
Expr: expr,
},
plan: plan,
}
stream := msgstream.NewMockMsgStream(t)
mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil)
@ -941,167 +1085,3 @@ func TestDeleteRunner_Run(t *testing.T) {
assert.Equal(t, int64(3), dr.result.DeleteCnt)
})
}
func TestDeleteRunner_StreamingQueryAndDelteFunc(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
collectionName := "test_delete"
collectionID := int64(111)
channels := []string{"test_channel"}
dbName := "test_1"
tsoAllocator := &mockTsoAllocator{}
idAllocator := &mockIDAllocatorInterface{}
queue, err := newTaskScheduler(ctx, tsoAllocator, nil)
assert.NoError(t, err)
queue.Start()
defer queue.Close()
collSchema := &schemapb.CollectionSchema{
Name: "test_delete",
Description: "",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
FieldID: common.StartOfUserFieldID,
Name: "pk",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
FieldID: common.StartOfUserFieldID + 1,
Name: "non_pk",
IsPrimaryKey: false,
DataType: schemapb.DataType_Int64,
},
},
}
// test partitionKey mode
collSchema.Fields[1].IsPartitionKey = true
schema := newSchemaInfo(collSchema)
partitionMaps := make(map[string]int64)
partitionMaps["test_0"] = 1
partitionMaps["test_1"] = 2
partitionMaps["test_2"] = 3
indexedPartitions := []string{"test_0", "test_1", "test_2"}
t.Run("partitionKey mode parse plan failed", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dr := deleteRunner{
schema: schema,
queue: queue.dmQueue,
tsoAllocatorIns: tsoAllocator,
idAllocator: idAllocator,
collectionID: collectionID,
partitionID: int64(-1),
vChannels: channels,
partitionKeyMode: true,
result: &milvuspb.MutationResult{
Status: merr.Success(),
IDs: &schemapb.IDs{
IdField: nil,
},
},
req: &milvuspb.DeleteRequest{
CollectionName: collectionName,
PartitionName: "",
DbName: dbName,
Expr: "non_pk in [2, 3]",
},
}
qn := mocks.NewMockQueryNodeClient(t)
// witho out plan
queryFunc := dr.getStreamingQueryAndDelteFunc(nil)
assert.Error(t, queryFunc(ctx, 1, qn, ""))
})
t.Run("partitionKey mode get meta failed", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dr := deleteRunner{
schema: schema,
tsoAllocatorIns: tsoAllocator,
idAllocator: idAllocator,
collectionID: collectionID,
partitionID: int64(-1),
vChannels: channels,
partitionKeyMode: true,
result: &milvuspb.MutationResult{
Status: merr.Success(),
IDs: &schemapb.IDs{
IdField: nil,
},
},
req: &milvuspb.DeleteRequest{
CollectionName: collectionName,
PartitionName: "",
DbName: dbName,
Expr: "non_pk in [2, 3]",
},
}
qn := mocks.NewMockQueryNodeClient(t)
mockCache := NewMockCache(t)
mockCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).
Return(nil, fmt.Errorf("mock error"))
globalMetaCache = mockCache
defer func() { globalMetaCache = nil }()
schemaHelper, err := typeutil.CreateSchemaHelper(dr.schema.CollectionSchema)
require.NoError(t, err)
plan, err := planparserv2.CreateRetrievePlan(schemaHelper, dr.req.Expr, nil)
assert.NoError(t, err)
queryFunc := dr.getStreamingQueryAndDelteFunc(plan)
assert.Error(t, queryFunc(ctx, 1, qn, ""))
})
t.Run("partitionKey mode get partition ID failed", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dr := deleteRunner{
schema: schema,
tsoAllocatorIns: tsoAllocator,
idAllocator: idAllocator,
collectionID: collectionID,
partitionID: int64(-1),
vChannels: channels,
partitionKeyMode: true,
result: &milvuspb.MutationResult{
Status: merr.Success(),
IDs: &schemapb.IDs{
IdField: nil,
},
},
req: &milvuspb.DeleteRequest{
CollectionName: collectionName,
PartitionName: "",
DbName: dbName,
Expr: "non_pk in [2, 3]",
},
}
qn := mocks.NewMockQueryNodeClient(t)
mockCache := NewMockCache(t)
mockCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).
Return(indexedPartitions, nil)
mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(
schema, nil)
mockCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(
nil, fmt.Errorf("mock error"))
globalMetaCache = mockCache
defer func() { globalMetaCache = nil }()
schemaHelper, err := typeutil.CreateSchemaHelper(dr.schema.CollectionSchema)
require.NoError(t, err)
plan, err := planparserv2.CreateRetrievePlan(schemaHelper, dr.req.Expr, nil)
assert.NoError(t, err)
queryFunc := dr.getStreamingQueryAndDelteFunc(plan)
assert.Error(t, queryFunc(ctx, 1, qn, ""))
})
}

View File

@ -41,6 +41,7 @@ func (s *CompactionSuite) TearDownSuite() {
s.MiniClusterSuite.TearDownSuite()
paramtable.Get().Reset(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key)
paramtable.Get().Reset(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key)
}
func TestCompaction(t *testing.T) {

View File

@ -21,13 +21,9 @@ import (
"fmt"
"time"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
@ -35,68 +31,6 @@ import (
"github.com/milvus-io/milvus/tests/integration"
)
func (s *LevelZeroSuite) createCollection(collection string) {
schema := integration.ConstructSchema(collection, s.dim, false)
marshaledSchema, err := proto.Marshal(schema)
s.Require().NoError(err)
status, err := s.Cluster.Proxy.CreateCollection(context.TODO(), &milvuspb.CreateCollectionRequest{
CollectionName: collection,
Schema: marshaledSchema,
ShardsNum: 1,
})
s.Require().NoError(err)
s.Require().True(merr.Ok(status))
log.Info("CreateCollection result", zap.Any("status", status))
}
func (s *LevelZeroSuite) generateSegment(collection string, numRows int, startPk int64, seal bool) {
log.Info("=========================Start generate one segment=========================")
pkColumn := integration.NewInt64FieldDataWithStart(integration.Int64Field, numRows, startPk)
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, numRows, s.dim)
hashKeys := integration.GenerateHashKeys(numRows)
insertResult, err := s.Cluster.Proxy.Insert(context.TODO(), &milvuspb.InsertRequest{
CollectionName: collection,
FieldsData: []*schemapb.FieldData{pkColumn, fVecColumn},
HashKeys: hashKeys,
NumRows: uint32(numRows),
})
s.Require().NoError(err)
s.True(merr.Ok(insertResult.GetStatus()))
s.Require().EqualValues(numRows, insertResult.GetInsertCnt())
s.Require().EqualValues(numRows, len(insertResult.GetIDs().GetIntId().GetData()))
if seal {
log.Info("=========================Start to flush =========================",
zap.String("collection", collection),
zap.Int("numRows", numRows),
zap.Int64("startPK", startPk),
)
flushResp, err := s.Cluster.Proxy.Flush(context.TODO(), &milvuspb.FlushRequest{
CollectionNames: []string{collection},
})
s.NoError(err)
segmentLongArr, has := flushResp.GetCollSegIDs()[collection]
s.Require().True(has)
segmentIDs := segmentLongArr.GetData()
s.Require().NotEmpty(segmentLongArr)
s.Require().True(has)
flushTs, has := flushResp.GetCollFlushTs()[collection]
s.True(has)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
s.WaitForFlush(ctx, segmentIDs, flushTs, "", collection)
log.Info("=========================Finish to generate one segment=========================",
zap.String("collection", collection),
zap.Int("numRows", numRows),
zap.Int64("startPK", startPk),
)
}
}
func (s *LevelZeroSuite) TestDeleteOnGrowing() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
@ -113,7 +47,9 @@ func (s *LevelZeroSuite) TestDeleteOnGrowing() {
)
collectionName := "TestLevelZero_" + funcutil.GenRandomStr()
s.createCollection(collectionName)
s.schema = integration.ConstructSchema(collectionName, s.dim, false)
req := s.buildCreateCollectionRequest(collectionName, s.schema, 0)
s.createCollection(req)
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
@ -134,9 +70,9 @@ func (s *LevelZeroSuite) TestDeleteOnGrowing() {
s.Require().NoError(err)
s.WaitForLoad(ctx, collectionName)
s.generateSegment(collectionName, 1, 0, true)
s.generateSegment(collectionName, 2, 1, true)
s.generateSegment(collectionName, 2, 3, false)
s.generateSegment(collectionName, 1, 0, true, -1)
s.generateSegment(collectionName, 2, 1, true, -1)
s.generateSegment(collectionName, 2, 3, false, -1)
checkRowCount := func(rowCount int) {
// query

View File

@ -0,0 +1,171 @@
package levelzero
import (
"context"
"fmt"
"time"
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/typeutil"
"github.com/milvus-io/milvus/tests/integration"
)
func (s *LevelZeroSuite) TestDeletePartitionKeyHint() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
const (
indexType = integration.IndexFaissIvfFlat
metricType = metric.L2
vecType = schemapb.DataType_FloatVector
)
collectionName := "TestLevelZero_" + funcutil.GenRandomStr()
// create a collection with partition key field "partition_key"
s.schema = integration.ConstructSchema(collectionName, s.dim, false)
s.schema.Fields = append(s.schema.Fields, &schemapb.FieldSchema{
FieldID: 102,
Name: "partition_key",
DataType: schemapb.DataType_Int64,
IsPartitionKey: true,
})
req := s.buildCreateCollectionRequest(collectionName, s.schema, 2)
s.createCollection(req)
c := s.Cluster
// create index and load
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: integration.ConstructIndexParam(s.dim, indexType, metricType),
})
err = merr.CheckRPCCall(createIndexStatus, err)
s.NoError(err)
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
CollectionName: collectionName,
})
err = merr.CheckRPCCall(loadStatus, err)
s.Require().NoError(err)
s.WaitForLoad(ctx, collectionName)
// Generate 2 growing segments with 2 differenct partition key 0, 1001, with exactlly same PK start from 0
s.generateSegment(collectionName, 1000, 0, false, 0)
s.generateSegment(collectionName, 1001, 0, false, 1001)
segments, err := s.Cluster.MetaWatcher.ShowSegments()
s.Require().NoError(err)
s.Require().EqualValues(len(segments), 2)
for _, segment := range segments {
s.Require().EqualValues(commonpb.SegmentState_Growing, segment.GetState())
s.Require().EqualValues(commonpb.SegmentLevel_L1, segment.GetLevel())
}
L1SegIDs := lo.Map(segments, func(seg *datapb.SegmentInfo, _ int) int64 {
return seg.GetID()
})
L1SegIDSet := typeutil.NewUniqueSet(L1SegIDs...)
checkRowCount := func(rowCount int) {
// query
queryResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{
CollectionName: collectionName,
OutputFields: []string{"count(*)"},
})
err = merr.CheckRPCCall(queryResult, err)
s.NoError(err)
s.EqualValues(rowCount, queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0])
}
checkRowCount(2001)
// delete all data belongs to partition_key == 1001
// expr: partition_key == 1001 && pk >= 0
// - for previous implementation, the delete pk >= 0 will touch every segments and leave only 1 numRows
// - for latest enhancements, the expr "pk >= 0" will only touch partitions that contains partition key == 1001
deleteResult, err := c.Proxy.Delete(ctx, &milvuspb.DeleteRequest{
CollectionName: collectionName,
Expr: fmt.Sprintf("partition_key == 1001 && %s >= 0", integration.Int64Field),
})
err = merr.CheckRPCCall(deleteResult, err)
s.NoError(err)
checkRowCount(1000)
// Flush will generates 2 Flushed L1 segments and 1 Flushed L0 segment
s.Flush(collectionName)
segments, err = s.Cluster.MetaWatcher.ShowSegments()
s.Require().NoError(err)
s.Require().EqualValues(len(segments), 3)
for _, segment := range segments {
s.Require().EqualValues(commonpb.SegmentState_Flushed, segment.GetState())
// L1 segments
if L1SegIDSet.Contain(segment.GetID()) {
s.Require().EqualValues(commonpb.SegmentLevel_L1, segment.GetLevel())
} else { // L0 segment with 1001 delete entries count
s.Require().EqualValues(commonpb.SegmentLevel_L0, segment.GetLevel())
s.EqualValues(1001, segment.Deltalogs[0].GetBinlogs()[0].GetEntriesNum())
}
}
l0Dropped := func() bool {
segments, err := s.Cluster.MetaWatcher.ShowSegments()
s.Require().NoError(err)
s.Require().EqualValues(len(segments), 3)
for _, segment := range segments {
// Return if L0 segments not compacted
if !L1SegIDSet.Contain(segment.GetID()) && segment.GetState() == commonpb.SegmentState_Flushed {
return false
}
// If L0 segment compacted
if !L1SegIDSet.Contain(segment.GetID()) && segment.GetState() == commonpb.SegmentState_Dropped {
// find the segment belong to partition_key == 1001
// check for the deltalog entries count == 1001
if segment.GetLevel() == datapb.SegmentLevel_L1 && segment.GetNumOfRows() == 1001 {
s.True(L1SegIDSet.Contain(segment.GetID()))
s.EqualValues(1001, segment.Deltalogs[0].GetBinlogs()[0].GetEntriesNum())
}
// find segment of another partition_key == 0
// check compaction doesn't touch it even though delete expression will delete it all
if segment.GetLevel() == datapb.SegmentLevel_L1 && segment.GetNumOfRows() == 1000 {
s.True(L1SegIDSet.Contain(segment.GetID()))
s.Empty(segment.Deltalogs)
}
return true
}
}
return false
}
checkL0CompactionTouchOnePartition := func() {
failT := time.NewTimer(3 * time.Minute)
checkT := time.NewTicker(1 * time.Second)
for {
select {
case <-failT.C:
s.FailNow("L0 compaction timeout")
case <-checkT.C:
if l0Dropped() {
failT.Stop()
return
}
}
}
}
checkL0CompactionTouchOnePartition()
}

View File

@ -17,10 +17,18 @@
package levelzero
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/tests/integration"
)
@ -28,7 +36,8 @@ import (
type LevelZeroSuite struct {
integration.MiniClusterSuite
dim int
schema *schemapb.CollectionSchema
dim int
}
func (s *LevelZeroSuite) SetupSuite() {
@ -45,3 +54,95 @@ func (s *LevelZeroSuite) TearDownSuite() {
func TestLevelZero(t *testing.T) {
suite.Run(t, new(LevelZeroSuite))
}
func (s *LevelZeroSuite) buildCreateCollectionRequest(
collection string,
schema *schemapb.CollectionSchema,
numPartitions int64,
) *milvuspb.CreateCollectionRequest {
marshaledSchema, err := proto.Marshal(schema)
s.Require().NoError(err)
return &milvuspb.CreateCollectionRequest{
CollectionName: collection,
Schema: marshaledSchema,
ShardsNum: 1,
NumPartitions: numPartitions,
}
}
func (s *LevelZeroSuite) createCollection(req *milvuspb.CreateCollectionRequest) {
status, err := s.Cluster.Proxy.CreateCollection(context.TODO(), req)
s.Require().NoError(err)
s.Require().True(merr.Ok(status))
log.Info("CreateCollection result", zap.Any("status", status))
}
// For PrimaryKey field, startPK will be the start PK of this generation
// For PartitionKey field, partitikonKey will be the same in this generation
func (s *LevelZeroSuite) buildFieldDataBySchema(schema *schemapb.CollectionSchema, numRows int, startPK int64, partitionKey int64) []*schemapb.FieldData {
var fieldData []*schemapb.FieldData
for _, field := range schema.Fields {
switch field.DataType {
case schemapb.DataType_Int64:
if field.IsPartitionKey {
fieldData = append(fieldData, integration.NewInt64SameFieldData(field.Name, numRows, partitionKey))
} else {
fieldData = append(fieldData, integration.NewInt64FieldDataWithStart(field.Name, numRows, startPK))
}
case schemapb.DataType_FloatVector:
fieldData = append(fieldData, integration.NewFloatVectorFieldData(field.Name, numRows, s.dim))
default:
s.Fail("not supported yet")
}
}
return fieldData
}
func (s *LevelZeroSuite) generateSegment(collection string, numRows int, startPk int64, seal bool, partitionKey int64) {
log.Info("=========================Start generate one segment=========================")
fieldData := s.buildFieldDataBySchema(s.schema, numRows, startPk, partitionKey)
hashKeys := integration.GenerateHashKeys(numRows)
insertResult, err := s.Cluster.Proxy.Insert(context.TODO(), &milvuspb.InsertRequest{
CollectionName: collection,
FieldsData: fieldData,
HashKeys: hashKeys,
NumRows: uint32(numRows),
})
s.Require().NoError(err)
s.True(merr.Ok(insertResult.GetStatus()))
s.Require().EqualValues(numRows, insertResult.GetInsertCnt())
s.Require().EqualValues(numRows, len(insertResult.GetIDs().GetIntId().GetData()))
if seal {
log.Info("=========================Start to flush =========================",
zap.String("collection", collection),
zap.Int("numRows", numRows),
zap.Int64("startPK", startPk),
)
s.Flush(collection)
log.Info("=========================Finish to generate one segment=========================",
zap.String("collection", collection),
zap.Int("numRows", numRows),
zap.Int64("startPK", startPk),
)
}
}
func (s *LevelZeroSuite) Flush(collection string) {
flushResp, err := s.Cluster.Proxy.Flush(context.TODO(), &milvuspb.FlushRequest{
CollectionNames: []string{collection},
})
s.NoError(err)
segmentLongArr, has := flushResp.GetCollSegIDs()[collection]
s.Require().True(has)
segmentIDs := segmentLongArr.GetData() // segmentIDs might be empty
// s.Require().NotEmpty(segmentLongArr)
flushTs, has := flushResp.GetCollFlushTs()[collection]
s.True(has)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
s.WaitForFlush(ctx, segmentIDs, flushTs, "", collection)
}