From 1a65f1e01cfd73dfae526f78065fada65cbb5cae Mon Sep 17 00:00:00 2001 From: jaime Date: Thu, 2 Feb 2023 20:25:44 +0800 Subject: [PATCH] Add a segment seal policy by number of binlog files Signed-off-by: jaime --- configs/milvus.yaml | 3 + .../datacoord/segment_allocation_policy.go | 14 ++- internal/datacoord/segment_manager.go | 1 + internal/datacoord/segment_manager_test.go | 89 ++++++++++++++++++- internal/util/paramtable/component_param.go | 7 ++ 5 files changed, 112 insertions(+), 2 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index f98195feac..fd0388eb30 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -279,6 +279,9 @@ dataCoord: # `minSizeFromIdleToSealed`, Milvus will automatically seal it. maxIdleTime: 600 # The max idle time of segment in seconds, 10*60. minSizeFromIdleToSealed: 16 # The min size in MB of segment which can be idle from sealed. + # The max number of binlog file for one segment, the segment will be sealed if + # the number of binlog file reaches to max value. + maxBinlogFileNumber: 256 smallProportion: 0.5 # The segment is considered as "small segment" when its # of rows is smaller than # (smallProportion * segment max # of rows). compactableProportion: 0.85 # A compaction will happen on small segments if the segment after compaction will have diff --git a/internal/datacoord/segment_allocation_policy.go b/internal/datacoord/segment_allocation_policy.go index 478d116ea2..a051871b00 100644 --- a/internal/datacoord/segment_allocation_policy.go +++ b/internal/datacoord/segment_allocation_policy.go @@ -116,7 +116,7 @@ func getSegmentCapacityPolicy(sizeFactor float64) segmentSealPolicy { } } -// getLastExpiresLifetimePolicy get segmentSealPolicy with lifetime limit compares ts - segment.lastExpireTime +// sealByMaxBinlogSizePolicy get segmentSealPolicy with lifetime limit compares ts - segment.lastExpireTime func sealByLifetimePolicy(lifetime time.Duration) segmentSealPolicy { return func(segment *SegmentInfo, ts Timestamp) bool { pts, _ := tsoutil.ParseTS(ts) @@ -126,6 +126,18 @@ func sealByLifetimePolicy(lifetime time.Duration) segmentSealPolicy { } } +// sealByMaxBinlogSizePolicy seal segment if binlog file number of segment exceed configured max number +func sealByMaxBinlogFileNumberPolicy(maxBinlogFileNumber int) segmentSealPolicy { + return func(segment *SegmentInfo, ts Timestamp) bool { + logFileCounter := 0 + for _, fieldBinlog := range segment.Binlogs { + logFileCounter += len(fieldBinlog.GetBinlogs()) + } + + return logFileCounter >= maxBinlogFileNumber + } +} + // sealLongTimeIdlePolicy seal segment if the segment has been written with a high frequency before. // serve for this case: // If users insert entities into segment continuously within a certain period of time, but they forgot to flush/(seal) diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index e9932c5b38..f562fa413b 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -184,6 +184,7 @@ func defaultAllocatePolicy() AllocatePolicy { func defaultSegmentSealPolicy() []segmentSealPolicy { return []segmentSealPolicy{ + sealByMaxBinlogFileNumberPolicy(Params.DataCoordCfg.SegmentMaxBinlogFileNumber), sealByLifetimePolicy(Params.DataCoordCfg.SegmentMaxLifetime), getSegmentCapacityPolicy(Params.DataCoordCfg.SegmentSealProportion), sealLongTimeIdlePolicy(Params.DataCoordCfg.SegmentMaxIdleTime, Params.DataCoordCfg.SegmentMinSizeFromIdleToSealed, Params.DataCoordCfg.SegmentMaxSize), diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 3d92b3bd19..02f72302b0 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -23,12 +23,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" memkv "github.com/milvus-io/milvus/internal/kv/mem" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus/internal/util/metautil" ) func TestManagerOptions(t *testing.T) { @@ -487,6 +489,91 @@ func TestTryToSealSegment(t *testing.T) { } }) + t.Run("test sealByMaxBinlogFileNumberPolicy", func(t *testing.T) { + Params.Init() + mockAllocator := newMockAllocator() + meta, err := newMemoryMeta() + assert.Nil(t, err) + + schema := newTestSchema() + collID, err := mockAllocator.allocID(context.Background()) + assert.Nil(t, err) + meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) + segmentManager := newSegmentManager(meta, mockAllocator, nil) + allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2) + assert.Nil(t, err) + assert.EqualValues(t, 1, len(allocations)) + + ts, err := segmentManager.allocator.allocTimestamp(context.Background()) + assert.Nil(t, err) + + // No seal polices + { + err = segmentManager.tryToSealSegment(ts, "c1") + assert.Nil(t, err) + segments := segmentManager.meta.segments.segments + assert.Equal(t, 1, len(segments)) + for _, seg := range segments { + assert.Equal(t, commonpb.SegmentState_Growing, seg.GetState()) + } + } + + // Not trigger seal + { + segmentManager.segmentSealPolicies = []segmentSealPolicy{sealByMaxBinlogFileNumberPolicy(2)} + segments := segmentManager.meta.segments.segments + assert.Equal(t, 1, len(segments)) + for _, seg := range segments { + seg.Binlogs = []*datapb.FieldBinlog{ + { + FieldID: 2, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 10, + LogID: 3, + LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 2, 3), + }, + }, + }, + } + err = segmentManager.tryToSealSegment(ts, "c1") + assert.Nil(t, err) + seg = segmentManager.meta.segments.segments[seg.ID] + assert.Equal(t, commonpb.SegmentState_Growing, seg.GetState()) + } + } + + // Trigger seal + { + segmentManager.segmentSealPolicies = []segmentSealPolicy{sealByMaxBinlogFileNumberPolicy(2)} + segments := segmentManager.meta.segments.segments + assert.Equal(t, 1, len(segments)) + for _, seg := range segments { + seg.Binlogs = []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 10, + LogID: 1, + LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 3), + }, + { + EntriesNum: 20, + LogID: 2, + LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 2), + }, + }, + }, + } + err = segmentManager.tryToSealSegment(ts, "c1") + assert.Nil(t, err) + seg = segmentManager.meta.segments.segments[seg.ID] + assert.Equal(t, commonpb.SegmentState_Sealed, seg.GetState()) + } + } + }) + t.Run("seal with segment policy with kv fails", func(t *testing.T) { Params.Init() mockAllocator := newMockAllocator() diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index a317bd0bdf..3ac44992e0 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -1196,6 +1196,7 @@ type dataCoordConfig struct { SegmentMaxLifetime time.Duration SegmentMaxIdleTime time.Duration SegmentMinSizeFromIdleToSealed float64 + SegmentMaxBinlogFileNumber int CreatedTime time.Time UpdatedTime time.Time @@ -1237,6 +1238,7 @@ func (p *dataCoordConfig) init(base *BaseTable) { p.initSegmentMaxLifetime() p.initSegmentMaxIdleTime() p.initSegmentMinSizeFromIdleToSealed() + p.initSegmentMaxBinlogFileNumber() p.initEnableCompaction() p.initEnableAutoCompaction() @@ -1293,6 +1295,11 @@ func (p *dataCoordConfig) initSegmentMinSizeFromIdleToSealed() { log.Info("init segment min size from idle to sealed", zap.Float64("value", p.SegmentMinSizeFromIdleToSealed)) } +func (p *dataCoordConfig) initSegmentMaxBinlogFileNumber() { + p.SegmentMaxBinlogFileNumber = p.Base.ParseIntWithDefault("dataCoord.segment.maxBinlogFileNumber", 256) + log.Info("init segment max binlog file to sealed", zap.Int("value", p.SegmentMaxBinlogFileNumber)) +} + func (p *dataCoordConfig) initChannelWatchPrefix() { // WARN: this value should not be put to milvus.yaml. It's a default value for channel watch path. // This will be removed after we reconstruct our config module.