diff --git a/go.mod b/go.mod index 3cad7c2c3c..154f13514d 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.9 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.6.1 + github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250903080546-f1a74984d9e4 github.com/minio/minio-go/v7 v7.0.73 github.com/panjf2000/ants/v2 v2.11.3 // indirect github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 diff --git a/go.sum b/go.sum index 2ccd136936..1f69a4f346 100644 --- a/go.sum +++ b/go.sum @@ -788,6 +788,10 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.1 h1:NLoSWXvlJD8t91G3CUsooXqYnm5nfsBngztQYYT58V0= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.1/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250903074146-fe93017b6822 h1:aLZGg4Hfy37RDTqMXsVAcqv+CqzpRtN13tLfgcGK1nQ= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250903074146-fe93017b6822/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250903080546-f1a74984d9e4 h1:9hoFUhw6wVRGBZFPDNjEaUihvZy5DEOV+SxRIWLEbRM= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250903080546-f1a74984d9e4/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= diff --git a/internal/mocks/mock_proxy.go b/internal/mocks/mock_proxy.go index 1f2146c813..27e5919148 100644 --- a/internal/mocks/mock_proxy.go +++ b/internal/mocks/mock_proxy.go @@ -1272,6 +1272,52 @@ func (_c *MockProxy_CreatePrivilegeGroup_Call) RunAndReturn(run func(context.Con return _c } +// CreateReplicateStream provides a mock function with given fields: _a0 +func (_m *MockProxy) CreateReplicateStream(_a0 milvuspb.MilvusService_CreateReplicateStreamServer) error { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for CreateReplicateStream") + } + + var r0 error + if rf, ok := ret.Get(0).(func(milvuspb.MilvusService_CreateReplicateStreamServer) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockProxy_CreateReplicateStream_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateReplicateStream' +type MockProxy_CreateReplicateStream_Call struct { + *mock.Call +} + +// CreateReplicateStream is a helper method to define mock.On call +// - _a0 milvuspb.MilvusService_CreateReplicateStreamServer +func (_e *MockProxy_Expecter) CreateReplicateStream(_a0 interface{}) *MockProxy_CreateReplicateStream_Call { + return &MockProxy_CreateReplicateStream_Call{Call: _e.mock.On("CreateReplicateStream", _a0)} +} + +func (_c *MockProxy_CreateReplicateStream_Call) Run(run func(_a0 milvuspb.MilvusService_CreateReplicateStreamServer)) *MockProxy_CreateReplicateStream_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(milvuspb.MilvusService_CreateReplicateStreamServer)) + }) + return _c +} + +func (_c *MockProxy_CreateReplicateStream_Call) Return(_a0 error) *MockProxy_CreateReplicateStream_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockProxy_CreateReplicateStream_Call) RunAndReturn(run func(milvuspb.MilvusService_CreateReplicateStreamServer) error) *MockProxy_CreateReplicateStream_Call { + _c.Call.Return(run) + return _c +} + // CreateResourceGroup provides a mock function with given fields: _a0, _a1 func (_m *MockProxy) CreateResourceGroup(_a0 context.Context, _a1 *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) @@ -4029,6 +4075,65 @@ func (_c *MockProxy_GetReplicas_Call) RunAndReturn(run func(context.Context, *mi return _c } +// GetReplicateInfo provides a mock function with given fields: _a0, _a1 +func (_m *MockProxy) GetReplicateInfo(_a0 context.Context, _a1 *milvuspb.GetReplicateInfoRequest) (*milvuspb.GetReplicateInfoResponse, error) { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for GetReplicateInfo") + } + + var r0 *milvuspb.GetReplicateInfoResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.GetReplicateInfoRequest) (*milvuspb.GetReplicateInfoResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.GetReplicateInfoRequest) *milvuspb.GetReplicateInfoResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.GetReplicateInfoResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.GetReplicateInfoRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockProxy_GetReplicateInfo_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetReplicateInfo' +type MockProxy_GetReplicateInfo_Call struct { + *mock.Call +} + +// GetReplicateInfo is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.GetReplicateInfoRequest +func (_e *MockProxy_Expecter) GetReplicateInfo(_a0 interface{}, _a1 interface{}) *MockProxy_GetReplicateInfo_Call { + return &MockProxy_GetReplicateInfo_Call{Call: _e.mock.On("GetReplicateInfo", _a0, _a1)} +} + +func (_c *MockProxy_GetReplicateInfo_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.GetReplicateInfoRequest)) *MockProxy_GetReplicateInfo_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.GetReplicateInfoRequest)) + }) + return _c +} + +func (_c *MockProxy_GetReplicateInfo_Call) Return(_a0 *milvuspb.GetReplicateInfoResponse, _a1 error) *MockProxy_GetReplicateInfo_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockProxy_GetReplicateInfo_Call) RunAndReturn(run func(context.Context, *milvuspb.GetReplicateInfoRequest) (*milvuspb.GetReplicateInfoResponse, error)) *MockProxy_GetReplicateInfo_Call { + _c.Call.Return(run) + return _c +} + // GetSegmentsInfo provides a mock function with given fields: _a0, _a1 func (_m *MockProxy) GetSegmentsInfo(_a0 context.Context, _a1 *internalpb.GetSegmentsInfoRequest) (*internalpb.GetSegmentsInfoResponse, error) { ret := _m.Called(_a0, _a1) @@ -7494,6 +7599,65 @@ func (_c *MockProxy_UpdateCredentialCache_Call) RunAndReturn(run func(context.Co return _c } +// UpdateReplicateConfiguration provides a mock function with given fields: _a0, _a1 +func (_m *MockProxy) UpdateReplicateConfiguration(_a0 context.Context, _a1 *milvuspb.UpdateReplicateConfigurationRequest) (*commonpb.Status, error) { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for UpdateReplicateConfiguration") + } + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.UpdateReplicateConfigurationRequest) (*commonpb.Status, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.UpdateReplicateConfigurationRequest) *commonpb.Status); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.UpdateReplicateConfigurationRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockProxy_UpdateReplicateConfiguration_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateReplicateConfiguration' +type MockProxy_UpdateReplicateConfiguration_Call struct { + *mock.Call +} + +// UpdateReplicateConfiguration is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.UpdateReplicateConfigurationRequest +func (_e *MockProxy_Expecter) UpdateReplicateConfiguration(_a0 interface{}, _a1 interface{}) *MockProxy_UpdateReplicateConfiguration_Call { + return &MockProxy_UpdateReplicateConfiguration_Call{Call: _e.mock.On("UpdateReplicateConfiguration", _a0, _a1)} +} + +func (_c *MockProxy_UpdateReplicateConfiguration_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.UpdateReplicateConfigurationRequest)) *MockProxy_UpdateReplicateConfiguration_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.UpdateReplicateConfigurationRequest)) + }) + return _c +} + +func (_c *MockProxy_UpdateReplicateConfiguration_Call) Return(_a0 *commonpb.Status, _a1 error) *MockProxy_UpdateReplicateConfiguration_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockProxy_UpdateReplicateConfiguration_Call) RunAndReturn(run func(context.Context, *milvuspb.UpdateReplicateConfigurationRequest) (*commonpb.Status, error)) *MockProxy_UpdateReplicateConfiguration_Call { + _c.Call.Return(run) + return _c +} + // UpdateResourceGroups provides a mock function with given fields: _a0, _a1 func (_m *MockProxy) UpdateResourceGroups(_a0 context.Context, _a1 *milvuspb.UpdateResourceGroupsRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) @@ -7650,8 +7814,7 @@ func (_c *MockProxy_Upsert_Call) RunAndReturn(run func(context.Context, *milvusp func NewMockProxy(t interface { mock.TestingT Cleanup(func()) -}, -) *MockProxy { +}) *MockProxy { mock := &MockProxy{} mock.Mock.Test(t) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 37dff08d1f..0c05b2ab9f 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2318,6 +2318,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) FieldsData: request.FieldsData, NumRows: uint64(request.NumRows), Version: msgpb.InsertDataVersion_ColumnBased, + Namespace: request.Namespace, }, }, idAllocator: node.rowIDAllocator, diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index 4cbd8201b6..3168084cf6 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -212,6 +212,13 @@ func (it *insertTask) PreExecute(ctx context.Context) error { } } + if Params.CommonCfg.EnableNamespace.GetAsBool() { + err = addNamespaceData(it.schema, it.insertMsg) + if err != nil { + return err + } + } + err = checkAndFlattenStructFieldData(it.schema, it.insertMsg) if err != nil { return err diff --git a/internal/proxy/task_insert_test.go b/internal/proxy/task_insert_test.go index 6d542df62e..2597b93ebe 100644 --- a/internal/proxy/task_insert_test.go +++ b/internal/proxy/task_insert_test.go @@ -13,6 +13,7 @@ import ( "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/util/function/embedding" + "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" "github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb" "github.com/milvus-io/milvus/pkg/v2/util/merr" @@ -484,3 +485,134 @@ func TestInsertTaskForSchemaMismatch(t *testing.T) { assert.ErrorIs(t, err, merr.ErrCollectionSchemaMismatch) }) } + +func TestInsertTask_Namespace(t *testing.T) { + paramtable.Init() + paramtable.Get().CommonCfg.EnableNamespace.SwapTempValue("true") + defer paramtable.Get().CommonCfg.EnableNamespace.SwapTempValue("false") + cache := NewMockCache(t) + globalMetaCache = cache + cache.On("GetDatabaseInfo", + mock.Anything, + mock.Anything, + ).Return(&databaseInfo{properties: []*commonpb.KeyValuePair{}}, nil).Maybe() + cache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil).Maybe() + ctx := context.Background() + rc := mocks.NewMockRootCoordClient(t) + rc.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{ + Status: merr.Status(nil), + ID: 11198, + Count: 10, + }, nil) + idAllocator, err := allocator.NewIDAllocator(ctx, rc, 0) + idAllocator.Start() + defer idAllocator.Close() + assert.NoError(t, err) + + schemaWithNamespaceEnabled := &schemapb.CollectionSchema{ + Name: "test", + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, Name: "id", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, AutoID: true}, + {FieldID: 101, Name: common.NamespaceFieldName, DataType: schemapb.DataType_VarChar, IsPartitionKey: true, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.MaxLengthKey, Value: "100"}, + }}, + }, + Properties: []*commonpb.KeyValuePair{ + {Key: common.NamespaceEnabledKey, Value: "true"}, + }, + } + + schemaWithNamespaceDisabled := &schemapb.CollectionSchema{ + Name: "test", + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, Name: "id", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, AutoID: true}, + }, + } + + t.Run("test insert with namespace enabled", func(t *testing.T) { + cache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Unset() + cache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Unset() + cache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{ + schema: newSchemaInfo(schemaWithNamespaceEnabled), + }, nil).Maybe() + cache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(newSchemaInfo(schemaWithNamespaceEnabled), nil).Maybe() + namespace := "test" + it := insertTask{ + ctx: context.Background(), + insertMsg: &msgstream.InsertMsg{ + InsertRequest: &msgpb.InsertRequest{ + CollectionName: "test", + Namespace: &namespace, + NumRows: 100, + Version: msgpb.InsertDataVersion_ColumnBased, + }, + }, + schema: schemaWithNamespaceEnabled, + idAllocator: idAllocator, + } + err := it.PreExecute(context.Background()) + assert.NoError(t, err) + assert.Equal(t, int64(101), it.insertMsg.FieldsData[0].FieldId) + + // namespace data is not set + it = insertTask{ + ctx: context.Background(), + insertMsg: &msgstream.InsertMsg{ + InsertRequest: &msgpb.InsertRequest{ + CollectionName: "test", + NumRows: 100, + Version: msgpb.InsertDataVersion_ColumnBased, + }, + }, + idAllocator: idAllocator, + } + err = it.PreExecute(context.Background()) + assert.Error(t, err) + }) + + t.Run("test insert with namespace disabled", func(t *testing.T) { + cache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Unset() + cache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Unset() + cache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{ + schema: newSchemaInfo(schemaWithNamespaceDisabled), + }, nil).Maybe() + cache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(newSchemaInfo(schemaWithNamespaceDisabled), nil).Maybe() + cache.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&partitionInfo{ + name: "p1", + partitionID: 10, + createdTimestamp: 10001, + createdUtcTimestamp: 10002, + }, nil).Maybe() + it := insertTask{ + ctx: context.Background(), + insertMsg: &msgstream.InsertMsg{ + InsertRequest: &msgpb.InsertRequest{ + CollectionName: "test", + NumRows: 100, + Version: msgpb.InsertDataVersion_ColumnBased, + }, + }, + schema: schemaWithNamespaceDisabled, + idAllocator: idAllocator, + } + err := it.PreExecute(context.Background()) + assert.NoError(t, err) + + // namespace data is set + namespace := "test" + it = insertTask{ + ctx: context.Background(), + insertMsg: &msgstream.InsertMsg{ + InsertRequest: &msgpb.InsertRequest{ + CollectionName: "test", + Namespace: &namespace, + NumRows: 100, + Version: msgpb.InsertDataVersion_ColumnBased, + }, + }, + idAllocator: idAllocator, + } + err = it.PreExecute(context.Background()) + assert.Error(t, err) + }) +} diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index 67c5e6a787..49ff44512a 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -744,6 +744,13 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error { } } + if Params.CommonCfg.EnableNamespace.GetAsBool() { + err := addNamespaceData(it.schema.CollectionSchema, it.upsertMsg.InsertMsg) + if err != nil { + return err + } + } + err := checkAndFlattenStructFieldData(it.schema.CollectionSchema, it.upsertMsg.InsertMsg) if err != nil { return err @@ -946,6 +953,7 @@ func (it *upsertTask) PreExecute(ctx context.Context) error { NumRows: uint64(it.req.NumRows), Version: msgpb.InsertDataVersion_ColumnBased, DbName: it.req.DbName, + Namespace: it.req.Namespace, }, }, DeleteMsg: &msgstream.DeleteMsg{ diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 4053a6570a..46749c33e8 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -2332,6 +2332,59 @@ func checkDynamicFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstre return nil } +func addNamespaceData(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) error { + namespaceEnabeld, _, err := common.ParseNamespaceProp(schema.Properties...) + if err != nil { + return err + } + namespaceIsSet := insertMsg.InsertRequest.Namespace != nil + + if namespaceEnabeld != namespaceIsSet { + if namespaceIsSet { + return fmt.Errorf("namespace data is set but namespace disabled") + } + return fmt.Errorf("namespace data is not set but namespace enabled") + } + if !namespaceEnabeld { + return nil + } + + // check namespace field exists + namespaceField := typeutil.GetFieldByName(schema, common.NamespaceFieldName) + if namespaceField == nil { + return fmt.Errorf("namespace field not found") + } + + // check namespace field data is already set + for _, fieldData := range insertMsg.FieldsData { + if fieldData.FieldId == namespaceField.FieldID { + return fmt.Errorf("namespace field data is already set by users") + } + } + + // set namespace field data + namespaceData := make([]string, insertMsg.NRows()) + namespace := *insertMsg.InsertRequest.Namespace + for i := range namespaceData { + namespaceData[i] = namespace + } + insertMsg.FieldsData = append(insertMsg.FieldsData, &schemapb.FieldData{ + FieldName: namespaceField.Name, + FieldId: namespaceField.FieldID, + Type: namespaceField.DataType, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{ + Data: namespaceData, + }, + }, + }, + }, + }) + return nil +} + func GetCachedCollectionSchema(ctx context.Context, dbName string, colName string) (*schemaInfo, error) { if globalMetaCache != nil { return globalMetaCache.GetCollectionSchema(ctx, dbName, colName) diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go index 6dc203df4f..8948cf2900 100644 --- a/internal/rootcoord/create_collection_task.go +++ b/internal/rootcoord/create_collection_task.go @@ -262,7 +262,7 @@ func (t *createCollectionTask) handleNamespaceField(ctx context.Context, schema hasIsolation := hasIsolationProperty(t.Req.Properties...) _, err := typeutil.GetPartitionKeyFieldSchema(schema) hasPartitionKey := err == nil - enabled, has, err := parseNamespaceProp(t.Req.Properties...) + enabled, has, err := common.ParseNamespaceProp(t.Req.Properties...) if err != nil { return err } @@ -303,19 +303,6 @@ func (t *createCollectionTask) handleNamespaceField(ctx context.Context, schema return nil } -func parseNamespaceProp(props ...*commonpb.KeyValuePair) (value bool, has bool, err error) { - for _, p := range props { - if p.GetKey() == common.NamespaceEnabledKey { - value, err := strconv.ParseBool(p.GetValue()) - if err != nil { - return false, false, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("invalid namespace prop value: %s", p.GetValue())) - } - return value, true, nil - } - } - return false, false, nil -} - func hasIsolationProperty(props ...*commonpb.KeyValuePair) bool { for _, p := range props { if p.GetKey() == common.PartitionKeyIsolationKey { diff --git a/pkg/common/common.go b/pkg/common/common.go index 5a15e3b454..31e6f1fe82 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -502,3 +502,16 @@ func ValidateAutoIndexMmapConfig(autoIndexConfigEnable, isVectorField bool, inde } return nil } + +func ParseNamespaceProp(props ...*commonpb.KeyValuePair) (value bool, has bool, err error) { + for _, p := range props { + if p.GetKey() == NamespaceEnabledKey { + value, err := strconv.ParseBool(p.GetValue()) + if err != nil { + return false, false, fmt.Errorf("invalid namespace prop value: %s", p.GetValue()) + } + return value, true, nil + } + } + return false, false, nil +}