milvus/tests/integration/levelzero/delete_partition_key_test.go
Zhen Ye ecb24e7232
enhance: use multi-process framework in integration test (#42976)
issue: #41609

- add env `MILVUS_NODE_ID_FOR_TESTING` to set up a node id for milvus
process.
- add env `MILVUS_CONFIG_REFRESH_INTERVAL` to set up the refresh
interval of paramtable.
- Init paramtable when calling `paramtable.Get()`.
- add new multi process framework for integration test.
- change all integration test into multi process.
- merge some test case into one suite to speed up it.
- modify some test, which need to wait for issue #42966, #42685.
- remove the waittssync for delete collection to fix issue: #42989

---------

Signed-off-by: chyezh <chyezh@outlook.com>
2025-06-30 14:22:43 +08:00

182 lines
5.9 KiB
Go

package levelzero
import (
"context"
"fmt"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metric"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
"github.com/milvus-io/milvus/tests/integration"
)
func (s *LevelZeroSuite) TestDeletePartitionKeyHint() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
const (
indexType = integration.IndexFaissIvfFlat
metricType = metric.L2
vecType = schemapb.DataType_FloatVector
)
collectionName := "TestLevelZero_" + funcutil.GenRandomStr()
// create a collection with partition key field "partition_key"
s.schema = integration.ConstructSchema(collectionName, s.dim, false)
s.schema.Fields = append(s.schema.Fields, &schemapb.FieldSchema{
FieldID: 102,
Name: "partition_key",
DataType: schemapb.DataType_Int64,
IsPartitionKey: true,
})
req := s.buildCreateCollectionRequest(collectionName, s.schema, 2)
s.createCollection(req)
c := s.Cluster
// create index and load
createIndexStatus, err := c.MilvusClient.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: integration.ConstructIndexParam(s.dim, indexType, metricType),
})
err = merr.CheckRPCCall(createIndexStatus, err)
s.NoError(err)
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
loadStatus, err := c.MilvusClient.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
CollectionName: collectionName,
})
err = merr.CheckRPCCall(loadStatus, err)
s.Require().NoError(err)
s.WaitForLoad(ctx, collectionName)
// Generate 2 growing segments with 2 differenct partition key 0, 1001, with exactlly same PK start from 0
s.generateSegment(collectionName, 1000, 0, false, 0)
s.generateSegment(collectionName, 1001, 0, false, 1001)
s.Require().NoError(err)
var segments []*datapb.SegmentInfo
assert.Eventually(s.T(), func() bool {
var err error
segments, err = c.ShowSegments(collectionName)
s.NoError(err)
if len(segments) == 2 {
for _, segment := range segments {
s.Require().EqualValues(commonpb.SegmentState_Growing, segment.GetState())
s.Require().EqualValues(commonpb.SegmentLevel_L1, segment.GetLevel())
}
return true
}
return false
}, 5*time.Second, 100*time.Millisecond)
L1SegIDs := lo.Map(segments, func(seg *datapb.SegmentInfo, _ int) int64 {
return seg.GetID()
})
L1SegIDSet := typeutil.NewUniqueSet(L1SegIDs...)
checkRowCount := func(rowCount int) {
// query
queryResult, err := c.MilvusClient.Query(ctx, &milvuspb.QueryRequest{
CollectionName: collectionName,
OutputFields: []string{"count(*)"},
})
err = merr.CheckRPCCall(queryResult, err)
s.NoError(err)
s.EqualValues(rowCount, queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0])
}
checkRowCount(2001)
// delete all data belongs to partition_key == 1001
// expr: partition_key == 1001 && pk >= 0
// - for previous implementation, the delete pk >= 0 will touch every segments and leave only 1 numRows
// - for latest enhancements, the expr "pk >= 0" will only touch partitions that contains partition key == 1001
deleteResult, err := c.MilvusClient.Delete(ctx, &milvuspb.DeleteRequest{
CollectionName: collectionName,
Expr: fmt.Sprintf("partition_key == 1001 && %s >= 0", integration.Int64Field),
})
err = merr.CheckRPCCall(deleteResult, err)
s.NoError(err)
checkRowCount(1000)
// Flush will generates 2 Flushed L1 segments and 1 Flushed L0 segment
s.Flush(collectionName)
segments, err = s.Cluster.ShowSegments(collectionName)
s.Require().NoError(err)
s.Require().EqualValues(len(segments), 3)
for _, segment := range segments {
s.Require().EqualValues(commonpb.SegmentState_Flushed, segment.GetState())
// L1 segments
if L1SegIDSet.Contain(segment.GetID()) {
s.Require().EqualValues(commonpb.SegmentLevel_L1, segment.GetLevel())
} else { // L0 segment with 1001 delete entries count
s.Require().EqualValues(commonpb.SegmentLevel_L0, segment.GetLevel())
s.EqualValues(1001, segment.Deltalogs[0].GetBinlogs()[0].GetEntriesNum())
}
}
l0Dropped := func() bool {
segments, err := s.Cluster.ShowSegments(collectionName)
s.Require().NoError(err)
s.Require().EqualValues(len(segments), 3)
for _, segment := range segments {
// Return if L0 segments not compacted
if !L1SegIDSet.Contain(segment.GetID()) && segment.GetState() == commonpb.SegmentState_Flushed {
return false
}
// If L0 segment compacted
if !L1SegIDSet.Contain(segment.GetID()) && segment.GetState() == commonpb.SegmentState_Dropped {
// find the segment belong to partition_key == 1001
// check for the deltalog entries count == 1001
if segment.GetLevel() == datapb.SegmentLevel_L1 && segment.GetNumOfRows() == 1001 {
s.True(L1SegIDSet.Contain(segment.GetID()))
s.EqualValues(1001, segment.Deltalogs[0].GetBinlogs()[0].GetEntriesNum())
}
// find segment of another partition_key == 0
// check compaction doesn't touch it even though delete expression will delete it all
if segment.GetLevel() == datapb.SegmentLevel_L1 && segment.GetNumOfRows() == 1000 {
s.True(L1SegIDSet.Contain(segment.GetID()))
s.Empty(segment.Deltalogs)
}
return true
}
}
return false
}
checkL0CompactionTouchOnePartition := func() {
failT := time.NewTimer(3 * time.Minute)
checkT := time.NewTicker(1 * time.Second)
for {
select {
case <-failT.C:
s.FailNow("L0 compaction timeout")
case <-checkT.C:
if l0Dropped() {
failT.Stop()
return
}
}
}
}
checkL0CompactionTouchOnePartition()
}