fix:nil part stats without l2 compaction (#34977)

related: #34923
pr: https://github.com/milvus-io/milvus/pull/34992

Signed-off-by: MrPresent-Han <chun.han@gmail.com>
Co-authored-by: MrPresent-Han <chun.han@gmail.com>
This commit is contained in:
Chun Han 2024-07-25 14:21:46 +08:00 committed by GitHub
parent 0dd9af8f13
commit 6283fd0b46
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 248 additions and 53 deletions

View File

@ -43,6 +43,9 @@ func PruneSegments(ctx context.Context,
) {
_, span := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "segmentPrune")
defer span.End()
if partitionStats == nil {
return
}
// 1. select collection, partitions and expr
clusteringKeyField := clustering.GetClusteringKeyField(schema)
if clusteringKeyField == nil {
@ -113,17 +116,21 @@ func PruneSegments(ctx context.Context,
targetSegmentIDs := make([]int64, 0, 32)
if len(partitionIDs) > 0 {
for _, partID := range partitionIDs {
partStats := partitionStats[partID]
for segID, segStat := range partStats.SegmentStats {
targetSegmentIDs = append(targetSegmentIDs, segID)
targetSegmentStats = append(targetSegmentStats, segStat)
partStats, exist := partitionStats[partID]
if exist && partStats != nil {
for segID, segStat := range partStats.SegmentStats {
targetSegmentIDs = append(targetSegmentIDs, segID)
targetSegmentStats = append(targetSegmentStats, segStat)
}
}
}
} else {
for _, partStats := range partitionStats {
for segID, segStat := range partStats.SegmentStats {
targetSegmentIDs = append(targetSegmentIDs, segID)
targetSegmentStats = append(targetSegmentStats, segStat)
if partStats != nil {
for segID, segStat := range partStats.SegmentStats {
targetSegmentIDs = append(targetSegmentIDs, segID)
targetSegmentStats = append(targetSegmentStats, segStat)
}
}
}
}

View File

@ -761,7 +761,7 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() {
{
// test out bound int expr, fallback to common search
testSegments := make([]SnapshotItem, 0)
testSegments := make([]SnapshotItem, len(sealedSegments))
copy(testSegments, sealedSegments)
exprStr := "int8 > 128"
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
@ -771,13 +771,13 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() {
queryReq := &internalpb.RetrieveRequest{
SerializedExprPlan: serializedPlan,
}
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(2, len(sealedSegments[0].Segments))
sps.Equal(2, len(sealedSegments[1].Segments))
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(2, len(testSegments[0].Segments))
sps.Equal(2, len(testSegments[1].Segments))
}
{
// test out bound int expr, fallback to common search
testSegments := make([]SnapshotItem, 0)
testSegments := make([]SnapshotItem, len(sealedSegments))
copy(testSegments, sealedSegments)
exprStr := "int8 < -129"
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
@ -787,13 +787,13 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() {
queryReq := &internalpb.RetrieveRequest{
SerializedExprPlan: serializedPlan,
}
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(2, len(sealedSegments[0].Segments))
sps.Equal(2, len(sealedSegments[1].Segments))
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(2, len(testSegments[0].Segments))
sps.Equal(2, len(testSegments[1].Segments))
}
{
// test out bound int expr, fallback to common search
testSegments := make([]SnapshotItem, 0)
testSegments := make([]SnapshotItem, len(sealedSegments))
copy(testSegments, sealedSegments)
exprStr := "int8 > 50"
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
@ -803,9 +803,9 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() {
queryReq := &internalpb.RetrieveRequest{
SerializedExprPlan: serializedPlan,
}
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(0, len(sealedSegments[0].Segments))
sps.Equal(1, len(sealedSegments[1].Segments))
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(0, len(testSegments[0].Segments))
sps.Equal(1, len(testSegments[1].Segments))
}
}
{
@ -910,7 +910,7 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() {
{
// test out bound int expr, fallback to common search
testSegments := make([]SnapshotItem, 0)
testSegments := make([]SnapshotItem, len(sealedSegments))
copy(testSegments, sealedSegments)
exprStr := "int16 > 32768"
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
@ -920,13 +920,13 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() {
queryReq := &internalpb.RetrieveRequest{
SerializedExprPlan: serializedPlan,
}
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(2, len(sealedSegments[0].Segments))
sps.Equal(2, len(sealedSegments[1].Segments))
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(2, len(testSegments[0].Segments))
sps.Equal(2, len(testSegments[1].Segments))
}
{
// test out bound int expr, fallback to common search
testSegments := make([]SnapshotItem, 0)
testSegments := make([]SnapshotItem, len(sealedSegments))
copy(testSegments, sealedSegments)
exprStr := "int16 < -32769"
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
@ -936,13 +936,12 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() {
queryReq := &internalpb.RetrieveRequest{
SerializedExprPlan: serializedPlan,
}
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(2, len(sealedSegments[0].Segments))
sps.Equal(2, len(sealedSegments[1].Segments))
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(2, len(testSegments[0].Segments))
sps.Equal(2, len(testSegments[1].Segments))
}
{
// test out bound int expr, fallback to common search
testSegments := make([]SnapshotItem, 0)
testSegments := make([]SnapshotItem, len(sealedSegments))
copy(testSegments, sealedSegments)
exprStr := "int16 > 2550"
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
@ -952,9 +951,9 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() {
queryReq := &internalpb.RetrieveRequest{
SerializedExprPlan: serializedPlan,
}
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(0, len(sealedSegments[0].Segments))
sps.Equal(2, len(sealedSegments[1].Segments))
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(0, len(testSegments[0].Segments))
sps.Equal(2, len(testSegments[1].Segments))
}
}
@ -1060,7 +1059,7 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() {
{
// test out bound int expr, fallback to common search
testSegments := make([]SnapshotItem, 0)
testSegments := make([]SnapshotItem, len(sealedSegments))
copy(testSegments, sealedSegments)
exprStr := "int32 > 2147483648"
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
@ -1070,13 +1069,13 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() {
queryReq := &internalpb.RetrieveRequest{
SerializedExprPlan: serializedPlan,
}
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(2, len(sealedSegments[0].Segments))
sps.Equal(2, len(sealedSegments[1].Segments))
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(2, len(testSegments[0].Segments))
sps.Equal(2, len(testSegments[1].Segments))
}
{
// test out bound int expr, fallback to common search
testSegments := make([]SnapshotItem, 0)
testSegments := make([]SnapshotItem, len(sealedSegments))
copy(testSegments, sealedSegments)
exprStr := "int32 < -2147483649"
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
@ -1086,13 +1085,13 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() {
queryReq := &internalpb.RetrieveRequest{
SerializedExprPlan: serializedPlan,
}
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(2, len(sealedSegments[0].Segments))
sps.Equal(2, len(sealedSegments[1].Segments))
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(2, len(testSegments[0].Segments))
sps.Equal(2, len(testSegments[1].Segments))
}
{
// test out bound int expr, fallback to common search
testSegments := make([]SnapshotItem, 0)
testSegments := make([]SnapshotItem, len(sealedSegments))
copy(testSegments, sealedSegments)
exprStr := "int32 > 12550"
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
@ -1102,9 +1101,9 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() {
queryReq := &internalpb.RetrieveRequest{
SerializedExprPlan: serializedPlan,
}
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(0, len(sealedSegments[0].Segments))
sps.Equal(1, len(sealedSegments[1].Segments))
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(0, len(testSegments[0].Segments))
sps.Equal(1, len(testSegments[1].Segments))
}
}
}
@ -1227,7 +1226,7 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsFloatTypes() {
{
// test out bound int expr, fallback to common search
testSegments := make([]SnapshotItem, 0)
testSegments := make([]SnapshotItem, len(sealedSegments))
copy(testSegments, sealedSegments)
exprStr := "float > 3.5"
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
@ -1237,9 +1236,9 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsFloatTypes() {
queryReq := &internalpb.RetrieveRequest{
SerializedExprPlan: serializedPlan,
}
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(0, len(sealedSegments[0].Segments))
sps.Equal(2, len(sealedSegments[1].Segments))
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(0, len(testSegments[0].Segments))
sps.Equal(2, len(testSegments[1].Segments))
}
}
@ -1344,8 +1343,7 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsFloatTypes() {
sealedSegments = append(sealedSegments, item2)
{
// test out bound int expr, fallback to common search
testSegments := make([]SnapshotItem, 0)
testSegments := make([]SnapshotItem, len(sealedSegments))
copy(testSegments, sealedSegments)
exprStr := "double < -1.5"
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
@ -1355,13 +1353,203 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsFloatTypes() {
queryReq := &internalpb.RetrieveRequest{
SerializedExprPlan: serializedPlan,
}
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(1, len(sealedSegments[0].Segments))
sps.Equal(0, len(sealedSegments[1].Segments))
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(1, len(testSegments[0].Segments))
sps.Equal(0, len(testSegments[1].Segments))
}
}
}
func (sps *SegmentPrunerSuite) TestPruneSegmentsWithoutPartitionStats() {
paramtable.Init()
collectionName := "test_segment_prune"
primaryFieldName := "pk"
dim := 8
const FLOAT = "float"
const DOUBLE = "double"
const VEC = "vec"
fieldName2DataType := make(map[string]schemapb.DataType)
fieldName2DataType[primaryFieldName] = schemapb.DataType_Int64
fieldName2DataType[FLOAT] = schemapb.DataType_Float
fieldName2DataType[DOUBLE] = schemapb.DataType_Double
fieldName2DataType[VEC] = schemapb.DataType_FloatVector
// set up segment distribution
sealedSegments := make([]SnapshotItem, 0)
item1 := SnapshotItem{
NodeID: 1,
Segments: []SegmentEntry{
{
NodeID: 1,
SegmentID: 1,
},
{
NodeID: 1,
SegmentID: 2,
},
},
}
item2 := SnapshotItem{
NodeID: 2,
Segments: []SegmentEntry{
{
NodeID: 2,
SegmentID: 3,
},
{
NodeID: 2,
SegmentID: 4,
},
},
}
sealedSegments = append(sealedSegments, item1)
sealedSegments = append(sealedSegments, item2)
clusterFieldName := DOUBLE
schema := testutil.ConstructCollectionSchemaWithKeys(collectionName,
fieldName2DataType,
primaryFieldName,
"",
clusterFieldName,
false,
dim)
{
// test for empty partition stats
partitionStats := make(map[UniqueID]*storage.PartitionStatsSnapshot)
testSegments := make([]SnapshotItem, len(sealedSegments))
copy(testSegments, sealedSegments)
exprStr := "double < -1.5"
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
planNode, err := planparserv2.CreateRetrievePlan(schemaHelper, exprStr)
sps.NoError(err)
serializedPlan, _ := proto.Marshal(planNode)
queryReq := &internalpb.RetrieveRequest{
SerializedExprPlan: serializedPlan,
PartitionIDs: []int64{1},
}
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
// without valid partition stats, we should get all segments targeted
sps.Equal(2, len(testSegments[0].Segments))
sps.Equal(2, len(testSegments[1].Segments))
}
{
// test for nil partition stat
partitionStats := make(map[UniqueID]*storage.PartitionStatsSnapshot)
partitionStats[1] = nil
testSegments := make([]SnapshotItem, len(sealedSegments))
copy(testSegments, sealedSegments)
exprStr := "double < -1.5"
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
planNode, err := planparserv2.CreateRetrievePlan(schemaHelper, exprStr)
sps.NoError(err)
serializedPlan, _ := proto.Marshal(planNode)
queryReq := &internalpb.RetrieveRequest{
SerializedExprPlan: serializedPlan,
PartitionIDs: []int64{1},
}
PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
// without valid partition stats, we should get all segments targeted
sps.Equal(2, len(testSegments[0].Segments))
sps.Equal(2, len(testSegments[1].Segments))
}
{
// test for nil partition stats map
testSegments := make([]SnapshotItem, len(sealedSegments))
copy(testSegments, sealedSegments)
exprStr := "double < -1.5"
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
planNode, err := planparserv2.CreateRetrievePlan(schemaHelper, exprStr)
sps.NoError(err)
serializedPlan, _ := proto.Marshal(planNode)
queryReq := &internalpb.RetrieveRequest{
SerializedExprPlan: serializedPlan,
PartitionIDs: []int64{1},
}
PruneSegments(context.TODO(), nil, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
// without valid partition stats, we should get all segments targeted
sps.Equal(2, len(testSegments[0].Segments))
sps.Equal(2, len(testSegments[1].Segments))
}
{
// test for nil schema
var clusteringKeyFieldID int64 = 0
for _, field := range schema.GetFields() {
if field.IsClusteringKey {
clusteringKeyFieldID = field.FieldID
break
}
}
// set up part stats
segStats := make(map[UniqueID]storage.SegmentStats)
{
fieldStats := make([]storage.FieldStats, 0)
fieldStat1 := storage.FieldStats{
FieldID: clusteringKeyFieldID,
Type: schemapb.DataType_Double,
Min: storage.NewDoubleFieldValue(-5.0),
Max: storage.NewDoubleFieldValue(-2.0),
}
fieldStats = append(fieldStats, fieldStat1)
segStats[1] = *storage.NewSegmentStats(fieldStats, 80)
}
{
fieldStats := make([]storage.FieldStats, 0)
fieldStat1 := storage.FieldStats{
FieldID: clusteringKeyFieldID,
Type: schemapb.DataType_Double,
Min: storage.NewDoubleFieldValue(-1.0),
Max: storage.NewDoubleFieldValue(2.0),
}
fieldStats = append(fieldStats, fieldStat1)
segStats[2] = *storage.NewSegmentStats(fieldStats, 80)
}
{
fieldStats := make([]storage.FieldStats, 0)
fieldStat1 := storage.FieldStats{
FieldID: clusteringKeyFieldID,
Type: schemapb.DataType_Double,
Min: storage.NewDoubleFieldValue(3.0),
Max: storage.NewDoubleFieldValue(6.0),
}
fieldStats = append(fieldStats, fieldStat1)
segStats[3] = *storage.NewSegmentStats(fieldStats, 80)
}
{
fieldStats := make([]storage.FieldStats, 0)
fieldStat1 := storage.FieldStats{
FieldID: clusteringKeyFieldID,
Type: schemapb.DataType_Double,
Min: storage.NewDoubleFieldValue(7.0),
Max: storage.NewDoubleFieldValue(8.0),
}
fieldStats = append(fieldStats, fieldStat1)
segStats[4] = *storage.NewSegmentStats(fieldStats, 80)
}
partitionStats := make(map[UniqueID]*storage.PartitionStatsSnapshot)
targetPartition := int64(1)
partitionStats[targetPartition] = &storage.PartitionStatsSnapshot{
SegmentStats: segStats,
}
testSegments := make([]SnapshotItem, len(sealedSegments))
copy(testSegments, sealedSegments)
exprStr := "double < -1.5"
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
planNode, err := planparserv2.CreateRetrievePlan(schemaHelper, exprStr)
sps.NoError(err)
serializedPlan, _ := proto.Marshal(planNode)
queryReq := &internalpb.RetrieveRequest{
SerializedExprPlan: serializedPlan,
PartitionIDs: []int64{targetPartition},
}
PruneSegments(context.TODO(), partitionStats, nil, queryReq, nil, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
sps.Equal(2, len(testSegments[0].Segments))
sps.Equal(2, len(testSegments[1].Segments))
}
}
func TestSegmentPrunerSuite(t *testing.T) {
suite.Run(t, new(SegmentPrunerSuite))
}