enhance: support configurable estimation on variable legnth field (#46301)

fix: #46300
pr: #46302

Signed-off-by: xiaofanluan <xiaofan.luan@zilliz.com>
This commit is contained in:
Xiaofan 2025-12-12 20:30:40 +08:00 committed by GitHub
parent 9b4b0cb808
commit 08bc450518
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 169 additions and 9 deletions

View File

@ -269,7 +269,10 @@ type commonConfig struct {
TopicNames ParamItem `refreshable:"true"`
TimeTicker ParamItem `refreshable:"true"`
JSONMaxLength ParamItem `refreshable:"false"`
JSONMaxLength ParamItem `refreshable:"false"`
DynamicFieldAvgLength ParamItem `refreshable:"true"`
SparseFloatVectorEstimateSize ParamItem `refreshable:"true"`
VarCharEstimateLengthAvg ParamItem `refreshable:"true"`
MetricsPort ParamItem `refreshable:"false"`

View File

@ -0,0 +1,88 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package typeutil
import "sync/atomic"
const (
defaultVarCharEstimateLength = 256
defaultDynamicFieldEstimateLength = 512
defaultSparseFloatEstimateLength = 1200
)
var (
varCharEstimateLength atomic.Int64
dynamicFieldEstimateLength atomic.Int64
sparseEstimateLength atomic.Int64
)
func init() {
SetVarCharEstimateLength(defaultVarCharEstimateLength)
SetDynamicFieldEstimateLength(defaultDynamicFieldEstimateLength)
SetSparseFloatVectorEstimateLength(defaultSparseFloatEstimateLength)
}
// SetVarCharEstimateLength updates the global cap applied when estimating record sizes for VarChar fields.
func SetVarCharEstimateLength(length int) {
if length <= 0 {
length = defaultVarCharEstimateLength
}
varCharEstimateLength.Store(int64(length))
}
// GetVarCharEstimateLength returns the current cap used when estimating VarChar field sizes.
func GetVarCharEstimateLength() int {
length := int(varCharEstimateLength.Load())
if length <= 0 {
return defaultVarCharEstimateLength
}
return length
}
// SetDynamicFieldEstimateLength updates the global cap used for dynamic fields (JSON/Array/Geometry).
func SetDynamicFieldEstimateLength(length int) {
if length <= 0 {
length = defaultDynamicFieldEstimateLength
}
dynamicFieldEstimateLength.Store(int64(length))
}
// GetDynamicFieldEstimateLength returns the current cap for dynamic fields.
func GetDynamicFieldEstimateLength() int {
length := int(dynamicFieldEstimateLength.Load())
if length <= 0 {
return defaultDynamicFieldEstimateLength
}
return length
}
// SetSparseFloatVectorEstimateLength updates the fallback size used when estimating sparse float vector fields.
func SetSparseFloatVectorEstimateLength(length int) {
if length <= 0 {
length = defaultSparseFloatEstimateLength
}
sparseEstimateLength.Store(int64(length))
}
// GetSparseFloatVectorEstimateLength returns the current fallback size used for sparse float vector fields.
func GetSparseFloatVectorEstimateLength() int {
length := int(sparseEstimateLength.Load())
if length <= 0 {
return defaultSparseFloatEstimateLength
}
return length
}

View File

@ -37,8 +37,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log"
)
const DynamicFieldMaxLength = 512
type getVariableFieldLengthPolicy int
const (
@ -75,16 +73,17 @@ func getVarFieldLength(fieldSchema *schemapb.FieldSchema, policy getVariableFiel
// TODO this is a hack and may not accurate, we should rely on estimate size per record
// However we should report size and datacoord calculate based on size
// https://github.com/milvus-io/milvus/issues/17687
if maxLength > 256 {
return 256, nil
estimateLimit := GetVarCharEstimateLength()
if maxLength > estimateLimit {
return estimateLimit, nil
}
return maxLength, nil
default:
return 0, fmt.Errorf("unrecognized getVariableFieldLengthPolicy %v", policy)
}
// geometry field max length now consider the same as json field, which is 512 bytes
// geometry field max length now consider the same as json field, which is 512 bytes
case schemapb.DataType_Array, schemapb.DataType_JSON, schemapb.DataType_Geometry:
return DynamicFieldMaxLength, nil
return GetDynamicFieldEstimateLength(), nil
default:
return 0, fmt.Errorf("field %s is not a variable-length type", fieldSchema.DataType.String())
}
@ -160,7 +159,7 @@ func estimateSizeBy(schema *schemapb.CollectionSchema, policy getVariableFieldLe
// varies depending on the number of non-zeros. Using sparse vector
// generated by SPLADE as reference and returning size of a sparse
// vector with 150 non-zeros.
res += 1200
res += GetSparseFloatVectorEstimateLength()
}
}
return res, nil

View File

@ -177,11 +177,81 @@ func TestSchema(t *testing.T) {
}
t.Run("EstimateSizePerRecord", func(t *testing.T) {
limit := GetDynamicFieldEstimateLength()
size, err := EstimateSizePerRecord(schema)
assert.Equal(t, 680+DynamicFieldMaxLength*4, size)
assert.Equal(t, 680+limit*4, size)
assert.NoError(t, err)
})
t.Run("VarCharEstimateLengthLimit", func(t *testing.T) {
originalLimit := GetVarCharEstimateLength()
t.Cleanup(func() { SetVarCharEstimateLength(originalLimit) })
field := &schemapb.FieldSchema{
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.MaxLengthKey,
Value: "1024",
},
},
}
SetVarCharEstimateLength(128)
length, err := getVarFieldLength(field, custom)
assert.NoError(t, err)
assert.Equal(t, 128, length)
SetVarCharEstimateLength(4096)
length, err = getVarFieldLength(field, custom)
assert.NoError(t, err)
assert.Equal(t, 1024, length)
})
t.Run("DynamicFieldMaxLengthLimit", func(t *testing.T) {
originalLimit := GetDynamicFieldEstimateLength()
t.Cleanup(func() { SetDynamicFieldEstimateLength(originalLimit) })
field := &schemapb.FieldSchema{
DataType: schemapb.DataType_JSON,
}
SetDynamicFieldEstimateLength(2048)
length, err := getVarFieldLength(field, custom)
assert.NoError(t, err)
assert.Equal(t, 2048, length)
SetDynamicFieldEstimateLength(128)
length, err = getVarFieldLength(field, custom)
assert.NoError(t, err)
assert.Equal(t, 128, length)
})
t.Run("SparseFloatVectorEstimateSize", func(t *testing.T) {
original := GetSparseFloatVectorEstimateLength()
t.Cleanup(func() { SetSparseFloatVectorEstimateLength(original) })
schemaWithSparse := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 1200,
Name: "sparse",
DataType: schemapb.DataType_SparseFloatVector,
},
},
}
SetSparseFloatVectorEstimateLength(2048)
size, err := EstimateSizePerRecord(schemaWithSparse)
assert.NoError(t, err)
assert.Equal(t, 2048, size)
SetSparseFloatVectorEstimateLength(64)
size, err = EstimateSizePerRecord(schemaWithSparse)
assert.NoError(t, err)
assert.Equal(t, 64, size)
})
t.Run("SchemaHelper", func(t *testing.T) {
_, err := CreateSchemaHelper(nil)
assert.Error(t, err)