enhance: [2.5] Add channel seal policy based on blocking l0 (#40505) (#40535)

Cherry-pick from 2.5
pr: #40505
Related to #40502

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-03-14 10:02:12 +08:00 committed by GitHub
parent bdc0e68aaf
commit d8a2c1a907
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 239 additions and 2 deletions

View File

@ -556,6 +556,12 @@ dataCoord:
# The size threshold in MB, if the total size of growing segments of each shard
# exceeds this threshold, the largest growing segment will be sealed.
growingSegmentsMemSize: 4096
# If the total entry number of l0 logs of each shard
# exceeds this threshold, the earliest growing segments will be sealed.
blockingL0EntryNum: 5000000
# The size threshold in MB, if the total entry number of l0 logs of each shard
# exceeds this threshold, the earliest growing segments will be sealed.
blockingL0SizeInMB: 64
autoUpgradeSegmentIndex: false # whether auto upgrade segment index to index engine's version
segmentFlushInterval: 2 # the minimal interval duration(unit: Seconds) between flusing operation on same segment
# Switch value to control if to enable segment compaction.

View File

@ -17,6 +17,7 @@
package datacoord
import (
"context"
"fmt"
"math/rand"
"sort"
@ -250,6 +251,81 @@ func sealByTotalGrowingSegmentsSize() channelSealPolicy {
}
}
func sealByBlockingL0(meta *meta) channelSealPolicy {
return func(channel string, segments []*SegmentInfo, _ Timestamp) ([]*SegmentInfo, string) {
if len(segments) == 0 {
return nil, ""
}
sizeLimit := paramtable.Get().DataCoordCfg.BlockingL0SizeInMB.GetAsInt64() * 1024 * 1024 // MB to bytes
entryNumLimit := paramtable.Get().DataCoordCfg.BlockingL0EntryNum.GetAsInt64()
if sizeLimit < 0 && entryNumLimit < 0 {
// both policies are disable, just return
return nil, ""
}
isLimitMet := func(blockingSize, blockingEntryNum int64) bool {
return (sizeLimit < 0 || blockingSize < sizeLimit) &&
(entryNumLimit < 0 || blockingEntryNum < entryNumLimit)
}
l0segments := meta.SelectSegments(context.TODO(), WithChannel(channel), SegmentFilterFunc(func(segment *SegmentInfo) bool {
return segment.GetLevel() == datapb.SegmentLevel_L0
}))
// sort growing by start pos
sortSegmentsByStartPosition(segments)
// example:
// G1 [0-----------------------------
// G2 [7-------------------
// G3 [4-------------------------
// G4 [10--------
// L0a [0-----5]
// L0b [6-------9]
// L0c [10------20]
// say L0a&L0b make total size/num exceed limit,
// we shall seal G1,G2,G3 since they have overlap ts range blocking l0 compaction
// calculate size & num
id2Size := lo.SliceToMap(l0segments, func(l0Segment *SegmentInfo) (int64, int64) {
return l0Segment.GetID(), int64(GetBinlogSizeAsBytes(l0Segment.GetDeltalogs()))
})
id2EntryNum := lo.SliceToMap(l0segments, func(l0Segment *SegmentInfo) (int64, int64) {
return l0Segment.GetID(), int64(GetBinlogEntriesNum(l0Segment.GetDeltalogs()))
})
// util func to calculate blocking statistics
blockingStats := func(l0Segments []*SegmentInfo, minStartTs uint64) (blockingSize int64, blockingEntryNum int64) {
for _, l0Segment := range l0Segments {
// GetBinlogSizeAsBytes()
if l0Segment.GetDmlPosition().GetTimestamp() >= minStartTs {
blockingSize += id2Size[l0Segment.GetID()]
blockingEntryNum += id2EntryNum[l0Segment.GetID()]
}
}
return blockingSize, blockingEntryNum
}
candidates := segments
var result []*SegmentInfo
for len(candidates) > 0 {
// minStartPos must be [0], since growing is sorted
blockingSize, blockingEntryNum := blockingStats(l0segments, candidates[0].GetStartPosition().GetTimestamp())
// if remaining blocking size and num are both less than configured limit, skip sealing segments
if isLimitMet(blockingSize, blockingEntryNum) {
break
}
result = append(result, candidates[0])
candidates = candidates[1:]
}
return result, fmt.Sprintf("seal segments due to blocking l0 size/num")
}
}
// sortSegmentsByLastExpires sort segmentStatus with lastExpireTime ascending order
func sortSegmentsByLastExpires(segs []*SegmentInfo) {
sort.Slice(segs, func(i, j int) bool {
@ -257,6 +333,13 @@ func sortSegmentsByLastExpires(segs []*SegmentInfo) {
})
}
// sortSegmentsByLastExpires sort segments with start position
func sortSegmentsByStartPosition(segs []*SegmentInfo) {
sort.Slice(segs, func(i, j int) bool {
return segs[i].GetStartPosition().GetTimestamp() < segs[j].GetStartPosition().GetTimestamp()
})
}
type flushPolicy func(segment *SegmentInfo, t Timestamp) bool
func flushPolicyL1(segment *SegmentInfo, t Timestamp) bool {

View File

@ -18,9 +18,11 @@ package datacoord
import (
"math/rand"
"strconv"
"testing"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -303,3 +305,126 @@ func TestFlushPolicyWithZeroCurRows(t *testing.T) {
flushed := flushPolicyL1(seg, tsoutil.GetCurrentTime())
assert.True(t, flushed)
}
func Test_sealByBlockingL0(t *testing.T) {
paramtable.Init()
pt := paramtable.Get()
type testCase struct {
tag string
channel string
sizeLimit int64
entryNumLimit int64
l0Segments []*SegmentInfo
growingSegments []*SegmentInfo
expected []int64
}
l0_1 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1001,
InsertChannel: "channel_1",
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 50, MemorySize: 1 * 1024 * 1024},
},
},
},
Level: datapb.SegmentLevel_L0,
StartPosition: &msgpb.MsgPosition{Timestamp: 10},
DmlPosition: &msgpb.MsgPosition{Timestamp: 20},
},
}
l0_2 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1002,
InsertChannel: "channel_1",
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 60, MemorySize: 2 * 1024 * 1024},
},
},
},
Level: datapb.SegmentLevel_L0,
StartPosition: &msgpb.MsgPosition{Timestamp: 30},
DmlPosition: &msgpb.MsgPosition{Timestamp: 40},
},
}
growing_1 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 2001,
InsertChannel: "channel_1",
StartPosition: &msgpb.MsgPosition{Timestamp: 10},
},
}
growing_2 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 2002,
InsertChannel: "channel_1",
StartPosition: &msgpb.MsgPosition{Timestamp: 35},
},
}
testCases := []*testCase{
{
tag: "seal_by_entrynum",
channel: "channel_1",
sizeLimit: -1,
entryNumLimit: 100,
l0Segments: []*SegmentInfo{l0_1, l0_2}, // ts: [10,20] [30, 40], entryNum: 50, 60
growingSegments: []*SegmentInfo{growing_1, growing_2}, // ts: [10, 35]
expected: []int64{2001},
},
{
tag: "seal_by_size",
channel: "channel_1",
sizeLimit: 1, // 1MB
entryNumLimit: -1,
l0Segments: []*SegmentInfo{l0_1, l0_2}, // ts: [10,20] [30, 40], entryNum: 1MB, 2MB
growingSegments: []*SegmentInfo{growing_1, growing_2}, // ts: [10, 35]
expected: []int64{2001, 2002},
},
{
tag: "empty_input",
channel: "channel_1",
growingSegments: []*SegmentInfo{growing_1, growing_2},
sizeLimit: 1,
entryNumLimit: 50,
expected: []int64{},
},
{
tag: "all_disabled",
channel: "channel_1",
l0Segments: []*SegmentInfo{l0_1, l0_2},
growingSegments: []*SegmentInfo{growing_1, growing_2},
sizeLimit: -1,
entryNumLimit: -1,
expected: []int64{},
},
}
for _, tc := range testCases {
t.Run(tc.tag, func(t *testing.T) {
pt.Save(pt.DataCoordCfg.BlockingL0SizeInMB.Key, strconv.FormatInt(tc.sizeLimit, 10))
defer pt.Reset(pt.DataCoordCfg.BlockingL0SizeInMB.Key)
pt.Save(pt.DataCoordCfg.BlockingL0EntryNum.Key, strconv.FormatInt(tc.entryNumLimit, 10))
defer pt.Reset(pt.DataCoordCfg.BlockingL0EntryNum.Key)
segments := NewSegmentsInfo()
for _, l0segment := range tc.l0Segments {
segments.SetSegment(l0segment.GetID(), l0segment)
}
meta := &meta{
segments: segments,
}
result, _ := sealByBlockingL0(meta)(tc.channel, tc.growingSegments, 0)
sealedIDs := lo.Map(result, func(segment *SegmentInfo, _ int) int64 {
return segment.GetID()
})
assert.ElementsMatch(t, tc.expected, sealedIDs)
})
}
}

View File

@ -204,9 +204,10 @@ func defaultSegmentSealPolicy() []SegmentSealPolicy {
}
}
func defaultChannelSealPolicy() []channelSealPolicy {
func defaultChannelSealPolicy(meta *meta) []channelSealPolicy {
return []channelSealPolicy{
sealByTotalGrowingSegmentsSize(),
sealByBlockingL0(meta),
}
}
@ -226,7 +227,7 @@ func newSegmentManager(meta *meta, allocator allocator.Allocator, opts ...allocO
estimatePolicy: defaultCalUpperLimitPolicy(),
allocPolicy: defaultAllocatePolicy(),
segmentSealPolicies: defaultSegmentSealPolicy(),
channelSealPolicies: defaultChannelSealPolicy(),
channelSealPolicies: defaultChannelSealPolicy(meta),
flushPolicy: defaultFlushPolicy(),
}
for _, opt := range opts {

View File

@ -3412,6 +3412,8 @@ type dataCoordConfig struct {
GrowingSegmentsMemSizeInMB ParamItem `refreshable:"true"`
AutoUpgradeSegmentIndex ParamItem `refreshable:"true"`
SegmentFlushInterval ParamItem `refreshable:"true"`
BlockingL0EntryNum ParamItem `refreshable:"true"`
BlockingL0SizeInMB ParamItem `refreshable:"true"`
// compaction
EnableCompaction ParamItem `refreshable:"false"`
@ -3678,6 +3680,26 @@ exceeds this threshold, the largest growing segment will be sealed.`,
}
p.GrowingSegmentsMemSizeInMB.Init(base.mgr)
p.BlockingL0EntryNum = ParamItem{
Key: "dataCoord.sealPolicy.channel.blockingL0EntryNum",
Version: "2.5.7",
DefaultValue: "5000000",
Doc: `If the total entry number of l0 logs of each shard
exceeds this threshold, the earliest growing segments will be sealed.`,
Export: true,
}
p.BlockingL0EntryNum.Init(base.mgr)
p.BlockingL0SizeInMB = ParamItem{
Key: "dataCoord.sealPolicy.channel.blockingL0SizeInMB",
Version: "2.5.7",
DefaultValue: "64",
Doc: `The size threshold in MB, if the total entry number of l0 logs of each shard
exceeds this threshold, the earliest growing segments will be sealed.`,
Export: true,
}
p.BlockingL0SizeInMB.Init(base.mgr)
p.EnableCompaction = ParamItem{
Key: "dataCoord.enableCompaction",
Version: "2.0.0",