mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
feat: Add namespace prop (#43962)
issue: https://github.com/milvus-io/milvus/issues/44011 namespace is an alias for tenant. if this property is enabled, milvus will add a __namespace_id field. Modifications in the future will use this property to do compaction and search. --------- Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
parent
6624011927
commit
e3ecacca9e
@ -927,6 +927,8 @@ common:
|
||||
BeamWidthRatio: 4
|
||||
gracefulTime: 5000 # milliseconds. it represents the interval (in ms) by which the request arrival time needs to be subtracted in the case of Bounded Consistency.
|
||||
gracefulStopTimeout: 1800 # seconds. it will force quit the server if the graceful stop process is not completed during this time.
|
||||
namespace:
|
||||
enabled: false # whether to enable namespace, this parameter may be deprecated in the future. Just keep it for compatibility.
|
||||
storageType: remote # please adjust in embedded Milvus: local, available values are [local, remote, opendal], value minio is deprecated, use remote instead
|
||||
# Default value: auto
|
||||
# Valid values: [auto, avx512, avx2, avx, sse4_2]
|
||||
|
||||
@ -179,7 +179,7 @@ func checkGetPrimaryKey(coll *schemapb.CollectionSchema, idResult gjson.Result)
|
||||
func printFields(fields []*schemapb.FieldSchema) []gin.H {
|
||||
res := make([]gin.H, 0, len(fields))
|
||||
for _, field := range fields {
|
||||
if field.Name == common.MetaFieldName {
|
||||
if field.Name == common.MetaFieldName || field.Name == common.NamespaceFieldName {
|
||||
continue
|
||||
}
|
||||
fieldDetail := printFieldDetail(field, true)
|
||||
@ -191,7 +191,7 @@ func printFields(fields []*schemapb.FieldSchema) []gin.H {
|
||||
func printFieldsV2(fields []*schemapb.FieldSchema) []gin.H {
|
||||
res := make([]gin.H, 0, len(fields))
|
||||
for _, field := range fields {
|
||||
if field.Name == common.MetaFieldName {
|
||||
if field.Name == common.MetaFieldName || field.Name == common.NamespaceFieldName {
|
||||
continue
|
||||
}
|
||||
fieldDetail := printFieldDetail(field, false)
|
||||
|
||||
@ -541,7 +541,7 @@ func (t *addCollectionFieldTask) PreExecute(ctx context.Context) error {
|
||||
if typeutil.IsVectorType(t.fieldSchema.DataType) {
|
||||
return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("not support to add vector field, field name = %s", t.fieldSchema.Name))
|
||||
}
|
||||
if funcutil.SliceContain([]string{common.RowIDFieldName, common.TimeStampFieldName, common.MetaFieldName}, t.fieldSchema.GetName()) {
|
||||
if funcutil.SliceContain([]string{common.RowIDFieldName, common.TimeStampFieldName, common.MetaFieldName, common.NamespaceFieldName}, t.fieldSchema.GetName()) {
|
||||
return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("not support to add system field, field name = %s", t.fieldSchema.Name))
|
||||
}
|
||||
if t.fieldSchema.IsPrimaryKey {
|
||||
|
||||
@ -44,6 +44,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
)
|
||||
|
||||
@ -178,12 +179,13 @@ func (t *createCollectionTask) validateSchema(ctx context.Context, schema *schem
|
||||
return err
|
||||
}
|
||||
|
||||
if hasSystemFields(schema, []string{RowIDFieldName, TimeStampFieldName, MetaFieldName}) {
|
||||
if hasSystemFields(schema, []string{RowIDFieldName, TimeStampFieldName, MetaFieldName, NamespaceFieldName}) {
|
||||
log.Ctx(ctx).Error("schema contains system field",
|
||||
zap.String("RowIDFieldName", RowIDFieldName),
|
||||
zap.String("TimeStampFieldName", TimeStampFieldName),
|
||||
zap.String("MetaFieldName", MetaFieldName))
|
||||
msg := fmt.Sprintf("schema contains system field: %s, %s, %s", RowIDFieldName, TimeStampFieldName, MetaFieldName)
|
||||
zap.String("MetaFieldName", MetaFieldName),
|
||||
zap.String("NamespaceFieldName", NamespaceFieldName))
|
||||
msg := fmt.Sprintf("schema contains system field: %s, %s, %s, %s", RowIDFieldName, TimeStampFieldName, MetaFieldName, NamespaceFieldName)
|
||||
return merr.WrapErrParameterInvalid("schema don't contains system field", "contains", msg)
|
||||
}
|
||||
|
||||
@ -252,6 +254,77 @@ func (t *createCollectionTask) appendDynamicField(ctx context.Context, schema *s
|
||||
}
|
||||
}
|
||||
|
||||
func (t *createCollectionTask) handleNamespaceField(ctx context.Context, schema *schemapb.CollectionSchema) error {
|
||||
if !Params.CommonCfg.EnableNamespace.GetAsBool() {
|
||||
return nil
|
||||
}
|
||||
|
||||
hasIsolation := hasIsolationProperty(t.Req.Properties...)
|
||||
_, err := typeutil.GetPartitionKeyFieldSchema(schema)
|
||||
hasPartitionKey := err == nil
|
||||
enabled, has, err := parseNamespaceProp(t.Req.Properties...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !has || !enabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
if hasIsolation {
|
||||
iso, err := common.IsPartitionKeyIsolationKvEnabled(t.Req.Properties...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !iso {
|
||||
return merr.WrapErrCollectionIllegalSchema(t.Req.CollectionName,
|
||||
"isolation property is false when namespace enabled")
|
||||
}
|
||||
}
|
||||
|
||||
if hasPartitionKey {
|
||||
return merr.WrapErrParameterInvalidMsg("namespace is not supported with partition key mode")
|
||||
}
|
||||
|
||||
schema.Fields = append(schema.Fields, &schemapb.FieldSchema{
|
||||
Name: common.NamespaceFieldName,
|
||||
IsPartitionKey: true,
|
||||
DataType: schemapb.DataType_VarChar,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{Key: common.MaxLengthKey, Value: fmt.Sprintf("%d", paramtable.Get().ProxyCfg.MaxVarCharLength.GetAsInt())},
|
||||
},
|
||||
})
|
||||
schema.Properties = append(schema.Properties, &commonpb.KeyValuePair{
|
||||
Key: common.PartitionKeyIsolationKey,
|
||||
Value: "true",
|
||||
})
|
||||
log.Ctx(ctx).Info("added namespace field",
|
||||
zap.String("collectionName", t.Req.CollectionName),
|
||||
zap.String("fieldName", common.NamespaceFieldName))
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseNamespaceProp(props ...*commonpb.KeyValuePair) (value bool, has bool, err error) {
|
||||
for _, p := range props {
|
||||
if p.GetKey() == common.NamespaceEnabledKey {
|
||||
value, err := strconv.ParseBool(p.GetValue())
|
||||
if err != nil {
|
||||
return false, false, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("invalid namespace prop value: %s", p.GetValue()))
|
||||
}
|
||||
return value, true, nil
|
||||
}
|
||||
}
|
||||
return false, false, nil
|
||||
}
|
||||
|
||||
func hasIsolationProperty(props ...*commonpb.KeyValuePair) bool {
|
||||
for _, p := range props {
|
||||
if p.GetKey() == common.PartitionKeyIsolationKey {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *createCollectionTask) appendSysFields(schema *schemapb.CollectionSchema) {
|
||||
schema.Fields = append(schema.Fields, &schemapb.FieldSchema{
|
||||
FieldID: int64(RowIDField),
|
||||
@ -278,6 +351,9 @@ func (t *createCollectionTask) prepareSchema(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
t.appendDynamicField(ctx, &schema)
|
||||
if err := t.handleNamespaceField(ctx, &schema); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := t.assignFieldAndFunctionID(&schema); err != nil {
|
||||
return err
|
||||
|
||||
@ -1651,3 +1651,152 @@ func Test_createCollectionTask_PartitionKey(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestNamespaceProperty(t *testing.T) {
|
||||
paramtable.Init()
|
||||
paramtable.Get().CommonCfg.EnableNamespace.SwapTempValue("true")
|
||||
defer paramtable.Get().CommonCfg.EnableNamespace.SwapTempValue("false")
|
||||
ctx := context.Background()
|
||||
prefix := "TestNamespaceProperty"
|
||||
collectionName := prefix + funcutil.GenRandomStr()
|
||||
|
||||
initSchema := func() *schemapb.CollectionSchema {
|
||||
return &schemapb.CollectionSchema{
|
||||
Name: collectionName,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "field1",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Name: "vector",
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.DimKey,
|
||||
Value: strconv.Itoa(1024),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
hasNamespaceField := func(schema *schemapb.CollectionSchema) bool {
|
||||
for _, f := range schema.Fields {
|
||||
if f.Name == common.NamespaceFieldName {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
t.Run("test namespace enabled", func(t *testing.T) {
|
||||
schema := initSchema()
|
||||
marshaledSchema, err := proto.Marshal(schema)
|
||||
assert.NoError(t, err)
|
||||
|
||||
task := &createCollectionTask{
|
||||
Req: &milvuspb.CreateCollectionRequest{
|
||||
CollectionName: collectionName,
|
||||
Schema: marshaledSchema,
|
||||
Properties: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.NamespaceEnabledKey,
|
||||
Value: "true",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err = task.handleNamespaceField(ctx, schema)
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, hasNamespaceField(schema))
|
||||
})
|
||||
|
||||
t.Run("test namespace disabled with isolation and partition key", func(t *testing.T) {
|
||||
schema := initSchema()
|
||||
schema.Fields = append(schema.Fields, &schemapb.FieldSchema{
|
||||
FieldID: 102,
|
||||
Name: "field2",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPartitionKey: true,
|
||||
})
|
||||
marshaledSchema, err := proto.Marshal(schema)
|
||||
assert.NoError(t, err)
|
||||
|
||||
task := &createCollectionTask{
|
||||
Req: &milvuspb.CreateCollectionRequest{
|
||||
CollectionName: collectionName,
|
||||
Schema: marshaledSchema,
|
||||
Properties: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.PartitionKeyIsolationKey,
|
||||
Value: "true",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err = task.handleNamespaceField(ctx, schema)
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, hasNamespaceField(schema))
|
||||
})
|
||||
|
||||
t.Run("test namespace enabled with isolation", func(t *testing.T) {
|
||||
schema := initSchema()
|
||||
marshaledSchema, err := proto.Marshal(schema)
|
||||
assert.NoError(t, err)
|
||||
|
||||
task := &createCollectionTask{
|
||||
Req: &milvuspb.CreateCollectionRequest{
|
||||
CollectionName: collectionName,
|
||||
Schema: marshaledSchema,
|
||||
Properties: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.NamespaceEnabledKey,
|
||||
Value: "true",
|
||||
},
|
||||
{
|
||||
Key: common.PartitionKeyIsolationKey,
|
||||
Value: "true",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err = task.handleNamespaceField(ctx, schema)
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, hasNamespaceField(schema))
|
||||
})
|
||||
|
||||
t.Run("test namespace enabled with partition key", func(t *testing.T) {
|
||||
schema := initSchema()
|
||||
schema.Fields = append(schema.Fields, &schemapb.FieldSchema{
|
||||
FieldID: 102,
|
||||
Name: "field2",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPartitionKey: true,
|
||||
})
|
||||
marshaledSchema, err := proto.Marshal(schema)
|
||||
assert.NoError(t, err)
|
||||
|
||||
task := &createCollectionTask{
|
||||
Req: &milvuspb.CreateCollectionRequest{
|
||||
CollectionName: collectionName,
|
||||
Schema: marshaledSchema,
|
||||
Properties: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.NamespaceEnabledKey,
|
||||
Value: "true",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err = task.handleNamespaceField(ctx, schema)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
@ -46,4 +46,7 @@ const (
|
||||
|
||||
// MetaFieldName name of the dynamic schema field
|
||||
MetaFieldName = common.MetaFieldName
|
||||
|
||||
// NamespaceFieldName name of the namespace field
|
||||
NamespaceFieldName = common.NamespaceFieldName
|
||||
)
|
||||
|
||||
@ -56,6 +56,9 @@ const (
|
||||
// TimeStampFieldName defines the name of the Timestamp field
|
||||
TimeStampFieldName = "Timestamp"
|
||||
|
||||
// NamespaceFieldName defines the name of the Namespace field
|
||||
NamespaceFieldName = "$namespace_id"
|
||||
|
||||
// MetaFieldName is the field name of dynamic schema
|
||||
MetaFieldName = "$meta"
|
||||
|
||||
@ -230,6 +233,7 @@ const (
|
||||
ReplicateEndTSKey = "replicate.endTS"
|
||||
IndexNonEncoding = "index.nonEncoding"
|
||||
EnableDynamicSchemaKey = `dynamicfield.enabled`
|
||||
NamespaceEnabledKey = "namespace.enabled"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@ -238,6 +238,7 @@ type commonConfig struct {
|
||||
BeamWidthRatio ParamItem `refreshable:"true"`
|
||||
GracefulTime ParamItem `refreshable:"true"`
|
||||
GracefulStopTimeout ParamItem `refreshable:"true"`
|
||||
EnableNamespace ParamItem `refreshable:"false"`
|
||||
|
||||
StorageType ParamItem `refreshable:"false"`
|
||||
SimdType ParamItem `refreshable:"false"`
|
||||
@ -624,6 +625,15 @@ This configuration is only used by querynode and indexnode, it selects CPU instr
|
||||
}
|
||||
p.GracefulStopTimeout.Init(base.mgr)
|
||||
|
||||
p.EnableNamespace = ParamItem{
|
||||
Key: "common.namespace.enabled",
|
||||
Version: "2.6.0",
|
||||
DefaultValue: "false",
|
||||
Doc: "whether to enable namespace, this parameter may be deprecated in the future. Just keep it for compatibility.",
|
||||
Export: true,
|
||||
}
|
||||
p.EnableNamespace.Init(base.mgr)
|
||||
|
||||
p.StorageType = ParamItem{
|
||||
Key: "common.storageType",
|
||||
Version: "2.0.0",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user