From 1165a5300f1987572bbb67b4db7d448d2e583f97 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Fri, 18 Jul 2025 11:16:52 +0800 Subject: [PATCH] fix: [cp25]Use diskSegmentMaxSize for coll with sparse and dense vectors (#43195) Previous code uses diskSegmentMaxSize if and only if all of the collection's vector fields are indexed with DiskANN index. When introducing sparse vectors, since sparse vector cannot be indexed with DiskANN index, collections with both dense and sparse vectors will use maxSize instead. This PR changes the requirments of using diskSegmentMaxSize to all dense vectors are indexed with DiskANN indexs, ignoring sparse vector fields. See also: #43193 pr: #43194 --------- Signed-off-by: yangxuan --- .../compaction_policy_clustering_test.go | 2 +- internal/datacoord/compaction_trigger.go | 2 +- internal/datacoord/compaction_trigger_v2.go | 13 +++++----- .../datacoord/compaction_trigger_v2_test.go | 6 ++--- internal/datacoord/import_checker.go | 8 ++++-- internal/datacoord/import_util.go | 25 +++---------------- internal/datacoord/import_util_test.go | 11 +++++++- internal/datacoord/index_meta.go | 7 +++--- pkg/util/typeutil/schema.go | 20 +++++++-------- pkg/util/typeutil/schema_test.go | 4 +-- 10 files changed, 47 insertions(+), 51 deletions(-) diff --git a/internal/datacoord/compaction_policy_clustering_test.go b/internal/datacoord/compaction_policy_clustering_test.go index 3fe4e2ccc6..fb9aef029d 100644 --- a/internal/datacoord/compaction_policy_clustering_test.go +++ b/internal/datacoord/compaction_policy_clustering_test.go @@ -184,7 +184,7 @@ func (s *ClusteringCompactionPolicySuite) TestCalculateClusteringCompactionConfi for _, test := range testCases { s.Run(test.description, func() { - expectedSegmentSize := getExpectedSegmentSize(s.meta, test.coll) + expectedSegmentSize := getExpectedSegmentSize(s.meta, test.coll.ID, test.coll.Schema) totalRows, maxSegmentRows, preferSegmentRows, err := calculateClusteringCompactionConfig(test.coll, test.view, expectedSegmentSize) s.Equal(test.totalRows, totalRows) s.Equal(test.maxSegmentRows, maxSegmentRows) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 091cf54110..ec4ea6e902 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -374,7 +374,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error { return err } - expectedSize := getExpectedSegmentSize(t.meta, coll) + expectedSize := getExpectedSegmentSize(t.meta, coll.ID, coll.Schema) plans := t.generatePlans(group.segments, signal, ct, expectedSize) for _, plan := range plans { if !signal.isForce && t.compactionHandler.isFull() { diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 79ab99cad8..65f842cf37 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -24,6 +24,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" @@ -308,7 +309,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C return } - expectedSegmentSize := getExpectedSegmentSize(m.meta, collection) + expectedSegmentSize := getExpectedSegmentSize(m.meta, collection.ID, collection.Schema) totalRows, maxSegmentRows, preferSegmentRows, err := calculateClusteringCompactionConfig(collection, view, expectedSegmentSize) if err != nil { log.Warn("Failed to calculate cluster compaction config fail", zap.Error(err)) @@ -383,7 +384,7 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte totalRows += s.NumOfRows } - expectedSize := getExpectedSegmentSize(m.meta, collection) + expectedSize := getExpectedSegmentSize(m.meta, collection.ID, collection.Schema) task := &datapb.CompactionTask{ PlanID: startID, TriggerID: view.(*MixSegmentView).triggerID, @@ -421,13 +422,13 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte ) } -func getExpectedSegmentSize(meta *meta, collInfo *collectionInfo) int64 { - allDiskIndex := meta.indexMeta.AreAllDiskIndex(collInfo.ID, collInfo.Schema) +func getExpectedSegmentSize(meta *meta, collectionID int64, schema *schemapb.CollectionSchema) int64 { + allDiskIndex := meta.indexMeta.AllDenseWithDiskIndex(collectionID, schema) if allDiskIndex { - // Only if all vector fields index type are DiskANN, recalc segment max size here. + // Only if all dense vector fields index type are DiskANN, recalc segment max size here. return Params.DataCoordCfg.DiskSegmentMaxSize.GetAsInt64() * 1024 * 1024 } - // If some vector fields index type are not DiskANN, recalc segment max size using default policy. + // If some dense vector fields index type are not DiskANN, recalc segment max size using default policy. return Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 } diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index 209573fd30..a46ce95748 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -209,7 +209,7 @@ func (s *CompactionTriggerManagerSuite) TestGetExpectedSegmentSize() { }, } - s.Equal(int64(200*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection)) + s.Equal(int64(200*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection.ID, collection.Schema)) }) s.Run("HNSW & DISKANN", func() { @@ -264,7 +264,7 @@ func (s *CompactionTriggerManagerSuite) TestGetExpectedSegmentSize() { }, } - s.Equal(int64(100*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection)) + s.Equal(int64(100*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection.ID, collection.Schema)) }) s.Run("some vector has no index", func() { @@ -305,6 +305,6 @@ func (s *CompactionTriggerManagerSuite) TestGetExpectedSegmentSize() { }, } - s.Equal(int64(100*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection)) + s.Equal(int64(100*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection.ID, collection.Schema)) }) } diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index c1337aa415..33a268c83b 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" ) @@ -275,8 +276,11 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { return } - allDiskIndex := c.meta.indexMeta.AreAllDiskIndex(job.GetCollectionID(), job.GetSchema()) - groups := RegroupImportFiles(job, lacks, allDiskIndex) + segmentMaxSize := getExpectedSegmentSize(c.meta, job.GetCollectionID(), job.GetSchema()) + if importutilv2.IsL0Import(job.GetOptions()) { + segmentMaxSize = paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64() + } + groups := RegroupImportFiles(job, lacks, int(segmentMaxSize)) newTasks, err := NewImportTasks(groups, job, c.alloc, c.meta) if err != nil { log.Warn("new import tasks failed", zap.Error(err)) diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 3368bed387..24494516f4 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -144,15 +144,11 @@ func AssignSegments(job ImportJob, task ImportTask, alloc allocator.Allocator, m } } - isL0Import := importutilv2.IsL0Import(job.GetOptions()) - - segmentMaxSize := paramtable.Get().DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 - if isL0Import { - segmentMaxSize = paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64() - } segmentLevel := datapb.SegmentLevel_L1 - if isL0Import { + segmentMaxSize := getExpectedSegmentSize(meta, job.GetCollectionID(), job.GetSchema()) + if importutilv2.IsL0Import(job.GetOptions()) { segmentLevel = datapb.SegmentLevel_L0 + segmentMaxSize = paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64() } // alloc new segments @@ -333,24 +329,11 @@ func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc all }, nil } -func RegroupImportFiles(job ImportJob, files []*datapb.ImportFileStats, allDiskIndex bool) [][]*datapb.ImportFileStats { +func RegroupImportFiles(job ImportJob, files []*datapb.ImportFileStats, segmentMaxSize int) [][]*datapb.ImportFileStats { if len(files) == 0 { return nil } - var segmentMaxSize int - if allDiskIndex { - // Only if all vector fields index type are DiskANN, recalc segment max size here. - segmentMaxSize = Params.DataCoordCfg.DiskSegmentMaxSize.GetAsInt() * 1024 * 1024 - } else { - // If some vector fields index type are not DiskANN, recalc segment max size using default policy. - segmentMaxSize = Params.DataCoordCfg.SegmentMaxSize.GetAsInt() * 1024 * 1024 - } - isL0Import := importutilv2.IsL0Import(job.GetOptions()) - if isL0Import { - segmentMaxSize = paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt() - } - threshold := paramtable.Get().DataCoordCfg.MaxSizeInMBPerImportTask.GetAsInt() * 1024 * 1024 maxSizePerFileGroup := segmentMaxSize * len(job.GetPartitionIDs()) * len(job.GetVchannels()) if maxSizePerFileGroup > threshold { diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 51efe695d6..a798f625e0 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -39,6 +39,7 @@ import ( mocks2 "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/importutilv2" + "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" @@ -108,6 +109,14 @@ func TestImportUtil_NewImportTasks(t *testing.T) { DataType: schemapb.DataType_Int64, IsPrimaryKey: true, }, + { + FieldID: 101, + Name: "vector", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, }, }, }, @@ -236,7 +245,7 @@ func TestImportUtil_RegroupImportFiles(t *testing.T) { }, } - groups := RegroupImportFiles(job, files, false) + groups := RegroupImportFiles(job, files, int(dataSize)) total := 0 for i, fs := range groups { sum := lo.SumBy(fs, func(f *datapb.ImportFileStats) int64 { diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index ed2625534e..0b328ccd22 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -1096,10 +1096,10 @@ func (m *indexMeta) GetUnindexedSegments(collectionID int64, segmentIDs []int64) return lo.Without(segmentIDs, indexed...) } -func (m *indexMeta) AreAllDiskIndex(collectionID int64, schema *schemapb.CollectionSchema) bool { +func (m *indexMeta) AllDenseWithDiskIndex(collectionID int64, schema *schemapb.CollectionSchema) bool { indexInfos := m.GetIndexesForCollection(collectionID, "") - vectorFields := typeutil.GetVectorFieldSchemas(schema) + vectorFields := typeutil.GetDenseVectorFieldSchemas(schema) fieldIndexTypes := lo.SliceToMap(indexInfos, func(t *model.Index) (int64, indexparamcheck.IndexType) { return t.FieldID, GetIndexType(t.IndexParams) }) @@ -1110,8 +1110,7 @@ func (m *indexMeta) AreAllDiskIndex(collectionID int64, schema *schemapb.Collect return false }) - allDiskIndex := len(vectorFields) == len(vectorFieldsWithDiskIndex) - return allDiskIndex + return len(vectorFields) == len(vectorFieldsWithDiskIndex) } func (m *indexMeta) HasIndex(collectionID int64) bool { diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index 8466384b53..01d8c0250c 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -1150,16 +1150,6 @@ func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) error return nil } -// GetVectorFieldSchema get vector field schema from collection schema. -func GetVectorFieldSchema(schema *schemapb.CollectionSchema) (*schemapb.FieldSchema, error) { - for _, fieldSchema := range schema.GetFields() { - if IsVectorType(fieldSchema.DataType) { - return fieldSchema, nil - } - } - return nil, errors.New("vector field is not found") -} - // GetVectorFieldSchemas get vector fields schema from collection schema. func GetVectorFieldSchemas(schema *schemapb.CollectionSchema) []*schemapb.FieldSchema { ret := make([]*schemapb.FieldSchema, 0) @@ -1172,6 +1162,16 @@ func GetVectorFieldSchemas(schema *schemapb.CollectionSchema) []*schemapb.FieldS return ret } +func GetDenseVectorFieldSchemas(schema *schemapb.CollectionSchema) []*schemapb.FieldSchema { + ret := make([]*schemapb.FieldSchema, 0) + for _, fieldSchema := range schema.GetFields() { + if IsDenseFloatVectorType(fieldSchema.DataType) || IsBinaryVectorType(fieldSchema.DataType) { + ret = append(ret, fieldSchema) + } + } + return ret +} + // GetPrimaryFieldSchema get primary field schema from collection schema func GetPrimaryFieldSchema(schema *schemapb.CollectionSchema) (*schemapb.FieldSchema, error) { for _, fieldSchema := range schema.GetFields() { diff --git a/pkg/util/typeutil/schema_test.go b/pkg/util/typeutil/schema_test.go index b5080e9b08..70143bd2b4 100644 --- a/pkg/util/typeutil/schema_test.go +++ b/pkg/util/typeutil/schema_test.go @@ -272,7 +272,7 @@ func TestSchema(t *testing.T) { }) } -func TestSchema_GetVectorFieldSchema(t *testing.T) { +func TestSchema_GetVectorFieldSchemas(t *testing.T) { schemaNormal := &schemapb.CollectionSchema{ Name: "testColl", Description: "", @@ -301,7 +301,7 @@ func TestSchema_GetVectorFieldSchema(t *testing.T) { }, } - t.Run("GetVectorFieldSchema", func(t *testing.T) { + t.Run("GetVectorFieldSchemas", func(t *testing.T) { fieldSchema := GetVectorFieldSchemas(schemaNormal) assert.Equal(t, 1, len(fieldSchema)) assert.Equal(t, "field_float_vector", fieldSchema[0].Name)