mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: [StorageV2] Include partition & clustering key to sys group (#44372)
Related to #44257 This PR makes partition key & clustering candidates of system field group and adds param item controlling the policy --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
2f35db5a09
commit
103db5ae3e
@ -50,7 +50,9 @@ func DefaultPolicies() []ColumnGroupSplitPolicy {
|
||||
paramtable.Init()
|
||||
result := make([]ColumnGroupSplitPolicy, 0, 4)
|
||||
if paramtable.Get().CommonCfg.Stv2SplitSystemColumn.GetAsBool() {
|
||||
result = append(result, NewSystemColumnPolicy(paramtable.Get().CommonCfg.Stv2SystemColumnIncludePK.GetAsBool()))
|
||||
result = append(result, NewSystemColumnPolicy(paramtable.Get().CommonCfg.Stv2SystemColumnIncludePK.GetAsBool(),
|
||||
paramtable.Get().CommonCfg.Stv2SystemColumnIncludePartitionKey.GetAsBool(),
|
||||
paramtable.Get().CommonCfg.Stv2SystemColumnIncludeClusteringKey.GetAsBool()))
|
||||
}
|
||||
if paramtable.Get().CommonCfg.Stv2SplitByAvgSize.GetAsBool() {
|
||||
result = append(result, NewAvgSizePolicy(paramtable.Get().CommonCfg.Stv2SplitAvgSizeThreshold.GetAsInt64()))
|
||||
|
||||
@ -96,11 +96,17 @@ func NewSelectedDataTypePolicy() ColumnGroupSplitPolicy {
|
||||
// systemColumnPolicy split system columns to a new column group
|
||||
// if includePK is true, system columns include primary key column.
|
||||
type systemColumnPolicy struct {
|
||||
includePK bool
|
||||
includePrimaryKey bool
|
||||
includePartitionKey bool
|
||||
includeClusteringKey bool
|
||||
}
|
||||
|
||||
func NewSystemColumnPolicy(includePK bool) ColumnGroupSplitPolicy {
|
||||
return &systemColumnPolicy{includePK: includePK}
|
||||
func NewSystemColumnPolicy(includePK bool, includePartKey bool, includeClusteringKey bool) ColumnGroupSplitPolicy {
|
||||
return &systemColumnPolicy{
|
||||
includePrimaryKey: includePK,
|
||||
includePartitionKey: includePartKey,
|
||||
includeClusteringKey: includeClusteringKey,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *systemColumnPolicy) Split(currentSplit *currentSplit) *currentSplit {
|
||||
@ -109,7 +115,9 @@ func (p *systemColumnPolicy) Split(currentSplit *currentSplit) *currentSplit {
|
||||
|
||||
currentSplit.Range(func(idx int, field *schemapb.FieldSchema) {
|
||||
if field.GetFieldID() < common.StartOfUserFieldID ||
|
||||
(p.includePK && field.GetIsPrimaryKey()) {
|
||||
(p.includePrimaryKey && field.GetIsPrimaryKey()) ||
|
||||
(p.includePartitionKey && field.GetIsPartitionKey()) ||
|
||||
(p.includeClusteringKey && field.GetIsClusteringKey()) {
|
||||
systemFields = append(systemFields, field.GetFieldID())
|
||||
systemFieldIndices = append(systemFieldIndices, idx)
|
||||
}
|
||||
|
||||
@ -139,10 +139,12 @@ func TestWideDataTypePolicy(t *testing.T) {
|
||||
|
||||
func TestSystemColumnPolicy(t *testing.T) {
|
||||
type testCase struct {
|
||||
tag string
|
||||
includePK bool
|
||||
input *currentSplit
|
||||
expect *currentSplit
|
||||
tag string
|
||||
includePK bool
|
||||
includePartKey bool
|
||||
includeClusteringKey bool
|
||||
input *currentSplit
|
||||
expect *currentSplit
|
||||
}
|
||||
|
||||
cases := []testCase{
|
||||
@ -179,6 +181,84 @@ func TestSystemColumnPolicy(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
tag: "normal_include_partition_key",
|
||||
input: newCurrentSplit([]*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 0,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: 1,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: 100,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
},
|
||||
{
|
||||
FieldID: 102,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPartitionKey: true,
|
||||
},
|
||||
}, nil),
|
||||
includePK: true,
|
||||
includePartKey: true,
|
||||
expect: ¤tSplit{
|
||||
processFields: typeutil.NewSet[int64](0, 1, 100, 102),
|
||||
outputGroups: []ColumnGroup{
|
||||
{
|
||||
GroupID: 0,
|
||||
Columns: []int{0, 1, 2, 4},
|
||||
Fields: []int64{0, 1, 100, 102},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
tag: "normal_include_clustering_key",
|
||||
input: newCurrentSplit([]*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 0,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: 1,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: 100,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
},
|
||||
{
|
||||
FieldID: 102,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsClusteringKey: true,
|
||||
},
|
||||
}, nil),
|
||||
includePK: true,
|
||||
includeClusteringKey: true,
|
||||
expect: ¤tSplit{
|
||||
processFields: typeutil.NewSet[int64](0, 1, 100, 102),
|
||||
outputGroups: []ColumnGroup{
|
||||
{
|
||||
GroupID: 0,
|
||||
Columns: []int{0, 1, 2, 4},
|
||||
Fields: []int64{0, 1, 100, 102},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
tag: "normal_with_processed_not_include_pk",
|
||||
input: ¤tSplit{
|
||||
@ -230,7 +310,11 @@ func TestSystemColumnPolicy(t *testing.T) {
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.tag, func(t *testing.T) {
|
||||
policy := &systemColumnPolicy{includePK: tc.includePK}
|
||||
policy := &systemColumnPolicy{
|
||||
includePrimaryKey: tc.includePK,
|
||||
includePartitionKey: tc.includePartKey,
|
||||
includeClusteringKey: tc.includeClusteringKey,
|
||||
}
|
||||
result := policy.Split(tc.input)
|
||||
|
||||
AssertSplitEqual(t, tc.expect, result)
|
||||
|
||||
@ -280,11 +280,13 @@ type commonConfig struct {
|
||||
MaxWLockConditionalWaitTime ParamItem `refreshable:"true"`
|
||||
|
||||
// storage v2
|
||||
EnableStorageV2 ParamItem `refreshable:"false"`
|
||||
Stv2SplitSystemColumn ParamItem `refreshable:"true"`
|
||||
Stv2SystemColumnIncludePK ParamItem `refreshable:"true"`
|
||||
Stv2SplitByAvgSize ParamItem `refreshable:"true"`
|
||||
Stv2SplitAvgSizeThreshold ParamItem `refreshable:"true"`
|
||||
EnableStorageV2 ParamItem `refreshable:"false"`
|
||||
Stv2SplitSystemColumn ParamItem `refreshable:"true"`
|
||||
Stv2SystemColumnIncludePK ParamItem `refreshable:"true"`
|
||||
Stv2SystemColumnIncludePartitionKey ParamItem `refreshable:"true"`
|
||||
Stv2SystemColumnIncludeClusteringKey ParamItem `refreshable:"true"`
|
||||
Stv2SplitByAvgSize ParamItem `refreshable:"true"`
|
||||
Stv2SplitAvgSizeThreshold ParamItem `refreshable:"true"`
|
||||
|
||||
StoragePathPrefix ParamItem `refreshable:"false"`
|
||||
StorageZstdConcurrency ParamItem `refreshable:"false"`
|
||||
@ -950,6 +952,24 @@ Large numeric passwords require double quotes to avoid yaml parsing precision is
|
||||
}
|
||||
p.Stv2SystemColumnIncludePK.Init(base.mgr)
|
||||
|
||||
p.Stv2SystemColumnIncludePartitionKey = ParamItem{
|
||||
Key: "common.storage.stv2.splitSystemColumn.includePartitionKey",
|
||||
Version: "2.6.2",
|
||||
DefaultValue: "true",
|
||||
Doc: "whether split system column policy include partition key field",
|
||||
Export: false,
|
||||
}
|
||||
p.Stv2SystemColumnIncludePartitionKey.Init(base.mgr)
|
||||
|
||||
p.Stv2SystemColumnIncludeClusteringKey = ParamItem{
|
||||
Key: "common.storage.stv2.splitSystemColumn.includeClusteringKey",
|
||||
Version: "2.6.2",
|
||||
DefaultValue: "true",
|
||||
Doc: "whether split system column policy include clustering key field",
|
||||
Export: false,
|
||||
}
|
||||
p.Stv2SystemColumnIncludeClusteringKey.Init(base.mgr)
|
||||
|
||||
p.Stv2SplitByAvgSize = ParamItem{
|
||||
Key: "common.storage.stv2.splitByAvgSize.enabled",
|
||||
Version: "2.6.2",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user