diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 28796af6a5..e91d3a8f16 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -556,6 +556,12 @@ dataCoord: # The size threshold in MB, if the total size of growing segments of each shard # exceeds this threshold, the largest growing segment will be sealed. growingSegmentsMemSize: 4096 + # If the total entry number of l0 logs of each shard + # exceeds this threshold, the earliest growing segments will be sealed. + blockingL0EntryNum: 5000000 + # The size threshold in MB, if the total entry number of l0 logs of each shard + # exceeds this threshold, the earliest growing segments will be sealed. + blockingL0SizeInMB: 64 autoUpgradeSegmentIndex: false # whether auto upgrade segment index to index engine's version segmentFlushInterval: 2 # the minimal interval duration(unit: Seconds) between flusing operation on same segment # Switch value to control if to enable segment compaction. diff --git a/internal/datacoord/segment_allocation_policy.go b/internal/datacoord/segment_allocation_policy.go index d80ba9db47..a4e7b4f45e 100644 --- a/internal/datacoord/segment_allocation_policy.go +++ b/internal/datacoord/segment_allocation_policy.go @@ -17,6 +17,7 @@ package datacoord import ( + "context" "fmt" "math/rand" "sort" @@ -250,6 +251,81 @@ func sealByTotalGrowingSegmentsSize() channelSealPolicy { } } +func sealByBlockingL0(meta *meta) channelSealPolicy { + return func(channel string, segments []*SegmentInfo, _ Timestamp) ([]*SegmentInfo, string) { + if len(segments) == 0 { + return nil, "" + } + + sizeLimit := paramtable.Get().DataCoordCfg.BlockingL0SizeInMB.GetAsInt64() * 1024 * 1024 // MB to bytes + entryNumLimit := paramtable.Get().DataCoordCfg.BlockingL0EntryNum.GetAsInt64() + + if sizeLimit < 0 && entryNumLimit < 0 { + // both policies are disable, just return + return nil, "" + } + + isLimitMet := func(blockingSize, blockingEntryNum int64) bool { + return (sizeLimit < 0 || blockingSize < sizeLimit) && + (entryNumLimit < 0 || blockingEntryNum < entryNumLimit) + } + + l0segments := meta.SelectSegments(context.TODO(), WithChannel(channel), SegmentFilterFunc(func(segment *SegmentInfo) bool { + return segment.GetLevel() == datapb.SegmentLevel_L0 + })) + + // sort growing by start pos + sortSegmentsByStartPosition(segments) + + // example: + // G1 [0----------------------------- + // G2 [7------------------- + // G3 [4------------------------- + // G4 [10-------- + // L0a [0-----5] + // L0b [6-------9] + // L0c [10------20] + // say L0a&L0b make total size/num exceed limit, + // we shall seal G1,G2,G3 since they have overlap ts range blocking l0 compaction + + // calculate size & num + id2Size := lo.SliceToMap(l0segments, func(l0Segment *SegmentInfo) (int64, int64) { + return l0Segment.GetID(), int64(GetBinlogSizeAsBytes(l0Segment.GetDeltalogs())) + }) + id2EntryNum := lo.SliceToMap(l0segments, func(l0Segment *SegmentInfo) (int64, int64) { + return l0Segment.GetID(), int64(GetBinlogEntriesNum(l0Segment.GetDeltalogs())) + }) + + // util func to calculate blocking statistics + blockingStats := func(l0Segments []*SegmentInfo, minStartTs uint64) (blockingSize int64, blockingEntryNum int64) { + for _, l0Segment := range l0Segments { + // GetBinlogSizeAsBytes() + if l0Segment.GetDmlPosition().GetTimestamp() >= minStartTs { + blockingSize += id2Size[l0Segment.GetID()] + blockingEntryNum += id2EntryNum[l0Segment.GetID()] + } + } + return blockingSize, blockingEntryNum + } + + candidates := segments + + var result []*SegmentInfo + for len(candidates) > 0 { + // minStartPos must be [0], since growing is sorted + blockingSize, blockingEntryNum := blockingStats(l0segments, candidates[0].GetStartPosition().GetTimestamp()) + + // if remaining blocking size and num are both less than configured limit, skip sealing segments + if isLimitMet(blockingSize, blockingEntryNum) { + break + } + result = append(result, candidates[0]) + candidates = candidates[1:] + } + return result, fmt.Sprintf("seal segments due to blocking l0 size/num") + } +} + // sortSegmentsByLastExpires sort segmentStatus with lastExpireTime ascending order func sortSegmentsByLastExpires(segs []*SegmentInfo) { sort.Slice(segs, func(i, j int) bool { @@ -257,6 +333,13 @@ func sortSegmentsByLastExpires(segs []*SegmentInfo) { }) } +// sortSegmentsByLastExpires sort segments with start position +func sortSegmentsByStartPosition(segs []*SegmentInfo) { + sort.Slice(segs, func(i, j int) bool { + return segs[i].GetStartPosition().GetTimestamp() < segs[j].GetStartPosition().GetTimestamp() + }) +} + type flushPolicy func(segment *SegmentInfo, t Timestamp) bool func flushPolicyL1(segment *SegmentInfo, t Timestamp) bool { diff --git a/internal/datacoord/segment_allocation_policy_test.go b/internal/datacoord/segment_allocation_policy_test.go index b3bf6b4b54..44449b33a5 100644 --- a/internal/datacoord/segment_allocation_policy_test.go +++ b/internal/datacoord/segment_allocation_policy_test.go @@ -18,9 +18,11 @@ package datacoord import ( "math/rand" + "strconv" "testing" "time" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -303,3 +305,126 @@ func TestFlushPolicyWithZeroCurRows(t *testing.T) { flushed := flushPolicyL1(seg, tsoutil.GetCurrentTime()) assert.True(t, flushed) } + +func Test_sealByBlockingL0(t *testing.T) { + paramtable.Init() + pt := paramtable.Get() + type testCase struct { + tag string + channel string + sizeLimit int64 + entryNumLimit int64 + l0Segments []*SegmentInfo + growingSegments []*SegmentInfo + expected []int64 + } + + l0_1 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1001, + InsertChannel: "channel_1", + Deltalogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {EntriesNum: 50, MemorySize: 1 * 1024 * 1024}, + }, + }, + }, + Level: datapb.SegmentLevel_L0, + StartPosition: &msgpb.MsgPosition{Timestamp: 10}, + DmlPosition: &msgpb.MsgPosition{Timestamp: 20}, + }, + } + l0_2 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1002, + InsertChannel: "channel_1", + Deltalogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {EntriesNum: 60, MemorySize: 2 * 1024 * 1024}, + }, + }, + }, + Level: datapb.SegmentLevel_L0, + StartPosition: &msgpb.MsgPosition{Timestamp: 30}, + DmlPosition: &msgpb.MsgPosition{Timestamp: 40}, + }, + } + growing_1 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 2001, + InsertChannel: "channel_1", + StartPosition: &msgpb.MsgPosition{Timestamp: 10}, + }, + } + growing_2 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 2002, + InsertChannel: "channel_1", + StartPosition: &msgpb.MsgPosition{Timestamp: 35}, + }, + } + + testCases := []*testCase{ + { + tag: "seal_by_entrynum", + channel: "channel_1", + sizeLimit: -1, + entryNumLimit: 100, + l0Segments: []*SegmentInfo{l0_1, l0_2}, // ts: [10,20] [30, 40], entryNum: 50, 60 + growingSegments: []*SegmentInfo{growing_1, growing_2}, // ts: [10, 35] + expected: []int64{2001}, + }, + { + tag: "seal_by_size", + channel: "channel_1", + sizeLimit: 1, // 1MB + entryNumLimit: -1, + l0Segments: []*SegmentInfo{l0_1, l0_2}, // ts: [10,20] [30, 40], entryNum: 1MB, 2MB + growingSegments: []*SegmentInfo{growing_1, growing_2}, // ts: [10, 35] + expected: []int64{2001, 2002}, + }, + { + tag: "empty_input", + channel: "channel_1", + growingSegments: []*SegmentInfo{growing_1, growing_2}, + sizeLimit: 1, + entryNumLimit: 50, + expected: []int64{}, + }, + { + tag: "all_disabled", + channel: "channel_1", + l0Segments: []*SegmentInfo{l0_1, l0_2}, + growingSegments: []*SegmentInfo{growing_1, growing_2}, + sizeLimit: -1, + entryNumLimit: -1, + expected: []int64{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.tag, func(t *testing.T) { + pt.Save(pt.DataCoordCfg.BlockingL0SizeInMB.Key, strconv.FormatInt(tc.sizeLimit, 10)) + defer pt.Reset(pt.DataCoordCfg.BlockingL0SizeInMB.Key) + pt.Save(pt.DataCoordCfg.BlockingL0EntryNum.Key, strconv.FormatInt(tc.entryNumLimit, 10)) + defer pt.Reset(pt.DataCoordCfg.BlockingL0EntryNum.Key) + + segments := NewSegmentsInfo() + for _, l0segment := range tc.l0Segments { + segments.SetSegment(l0segment.GetID(), l0segment) + } + + meta := &meta{ + segments: segments, + } + + result, _ := sealByBlockingL0(meta)(tc.channel, tc.growingSegments, 0) + sealedIDs := lo.Map(result, func(segment *SegmentInfo, _ int) int64 { + return segment.GetID() + }) + assert.ElementsMatch(t, tc.expected, sealedIDs) + }) + } +} diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 0177ae221b..24a4511509 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -204,9 +204,10 @@ func defaultSegmentSealPolicy() []SegmentSealPolicy { } } -func defaultChannelSealPolicy() []channelSealPolicy { +func defaultChannelSealPolicy(meta *meta) []channelSealPolicy { return []channelSealPolicy{ sealByTotalGrowingSegmentsSize(), + sealByBlockingL0(meta), } } @@ -226,7 +227,7 @@ func newSegmentManager(meta *meta, allocator allocator.Allocator, opts ...allocO estimatePolicy: defaultCalUpperLimitPolicy(), allocPolicy: defaultAllocatePolicy(), segmentSealPolicies: defaultSegmentSealPolicy(), - channelSealPolicies: defaultChannelSealPolicy(), + channelSealPolicies: defaultChannelSealPolicy(meta), flushPolicy: defaultFlushPolicy(), } for _, opt := range opts { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index ff8647df55..408accd9d6 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3412,6 +3412,8 @@ type dataCoordConfig struct { GrowingSegmentsMemSizeInMB ParamItem `refreshable:"true"` AutoUpgradeSegmentIndex ParamItem `refreshable:"true"` SegmentFlushInterval ParamItem `refreshable:"true"` + BlockingL0EntryNum ParamItem `refreshable:"true"` + BlockingL0SizeInMB ParamItem `refreshable:"true"` // compaction EnableCompaction ParamItem `refreshable:"false"` @@ -3678,6 +3680,26 @@ exceeds this threshold, the largest growing segment will be sealed.`, } p.GrowingSegmentsMemSizeInMB.Init(base.mgr) + p.BlockingL0EntryNum = ParamItem{ + Key: "dataCoord.sealPolicy.channel.blockingL0EntryNum", + Version: "2.5.7", + DefaultValue: "5000000", + Doc: `If the total entry number of l0 logs of each shard +exceeds this threshold, the earliest growing segments will be sealed.`, + Export: true, + } + p.BlockingL0EntryNum.Init(base.mgr) + + p.BlockingL0SizeInMB = ParamItem{ + Key: "dataCoord.sealPolicy.channel.blockingL0SizeInMB", + Version: "2.5.7", + DefaultValue: "64", + Doc: `The size threshold in MB, if the total entry number of l0 logs of each shard +exceeds this threshold, the earliest growing segments will be sealed.`, + Export: true, + } + p.BlockingL0SizeInMB.Init(base.mgr) + p.EnableCompaction = ParamItem{ Key: "dataCoord.enableCompaction", Version: "2.0.0",