diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 506637f233..e03615195d 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -1010,6 +1010,13 @@ common: maxWLockConditionalWaitTime: 600 # maximum seconds for waiting wlock conditional storage: enablev2: true + stv2: + splitSystemColumn: + enabled: true # enable split system column policy in storage v2 + includePK: true # whether split system column policy include pk field + splitByAvgSize: + enabled: false # enable split by average size policy in storage v2 + threshold: 1024 # split by average size policy threshold(in bytes) in storage v2 # Whether to disable the internal time messaging mechanism for the system. # If disabled (set to false), the system will not allow DML operations, including insertion, deletion, queries, and searches. # This helps Milvus-CDC synchronize incremental data diff --git a/internal/flushcommon/syncmgr/task.go b/internal/flushcommon/syncmgr/task.go index 8922f3f9c8..9c98456f7b 100644 --- a/internal/flushcommon/syncmgr/task.go +++ b/internal/flushcommon/syncmgr/task.go @@ -239,7 +239,28 @@ func (t *SyncTask) getColumnGroups(segmentInfo *metacache.SegmentInfo) []storage // TODO calculate field stats policies := storagecommon.DefaultPolicies() - return storagecommon.SplitColumns(allFields, map[int64]storagecommon.ColumnStats{}, policies...) + return storagecommon.SplitColumns(allFields, t.calcColumnStats(), policies...) +} + +func (t *SyncTask) calcColumnStats() map[int64]storagecommon.ColumnStats { + result := make(map[int64]storagecommon.ColumnStats) + + memorySizes := make(map[int64]int64) + rowNums := make(map[int64]int64) + for _, data := range t.pack.insertData { + for fieldID, fieldData := range data.Data { + memorySizes[fieldID] += int64(fieldData.GetMemorySize()) + rowNums[fieldID] += int64(fieldData.RowNum()) + } + } + for fieldID, rowNum := range rowNums { + if rowNum > 0 { + result[fieldID] = storagecommon.ColumnStats{ + AvgSize: memorySizes[fieldID] / rowNum, + } + } + } + return result } // writeMeta updates segments via meta writer in option. diff --git a/internal/storage/serde_events_v2.go b/internal/storage/serde_events_v2.go index 1663ba0264..1552771bbf 100644 --- a/internal/storage/serde_events_v2.go +++ b/internal/storage/serde_events_v2.go @@ -351,7 +351,7 @@ func (pw *PackedBinlogRecordWriter) initWriters(r Record) error { if pw.writer == nil { if len(pw.columnGroups) == 0 { allFields := typeutil.GetAllFieldSchemas(pw.schema) - pw.columnGroups = storagecommon.SplitColumns(allFields, map[int64]storagecommon.ColumnStats{}, storagecommon.DefaultPolicies()...) + pw.columnGroups = storagecommon.SplitColumns(allFields, pw.getColumnStatsFromRecord(r, allFields), storagecommon.DefaultPolicies()...) } logIdStart, _, err := pw.allocator.Alloc(uint32(len(pw.columnGroups))) if err != nil { @@ -371,6 +371,18 @@ func (pw *PackedBinlogRecordWriter) initWriters(r Record) error { return nil } +func (pw *PackedBinlogRecordWriter) getColumnStatsFromRecord(r Record, allFields []*schemapb.FieldSchema) map[int64]storagecommon.ColumnStats { + result := make(map[int64]storagecommon.ColumnStats) + for _, field := range allFields { + if arr := r.Column(field.FieldID); arr != nil { + result[field.FieldID] = storagecommon.ColumnStats{ + AvgSize: int64(arr.Data().SizeInBytes()) / int64(arr.Len()), + } + } + } + return result +} + func (pw *PackedBinlogRecordWriter) GetWrittenUncompressed() uint64 { return pw.writtenUncompressed } diff --git a/internal/storagecommon/column_group_splitter.go b/internal/storagecommon/column_group_splitter.go index 96377d74b9..7b6b8a2a34 100644 --- a/internal/storagecommon/column_group_splitter.go +++ b/internal/storagecommon/column_group_splitter.go @@ -20,6 +20,7 @@ import ( "sort" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -46,11 +47,18 @@ func SplitColumns(fields []*schemapb.FieldSchema, stats map[int64]ColumnStats, p } func DefaultPolicies() []ColumnGroupSplitPolicy { - return []ColumnGroupSplitPolicy{ - NewSystemColumnPolicy(true), - NewSelectedDataTypePolicy(), - NewRemanentShortPolicy(-1), + paramtable.Init() + result := make([]ColumnGroupSplitPolicy, 0, 4) + if paramtable.Get().CommonCfg.Stv2SplitSystemColumn.GetAsBool() { + result = append(result, NewSystemColumnPolicy(paramtable.Get().CommonCfg.Stv2SystemColumnIncludePK.GetAsBool())) } + if paramtable.Get().CommonCfg.Stv2SplitByAvgSize.GetAsBool() { + result = append(result, NewAvgSizePolicy(paramtable.Get().CommonCfg.Stv2SplitAvgSizeThreshold.GetAsInt64())) + } + result = append(result, + NewSelectedDataTypePolicy(), + NewRemanentShortPolicy(-1)) + return result } func IsVectorDataType(dataType schemapb.DataType) bool { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 2543db88e8..b67cfedb7d 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -279,7 +279,13 @@ type commonConfig struct { LockSlowLogWarnThreshold ParamItem `refreshable:"true"` MaxWLockConditionalWaitTime ParamItem `refreshable:"true"` + // storage v2 EnableStorageV2 ParamItem `refreshable:"false"` + Stv2SplitSystemColumn ParamItem `refreshable:"true"` + Stv2SystemColumnIncludePK ParamItem `refreshable:"true"` + Stv2SplitByAvgSize ParamItem `refreshable:"true"` + Stv2SplitAvgSizeThreshold ParamItem `refreshable:"true"` + StoragePathPrefix ParamItem `refreshable:"false"` StorageZstdConcurrency ParamItem `refreshable:"false"` TTMsgEnabled ParamItem `refreshable:"true"` @@ -926,6 +932,42 @@ Large numeric passwords require double quotes to avoid yaml parsing precision is } p.EnableStorageV2.Init(base.mgr) + p.Stv2SplitSystemColumn = ParamItem{ + Key: "common.storage.stv2.splitSystemColumn.enabled", + Version: "2.6.2", + DefaultValue: "true", + Doc: "enable split system column policy in storage v2", + Export: true, + } + p.Stv2SplitSystemColumn.Init(base.mgr) + + p.Stv2SystemColumnIncludePK = ParamItem{ + Key: "common.storage.stv2.splitSystemColumn.includePK", + Version: "2.6.2", + DefaultValue: "true", + Doc: "whether split system column policy include pk field", + Export: true, + } + p.Stv2SystemColumnIncludePK.Init(base.mgr) + + p.Stv2SplitByAvgSize = ParamItem{ + Key: "common.storage.stv2.splitByAvgSize.enabled", + Version: "2.6.2", + DefaultValue: "false", + Doc: "enable split by average size policy in storage v2", + Export: true, + } + p.Stv2SplitByAvgSize.Init(base.mgr) + + p.Stv2SplitAvgSizeThreshold = ParamItem{ + Key: "common.storage.stv2.splitByAvgSize.threshold", + Version: "2.6.2", + DefaultValue: "1024", + Doc: "split by average size policy threshold(in bytes) in storage v2", + Export: true, + } + p.Stv2SplitAvgSizeThreshold.Init(base.mgr) + p.StoragePathPrefix = ParamItem{ Key: "common.storage.pathPrefix", Version: "2.3.4",