diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 6fd187ae95..bd265c7310 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -269,7 +269,10 @@ type commonConfig struct { TopicNames ParamItem `refreshable:"true"` TimeTicker ParamItem `refreshable:"true"` - JSONMaxLength ParamItem `refreshable:"false"` + JSONMaxLength ParamItem `refreshable:"false"` + DynamicFieldAvgLength ParamItem `refreshable:"true"` + SparseFloatVectorEstimateSize ParamItem `refreshable:"true"` + VarCharEstimateLengthAvg ParamItem `refreshable:"true"` MetricsPort ParamItem `refreshable:"false"` diff --git a/pkg/util/typeutil/field_length_limit.go b/pkg/util/typeutil/field_length_limit.go new file mode 100644 index 0000000000..05f38c061f --- /dev/null +++ b/pkg/util/typeutil/field_length_limit.go @@ -0,0 +1,88 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package typeutil + +import "sync/atomic" + +const ( + defaultVarCharEstimateLength = 256 + defaultDynamicFieldEstimateLength = 512 + defaultSparseFloatEstimateLength = 1200 +) + +var ( + varCharEstimateLength atomic.Int64 + dynamicFieldEstimateLength atomic.Int64 + sparseEstimateLength atomic.Int64 +) + +func init() { + SetVarCharEstimateLength(defaultVarCharEstimateLength) + SetDynamicFieldEstimateLength(defaultDynamicFieldEstimateLength) + SetSparseFloatVectorEstimateLength(defaultSparseFloatEstimateLength) +} + +// SetVarCharEstimateLength updates the global cap applied when estimating record sizes for VarChar fields. +func SetVarCharEstimateLength(length int) { + if length <= 0 { + length = defaultVarCharEstimateLength + } + varCharEstimateLength.Store(int64(length)) +} + +// GetVarCharEstimateLength returns the current cap used when estimating VarChar field sizes. +func GetVarCharEstimateLength() int { + length := int(varCharEstimateLength.Load()) + if length <= 0 { + return defaultVarCharEstimateLength + } + return length +} + +// SetDynamicFieldEstimateLength updates the global cap used for dynamic fields (JSON/Array/Geometry). +func SetDynamicFieldEstimateLength(length int) { + if length <= 0 { + length = defaultDynamicFieldEstimateLength + } + dynamicFieldEstimateLength.Store(int64(length)) +} + +// GetDynamicFieldEstimateLength returns the current cap for dynamic fields. +func GetDynamicFieldEstimateLength() int { + length := int(dynamicFieldEstimateLength.Load()) + if length <= 0 { + return defaultDynamicFieldEstimateLength + } + return length +} + +// SetSparseFloatVectorEstimateLength updates the fallback size used when estimating sparse float vector fields. +func SetSparseFloatVectorEstimateLength(length int) { + if length <= 0 { + length = defaultSparseFloatEstimateLength + } + sparseEstimateLength.Store(int64(length)) +} + +// GetSparseFloatVectorEstimateLength returns the current fallback size used for sparse float vector fields. +func GetSparseFloatVectorEstimateLength() int { + length := int(sparseEstimateLength.Load()) + if length <= 0 { + return defaultSparseFloatEstimateLength + } + return length +} diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index a6eda5366a..fa06dec533 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -37,8 +37,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/log" ) -const DynamicFieldMaxLength = 512 - type getVariableFieldLengthPolicy int const ( @@ -75,16 +73,17 @@ func getVarFieldLength(fieldSchema *schemapb.FieldSchema, policy getVariableFiel // TODO this is a hack and may not accurate, we should rely on estimate size per record // However we should report size and datacoord calculate based on size // https://github.com/milvus-io/milvus/issues/17687 - if maxLength > 256 { - return 256, nil + estimateLimit := GetVarCharEstimateLength() + if maxLength > estimateLimit { + return estimateLimit, nil } return maxLength, nil default: return 0, fmt.Errorf("unrecognized getVariableFieldLengthPolicy %v", policy) } - // geometry field max length now consider the same as json field, which is 512 bytes + // geometry field max length now consider the same as json field, which is 512 bytes case schemapb.DataType_Array, schemapb.DataType_JSON, schemapb.DataType_Geometry: - return DynamicFieldMaxLength, nil + return GetDynamicFieldEstimateLength(), nil default: return 0, fmt.Errorf("field %s is not a variable-length type", fieldSchema.DataType.String()) } @@ -160,7 +159,7 @@ func estimateSizeBy(schema *schemapb.CollectionSchema, policy getVariableFieldLe // varies depending on the number of non-zeros. Using sparse vector // generated by SPLADE as reference and returning size of a sparse // vector with 150 non-zeros. - res += 1200 + res += GetSparseFloatVectorEstimateLength() } } return res, nil diff --git a/pkg/util/typeutil/schema_test.go b/pkg/util/typeutil/schema_test.go index 3812d40673..766fb776c2 100644 --- a/pkg/util/typeutil/schema_test.go +++ b/pkg/util/typeutil/schema_test.go @@ -177,11 +177,81 @@ func TestSchema(t *testing.T) { } t.Run("EstimateSizePerRecord", func(t *testing.T) { + limit := GetDynamicFieldEstimateLength() size, err := EstimateSizePerRecord(schema) - assert.Equal(t, 680+DynamicFieldMaxLength*4, size) + assert.Equal(t, 680+limit*4, size) assert.NoError(t, err) }) + t.Run("VarCharEstimateLengthLimit", func(t *testing.T) { + originalLimit := GetVarCharEstimateLength() + t.Cleanup(func() { SetVarCharEstimateLength(originalLimit) }) + + field := &schemapb.FieldSchema{ + DataType: schemapb.DataType_VarChar, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxLengthKey, + Value: "1024", + }, + }, + } + + SetVarCharEstimateLength(128) + length, err := getVarFieldLength(field, custom) + assert.NoError(t, err) + assert.Equal(t, 128, length) + + SetVarCharEstimateLength(4096) + length, err = getVarFieldLength(field, custom) + assert.NoError(t, err) + assert.Equal(t, 1024, length) + }) + + t.Run("DynamicFieldMaxLengthLimit", func(t *testing.T) { + originalLimit := GetDynamicFieldEstimateLength() + t.Cleanup(func() { SetDynamicFieldEstimateLength(originalLimit) }) + + field := &schemapb.FieldSchema{ + DataType: schemapb.DataType_JSON, + } + + SetDynamicFieldEstimateLength(2048) + length, err := getVarFieldLength(field, custom) + assert.NoError(t, err) + assert.Equal(t, 2048, length) + + SetDynamicFieldEstimateLength(128) + length, err = getVarFieldLength(field, custom) + assert.NoError(t, err) + assert.Equal(t, 128, length) + }) + + t.Run("SparseFloatVectorEstimateSize", func(t *testing.T) { + original := GetSparseFloatVectorEstimateLength() + t.Cleanup(func() { SetSparseFloatVectorEstimateLength(original) }) + + schemaWithSparse := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 1200, + Name: "sparse", + DataType: schemapb.DataType_SparseFloatVector, + }, + }, + } + + SetSparseFloatVectorEstimateLength(2048) + size, err := EstimateSizePerRecord(schemaWithSparse) + assert.NoError(t, err) + assert.Equal(t, 2048, size) + + SetSparseFloatVectorEstimateLength(64) + size, err = EstimateSizePerRecord(schemaWithSparse) + assert.NoError(t, err) + assert.Equal(t, 64, size) + }) + t.Run("SchemaHelper", func(t *testing.T) { _, err := CreateSchemaHelper(nil) assert.Error(t, err)