Add a segment seal policy by number of binlog files (#21263)

Signed-off-by: yun.zhang <yun.zhang@zilliz.com>
This commit is contained in:
jaime 2022-12-20 14:09:25 +08:00 committed by GitHub
parent 856bceec27
commit d401608899
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 113 additions and 2 deletions

View File

@ -283,6 +283,9 @@ dataCoord:
# `minSizeFromIdleToSealed`, Milvus will automatically seal it. # `minSizeFromIdleToSealed`, Milvus will automatically seal it.
maxIdleTime: 600 # The max idle time of segment in seconds, 10*60. maxIdleTime: 600 # The max idle time of segment in seconds, 10*60.
minSizeFromIdleToSealed: 16 # The min size in MB of segment which can be idle from sealed. minSizeFromIdleToSealed: 16 # The min size in MB of segment which can be idle from sealed.
# The max number of binlog file for one segment, the segment will be sealed if
# the number of binlog file reaches to max value.
maxBinlogFileNumber: 256
smallProportion: 0.5 # The segment is considered as "small segment" when its # of rows is smaller than smallProportion: 0.5 # The segment is considered as "small segment" when its # of rows is smaller than
# (smallProportion * segment max # of rows). # (smallProportion * segment max # of rows).
compactableProportion: 0.5 # A compaction will happen on small segments if the segment after compaction will have compactableProportion: 0.5 # A compaction will happen on small segments if the segment after compaction will have

View File

@ -116,7 +116,7 @@ func getSegmentCapacityPolicy(sizeFactor float64) segmentSealPolicy {
} }
} }
// getLastExpiresLifetimePolicy get segmentSealPolicy with lifetime limit compares ts - segment.lastExpireTime // sealByMaxBinlogSizePolicy get segmentSealPolicy with lifetime limit compares ts - segment.lastExpireTime
func sealByLifetimePolicy(lifetime time.Duration) segmentSealPolicy { func sealByLifetimePolicy(lifetime time.Duration) segmentSealPolicy {
return func(segment *SegmentInfo, ts Timestamp) bool { return func(segment *SegmentInfo, ts Timestamp) bool {
pts, _ := tsoutil.ParseTS(ts) pts, _ := tsoutil.ParseTS(ts)
@ -126,6 +126,18 @@ func sealByLifetimePolicy(lifetime time.Duration) segmentSealPolicy {
} }
} }
// sealByMaxBinlogSizePolicy seal segment if binlog file number of segment exceed configured max number
func sealByMaxBinlogFileNumberPolicy(maxBinlogFileNumber int) segmentSealPolicy {
return func(segment *SegmentInfo, ts Timestamp) bool {
logFileCounter := 0
for _, fieldBinlog := range segment.Binlogs {
logFileCounter += len(fieldBinlog.GetBinlogs())
}
return logFileCounter >= maxBinlogFileNumber
}
}
// sealLongTimeIdlePolicy seal segment if the segment has been written with a high frequency before. // sealLongTimeIdlePolicy seal segment if the segment has been written with a high frequency before.
// serve for this case: // serve for this case:
// If users insert entities into segment continuously within a certain period of time, but they forgot to flush/(seal) // If users insert entities into segment continuously within a certain period of time, but they forgot to flush/(seal)

View File

@ -185,6 +185,7 @@ func defaultAllocatePolicy() AllocatePolicy {
func defaultSegmentSealPolicy() []segmentSealPolicy { func defaultSegmentSealPolicy() []segmentSealPolicy {
return []segmentSealPolicy{ return []segmentSealPolicy{
sealByMaxBinlogFileNumberPolicy(Params.DataCoordCfg.SegmentMaxBinlogFileNumber.GetAsInt()),
sealByLifetimePolicy(Params.DataCoordCfg.SegmentMaxLifetime.GetAsDuration(time.Second)), sealByLifetimePolicy(Params.DataCoordCfg.SegmentMaxLifetime.GetAsDuration(time.Second)),
getSegmentCapacityPolicy(Params.DataCoordCfg.SegmentSealProportion.GetAsFloat()), getSegmentCapacityPolicy(Params.DataCoordCfg.SegmentSealProportion.GetAsFloat()),
sealLongTimeIdlePolicy(Params.DataCoordCfg.SegmentMaxIdleTime.GetAsDuration(time.Second), Params.DataCoordCfg.SegmentMinSizeFromIdleToSealed.GetAsFloat(), Params.DataCoordCfg.SegmentMaxSize.GetAsFloat()), sealLongTimeIdlePolicy(Params.DataCoordCfg.SegmentMaxIdleTime.GetAsDuration(time.Second), Params.DataCoordCfg.SegmentMinSizeFromIdleToSealed.GetAsFloat(), Params.DataCoordCfg.SegmentMaxSize.GetAsFloat()),

View File

@ -23,12 +23,14 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus-proto/go-api/schemapb"
memkv "github.com/milvus-io/milvus/internal/kv/mem" memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/stretchr/testify/assert" "github.com/milvus-io/milvus/internal/util/metautil"
) )
func TestManagerOptions(t *testing.T) { func TestManagerOptions(t *testing.T) {
@ -487,6 +489,91 @@ func TestTryToSealSegment(t *testing.T) {
} }
}) })
t.Run("test sealByMaxBinlogFileNumberPolicy", func(t *testing.T) {
Params.Init()
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta()
assert.Nil(t, err)
schema := newTestSchema()
collID, err := mockAllocator.allocID(context.Background())
assert.Nil(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, nil)
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
ts, err := segmentManager.allocator.allocTimestamp(context.Background())
assert.Nil(t, err)
// No seal polices
{
err = segmentManager.tryToSealSegment(ts, "c1")
assert.Nil(t, err)
segments := segmentManager.meta.segments.segments
assert.Equal(t, 1, len(segments))
for _, seg := range segments {
assert.Equal(t, commonpb.SegmentState_Growing, seg.GetState())
}
}
// Not trigger seal
{
segmentManager.segmentSealPolicies = []segmentSealPolicy{sealByMaxBinlogFileNumberPolicy(2)}
segments := segmentManager.meta.segments.segments
assert.Equal(t, 1, len(segments))
for _, seg := range segments {
seg.Binlogs = []*datapb.FieldBinlog{
{
FieldID: 2,
Binlogs: []*datapb.Binlog{
{
EntriesNum: 10,
LogID: 3,
LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 2, 3),
},
},
},
}
err = segmentManager.tryToSealSegment(ts, "c1")
assert.Nil(t, err)
seg = segmentManager.meta.segments.segments[seg.ID]
assert.Equal(t, commonpb.SegmentState_Growing, seg.GetState())
}
}
// Trigger seal
{
segmentManager.segmentSealPolicies = []segmentSealPolicy{sealByMaxBinlogFileNumberPolicy(2)}
segments := segmentManager.meta.segments.segments
assert.Equal(t, 1, len(segments))
for _, seg := range segments {
seg.Binlogs = []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
EntriesNum: 10,
LogID: 1,
LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 3),
},
{
EntriesNum: 20,
LogID: 2,
LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 2),
},
},
},
}
err = segmentManager.tryToSealSegment(ts, "c1")
assert.Nil(t, err)
seg = segmentManager.meta.segments.segments[seg.ID]
assert.Equal(t, commonpb.SegmentState_Sealed, seg.GetState())
}
}
})
t.Run("seal with segment policy with kv fails", func(t *testing.T) { t.Run("seal with segment policy with kv fails", func(t *testing.T) {
Params.Init() Params.Init()
mockAllocator := newMockAllocator() mockAllocator := newMockAllocator()

View File

@ -1298,6 +1298,7 @@ type dataCoordConfig struct {
SegmentMaxLifetime ParamItem `refreshable:"false"` SegmentMaxLifetime ParamItem `refreshable:"false"`
SegmentMaxIdleTime ParamItem `refreshable:"false"` SegmentMaxIdleTime ParamItem `refreshable:"false"`
SegmentMinSizeFromIdleToSealed ParamItem `refreshable:"false"` SegmentMinSizeFromIdleToSealed ParamItem `refreshable:"false"`
SegmentMaxBinlogFileNumber ParamItem `refreshable:"false"`
// compaction // compaction
EnableCompaction ParamItem `refreshable:"false"` EnableCompaction ParamItem `refreshable:"false"`
@ -1382,6 +1383,13 @@ func (p *dataCoordConfig) init(base *BaseTable) {
} }
p.SegmentMinSizeFromIdleToSealed.Init(base.mgr) p.SegmentMinSizeFromIdleToSealed.Init(base.mgr)
p.SegmentMaxBinlogFileNumber = ParamItem{
Key: "dataCoord.segment.maxBinlogFileNumber",
Version: "2.0.0",
DefaultValue: "256",
}
p.SegmentMaxBinlogFileNumber.Init(base.mgr)
p.EnableCompaction = ParamItem{ p.EnableCompaction = ParamItem{
Key: "dataCoord.enableCompaction", Key: "dataCoord.enableCompaction",
Version: "2.0.0", Version: "2.0.0",