From ca2e27f5769da61e8afb72021c583fe79a75bb41 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Sat, 13 Dec 2025 02:58:46 +0800 Subject: [PATCH] enhance: remove uncessary segment size estimation and make it configurable (#46302) fix #46300 remove unused segment size estimation, and make size estimation configurable Signed-off-by: xiaofanluan --- configs/milvus.yaml | 4 + .../datacoord/compaction_policy_clustering.go | 44 ++++- .../compaction_policy_clustering_test.go | 8 + internal/datacoord/compaction_trigger.go | 20 +-- internal/datacoord/compaction_trigger_test.go | 154 ++++++++---------- .../datacoord/segment_allocation_policy.go | 31 ---- .../segment_allocation_policy_test.go | 52 ------ pkg/util/paramtable/component_param.go | 48 +++++- pkg/util/typeutil/field_length_limit.go | 88 ++++++++++ pkg/util/typeutil/schema.go | 13 +- 10 files changed, 270 insertions(+), 192 deletions(-) create mode 100644 pkg/util/typeutil/field_length_limit.go diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 9797ab6868..aefe5815c1 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -1003,6 +1003,10 @@ common: session: ttl: 15 # ttl value when session granting a lease to register service retryTimes: 30 # retry times when session sending etcd requests + estimate: + varCharLengthAvg: 256 # average length considered per VarChar/Text field when estimating record size + dynamicFieldLengthAvg: 512 # average length considered per JSON/Array/Geometry field when estimating record size + sparseFloatVectorSize: 1200 # fallback size (bytes) used when estimating sparse float vector fields locks: metrics: enable: false # whether gather statistics for metrics locks diff --git a/internal/datacoord/compaction_policy_clustering.go b/internal/datacoord/compaction_policy_clustering.go index 00583a14aa..1bf6989b8c 100644 --- a/internal/datacoord/compaction_policy_clustering.go +++ b/internal/datacoord/compaction_policy_clustering.go @@ -205,21 +205,57 @@ func (policy *clusteringCompactionPolicy) collectionIsClusteringCompacting(colle } func calculateClusteringCompactionConfig(coll *collectionInfo, view CompactionView, expectedSegmentSize int64) (totalRows, maxSegmentRows, preferSegmentRows int64, err error) { - for _, s := range view.GetSegmentsView() { + segments := view.GetSegmentsView() + for _, s := range segments { totalRows += s.NumOfRows } clusteringMaxSegmentSizeRatio := paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.GetAsFloat() clusteringPreferSegmentSizeRatio := paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat() - maxRows, err := calBySegmentSizePolicy(coll.Schema, expectedSegmentSize) + segmentRows, err := estimateRowsBySegmentSize(segments, expectedSegmentSize) if err != nil { return 0, 0, 0, err } - maxSegmentRows = int64(float64(maxRows) * clusteringMaxSegmentSizeRatio) - preferSegmentRows = int64(float64(maxRows) * clusteringPreferSegmentSizeRatio) + maxSegmentRows = int64(float64(segmentRows) * clusteringMaxSegmentSizeRatio) + preferSegmentRows = int64(float64(segmentRows) * clusteringPreferSegmentSizeRatio) return } +func estimateRowsBySegmentSize(segments []*SegmentView, expectedSegmentSize int64) (int64, error) { + if expectedSegmentSize <= 0 { + return 0, fmt.Errorf("invalid expected segment size %d", expectedSegmentSize) + } + + var totalSize float64 + var totalRows int64 + for _, segment := range segments { + if segment == nil { + continue + } + if segment.NumOfRows <= 0 || segment.Size <= 0 { + continue + } + totalSize += segment.Size + totalRows += segment.NumOfRows + } + + if totalRows == 0 || totalSize == 0 { + return 0, fmt.Errorf("segment view does not contain size info to estimate row count") + } + + rowSize := totalSize / float64(totalRows) + if rowSize <= 0 { + return 0, fmt.Errorf("invalid row size calculated from segment view") + } + + rows := float64(expectedSegmentSize) / rowSize + if rows <= 0 { + return 0, fmt.Errorf("estimated max row count is not positive") + } + + return int64(rows), nil +} + func triggerClusteringCompactionPolicy(ctx context.Context, meta *meta, collectionID int64, partitionID int64, channel string, segments []*SegmentInfo) (bool, error) { log := log.With(zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID)) currentVersion := meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) diff --git a/internal/datacoord/compaction_policy_clustering_test.go b/internal/datacoord/compaction_policy_clustering_test.go index adcd7cfc0d..d29d869708 100644 --- a/internal/datacoord/compaction_policy_clustering_test.go +++ b/internal/datacoord/compaction_policy_clustering_test.go @@ -185,6 +185,14 @@ func (s *ClusteringCompactionPolicySuite) TestCalculateClusteringCompactionConfi for _, test := range testCases { s.Run(test.description, func() { expectedSegmentSize := getExpectedSegmentSize(s.meta, test.coll.ID, test.coll.Schema) + if view, ok := test.view.(*ClusteringSegmentsView); ok { + for _, segment := range view.segments { + if segment == nil || segment.NumOfRows <= 0 || test.maxSegmentRows == 0 { + continue + } + segment.Size = float64(expectedSegmentSize) * float64(segment.NumOfRows) / float64(test.maxSegmentRows) + } + } totalRows, maxSegmentRows, preferSegmentRows, err := calculateClusteringCompactionConfig(test.coll, test.view, expectedSegmentSize) s.Equal(test.totalRows, totalRows) s.Equal(test.maxSegmentRows, maxSegmentRows) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index c289608be5..888a44da77 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -128,8 +128,6 @@ type compactionTrigger struct { indexEngineVersionManager IndexEngineVersionManager - estimateNonDiskSegmentPolicy calUpperLimitPolicy - estimateDiskSegmentPolicy calUpperLimitPolicy // A sloopy hack, so we can test with different segment row count without worrying that // they are re-calculated in every compaction. testingOnly bool @@ -143,16 +141,14 @@ func newCompactionTrigger( indexVersionManager IndexEngineVersionManager, ) *compactionTrigger { return &compactionTrigger{ - meta: meta, - allocator: allocator, - signals: make(chan *compactionSignal, 100), - manualSignals: make(chan *compactionSignal, 100), - inspector: inspector, - indexEngineVersionManager: indexVersionManager, - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, - estimateNonDiskSegmentPolicy: calBySchemaPolicy, - handler: handler, - closeCh: lifetime.NewSafeChan(), + meta: meta, + allocator: allocator, + signals: make(chan *compactionSignal, 100), + manualSignals: make(chan *compactionSignal, 100), + inspector: inspector, + indexEngineVersionManager: indexVersionManager, + handler: handler, + closeCh: lifetime.NewSafeChan(), } } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 4a4718faed..16879f9db2 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -642,17 +642,15 @@ func Test_compactionTrigger_force(t *testing.T) { tt.fields.inspector.(*spyCompactionInspector).meta = tt.fields.meta t.Run(tt.name, func(t *testing.T) { tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: make(chan *compactionSignal, 100), - manualSignals: make(chan *compactionSignal, 100), - inspector: tt.fields.inspector, - globalTrigger: tt.fields.globalTrigger, - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, - estimateNonDiskSegmentPolicy: calBySchemaPolicy, - closeCh: lifetime.NewSafeChan(), - testingOnly: true, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: make(chan *compactionSignal, 100), + manualSignals: make(chan *compactionSignal, 100), + inspector: tt.fields.inspector, + globalTrigger: tt.fields.globalTrigger, + closeCh: lifetime.NewSafeChan(), + testingOnly: true, } tr.closeWaiter.Add(1) go func() { @@ -687,17 +685,15 @@ func Test_compactionTrigger_force(t *testing.T) { tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()][segment.GetID()] = segment } tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: make(chan *compactionSignal, 100), - manualSignals: make(chan *compactionSignal, 100), - inspector: tt.fields.inspector, - globalTrigger: tt.fields.globalTrigger, - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, - estimateNonDiskSegmentPolicy: calBySchemaPolicy, - closeCh: lifetime.NewSafeChan(), - testingOnly: true, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: make(chan *compactionSignal, 100), + manualSignals: make(chan *compactionSignal, 100), + inspector: tt.fields.inspector, + globalTrigger: tt.fields.globalTrigger, + closeCh: lifetime.NewSafeChan(), + testingOnly: true, } tr.closeWaiter.Add(1) go func() { @@ -726,17 +722,15 @@ func Test_compactionTrigger_force(t *testing.T) { segment.CollectionID = 1111 } tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: make(chan *compactionSignal, 100), - manualSignals: make(chan *compactionSignal, 100), - inspector: tt.fields.inspector, - globalTrigger: tt.fields.globalTrigger, - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, - estimateNonDiskSegmentPolicy: calBySchemaPolicy, - closeCh: lifetime.NewSafeChan(), - testingOnly: true, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: make(chan *compactionSignal, 100), + manualSignals: make(chan *compactionSignal, 100), + inspector: tt.fields.inspector, + globalTrigger: tt.fields.globalTrigger, + closeCh: lifetime.NewSafeChan(), + testingOnly: true, } tr.closeWaiter.Add(1) go func() { @@ -976,17 +970,15 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { (tt.fields.inspector).(*spyCompactionInspector).meta = tt.fields.meta t.Run(tt.name, func(t *testing.T) { tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: make(chan *compactionSignal, 100), - manualSignals: make(chan *compactionSignal, 100), - inspector: tt.fields.inspector, - globalTrigger: tt.fields.globalTrigger, - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, - estimateNonDiskSegmentPolicy: calBySchemaPolicy, - closeCh: lifetime.NewSafeChan(), - testingOnly: true, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: make(chan *compactionSignal, 100), + manualSignals: make(chan *compactionSignal, 100), + inspector: tt.fields.inspector, + globalTrigger: tt.fields.globalTrigger, + closeCh: lifetime.NewSafeChan(), + testingOnly: true, } tr.closeWaiter.Add(1) go func() { @@ -1138,16 +1130,14 @@ func Test_compactionTrigger_noplan(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: make(chan *compactionSignal, 100), - inspector: tt.fields.inspector, - globalTrigger: tt.fields.globalTrigger, - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, - estimateNonDiskSegmentPolicy: calBySchemaPolicy, - closeCh: lifetime.NewSafeChan(), - testingOnly: true, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: make(chan *compactionSignal, 100), + inspector: tt.fields.inspector, + globalTrigger: tt.fields.globalTrigger, + closeCh: lifetime.NewSafeChan(), + testingOnly: true, } tr.start() defer tr.stop() @@ -1476,17 +1466,15 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { MsgID: []byte{1, 2, 3, 4}, } tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: make(chan *compactionSignal, 100), - inspector: tt.fields.inspector, - globalTrigger: tt.fields.globalTrigger, - indexEngineVersionManager: newMockVersionManager(), - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, - estimateNonDiskSegmentPolicy: calBySchemaPolicy, - closeCh: lifetime.NewSafeChan(), - testingOnly: true, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: make(chan *compactionSignal, 100), + inspector: tt.fields.inspector, + globalTrigger: tt.fields.globalTrigger, + indexEngineVersionManager: newMockVersionManager(), + closeCh: lifetime.NewSafeChan(), + testingOnly: true, } tr.start() defer tr.stop() @@ -2773,16 +2761,14 @@ func Test_compactionTrigger_generatePlans(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: tt.fields.signals, - inspector: tt.fields.inspector, - globalTrigger: tt.fields.globalTrigger, - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, - estimateNonDiskSegmentPolicy: calBySchemaPolicy, - closeCh: lifetime.NewSafeChan(), - testingOnly: true, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: tt.fields.signals, + inspector: tt.fields.inspector, + globalTrigger: tt.fields.globalTrigger, + closeCh: lifetime.NewSafeChan(), + testingOnly: true, } if got := tr.generatePlans(tt.args.segments, tt.args.signal, tt.args.compactTime, tt.args.expectedSize); !reflect.DeepEqual(got, tt.want) { @@ -3071,16 +3057,14 @@ func Test_compactionTrigger_generatePlansByTime(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: tt.fields.signals, - inspector: tt.fields.inspector, - globalTrigger: tt.fields.globalTrigger, - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, - estimateNonDiskSegmentPolicy: calBySchemaPolicy, - closeCh: lifetime.NewSafeChan(), - testingOnly: true, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: tt.fields.signals, + inspector: tt.fields.inspector, + globalTrigger: tt.fields.globalTrigger, + closeCh: lifetime.NewSafeChan(), + testingOnly: true, } got := tr.generatePlans(tt.args.segments, tt.args.signal, tt.args.compactTime, tt.args.expectedSize) diff --git a/internal/datacoord/segment_allocation_policy.go b/internal/datacoord/segment_allocation_policy.go index e77bec4a74..864a9e3231 100644 --- a/internal/datacoord/segment_allocation_policy.go +++ b/internal/datacoord/segment_allocation_policy.go @@ -52,37 +52,6 @@ func calBySchemaPolicy(schema *schemapb.CollectionSchema) (int, error) { return int(threshold / float64(sizePerRecord)), nil } -func calBySchemaPolicyWithDiskIndex(schema *schemapb.CollectionSchema) (int, error) { - if schema == nil { - return -1, errors.New("nil schema") - } - sizePerRecord, err := typeutil.EstimateSizePerRecord(schema) - if err != nil { - return -1, err - } - // check zero value, preventing panicking - if sizePerRecord == 0 { - return -1, errors.New("zero size record schema found") - } - threshold := Params.DataCoordCfg.DiskSegmentMaxSize.GetAsFloat() * 1024 * 1024 - return int(threshold / float64(sizePerRecord)), nil -} - -func calBySegmentSizePolicy(schema *schemapb.CollectionSchema, segmentSize int64) (int, error) { - if schema == nil { - return -1, errors.New("nil schema") - } - sizePerRecord, err := typeutil.EstimateSizePerRecord(schema) - if err != nil { - return -1, err - } - // check zero value, preventing panicking - if sizePerRecord == 0 { - return -1, errors.New("zero size record schema found") - } - return int(segmentSize) / sizePerRecord, nil -} - // AllocatePolicy helper function definition to allocate Segment space type AllocatePolicy func(segments []*SegmentInfo, count int64, maxCountPerL1Segment int64, level datapb.SegmentLevel) ([]*Allocation, []*Allocation) diff --git a/internal/datacoord/segment_allocation_policy_test.go b/internal/datacoord/segment_allocation_policy_test.go index b4debf265b..964036ca43 100644 --- a/internal/datacoord/segment_allocation_policy_test.go +++ b/internal/datacoord/segment_allocation_policy_test.go @@ -143,58 +143,6 @@ func TestGetChannelOpenSegCapacityPolicy(t *testing.T) { } } -func TestCalBySegmentSizePolicy(t *testing.T) { - t.Run("nil schema", func(t *testing.T) { - rows, err := calBySegmentSizePolicy(nil, 1024) - - assert.Error(t, err) - assert.Equal(t, -1, rows) - }) - - t.Run("get dim failed", func(t *testing.T) { - schema := &schemapb.CollectionSchema{ - Name: "coll1", - Description: "", - Fields: []*schemapb.FieldSchema{ - {FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, - {FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "fake"}}}, - }, - EnableDynamicField: false, - Properties: nil, - } - - rows, err := calBySegmentSizePolicy(schema, 1024) - assert.Error(t, err) - assert.Equal(t, -1, rows) - }) - - t.Run("sizePerRecord is zero", func(t *testing.T) { - schema := &schemapb.CollectionSchema{Fields: nil} - rows, err := calBySegmentSizePolicy(schema, 1024) - - assert.Error(t, err) - assert.Equal(t, -1, rows) - }) - - t.Run("normal case", func(t *testing.T) { - schema := &schemapb.CollectionSchema{ - Name: "coll1", - Description: "", - Fields: []*schemapb.FieldSchema{ - {FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, - {FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}}, - }, - EnableDynamicField: false, - Properties: nil, - } - - rows, err := calBySegmentSizePolicy(schema, 1200) - assert.NoError(t, err) - // 1200/(4*8+8) - assert.Equal(t, 30, rows) - }) -} - func TestSortSegmentsByLastExpires(t *testing.T) { segs := make([]*SegmentInfo, 0, 10) for i := 0; i < 10; i++ { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 3b07f73aec..10209bbdea 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -17,6 +17,7 @@ package paramtable import ( + "context" "fmt" "os" "path" @@ -270,7 +271,10 @@ type commonConfig struct { TopicNames ParamItem `refreshable:"true"` TimeTicker ParamItem `refreshable:"true"` - JSONMaxLength ParamItem `refreshable:"false"` + JSONMaxLength ParamItem `refreshable:"false"` + VarCharLengthAvg ParamItem `refreshable:"true"` + DynamicFieldLengthAvg ParamItem `refreshable:"true"` + SparseFloatVectorSize ParamItem `refreshable:"true"` MetricsPort ParamItem `refreshable:"false"` @@ -888,6 +892,48 @@ Large numeric passwords require double quotes to avoid yaml parsing precision is } p.JSONMaxLength.Init(base.mgr) + p.VarCharLengthAvg = ParamItem{ + Key: "common.estimate.varCharLengthAvg", + Version: "2.6.8", + DefaultValue: fmt.Sprint(typeutil.GetVarCharEstimateLength()), + Doc: "average length considered per VarChar/Text field when estimating record size", + Export: true, + } + p.VarCharLengthAvg.Init(base.mgr) + typeutil.SetVarCharEstimateLength(p.VarCharLengthAvg.GetAsInt()) + p.VarCharLengthAvg.RegisterCallback(func(_ context.Context, _, _, _ string) error { + typeutil.SetVarCharEstimateLength(p.VarCharLengthAvg.GetAsInt()) + return nil + }) + + p.DynamicFieldLengthAvg = ParamItem{ + Key: "common.estimate.dynamicFieldLengthAvg", + Version: "2.6.8", + DefaultValue: fmt.Sprint(typeutil.GetDynamicFieldEstimateLength()), + Doc: "average length considered per JSON/Array/Geometry field when estimating record size", + Export: true, + } + p.DynamicFieldLengthAvg.Init(base.mgr) + typeutil.SetDynamicFieldEstimateLength(p.DynamicFieldLengthAvg.GetAsInt()) + p.DynamicFieldLengthAvg.RegisterCallback(func(_ context.Context, _, _, _ string) error { + typeutil.SetDynamicFieldEstimateLength(p.DynamicFieldLengthAvg.GetAsInt()) + return nil + }) + + p.SparseFloatVectorSize = ParamItem{ + Key: "common.estimate.sparseFloatVectorSize", + Version: "2.6.8", + DefaultValue: fmt.Sprint(typeutil.GetSparseFloatVectorEstimateLength()), + Doc: "fallback size (bytes) used when estimating sparse float vector fields", + Export: true, + } + p.SparseFloatVectorSize.Init(base.mgr) + typeutil.SetSparseFloatVectorEstimateLength(p.SparseFloatVectorSize.GetAsInt()) + p.SparseFloatVectorSize.RegisterCallback(func(_ context.Context, _, _, _ string) error { + typeutil.SetSparseFloatVectorEstimateLength(p.SparseFloatVectorSize.GetAsInt()) + return nil + }) + p.MetricsPort = ParamItem{ Key: "common.MetricsPort", Version: "2.3.0", diff --git a/pkg/util/typeutil/field_length_limit.go b/pkg/util/typeutil/field_length_limit.go new file mode 100644 index 0000000000..05f38c061f --- /dev/null +++ b/pkg/util/typeutil/field_length_limit.go @@ -0,0 +1,88 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package typeutil + +import "sync/atomic" + +const ( + defaultVarCharEstimateLength = 256 + defaultDynamicFieldEstimateLength = 512 + defaultSparseFloatEstimateLength = 1200 +) + +var ( + varCharEstimateLength atomic.Int64 + dynamicFieldEstimateLength atomic.Int64 + sparseEstimateLength atomic.Int64 +) + +func init() { + SetVarCharEstimateLength(defaultVarCharEstimateLength) + SetDynamicFieldEstimateLength(defaultDynamicFieldEstimateLength) + SetSparseFloatVectorEstimateLength(defaultSparseFloatEstimateLength) +} + +// SetVarCharEstimateLength updates the global cap applied when estimating record sizes for VarChar fields. +func SetVarCharEstimateLength(length int) { + if length <= 0 { + length = defaultVarCharEstimateLength + } + varCharEstimateLength.Store(int64(length)) +} + +// GetVarCharEstimateLength returns the current cap used when estimating VarChar field sizes. +func GetVarCharEstimateLength() int { + length := int(varCharEstimateLength.Load()) + if length <= 0 { + return defaultVarCharEstimateLength + } + return length +} + +// SetDynamicFieldEstimateLength updates the global cap used for dynamic fields (JSON/Array/Geometry). +func SetDynamicFieldEstimateLength(length int) { + if length <= 0 { + length = defaultDynamicFieldEstimateLength + } + dynamicFieldEstimateLength.Store(int64(length)) +} + +// GetDynamicFieldEstimateLength returns the current cap for dynamic fields. +func GetDynamicFieldEstimateLength() int { + length := int(dynamicFieldEstimateLength.Load()) + if length <= 0 { + return defaultDynamicFieldEstimateLength + } + return length +} + +// SetSparseFloatVectorEstimateLength updates the fallback size used when estimating sparse float vector fields. +func SetSparseFloatVectorEstimateLength(length int) { + if length <= 0 { + length = defaultSparseFloatEstimateLength + } + sparseEstimateLength.Store(int64(length)) +} + +// GetSparseFloatVectorEstimateLength returns the current fallback size used for sparse float vector fields. +func GetSparseFloatVectorEstimateLength() int { + length := int(sparseEstimateLength.Load()) + if length <= 0 { + return defaultSparseFloatEstimateLength + } + return length +} diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index 465ed5c430..0e77e37e48 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -37,8 +37,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/common" ) -const DynamicFieldMaxLength = 512 - type getVariableFieldLengthPolicy int const ( @@ -75,16 +73,17 @@ func getVarFieldLength(fieldSchema *schemapb.FieldSchema, policy getVariableFiel // TODO this is a hack and may not accurate, we should rely on estimate size per record // However we should report size and datacoord calculate based on size // https://github.com/milvus-io/milvus/issues/17687 - if maxLength > 256 { - return 256, nil + estimateLimit := GetVarCharEstimateLength() + if maxLength > estimateLimit { + return estimateLimit, nil } return maxLength, nil default: return 0, fmt.Errorf("unrecognized getVariableFieldLengthPolicy %v", policy) } - // geometry field max length now consider the same as json field, which is 512 bytes + // geometry field max length now consider the same as json field, which is 512 bytes case schemapb.DataType_Array, schemapb.DataType_JSON, schemapb.DataType_Geometry: - return DynamicFieldMaxLength, nil + return GetDynamicFieldEstimateLength(), nil default: return 0, fmt.Errorf("field %s is not a variable-length type", fieldSchema.DataType.String()) } @@ -160,7 +159,7 @@ func estimateSizeBy(schema *schemapb.CollectionSchema, policy getVariableFieldLe // varies depending on the number of non-zeros. Using sparse vector // generated by SPLADE as reference and returning size of a sparse // vector with 150 non-zeros. - res += 1200 + res += GetSparseFloatVectorEstimateLength() case schemapb.DataType_Int8Vector: for _, kv := range fs.TypeParams { if kv.Key == common.DimKey {