diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 313781d2b9..db4bb1dcae 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -283,6 +283,9 @@ dataCoord: # `minSizeFromIdleToSealed`, Milvus will automatically seal it. maxIdleTime: 600 # The max idle time of segment in seconds, 10*60. minSizeFromIdleToSealed: 16 # The min size in MB of segment which can be idle from sealed. + # The max number of binlog file for one segment, the segment will be sealed if + # the number of binlog file reaches to max value. + maxBinlogFileNumber: 256 smallProportion: 0.5 # The segment is considered as "small segment" when its # of rows is smaller than # (smallProportion * segment max # of rows). compactableProportion: 0.5 # A compaction will happen on small segments if the segment after compaction will have diff --git a/internal/datacoord/segment_allocation_policy.go b/internal/datacoord/segment_allocation_policy.go index cc3400ad20..abcea9a69d 100644 --- a/internal/datacoord/segment_allocation_policy.go +++ b/internal/datacoord/segment_allocation_policy.go @@ -116,7 +116,7 @@ func getSegmentCapacityPolicy(sizeFactor float64) segmentSealPolicy { } } -// getLastExpiresLifetimePolicy get segmentSealPolicy with lifetime limit compares ts - segment.lastExpireTime +// sealByMaxBinlogSizePolicy get segmentSealPolicy with lifetime limit compares ts - segment.lastExpireTime func sealByLifetimePolicy(lifetime time.Duration) segmentSealPolicy { return func(segment *SegmentInfo, ts Timestamp) bool { pts, _ := tsoutil.ParseTS(ts) @@ -126,6 +126,18 @@ func sealByLifetimePolicy(lifetime time.Duration) segmentSealPolicy { } } +// sealByMaxBinlogSizePolicy seal segment if binlog file number of segment exceed configured max number +func sealByMaxBinlogFileNumberPolicy(maxBinlogFileNumber int) segmentSealPolicy { + return func(segment *SegmentInfo, ts Timestamp) bool { + logFileCounter := 0 + for _, fieldBinlog := range segment.Binlogs { + logFileCounter += len(fieldBinlog.GetBinlogs()) + } + + return logFileCounter >= maxBinlogFileNumber + } +} + // sealLongTimeIdlePolicy seal segment if the segment has been written with a high frequency before. // serve for this case: // If users insert entities into segment continuously within a certain period of time, but they forgot to flush/(seal) diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index fc705892a8..b02a7c0f2b 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -185,6 +185,7 @@ func defaultAllocatePolicy() AllocatePolicy { func defaultSegmentSealPolicy() []segmentSealPolicy { return []segmentSealPolicy{ + sealByMaxBinlogFileNumberPolicy(Params.DataCoordCfg.SegmentMaxBinlogFileNumber.GetAsInt()), sealByLifetimePolicy(Params.DataCoordCfg.SegmentMaxLifetime.GetAsDuration(time.Second)), getSegmentCapacityPolicy(Params.DataCoordCfg.SegmentSealProportion.GetAsFloat()), sealLongTimeIdlePolicy(Params.DataCoordCfg.SegmentMaxIdleTime.GetAsDuration(time.Second), Params.DataCoordCfg.SegmentMinSizeFromIdleToSealed.GetAsFloat(), Params.DataCoordCfg.SegmentMaxSize.GetAsFloat()), diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 3d92b3bd19..02f72302b0 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -23,12 +23,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" memkv "github.com/milvus-io/milvus/internal/kv/mem" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus/internal/util/metautil" ) func TestManagerOptions(t *testing.T) { @@ -487,6 +489,91 @@ func TestTryToSealSegment(t *testing.T) { } }) + t.Run("test sealByMaxBinlogFileNumberPolicy", func(t *testing.T) { + Params.Init() + mockAllocator := newMockAllocator() + meta, err := newMemoryMeta() + assert.Nil(t, err) + + schema := newTestSchema() + collID, err := mockAllocator.allocID(context.Background()) + assert.Nil(t, err) + meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) + segmentManager := newSegmentManager(meta, mockAllocator, nil) + allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2) + assert.Nil(t, err) + assert.EqualValues(t, 1, len(allocations)) + + ts, err := segmentManager.allocator.allocTimestamp(context.Background()) + assert.Nil(t, err) + + // No seal polices + { + err = segmentManager.tryToSealSegment(ts, "c1") + assert.Nil(t, err) + segments := segmentManager.meta.segments.segments + assert.Equal(t, 1, len(segments)) + for _, seg := range segments { + assert.Equal(t, commonpb.SegmentState_Growing, seg.GetState()) + } + } + + // Not trigger seal + { + segmentManager.segmentSealPolicies = []segmentSealPolicy{sealByMaxBinlogFileNumberPolicy(2)} + segments := segmentManager.meta.segments.segments + assert.Equal(t, 1, len(segments)) + for _, seg := range segments { + seg.Binlogs = []*datapb.FieldBinlog{ + { + FieldID: 2, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 10, + LogID: 3, + LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 2, 3), + }, + }, + }, + } + err = segmentManager.tryToSealSegment(ts, "c1") + assert.Nil(t, err) + seg = segmentManager.meta.segments.segments[seg.ID] + assert.Equal(t, commonpb.SegmentState_Growing, seg.GetState()) + } + } + + // Trigger seal + { + segmentManager.segmentSealPolicies = []segmentSealPolicy{sealByMaxBinlogFileNumberPolicy(2)} + segments := segmentManager.meta.segments.segments + assert.Equal(t, 1, len(segments)) + for _, seg := range segments { + seg.Binlogs = []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 10, + LogID: 1, + LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 3), + }, + { + EntriesNum: 20, + LogID: 2, + LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 2), + }, + }, + }, + } + err = segmentManager.tryToSealSegment(ts, "c1") + assert.Nil(t, err) + seg = segmentManager.meta.segments.segments[seg.ID] + assert.Equal(t, commonpb.SegmentState_Sealed, seg.GetState()) + } + } + }) + t.Run("seal with segment policy with kv fails", func(t *testing.T) { Params.Init() mockAllocator := newMockAllocator() diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 7245f74803..18ff262248 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -1298,6 +1298,7 @@ type dataCoordConfig struct { SegmentMaxLifetime ParamItem `refreshable:"false"` SegmentMaxIdleTime ParamItem `refreshable:"false"` SegmentMinSizeFromIdleToSealed ParamItem `refreshable:"false"` + SegmentMaxBinlogFileNumber ParamItem `refreshable:"false"` // compaction EnableCompaction ParamItem `refreshable:"false"` @@ -1382,6 +1383,13 @@ func (p *dataCoordConfig) init(base *BaseTable) { } p.SegmentMinSizeFromIdleToSealed.Init(base.mgr) + p.SegmentMaxBinlogFileNumber = ParamItem{ + Key: "dataCoord.segment.maxBinlogFileNumber", + Version: "2.0.0", + DefaultValue: "256", + } + p.SegmentMaxBinlogFileNumber.Init(base.mgr) + p.EnableCompaction = ParamItem{ Key: "dataCoord.enableCompaction", Version: "2.0.0",