mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 09:38:39 +08:00
fix: [2.5] Ignore growing segment without start pos for seal policy (#41131)
Cherry-pick from master pr: #41130 Related to #41129 Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
64255e229a
commit
281a4b0300
@ -301,7 +301,6 @@ func sealByBlockingL0(meta *meta) channelSealPolicy {
|
|||||||
// util func to calculate blocking statistics
|
// util func to calculate blocking statistics
|
||||||
blockingStats := func(l0Segments []*SegmentInfo, minStartTs uint64) (blockingSize int64, blockingEntryNum int64) {
|
blockingStats := func(l0Segments []*SegmentInfo, minStartTs uint64) (blockingSize int64, blockingEntryNum int64) {
|
||||||
for _, l0Segment := range l0Segments {
|
for _, l0Segment := range l0Segments {
|
||||||
// GetBinlogSizeAsBytes()
|
|
||||||
if l0Segment.GetDmlPosition().GetTimestamp() >= minStartTs {
|
if l0Segment.GetDmlPosition().GetTimestamp() >= minStartTs {
|
||||||
blockingSize += id2Size[l0Segment.GetID()]
|
blockingSize += id2Size[l0Segment.GetID()]
|
||||||
blockingEntryNum += id2EntryNum[l0Segment.GetID()]
|
blockingEntryNum += id2EntryNum[l0Segment.GetID()]
|
||||||
@ -314,6 +313,11 @@ func sealByBlockingL0(meta *meta) channelSealPolicy {
|
|||||||
|
|
||||||
var result []*SegmentInfo
|
var result []*SegmentInfo
|
||||||
for len(candidates) > 0 {
|
for len(candidates) > 0 {
|
||||||
|
// skip segments with nil start position
|
||||||
|
if candidates[0].GetStartPosition() == nil {
|
||||||
|
candidates = candidates[1:]
|
||||||
|
continue
|
||||||
|
}
|
||||||
// minStartPos must be [0], since growing is sorted
|
// minStartPos must be [0], since growing is sorted
|
||||||
blockingSize, blockingEntryNum := blockingStats(l0segments, candidates[0].GetStartPosition().GetTimestamp())
|
blockingSize, blockingEntryNum := blockingStats(l0segments, candidates[0].GetStartPosition().GetTimestamp())
|
||||||
|
|
||||||
|
|||||||
@ -349,6 +349,12 @@ func Test_sealByBlockingL0(t *testing.T) {
|
|||||||
StartPosition: &msgpb.MsgPosition{Timestamp: 35},
|
StartPosition: &msgpb.MsgPosition{Timestamp: 35},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
growing_3 := &SegmentInfo{
|
||||||
|
SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 2003,
|
||||||
|
InsertChannel: "channel_1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
testCases := []*testCase{
|
testCases := []*testCase{
|
||||||
{
|
{
|
||||||
@ -386,6 +392,13 @@ func Test_sealByBlockingL0(t *testing.T) {
|
|||||||
entryNumLimit: -1,
|
entryNumLimit: -1,
|
||||||
expected: []int64{},
|
expected: []int64{},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
tag: "growing_segment_with_nil_start_position",
|
||||||
|
channel: "channel_1",
|
||||||
|
l0Segments: []*SegmentInfo{l0_1, l0_2},
|
||||||
|
growingSegments: []*SegmentInfo{growing_3},
|
||||||
|
expected: []int64{},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user