diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 03f47d88d3..6306eeb881 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -927,6 +927,8 @@ common: BeamWidthRatio: 4 gracefulTime: 5000 # milliseconds. it represents the interval (in ms) by which the request arrival time needs to be subtracted in the case of Bounded Consistency. gracefulStopTimeout: 1800 # seconds. it will force quit the server if the graceful stop process is not completed during this time. + namespace: + enabled: false # whether to enable namespace, this parameter may be deprecated in the future. Just keep it for compatibility. storageType: remote # please adjust in embedded Milvus: local, available values are [local, remote, opendal], value minio is deprecated, use remote instead # Default value: auto # Valid values: [auto, avx512, avx2, avx, sse4_2] diff --git a/internal/distributed/proxy/httpserver/utils.go b/internal/distributed/proxy/httpserver/utils.go index 4076e989fd..70ef200114 100644 --- a/internal/distributed/proxy/httpserver/utils.go +++ b/internal/distributed/proxy/httpserver/utils.go @@ -179,7 +179,7 @@ func checkGetPrimaryKey(coll *schemapb.CollectionSchema, idResult gjson.Result) func printFields(fields []*schemapb.FieldSchema) []gin.H { res := make([]gin.H, 0, len(fields)) for _, field := range fields { - if field.Name == common.MetaFieldName { + if field.Name == common.MetaFieldName || field.Name == common.NamespaceFieldName { continue } fieldDetail := printFieldDetail(field, true) @@ -191,7 +191,7 @@ func printFields(fields []*schemapb.FieldSchema) []gin.H { func printFieldsV2(fields []*schemapb.FieldSchema) []gin.H { res := make([]gin.H, 0, len(fields)) for _, field := range fields { - if field.Name == common.MetaFieldName { + if field.Name == common.MetaFieldName || field.Name == common.NamespaceFieldName { continue } fieldDetail := printFieldDetail(field, false) diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 2f93737c96..990f93f082 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -541,7 +541,7 @@ func (t *addCollectionFieldTask) PreExecute(ctx context.Context) error { if typeutil.IsVectorType(t.fieldSchema.DataType) { return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("not support to add vector field, field name = %s", t.fieldSchema.Name)) } - if funcutil.SliceContain([]string{common.RowIDFieldName, common.TimeStampFieldName, common.MetaFieldName}, t.fieldSchema.GetName()) { + if funcutil.SliceContain([]string{common.RowIDFieldName, common.TimeStampFieldName, common.MetaFieldName, common.NamespaceFieldName}, t.fieldSchema.GetName()) { return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("not support to add system field, field name = %s", t.fieldSchema.Name)) } if t.fieldSchema.IsPrimaryKey { diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go index 28f4eaa01b..6dc203df4f 100644 --- a/internal/rootcoord/create_collection_task.go +++ b/internal/rootcoord/create_collection_task.go @@ -44,6 +44,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/commonpbutil" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -178,12 +179,13 @@ func (t *createCollectionTask) validateSchema(ctx context.Context, schema *schem return err } - if hasSystemFields(schema, []string{RowIDFieldName, TimeStampFieldName, MetaFieldName}) { + if hasSystemFields(schema, []string{RowIDFieldName, TimeStampFieldName, MetaFieldName, NamespaceFieldName}) { log.Ctx(ctx).Error("schema contains system field", zap.String("RowIDFieldName", RowIDFieldName), zap.String("TimeStampFieldName", TimeStampFieldName), - zap.String("MetaFieldName", MetaFieldName)) - msg := fmt.Sprintf("schema contains system field: %s, %s, %s", RowIDFieldName, TimeStampFieldName, MetaFieldName) + zap.String("MetaFieldName", MetaFieldName), + zap.String("NamespaceFieldName", NamespaceFieldName)) + msg := fmt.Sprintf("schema contains system field: %s, %s, %s, %s", RowIDFieldName, TimeStampFieldName, MetaFieldName, NamespaceFieldName) return merr.WrapErrParameterInvalid("schema don't contains system field", "contains", msg) } @@ -252,6 +254,77 @@ func (t *createCollectionTask) appendDynamicField(ctx context.Context, schema *s } } +func (t *createCollectionTask) handleNamespaceField(ctx context.Context, schema *schemapb.CollectionSchema) error { + if !Params.CommonCfg.EnableNamespace.GetAsBool() { + return nil + } + + hasIsolation := hasIsolationProperty(t.Req.Properties...) + _, err := typeutil.GetPartitionKeyFieldSchema(schema) + hasPartitionKey := err == nil + enabled, has, err := parseNamespaceProp(t.Req.Properties...) + if err != nil { + return err + } + if !has || !enabled { + return nil + } + + if hasIsolation { + iso, err := common.IsPartitionKeyIsolationKvEnabled(t.Req.Properties...) + if err != nil { + return err + } + if !iso { + return merr.WrapErrCollectionIllegalSchema(t.Req.CollectionName, + "isolation property is false when namespace enabled") + } + } + + if hasPartitionKey { + return merr.WrapErrParameterInvalidMsg("namespace is not supported with partition key mode") + } + + schema.Fields = append(schema.Fields, &schemapb.FieldSchema{ + Name: common.NamespaceFieldName, + IsPartitionKey: true, + DataType: schemapb.DataType_VarChar, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.MaxLengthKey, Value: fmt.Sprintf("%d", paramtable.Get().ProxyCfg.MaxVarCharLength.GetAsInt())}, + }, + }) + schema.Properties = append(schema.Properties, &commonpb.KeyValuePair{ + Key: common.PartitionKeyIsolationKey, + Value: "true", + }) + log.Ctx(ctx).Info("added namespace field", + zap.String("collectionName", t.Req.CollectionName), + zap.String("fieldName", common.NamespaceFieldName)) + return nil +} + +func parseNamespaceProp(props ...*commonpb.KeyValuePair) (value bool, has bool, err error) { + for _, p := range props { + if p.GetKey() == common.NamespaceEnabledKey { + value, err := strconv.ParseBool(p.GetValue()) + if err != nil { + return false, false, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("invalid namespace prop value: %s", p.GetValue())) + } + return value, true, nil + } + } + return false, false, nil +} + +func hasIsolationProperty(props ...*commonpb.KeyValuePair) bool { + for _, p := range props { + if p.GetKey() == common.PartitionKeyIsolationKey { + return true + } + } + return false +} + func (t *createCollectionTask) appendSysFields(schema *schemapb.CollectionSchema) { schema.Fields = append(schema.Fields, &schemapb.FieldSchema{ FieldID: int64(RowIDField), @@ -278,6 +351,9 @@ func (t *createCollectionTask) prepareSchema(ctx context.Context) error { return err } t.appendDynamicField(ctx, &schema) + if err := t.handleNamespaceField(ctx, &schema); err != nil { + return err + } if err := t.assignFieldAndFunctionID(&schema); err != nil { return err diff --git a/internal/rootcoord/create_collection_task_test.go b/internal/rootcoord/create_collection_task_test.go index 4cf3b4005c..8e8131a16f 100644 --- a/internal/rootcoord/create_collection_task_test.go +++ b/internal/rootcoord/create_collection_task_test.go @@ -1651,3 +1651,152 @@ func Test_createCollectionTask_PartitionKey(t *testing.T) { assert.NoError(t, err) }) } + +func TestNamespaceProperty(t *testing.T) { + paramtable.Init() + paramtable.Get().CommonCfg.EnableNamespace.SwapTempValue("true") + defer paramtable.Get().CommonCfg.EnableNamespace.SwapTempValue("false") + ctx := context.Background() + prefix := "TestNamespaceProperty" + collectionName := prefix + funcutil.GenRandomStr() + + initSchema := func() *schemapb.CollectionSchema { + return &schemapb.CollectionSchema{ + Name: collectionName, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "field1", + DataType: schemapb.DataType_Int64, + IsPrimaryKey: true, + }, + { + FieldID: 101, + Name: "vector", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: strconv.Itoa(1024), + }, + }, + }, + }, + } + } + hasNamespaceField := func(schema *schemapb.CollectionSchema) bool { + for _, f := range schema.Fields { + if f.Name == common.NamespaceFieldName { + return true + } + } + return false + } + + t.Run("test namespace enabled", func(t *testing.T) { + schema := initSchema() + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) + + task := &createCollectionTask{ + Req: &milvuspb.CreateCollectionRequest{ + CollectionName: collectionName, + Schema: marshaledSchema, + Properties: []*commonpb.KeyValuePair{ + { + Key: common.NamespaceEnabledKey, + Value: "true", + }, + }, + }, + } + + err = task.handleNamespaceField(ctx, schema) + assert.NoError(t, err) + assert.True(t, hasNamespaceField(schema)) + }) + + t.Run("test namespace disabled with isolation and partition key", func(t *testing.T) { + schema := initSchema() + schema.Fields = append(schema.Fields, &schemapb.FieldSchema{ + FieldID: 102, + Name: "field2", + DataType: schemapb.DataType_Int64, + IsPartitionKey: true, + }) + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) + + task := &createCollectionTask{ + Req: &milvuspb.CreateCollectionRequest{ + CollectionName: collectionName, + Schema: marshaledSchema, + Properties: []*commonpb.KeyValuePair{ + { + Key: common.PartitionKeyIsolationKey, + Value: "true", + }, + }, + }, + } + + err = task.handleNamespaceField(ctx, schema) + assert.NoError(t, err) + assert.False(t, hasNamespaceField(schema)) + }) + + t.Run("test namespace enabled with isolation", func(t *testing.T) { + schema := initSchema() + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) + + task := &createCollectionTask{ + Req: &milvuspb.CreateCollectionRequest{ + CollectionName: collectionName, + Schema: marshaledSchema, + Properties: []*commonpb.KeyValuePair{ + { + Key: common.NamespaceEnabledKey, + Value: "true", + }, + { + Key: common.PartitionKeyIsolationKey, + Value: "true", + }, + }, + }, + } + + err = task.handleNamespaceField(ctx, schema) + assert.NoError(t, err) + assert.True(t, hasNamespaceField(schema)) + }) + + t.Run("test namespace enabled with partition key", func(t *testing.T) { + schema := initSchema() + schema.Fields = append(schema.Fields, &schemapb.FieldSchema{ + FieldID: 102, + Name: "field2", + DataType: schemapb.DataType_Int64, + IsPartitionKey: true, + }) + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) + + task := &createCollectionTask{ + Req: &milvuspb.CreateCollectionRequest{ + CollectionName: collectionName, + Schema: marshaledSchema, + Properties: []*commonpb.KeyValuePair{ + { + Key: common.NamespaceEnabledKey, + Value: "true", + }, + }, + }, + } + + err = task.handleNamespaceField(ctx, schema) + assert.Error(t, err) + }) +} diff --git a/internal/rootcoord/field_id.go b/internal/rootcoord/field_id.go index c511ce0629..da7a075faf 100644 --- a/internal/rootcoord/field_id.go +++ b/internal/rootcoord/field_id.go @@ -46,4 +46,7 @@ const ( // MetaFieldName name of the dynamic schema field MetaFieldName = common.MetaFieldName + + // NamespaceFieldName name of the namespace field + NamespaceFieldName = common.NamespaceFieldName ) diff --git a/pkg/common/common.go b/pkg/common/common.go index a7db5931d7..5a15e3b454 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -56,6 +56,9 @@ const ( // TimeStampFieldName defines the name of the Timestamp field TimeStampFieldName = "Timestamp" + // NamespaceFieldName defines the name of the Namespace field + NamespaceFieldName = "$namespace_id" + // MetaFieldName is the field name of dynamic schema MetaFieldName = "$meta" @@ -230,6 +233,7 @@ const ( ReplicateEndTSKey = "replicate.endTS" IndexNonEncoding = "index.nonEncoding" EnableDynamicSchemaKey = `dynamicfield.enabled` + NamespaceEnabledKey = "namespace.enabled" ) const ( diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 6ecf547195..faef4d318b 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -238,6 +238,7 @@ type commonConfig struct { BeamWidthRatio ParamItem `refreshable:"true"` GracefulTime ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"` + EnableNamespace ParamItem `refreshable:"false"` StorageType ParamItem `refreshable:"false"` SimdType ParamItem `refreshable:"false"` @@ -624,6 +625,15 @@ This configuration is only used by querynode and indexnode, it selects CPU instr } p.GracefulStopTimeout.Init(base.mgr) + p.EnableNamespace = ParamItem{ + Key: "common.namespace.enabled", + Version: "2.6.0", + DefaultValue: "false", + Doc: "whether to enable namespace, this parameter may be deprecated in the future. Just keep it for compatibility.", + Export: true, + } + p.EnableNamespace.Init(base.mgr) + p.StorageType = ParamItem{ Key: "common.storageType", Version: "2.0.0",