diff --git a/internal/datacoord/compaction_policy_clustering_test.go b/internal/datacoord/compaction_policy_clustering_test.go index 3fe4e2ccc6..fb9aef029d 100644 --- a/internal/datacoord/compaction_policy_clustering_test.go +++ b/internal/datacoord/compaction_policy_clustering_test.go @@ -184,7 +184,7 @@ func (s *ClusteringCompactionPolicySuite) TestCalculateClusteringCompactionConfi for _, test := range testCases { s.Run(test.description, func() { - expectedSegmentSize := getExpectedSegmentSize(s.meta, test.coll) + expectedSegmentSize := getExpectedSegmentSize(s.meta, test.coll.ID, test.coll.Schema) totalRows, maxSegmentRows, preferSegmentRows, err := calculateClusteringCompactionConfig(test.coll, test.view, expectedSegmentSize) s.Equal(test.totalRows, totalRows) s.Equal(test.maxSegmentRows, maxSegmentRows) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 091cf54110..ec4ea6e902 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -374,7 +374,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error { return err } - expectedSize := getExpectedSegmentSize(t.meta, coll) + expectedSize := getExpectedSegmentSize(t.meta, coll.ID, coll.Schema) plans := t.generatePlans(group.segments, signal, ct, expectedSize) for _, plan := range plans { if !signal.isForce && t.compactionHandler.isFull() { diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 79ab99cad8..65f842cf37 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -24,6 +24,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" @@ -308,7 +309,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C return } - expectedSegmentSize := getExpectedSegmentSize(m.meta, collection) + expectedSegmentSize := getExpectedSegmentSize(m.meta, collection.ID, collection.Schema) totalRows, maxSegmentRows, preferSegmentRows, err := calculateClusteringCompactionConfig(collection, view, expectedSegmentSize) if err != nil { log.Warn("Failed to calculate cluster compaction config fail", zap.Error(err)) @@ -383,7 +384,7 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte totalRows += s.NumOfRows } - expectedSize := getExpectedSegmentSize(m.meta, collection) + expectedSize := getExpectedSegmentSize(m.meta, collection.ID, collection.Schema) task := &datapb.CompactionTask{ PlanID: startID, TriggerID: view.(*MixSegmentView).triggerID, @@ -421,13 +422,13 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte ) } -func getExpectedSegmentSize(meta *meta, collInfo *collectionInfo) int64 { - allDiskIndex := meta.indexMeta.AreAllDiskIndex(collInfo.ID, collInfo.Schema) +func getExpectedSegmentSize(meta *meta, collectionID int64, schema *schemapb.CollectionSchema) int64 { + allDiskIndex := meta.indexMeta.AllDenseWithDiskIndex(collectionID, schema) if allDiskIndex { - // Only if all vector fields index type are DiskANN, recalc segment max size here. + // Only if all dense vector fields index type are DiskANN, recalc segment max size here. return Params.DataCoordCfg.DiskSegmentMaxSize.GetAsInt64() * 1024 * 1024 } - // If some vector fields index type are not DiskANN, recalc segment max size using default policy. + // If some dense vector fields index type are not DiskANN, recalc segment max size using default policy. return Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 } diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index 209573fd30..a46ce95748 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -209,7 +209,7 @@ func (s *CompactionTriggerManagerSuite) TestGetExpectedSegmentSize() { }, } - s.Equal(int64(200*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection)) + s.Equal(int64(200*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection.ID, collection.Schema)) }) s.Run("HNSW & DISKANN", func() { @@ -264,7 +264,7 @@ func (s *CompactionTriggerManagerSuite) TestGetExpectedSegmentSize() { }, } - s.Equal(int64(100*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection)) + s.Equal(int64(100*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection.ID, collection.Schema)) }) s.Run("some vector has no index", func() { @@ -305,6 +305,6 @@ func (s *CompactionTriggerManagerSuite) TestGetExpectedSegmentSize() { }, } - s.Equal(int64(100*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection)) + s.Equal(int64(100*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection.ID, collection.Schema)) }) } diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index c1337aa415..33a268c83b 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" ) @@ -275,8 +276,11 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { return } - allDiskIndex := c.meta.indexMeta.AreAllDiskIndex(job.GetCollectionID(), job.GetSchema()) - groups := RegroupImportFiles(job, lacks, allDiskIndex) + segmentMaxSize := getExpectedSegmentSize(c.meta, job.GetCollectionID(), job.GetSchema()) + if importutilv2.IsL0Import(job.GetOptions()) { + segmentMaxSize = paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64() + } + groups := RegroupImportFiles(job, lacks, int(segmentMaxSize)) newTasks, err := NewImportTasks(groups, job, c.alloc, c.meta) if err != nil { log.Warn("new import tasks failed", zap.Error(err)) diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 3368bed387..24494516f4 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -144,15 +144,11 @@ func AssignSegments(job ImportJob, task ImportTask, alloc allocator.Allocator, m } } - isL0Import := importutilv2.IsL0Import(job.GetOptions()) - - segmentMaxSize := paramtable.Get().DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 - if isL0Import { - segmentMaxSize = paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64() - } segmentLevel := datapb.SegmentLevel_L1 - if isL0Import { + segmentMaxSize := getExpectedSegmentSize(meta, job.GetCollectionID(), job.GetSchema()) + if importutilv2.IsL0Import(job.GetOptions()) { segmentLevel = datapb.SegmentLevel_L0 + segmentMaxSize = paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64() } // alloc new segments @@ -333,24 +329,11 @@ func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc all }, nil } -func RegroupImportFiles(job ImportJob, files []*datapb.ImportFileStats, allDiskIndex bool) [][]*datapb.ImportFileStats { +func RegroupImportFiles(job ImportJob, files []*datapb.ImportFileStats, segmentMaxSize int) [][]*datapb.ImportFileStats { if len(files) == 0 { return nil } - var segmentMaxSize int - if allDiskIndex { - // Only if all vector fields index type are DiskANN, recalc segment max size here. - segmentMaxSize = Params.DataCoordCfg.DiskSegmentMaxSize.GetAsInt() * 1024 * 1024 - } else { - // If some vector fields index type are not DiskANN, recalc segment max size using default policy. - segmentMaxSize = Params.DataCoordCfg.SegmentMaxSize.GetAsInt() * 1024 * 1024 - } - isL0Import := importutilv2.IsL0Import(job.GetOptions()) - if isL0Import { - segmentMaxSize = paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt() - } - threshold := paramtable.Get().DataCoordCfg.MaxSizeInMBPerImportTask.GetAsInt() * 1024 * 1024 maxSizePerFileGroup := segmentMaxSize * len(job.GetPartitionIDs()) * len(job.GetVchannels()) if maxSizePerFileGroup > threshold { diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 51efe695d6..a798f625e0 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -39,6 +39,7 @@ import ( mocks2 "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/importutilv2" + "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" @@ -108,6 +109,14 @@ func TestImportUtil_NewImportTasks(t *testing.T) { DataType: schemapb.DataType_Int64, IsPrimaryKey: true, }, + { + FieldID: 101, + Name: "vector", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, }, }, }, @@ -236,7 +245,7 @@ func TestImportUtil_RegroupImportFiles(t *testing.T) { }, } - groups := RegroupImportFiles(job, files, false) + groups := RegroupImportFiles(job, files, int(dataSize)) total := 0 for i, fs := range groups { sum := lo.SumBy(fs, func(f *datapb.ImportFileStats) int64 { diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index ed2625534e..0b328ccd22 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -1096,10 +1096,10 @@ func (m *indexMeta) GetUnindexedSegments(collectionID int64, segmentIDs []int64) return lo.Without(segmentIDs, indexed...) } -func (m *indexMeta) AreAllDiskIndex(collectionID int64, schema *schemapb.CollectionSchema) bool { +func (m *indexMeta) AllDenseWithDiskIndex(collectionID int64, schema *schemapb.CollectionSchema) bool { indexInfos := m.GetIndexesForCollection(collectionID, "") - vectorFields := typeutil.GetVectorFieldSchemas(schema) + vectorFields := typeutil.GetDenseVectorFieldSchemas(schema) fieldIndexTypes := lo.SliceToMap(indexInfos, func(t *model.Index) (int64, indexparamcheck.IndexType) { return t.FieldID, GetIndexType(t.IndexParams) }) @@ -1110,8 +1110,7 @@ func (m *indexMeta) AreAllDiskIndex(collectionID int64, schema *schemapb.Collect return false }) - allDiskIndex := len(vectorFields) == len(vectorFieldsWithDiskIndex) - return allDiskIndex + return len(vectorFields) == len(vectorFieldsWithDiskIndex) } func (m *indexMeta) HasIndex(collectionID int64) bool { diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index 8466384b53..01d8c0250c 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -1150,16 +1150,6 @@ func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) error return nil } -// GetVectorFieldSchema get vector field schema from collection schema. -func GetVectorFieldSchema(schema *schemapb.CollectionSchema) (*schemapb.FieldSchema, error) { - for _, fieldSchema := range schema.GetFields() { - if IsVectorType(fieldSchema.DataType) { - return fieldSchema, nil - } - } - return nil, errors.New("vector field is not found") -} - // GetVectorFieldSchemas get vector fields schema from collection schema. func GetVectorFieldSchemas(schema *schemapb.CollectionSchema) []*schemapb.FieldSchema { ret := make([]*schemapb.FieldSchema, 0) @@ -1172,6 +1162,16 @@ func GetVectorFieldSchemas(schema *schemapb.CollectionSchema) []*schemapb.FieldS return ret } +func GetDenseVectorFieldSchemas(schema *schemapb.CollectionSchema) []*schemapb.FieldSchema { + ret := make([]*schemapb.FieldSchema, 0) + for _, fieldSchema := range schema.GetFields() { + if IsDenseFloatVectorType(fieldSchema.DataType) || IsBinaryVectorType(fieldSchema.DataType) { + ret = append(ret, fieldSchema) + } + } + return ret +} + // GetPrimaryFieldSchema get primary field schema from collection schema func GetPrimaryFieldSchema(schema *schemapb.CollectionSchema) (*schemapb.FieldSchema, error) { for _, fieldSchema := range schema.GetFields() { diff --git a/pkg/util/typeutil/schema_test.go b/pkg/util/typeutil/schema_test.go index b5080e9b08..70143bd2b4 100644 --- a/pkg/util/typeutil/schema_test.go +++ b/pkg/util/typeutil/schema_test.go @@ -272,7 +272,7 @@ func TestSchema(t *testing.T) { }) } -func TestSchema_GetVectorFieldSchema(t *testing.T) { +func TestSchema_GetVectorFieldSchemas(t *testing.T) { schemaNormal := &schemapb.CollectionSchema{ Name: "testColl", Description: "", @@ -301,7 +301,7 @@ func TestSchema_GetVectorFieldSchema(t *testing.T) { }, } - t.Run("GetVectorFieldSchema", func(t *testing.T) { + t.Run("GetVectorFieldSchemas", func(t *testing.T) { fieldSchema := GetVectorFieldSchemas(schemaNormal) assert.Equal(t, 1, len(fieldSchema)) assert.Equal(t, "field_float_vector", fieldSchema[0].Name)