mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
feat: Auto add namespace field data if namespace is enabled (#44198)
issue: #44011 Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
parent
0c9b1597f2
commit
e2eb8562f1
2
go.mod
2
go.mod
@ -21,7 +21,7 @@ require (
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
|
||||
github.com/klauspost/compress v1.17.9
|
||||
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.1
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250903080546-f1a74984d9e4
|
||||
github.com/minio/minio-go/v7 v7.0.73
|
||||
github.com/panjf2000/ants/v2 v2.11.3 // indirect
|
||||
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
|
||||
|
||||
4
go.sum
4
go.sum
@ -788,6 +788,10 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz
|
||||
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.1 h1:NLoSWXvlJD8t91G3CUsooXqYnm5nfsBngztQYYT58V0=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.1/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250903074146-fe93017b6822 h1:aLZGg4Hfy37RDTqMXsVAcqv+CqzpRtN13tLfgcGK1nQ=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250903074146-fe93017b6822/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250903080546-f1a74984d9e4 h1:9hoFUhw6wVRGBZFPDNjEaUihvZy5DEOV+SxRIWLEbRM=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250903080546-f1a74984d9e4/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
|
||||
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
|
||||
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
|
||||
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
|
||||
|
||||
@ -1272,6 +1272,52 @@ func (_c *MockProxy_CreatePrivilegeGroup_Call) RunAndReturn(run func(context.Con
|
||||
return _c
|
||||
}
|
||||
|
||||
// CreateReplicateStream provides a mock function with given fields: _a0
|
||||
func (_m *MockProxy) CreateReplicateStream(_a0 milvuspb.MilvusService_CreateReplicateStreamServer) error {
|
||||
ret := _m.Called(_a0)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for CreateReplicateStream")
|
||||
}
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(milvuspb.MilvusService_CreateReplicateStreamServer) error); ok {
|
||||
r0 = rf(_a0)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// MockProxy_CreateReplicateStream_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateReplicateStream'
|
||||
type MockProxy_CreateReplicateStream_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// CreateReplicateStream is a helper method to define mock.On call
|
||||
// - _a0 milvuspb.MilvusService_CreateReplicateStreamServer
|
||||
func (_e *MockProxy_Expecter) CreateReplicateStream(_a0 interface{}) *MockProxy_CreateReplicateStream_Call {
|
||||
return &MockProxy_CreateReplicateStream_Call{Call: _e.mock.On("CreateReplicateStream", _a0)}
|
||||
}
|
||||
|
||||
func (_c *MockProxy_CreateReplicateStream_Call) Run(run func(_a0 milvuspb.MilvusService_CreateReplicateStreamServer)) *MockProxy_CreateReplicateStream_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(milvuspb.MilvusService_CreateReplicateStreamServer))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockProxy_CreateReplicateStream_Call) Return(_a0 error) *MockProxy_CreateReplicateStream_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockProxy_CreateReplicateStream_Call) RunAndReturn(run func(milvuspb.MilvusService_CreateReplicateStreamServer) error) *MockProxy_CreateReplicateStream_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// CreateResourceGroup provides a mock function with given fields: _a0, _a1
|
||||
func (_m *MockProxy) CreateResourceGroup(_a0 context.Context, _a1 *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
@ -4029,6 +4075,65 @@ func (_c *MockProxy_GetReplicas_Call) RunAndReturn(run func(context.Context, *mi
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetReplicateInfo provides a mock function with given fields: _a0, _a1
|
||||
func (_m *MockProxy) GetReplicateInfo(_a0 context.Context, _a1 *milvuspb.GetReplicateInfoRequest) (*milvuspb.GetReplicateInfoResponse, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for GetReplicateInfo")
|
||||
}
|
||||
|
||||
var r0 *milvuspb.GetReplicateInfoResponse
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.GetReplicateInfoRequest) (*milvuspb.GetReplicateInfoResponse, error)); ok {
|
||||
return rf(_a0, _a1)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.GetReplicateInfoRequest) *milvuspb.GetReplicateInfoResponse); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*milvuspb.GetReplicateInfoResponse)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.GetReplicateInfoRequest) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockProxy_GetReplicateInfo_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetReplicateInfo'
|
||||
type MockProxy_GetReplicateInfo_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// GetReplicateInfo is a helper method to define mock.On call
|
||||
// - _a0 context.Context
|
||||
// - _a1 *milvuspb.GetReplicateInfoRequest
|
||||
func (_e *MockProxy_Expecter) GetReplicateInfo(_a0 interface{}, _a1 interface{}) *MockProxy_GetReplicateInfo_Call {
|
||||
return &MockProxy_GetReplicateInfo_Call{Call: _e.mock.On("GetReplicateInfo", _a0, _a1)}
|
||||
}
|
||||
|
||||
func (_c *MockProxy_GetReplicateInfo_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.GetReplicateInfoRequest)) *MockProxy_GetReplicateInfo_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(*milvuspb.GetReplicateInfoRequest))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockProxy_GetReplicateInfo_Call) Return(_a0 *milvuspb.GetReplicateInfoResponse, _a1 error) *MockProxy_GetReplicateInfo_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockProxy_GetReplicateInfo_Call) RunAndReturn(run func(context.Context, *milvuspb.GetReplicateInfoRequest) (*milvuspb.GetReplicateInfoResponse, error)) *MockProxy_GetReplicateInfo_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetSegmentsInfo provides a mock function with given fields: _a0, _a1
|
||||
func (_m *MockProxy) GetSegmentsInfo(_a0 context.Context, _a1 *internalpb.GetSegmentsInfoRequest) (*internalpb.GetSegmentsInfoResponse, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
@ -7494,6 +7599,65 @@ func (_c *MockProxy_UpdateCredentialCache_Call) RunAndReturn(run func(context.Co
|
||||
return _c
|
||||
}
|
||||
|
||||
// UpdateReplicateConfiguration provides a mock function with given fields: _a0, _a1
|
||||
func (_m *MockProxy) UpdateReplicateConfiguration(_a0 context.Context, _a1 *milvuspb.UpdateReplicateConfigurationRequest) (*commonpb.Status, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for UpdateReplicateConfiguration")
|
||||
}
|
||||
|
||||
var r0 *commonpb.Status
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.UpdateReplicateConfigurationRequest) (*commonpb.Status, error)); ok {
|
||||
return rf(_a0, _a1)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.UpdateReplicateConfigurationRequest) *commonpb.Status); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*commonpb.Status)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.UpdateReplicateConfigurationRequest) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockProxy_UpdateReplicateConfiguration_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateReplicateConfiguration'
|
||||
type MockProxy_UpdateReplicateConfiguration_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// UpdateReplicateConfiguration is a helper method to define mock.On call
|
||||
// - _a0 context.Context
|
||||
// - _a1 *milvuspb.UpdateReplicateConfigurationRequest
|
||||
func (_e *MockProxy_Expecter) UpdateReplicateConfiguration(_a0 interface{}, _a1 interface{}) *MockProxy_UpdateReplicateConfiguration_Call {
|
||||
return &MockProxy_UpdateReplicateConfiguration_Call{Call: _e.mock.On("UpdateReplicateConfiguration", _a0, _a1)}
|
||||
}
|
||||
|
||||
func (_c *MockProxy_UpdateReplicateConfiguration_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.UpdateReplicateConfigurationRequest)) *MockProxy_UpdateReplicateConfiguration_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(*milvuspb.UpdateReplicateConfigurationRequest))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockProxy_UpdateReplicateConfiguration_Call) Return(_a0 *commonpb.Status, _a1 error) *MockProxy_UpdateReplicateConfiguration_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockProxy_UpdateReplicateConfiguration_Call) RunAndReturn(run func(context.Context, *milvuspb.UpdateReplicateConfigurationRequest) (*commonpb.Status, error)) *MockProxy_UpdateReplicateConfiguration_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// UpdateResourceGroups provides a mock function with given fields: _a0, _a1
|
||||
func (_m *MockProxy) UpdateResourceGroups(_a0 context.Context, _a1 *milvuspb.UpdateResourceGroupsRequest) (*commonpb.Status, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
@ -7650,8 +7814,7 @@ func (_c *MockProxy_Upsert_Call) RunAndReturn(run func(context.Context, *milvusp
|
||||
func NewMockProxy(t interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
},
|
||||
) *MockProxy {
|
||||
}) *MockProxy {
|
||||
mock := &MockProxy{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
|
||||
@ -2318,6 +2318,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
|
||||
FieldsData: request.FieldsData,
|
||||
NumRows: uint64(request.NumRows),
|
||||
Version: msgpb.InsertDataVersion_ColumnBased,
|
||||
Namespace: request.Namespace,
|
||||
},
|
||||
},
|
||||
idAllocator: node.rowIDAllocator,
|
||||
|
||||
@ -212,6 +212,13 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
if Params.CommonCfg.EnableNamespace.GetAsBool() {
|
||||
err = addNamespaceData(it.schema, it.insertMsg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err = checkAndFlattenStructFieldData(it.schema, it.insertMsg)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/allocator"
|
||||
"github.com/milvus-io/milvus/internal/mocks"
|
||||
"github.com/milvus-io/milvus/internal/util/function/embedding"
|
||||
"github.com/milvus-io/milvus/pkg/v2/common"
|
||||
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||
@ -484,3 +485,134 @@ func TestInsertTaskForSchemaMismatch(t *testing.T) {
|
||||
assert.ErrorIs(t, err, merr.ErrCollectionSchemaMismatch)
|
||||
})
|
||||
}
|
||||
|
||||
func TestInsertTask_Namespace(t *testing.T) {
|
||||
paramtable.Init()
|
||||
paramtable.Get().CommonCfg.EnableNamespace.SwapTempValue("true")
|
||||
defer paramtable.Get().CommonCfg.EnableNamespace.SwapTempValue("false")
|
||||
cache := NewMockCache(t)
|
||||
globalMetaCache = cache
|
||||
cache.On("GetDatabaseInfo",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
).Return(&databaseInfo{properties: []*commonpb.KeyValuePair{}}, nil).Maybe()
|
||||
cache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil).Maybe()
|
||||
ctx := context.Background()
|
||||
rc := mocks.NewMockRootCoordClient(t)
|
||||
rc.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{
|
||||
Status: merr.Status(nil),
|
||||
ID: 11198,
|
||||
Count: 10,
|
||||
}, nil)
|
||||
idAllocator, err := allocator.NewIDAllocator(ctx, rc, 0)
|
||||
idAllocator.Start()
|
||||
defer idAllocator.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
schemaWithNamespaceEnabled := &schemapb.CollectionSchema{
|
||||
Name: "test",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{FieldID: 100, Name: "id", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, AutoID: true},
|
||||
{FieldID: 101, Name: common.NamespaceFieldName, DataType: schemapb.DataType_VarChar, IsPartitionKey: true, TypeParams: []*commonpb.KeyValuePair{
|
||||
{Key: common.MaxLengthKey, Value: "100"},
|
||||
}},
|
||||
},
|
||||
Properties: []*commonpb.KeyValuePair{
|
||||
{Key: common.NamespaceEnabledKey, Value: "true"},
|
||||
},
|
||||
}
|
||||
|
||||
schemaWithNamespaceDisabled := &schemapb.CollectionSchema{
|
||||
Name: "test",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{FieldID: 100, Name: "id", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, AutoID: true},
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("test insert with namespace enabled", func(t *testing.T) {
|
||||
cache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Unset()
|
||||
cache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Unset()
|
||||
cache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{
|
||||
schema: newSchemaInfo(schemaWithNamespaceEnabled),
|
||||
}, nil).Maybe()
|
||||
cache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(newSchemaInfo(schemaWithNamespaceEnabled), nil).Maybe()
|
||||
namespace := "test"
|
||||
it := insertTask{
|
||||
ctx: context.Background(),
|
||||
insertMsg: &msgstream.InsertMsg{
|
||||
InsertRequest: &msgpb.InsertRequest{
|
||||
CollectionName: "test",
|
||||
Namespace: &namespace,
|
||||
NumRows: 100,
|
||||
Version: msgpb.InsertDataVersion_ColumnBased,
|
||||
},
|
||||
},
|
||||
schema: schemaWithNamespaceEnabled,
|
||||
idAllocator: idAllocator,
|
||||
}
|
||||
err := it.PreExecute(context.Background())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(101), it.insertMsg.FieldsData[0].FieldId)
|
||||
|
||||
// namespace data is not set
|
||||
it = insertTask{
|
||||
ctx: context.Background(),
|
||||
insertMsg: &msgstream.InsertMsg{
|
||||
InsertRequest: &msgpb.InsertRequest{
|
||||
CollectionName: "test",
|
||||
NumRows: 100,
|
||||
Version: msgpb.InsertDataVersion_ColumnBased,
|
||||
},
|
||||
},
|
||||
idAllocator: idAllocator,
|
||||
}
|
||||
err = it.PreExecute(context.Background())
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("test insert with namespace disabled", func(t *testing.T) {
|
||||
cache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Unset()
|
||||
cache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Unset()
|
||||
cache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{
|
||||
schema: newSchemaInfo(schemaWithNamespaceDisabled),
|
||||
}, nil).Maybe()
|
||||
cache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(newSchemaInfo(schemaWithNamespaceDisabled), nil).Maybe()
|
||||
cache.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&partitionInfo{
|
||||
name: "p1",
|
||||
partitionID: 10,
|
||||
createdTimestamp: 10001,
|
||||
createdUtcTimestamp: 10002,
|
||||
}, nil).Maybe()
|
||||
it := insertTask{
|
||||
ctx: context.Background(),
|
||||
insertMsg: &msgstream.InsertMsg{
|
||||
InsertRequest: &msgpb.InsertRequest{
|
||||
CollectionName: "test",
|
||||
NumRows: 100,
|
||||
Version: msgpb.InsertDataVersion_ColumnBased,
|
||||
},
|
||||
},
|
||||
schema: schemaWithNamespaceDisabled,
|
||||
idAllocator: idAllocator,
|
||||
}
|
||||
err := it.PreExecute(context.Background())
|
||||
assert.NoError(t, err)
|
||||
|
||||
// namespace data is set
|
||||
namespace := "test"
|
||||
it = insertTask{
|
||||
ctx: context.Background(),
|
||||
insertMsg: &msgstream.InsertMsg{
|
||||
InsertRequest: &msgpb.InsertRequest{
|
||||
CollectionName: "test",
|
||||
Namespace: &namespace,
|
||||
NumRows: 100,
|
||||
Version: msgpb.InsertDataVersion_ColumnBased,
|
||||
},
|
||||
},
|
||||
idAllocator: idAllocator,
|
||||
}
|
||||
err = it.PreExecute(context.Background())
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
@ -744,6 +744,13 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
if Params.CommonCfg.EnableNamespace.GetAsBool() {
|
||||
err := addNamespaceData(it.schema.CollectionSchema, it.upsertMsg.InsertMsg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err := checkAndFlattenStructFieldData(it.schema.CollectionSchema, it.upsertMsg.InsertMsg)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -946,6 +953,7 @@ func (it *upsertTask) PreExecute(ctx context.Context) error {
|
||||
NumRows: uint64(it.req.NumRows),
|
||||
Version: msgpb.InsertDataVersion_ColumnBased,
|
||||
DbName: it.req.DbName,
|
||||
Namespace: it.req.Namespace,
|
||||
},
|
||||
},
|
||||
DeleteMsg: &msgstream.DeleteMsg{
|
||||
|
||||
@ -2332,6 +2332,59 @@ func checkDynamicFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstre
|
||||
return nil
|
||||
}
|
||||
|
||||
func addNamespaceData(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) error {
|
||||
namespaceEnabeld, _, err := common.ParseNamespaceProp(schema.Properties...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
namespaceIsSet := insertMsg.InsertRequest.Namespace != nil
|
||||
|
||||
if namespaceEnabeld != namespaceIsSet {
|
||||
if namespaceIsSet {
|
||||
return fmt.Errorf("namespace data is set but namespace disabled")
|
||||
}
|
||||
return fmt.Errorf("namespace data is not set but namespace enabled")
|
||||
}
|
||||
if !namespaceEnabeld {
|
||||
return nil
|
||||
}
|
||||
|
||||
// check namespace field exists
|
||||
namespaceField := typeutil.GetFieldByName(schema, common.NamespaceFieldName)
|
||||
if namespaceField == nil {
|
||||
return fmt.Errorf("namespace field not found")
|
||||
}
|
||||
|
||||
// check namespace field data is already set
|
||||
for _, fieldData := range insertMsg.FieldsData {
|
||||
if fieldData.FieldId == namespaceField.FieldID {
|
||||
return fmt.Errorf("namespace field data is already set by users")
|
||||
}
|
||||
}
|
||||
|
||||
// set namespace field data
|
||||
namespaceData := make([]string, insertMsg.NRows())
|
||||
namespace := *insertMsg.InsertRequest.Namespace
|
||||
for i := range namespaceData {
|
||||
namespaceData[i] = namespace
|
||||
}
|
||||
insertMsg.FieldsData = append(insertMsg.FieldsData, &schemapb.FieldData{
|
||||
FieldName: namespaceField.Name,
|
||||
FieldId: namespaceField.FieldID,
|
||||
Type: namespaceField.DataType,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_StringData{
|
||||
StringData: &schemapb.StringArray{
|
||||
Data: namespaceData,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetCachedCollectionSchema(ctx context.Context, dbName string, colName string) (*schemaInfo, error) {
|
||||
if globalMetaCache != nil {
|
||||
return globalMetaCache.GetCollectionSchema(ctx, dbName, colName)
|
||||
|
||||
@ -262,7 +262,7 @@ func (t *createCollectionTask) handleNamespaceField(ctx context.Context, schema
|
||||
hasIsolation := hasIsolationProperty(t.Req.Properties...)
|
||||
_, err := typeutil.GetPartitionKeyFieldSchema(schema)
|
||||
hasPartitionKey := err == nil
|
||||
enabled, has, err := parseNamespaceProp(t.Req.Properties...)
|
||||
enabled, has, err := common.ParseNamespaceProp(t.Req.Properties...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -303,19 +303,6 @@ func (t *createCollectionTask) handleNamespaceField(ctx context.Context, schema
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseNamespaceProp(props ...*commonpb.KeyValuePair) (value bool, has bool, err error) {
|
||||
for _, p := range props {
|
||||
if p.GetKey() == common.NamespaceEnabledKey {
|
||||
value, err := strconv.ParseBool(p.GetValue())
|
||||
if err != nil {
|
||||
return false, false, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("invalid namespace prop value: %s", p.GetValue()))
|
||||
}
|
||||
return value, true, nil
|
||||
}
|
||||
}
|
||||
return false, false, nil
|
||||
}
|
||||
|
||||
func hasIsolationProperty(props ...*commonpb.KeyValuePair) bool {
|
||||
for _, p := range props {
|
||||
if p.GetKey() == common.PartitionKeyIsolationKey {
|
||||
|
||||
@ -502,3 +502,16 @@ func ValidateAutoIndexMmapConfig(autoIndexConfigEnable, isVectorField bool, inde
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func ParseNamespaceProp(props ...*commonpb.KeyValuePair) (value bool, has bool, err error) {
|
||||
for _, p := range props {
|
||||
if p.GetKey() == NamespaceEnabledKey {
|
||||
value, err := strconv.ParseBool(p.GetValue())
|
||||
if err != nil {
|
||||
return false, false, fmt.Errorf("invalid namespace prop value: %s", p.GetValue())
|
||||
}
|
||||
return value, true, nil
|
||||
}
|
||||
}
|
||||
return false, false, nil
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user