mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
feat: allow users to write pk field when autoid is enabled (#44424)
https://github.com/milvus-io/milvus/issues/44425 --------- Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
parent
71563d5d0e
commit
96e1de4e22
@ -176,7 +176,13 @@ func AppendSystemFieldsData(task *ImportTask, data *storage.InsertData, rowNum i
|
|||||||
for i := 0; i < rowNum; i++ {
|
for i := 0; i < rowNum; i++ {
|
||||||
ids[i] = start + int64(i)
|
ids[i] = start + int64(i)
|
||||||
}
|
}
|
||||||
|
pkData, ok := data.Data[pkField.GetFieldID()]
|
||||||
|
allowInsertAutoID, _ := common.IsAllowInsertAutoID(task.req.Schema.GetProperties()...)
|
||||||
if pkField.GetAutoID() {
|
if pkField.GetAutoID() {
|
||||||
|
if allowInsertAutoID && ok && pkData != nil {
|
||||||
|
// if allowInsertAutoID is true, and pkData is not nil, skip generating primary key data
|
||||||
|
return nil
|
||||||
|
}
|
||||||
switch pkField.GetDataType() {
|
switch pkField.GetDataType() {
|
||||||
case schemapb.DataType_Int64:
|
case schemapb.DataType_Int64:
|
||||||
data.Data[pkField.GetFieldID()] = &storage.Int64FieldData{Data: ids}
|
data.Data[pkField.GetFieldID()] = &storage.Int64FieldData{Data: ids}
|
||||||
|
|||||||
@ -95,6 +95,54 @@ func Test_AppendSystemFieldsData(t *testing.T) {
|
|||||||
assert.Equal(t, count, insertData.Data[common.TimeStampField].RowNum())
|
assert.Equal(t, count, insertData.Data[common.TimeStampField].RowNum())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_AppendSystemFieldsData_AllowInsertAutoID_KeepUserPK(t *testing.T) {
|
||||||
|
const count = 10
|
||||||
|
|
||||||
|
pkField := &schemapb.FieldSchema{
|
||||||
|
FieldID: 100,
|
||||||
|
Name: "pk",
|
||||||
|
DataType: schemapb.DataType_Int64,
|
||||||
|
IsPrimaryKey: true,
|
||||||
|
AutoID: true,
|
||||||
|
}
|
||||||
|
vecField := &schemapb.FieldSchema{
|
||||||
|
FieldID: 101,
|
||||||
|
Name: "vec",
|
||||||
|
DataType: schemapb.DataType_FloatVector,
|
||||||
|
TypeParams: []*commonpb.KeyValuePair{
|
||||||
|
{Key: common.DimKey, Value: "4"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
schema := &schemapb.CollectionSchema{}
|
||||||
|
schema.Fields = []*schemapb.FieldSchema{pkField, vecField}
|
||||||
|
schema.Properties = []*commonpb.KeyValuePair{{Key: common.AllowInsertAutoIDKey, Value: "true"}}
|
||||||
|
|
||||||
|
task := &ImportTask{
|
||||||
|
req: &datapb.ImportRequest{Ts: 1000, Schema: schema},
|
||||||
|
allocator: allocator.NewLocalAllocator(0, count*2),
|
||||||
|
}
|
||||||
|
|
||||||
|
insertData, err := testutil.CreateInsertData(schema, count)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
userPK := make([]int64, count)
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
userPK[i] = 1000 + int64(i)
|
||||||
|
}
|
||||||
|
insertData.Data[pkField.GetFieldID()] = &storage.Int64FieldData{Data: userPK}
|
||||||
|
|
||||||
|
rowNum, _ := GetInsertDataRowCount(insertData, task.GetSchema())
|
||||||
|
err = AppendSystemFieldsData(task, insertData, rowNum)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
got := insertData.Data[pkField.GetFieldID()].(*storage.Int64FieldData)
|
||||||
|
assert.Equal(t, count, got.RowNum())
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
assert.Equal(t, userPK[i], got.Data[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func Test_UnsetAutoID(t *testing.T) {
|
func Test_UnsetAutoID(t *testing.T) {
|
||||||
pkField := &schemapb.FieldSchema{
|
pkField := &schemapb.FieldSchema{
|
||||||
FieldID: 100,
|
FieldID: 100,
|
||||||
|
|||||||
@ -1343,6 +1343,7 @@ var allowedAlterProps = []string{
|
|||||||
common.MaxLengthKey,
|
common.MaxLengthKey,
|
||||||
common.MmapEnabledKey,
|
common.MmapEnabledKey,
|
||||||
common.MaxCapacityKey,
|
common.MaxCapacityKey,
|
||||||
|
common.AllowInsertAutoIDKey,
|
||||||
}
|
}
|
||||||
|
|
||||||
var allowedDropProps = []string{
|
var allowedDropProps = []string{
|
||||||
@ -1472,6 +1473,18 @@ func (t *alterCollectionFieldTask) PreExecute(ctx context.Context) error {
|
|||||||
if maxCapacityPerRow > defaultMaxArrayCapacity || maxCapacityPerRow <= 0 {
|
if maxCapacityPerRow > defaultMaxArrayCapacity || maxCapacityPerRow <= 0 {
|
||||||
return merr.WrapErrParameterInvalidMsg("the maximum capacity specified for a Array should be in (0, %d]", defaultMaxArrayCapacity)
|
return merr.WrapErrParameterInvalidMsg("the maximum capacity specified for a Array should be in (0, %d]", defaultMaxArrayCapacity)
|
||||||
}
|
}
|
||||||
|
case common.AllowInsertAutoIDKey:
|
||||||
|
allowInsertAutoID, err := strconv.ParseBool(prop.Value)
|
||||||
|
if err != nil {
|
||||||
|
return merr.WrapErrParameterInvalidMsg("the value for %s must be a boolean", common.AllowInsertAutoIDKey)
|
||||||
|
}
|
||||||
|
primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(collSchema.CollectionSchema)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if allowInsertAutoID && !primaryFieldSchema.AutoID {
|
||||||
|
return merr.WrapErrParameterInvalidMsg("the value for %s must be false when autoID is false", common.AllowInsertAutoIDKey)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -314,6 +314,113 @@ func TestMaxInsertSize(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestInsertTask_KeepUserPK_WhenAllowInsertAutoIDTrue(t *testing.T) {
|
||||||
|
paramtable.Init()
|
||||||
|
// run auto-id path with field count check; allow user to pass PK
|
||||||
|
Params.Save(Params.ProxyCfg.SkipAutoIDCheck.Key, "false")
|
||||||
|
defer Params.Reset(Params.ProxyCfg.SkipAutoIDCheck.Key)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
rc := mocks.NewMockRootCoordClient(t)
|
||||||
|
rc.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{
|
||||||
|
Status: merr.Status(nil),
|
||||||
|
ID: 11198,
|
||||||
|
Count: 10,
|
||||||
|
}, nil)
|
||||||
|
idAllocator, err := allocator.NewIDAllocator(ctx, rc, 0)
|
||||||
|
idAllocator.Start()
|
||||||
|
defer idAllocator.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
nb := 5
|
||||||
|
userIDs := []int64{101, 102, 103, 104, 105}
|
||||||
|
|
||||||
|
collectionName := "TestInsertTask_KeepUserPK"
|
||||||
|
schema := &schemapb.CollectionSchema{
|
||||||
|
Name: collectionName,
|
||||||
|
AutoID: true,
|
||||||
|
Fields: []*schemapb.FieldSchema{
|
||||||
|
{FieldID: 100, Name: "id", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, AutoID: true},
|
||||||
|
},
|
||||||
|
Properties: []*commonpb.KeyValuePair{
|
||||||
|
{Key: common.AllowInsertAutoIDKey, Value: "true"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pkFieldData := &schemapb.FieldData{
|
||||||
|
FieldName: "id",
|
||||||
|
FieldId: 100,
|
||||||
|
Type: schemapb.DataType_Int64,
|
||||||
|
Field: &schemapb.FieldData_Scalars{
|
||||||
|
Scalars: &schemapb.ScalarField{
|
||||||
|
Data: &schemapb.ScalarField_LongData{
|
||||||
|
LongData: &schemapb.LongArray{Data: userIDs},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
task := insertTask{
|
||||||
|
ctx: context.Background(),
|
||||||
|
insertMsg: &BaseInsertTask{
|
||||||
|
InsertRequest: &msgpb.InsertRequest{
|
||||||
|
CollectionName: collectionName,
|
||||||
|
DbName: "test_db",
|
||||||
|
PartitionName: "_default",
|
||||||
|
Base: &commonpb.MsgBase{
|
||||||
|
MsgType: commonpb.MsgType_Insert,
|
||||||
|
},
|
||||||
|
Version: msgpb.InsertDataVersion_ColumnBased,
|
||||||
|
FieldsData: []*schemapb.FieldData{pkFieldData},
|
||||||
|
NumRows: uint64(nb),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
idAllocator: idAllocator,
|
||||||
|
}
|
||||||
|
|
||||||
|
info := newSchemaInfo(schema)
|
||||||
|
cache := NewMockCache(t)
|
||||||
|
collectionID := UniqueID(0)
|
||||||
|
cache.On("GetCollectionID",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
).Return(collectionID, nil)
|
||||||
|
|
||||||
|
cache.On("GetCollectionSchema",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
).Return(info, nil)
|
||||||
|
|
||||||
|
cache.On("GetCollectionInfo",
|
||||||
|
mock.Anything,
|
||||||
|
mock.Anything,
|
||||||
|
mock.Anything,
|
||||||
|
mock.Anything,
|
||||||
|
).Return(&collectionInfo{schema: info}, nil)
|
||||||
|
|
||||||
|
cache.On("GetDatabaseInfo",
|
||||||
|
mock.Anything,
|
||||||
|
mock.Anything,
|
||||||
|
).Return(&databaseInfo{properties: []*commonpb.KeyValuePair{}}, nil)
|
||||||
|
|
||||||
|
globalMetaCache = cache
|
||||||
|
|
||||||
|
err = task.PreExecute(context.Background())
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
ids := task.result.IDs
|
||||||
|
if ids.GetIntId() == nil {
|
||||||
|
t.Fatalf("expected int IDs, got nil")
|
||||||
|
}
|
||||||
|
got := ids.GetIntId().GetData()
|
||||||
|
|
||||||
|
assert.Equal(t, userIDs, got)
|
||||||
|
}
|
||||||
|
|
||||||
func TestInsertTask_Function(t *testing.T) {
|
func TestInsertTask_Function(t *testing.T) {
|
||||||
paramtable.Init()
|
paramtable.Init()
|
||||||
paramtable.Get().CredentialCfg.Credential.GetFunc = func() map[string]string {
|
paramtable.Get().CredentialCfg.Credential.GetFunc = func() map[string]string {
|
||||||
|
|||||||
@ -428,6 +428,77 @@ func constructCollectionSchemaWithAllType(
|
|||||||
return schema
|
return schema
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAlterCollection_AllowInsertAutoID_Validation(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
buildRoot := func(autoID bool) *mocks.MockMixCoordClient {
|
||||||
|
root := mocks.NewMockMixCoordClient(t)
|
||||||
|
// InitMetaCache requires ListPolicy
|
||||||
|
root.EXPECT().ListPolicy(mock.Anything, mock.Anything, mock.Anything).Return(&internalpb.ListPolicyResponse{Status: merr.Success()}, nil).Once()
|
||||||
|
// Meta cache update path fetches partitions info
|
||||||
|
root.EXPECT().ShowPartitions(mock.Anything, mock.Anything, mock.Anything).Return(&milvuspb.ShowPartitionsResponse{Status: merr.Success()}, nil).Maybe()
|
||||||
|
// DescribeCollection returns a schema with PK autoID configurable
|
||||||
|
root.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
|
||||||
|
Status: merr.Success(),
|
||||||
|
CollectionID: 1,
|
||||||
|
DbName: dbName,
|
||||||
|
Schema: &schemapb.CollectionSchema{
|
||||||
|
Name: "allow_autoid_test",
|
||||||
|
AutoID: false,
|
||||||
|
Fields: []*schemapb.FieldSchema{
|
||||||
|
{FieldID: 100, Name: "pk", IsPrimaryKey: true, DataType: schemapb.DataType_Int64, AutoID: autoID},
|
||||||
|
{FieldID: 101, Name: "vec", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "8"}}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, nil).Maybe()
|
||||||
|
return root
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("success when PK autoID=true and allow_insert_autoid=true", func(t *testing.T) {
|
||||||
|
cache := globalMetaCache
|
||||||
|
defer func() { globalMetaCache = cache }()
|
||||||
|
root := buildRoot(true)
|
||||||
|
mgr := newShardClientMgr()
|
||||||
|
err := InitMetaCache(ctx, root, mgr)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
task := &alterCollectionFieldTask{
|
||||||
|
AlterCollectionFieldRequest: &milvuspb.AlterCollectionFieldRequest{
|
||||||
|
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollectionField},
|
||||||
|
DbName: dbName,
|
||||||
|
CollectionName: "allow_autoid_test",
|
||||||
|
Properties: []*commonpb.KeyValuePair{
|
||||||
|
{Key: common.AllowInsertAutoIDKey, Value: "true"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err = task.PreExecute(ctx)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("error when PK autoID=false and allow_insert_autoid=true", func(t *testing.T) {
|
||||||
|
cache := globalMetaCache
|
||||||
|
defer func() { globalMetaCache = cache }()
|
||||||
|
root := buildRoot(false)
|
||||||
|
mgr := newShardClientMgr()
|
||||||
|
err := InitMetaCache(ctx, root, mgr)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
task := &alterCollectionFieldTask{
|
||||||
|
AlterCollectionFieldRequest: &milvuspb.AlterCollectionFieldRequest{
|
||||||
|
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollectionField},
|
||||||
|
DbName: dbName,
|
||||||
|
CollectionName: "allow_autoid_test",
|
||||||
|
Properties: []*commonpb.KeyValuePair{
|
||||||
|
{Key: common.AllowInsertAutoIDKey, Value: "true"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err = task.PreExecute(ctx)
|
||||||
|
assert.Error(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func constructPlaceholderGroup(
|
func constructPlaceholderGroup(
|
||||||
nq, dim int,
|
nq, dim int,
|
||||||
) *commonpb.PlaceholderGroup {
|
) *commonpb.PlaceholderGroup {
|
||||||
@ -5269,3 +5340,43 @@ func TestDescribeCollectionTaskWithStructArrayField(t *testing.T) {
|
|||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAlterCollectionField_AllowInsertAutoID_AutoIDFalse(t *testing.T) {
|
||||||
|
qc := NewMixCoordMock()
|
||||||
|
InitMetaCache(context.Background(), qc, nil)
|
||||||
|
ctx := context.Background()
|
||||||
|
collectionName := "test_alter_allow_insert_autoid_autoid_false"
|
||||||
|
|
||||||
|
schema := constructCollectionSchemaWithStructArrayField(collectionName, testStructArrayField, false)
|
||||||
|
schemaBytes, err := proto.Marshal(schema)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
createColReq := &milvuspb.CreateCollectionRequest{
|
||||||
|
Base: &commonpb.MsgBase{
|
||||||
|
MsgType: commonpb.MsgType_CreateCollection,
|
||||||
|
MsgID: 200,
|
||||||
|
Timestamp: 200,
|
||||||
|
},
|
||||||
|
DbName: dbName,
|
||||||
|
CollectionName: collectionName,
|
||||||
|
Schema: schemaBytes,
|
||||||
|
ShardsNum: 1,
|
||||||
|
}
|
||||||
|
qc.CreateCollection(ctx, createColReq)
|
||||||
|
|
||||||
|
task := &alterCollectionFieldTask{
|
||||||
|
AlterCollectionFieldRequest: &milvuspb.AlterCollectionFieldRequest{
|
||||||
|
Base: &commonpb.MsgBase{},
|
||||||
|
CollectionName: collectionName,
|
||||||
|
FieldName: "",
|
||||||
|
Properties: []*commonpb.KeyValuePair{
|
||||||
|
{Key: common.AllowInsertAutoIDKey, Value: "true"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
mixCoord: qc,
|
||||||
|
}
|
||||||
|
|
||||||
|
err = task.PreExecute(ctx)
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.Equal(t, merr.Code(merr.ErrParameterInvalid), merr.Code(err))
|
||||||
|
}
|
||||||
|
|||||||
@ -1749,6 +1749,10 @@ func checkFieldsDataBySchema(allFields []*schemapb.FieldSchema, schema *schemapb
|
|||||||
dataNameSet.Insert(fieldName)
|
dataNameSet.Insert(fieldName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
allowInsertAutoID, _ := common.IsAllowInsertAutoID(schema.GetProperties()...)
|
||||||
|
hasPkData := false
|
||||||
|
needAutoGenPk := false
|
||||||
|
|
||||||
for _, fieldSchema := range allFields {
|
for _, fieldSchema := range allFields {
|
||||||
if fieldSchema.AutoID && !fieldSchema.IsPrimaryKey {
|
if fieldSchema.AutoID && !fieldSchema.IsPrimaryKey {
|
||||||
log.Warn("not primary key field, but set autoID true", zap.String("field", fieldSchema.GetName()))
|
log.Warn("not primary key field, but set autoID true", zap.String("field", fieldSchema.GetName()))
|
||||||
@ -1757,16 +1761,18 @@ func checkFieldsDataBySchema(allFields []*schemapb.FieldSchema, schema *schemapb
|
|||||||
|
|
||||||
if fieldSchema.IsPrimaryKey {
|
if fieldSchema.IsPrimaryKey {
|
||||||
primaryKeyNum++
|
primaryKeyNum++
|
||||||
|
hasPkData = dataNameSet.Contain(fieldSchema.GetName())
|
||||||
|
needAutoGenPk = fieldSchema.AutoID && (!allowInsertAutoID || !hasPkData)
|
||||||
}
|
}
|
||||||
if fieldSchema.GetDefaultValue() != nil && fieldSchema.IsPrimaryKey {
|
if fieldSchema.GetDefaultValue() != nil && fieldSchema.IsPrimaryKey {
|
||||||
return merr.WrapErrParameterInvalidMsg("primary key can't be with default value")
|
return merr.WrapErrParameterInvalidMsg("primary key can't be with default value")
|
||||||
}
|
}
|
||||||
if (fieldSchema.IsPrimaryKey && fieldSchema.AutoID && !Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() && inInsert) || IsBM25FunctionOutputField(fieldSchema, schema) {
|
if (fieldSchema.IsPrimaryKey && fieldSchema.AutoID && !Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() && needAutoGenPk && inInsert) || IsBM25FunctionOutputField(fieldSchema, schema) {
|
||||||
// when inInsert, no need to pass when pk is autoid and SkipAutoIDCheck is false
|
// when inInsert, no need to pass when pk is autoid and SkipAutoIDCheck is false
|
||||||
autoGenFieldNum++
|
autoGenFieldNum++
|
||||||
}
|
}
|
||||||
if _, ok := dataNameSet[fieldSchema.GetName()]; !ok {
|
if _, ok := dataNameSet[fieldSchema.GetName()]; !ok {
|
||||||
if (fieldSchema.IsPrimaryKey && fieldSchema.AutoID && !Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() && inInsert) || IsBM25FunctionOutputField(fieldSchema, schema) {
|
if (fieldSchema.IsPrimaryKey && fieldSchema.AutoID && !Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() && needAutoGenPk && inInsert) || IsBM25FunctionOutputField(fieldSchema, schema) {
|
||||||
// autoGenField
|
// autoGenField
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -1909,9 +1915,9 @@ func checkPrimaryFieldData(allFields []*schemapb.FieldSchema, schema *schemapb.C
|
|||||||
var primaryFieldData *schemapb.FieldData
|
var primaryFieldData *schemapb.FieldData
|
||||||
// when checkPrimaryFieldData in insert
|
// when checkPrimaryFieldData in insert
|
||||||
|
|
||||||
skipAutoIDCheck := Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() &&
|
allowInsertAutoID, _ := common.IsAllowInsertAutoID(schema.GetProperties()...)
|
||||||
primaryFieldSchema.AutoID &&
|
skipAutoIDCheck := primaryFieldSchema.AutoID &&
|
||||||
typeutil.IsPrimaryFieldDataExist(insertMsg.GetFieldsData(), primaryFieldSchema)
|
typeutil.IsPrimaryFieldDataExist(insertMsg.GetFieldsData(), primaryFieldSchema) && (Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() || allowInsertAutoID)
|
||||||
|
|
||||||
if !primaryFieldSchema.AutoID || skipAutoIDCheck {
|
if !primaryFieldSchema.AutoID || skipAutoIDCheck {
|
||||||
primaryFieldData, err = typeutil.GetPrimaryFieldData(insertMsg.GetFieldsData(), primaryFieldSchema)
|
primaryFieldData, err = typeutil.GetPrimaryFieldData(insertMsg.GetFieldsData(), primaryFieldSchema)
|
||||||
@ -1922,7 +1928,7 @@ func checkPrimaryFieldData(allFields []*schemapb.FieldSchema, schema *schemapb.C
|
|||||||
} else {
|
} else {
|
||||||
// check primary key data not exist
|
// check primary key data not exist
|
||||||
if typeutil.IsPrimaryFieldDataExist(insertMsg.GetFieldsData(), primaryFieldSchema) {
|
if typeutil.IsPrimaryFieldDataExist(insertMsg.GetFieldsData(), primaryFieldSchema) {
|
||||||
return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("can not assign primary field data when auto id enabled %v", primaryFieldSchema.Name))
|
return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("can not assign primary field data when auto id enabled and allow_insert_auto_id is false %v", primaryFieldSchema.Name))
|
||||||
}
|
}
|
||||||
// if autoID == true, currently support autoID for int64 and varchar PrimaryField
|
// if autoID == true, currently support autoID for int64 and varchar PrimaryField
|
||||||
primaryFieldData, err = autoGenPrimaryFieldData(primaryFieldSchema, insertMsg.GetRowIDs())
|
primaryFieldData, err = autoGenPrimaryFieldData(primaryFieldSchema, insertMsg.GetRowIDs())
|
||||||
|
|||||||
@ -369,3 +369,57 @@ func (suite *ReaderSuite) TestReadLoop() {
|
|||||||
func TestCsvReader(t *testing.T) {
|
func TestCsvReader(t *testing.T) {
|
||||||
suite.Run(t, new(ReaderSuite))
|
suite.Run(t, new(ReaderSuite))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *ReaderSuite) TestAllowInsertAutoID_KeepUserPK() {
|
||||||
|
schema := &schemapb.CollectionSchema{
|
||||||
|
Fields: []*schemapb.FieldSchema{
|
||||||
|
{
|
||||||
|
FieldID: 100,
|
||||||
|
Name: "pk",
|
||||||
|
IsPrimaryKey: true,
|
||||||
|
DataType: schemapb.DataType_Int64,
|
||||||
|
AutoID: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
FieldID: 101,
|
||||||
|
Name: "vec",
|
||||||
|
DataType: schemapb.DataType_FloatVector,
|
||||||
|
TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "8"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// prepare csv content with header including pk
|
||||||
|
content := "pk,vec\n"
|
||||||
|
// one row is enough; vec must be a quoted JSON array to avoid CSV splitting
|
||||||
|
content += "1,\"[0,1,2,3,4,5,6,7]\"\n"
|
||||||
|
|
||||||
|
// allow_insert_autoid=false, providing PK in header should error
|
||||||
|
{
|
||||||
|
cm := mocks.NewChunkManager(suite.T())
|
||||||
|
cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) {
|
||||||
|
reader := strings.NewReader(content)
|
||||||
|
r := &mockReader{Reader: reader, Closer: io.NopCloser(reader)}
|
||||||
|
return r, nil
|
||||||
|
})
|
||||||
|
_, err := NewReader(context.Background(), cm, schema, "dummy path", 1024, ',', "")
|
||||||
|
suite.Error(err)
|
||||||
|
suite.Contains(err.Error(), "is auto-generated, no need to provide")
|
||||||
|
}
|
||||||
|
|
||||||
|
// allow_insert_autoid=true, providing PK in header should be allowed
|
||||||
|
{
|
||||||
|
schema.Properties = []*commonpb.KeyValuePair{{Key: common.AllowInsertAutoIDKey, Value: "true"}}
|
||||||
|
cm := mocks.NewChunkManager(suite.T())
|
||||||
|
cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) {
|
||||||
|
reader := strings.NewReader(content)
|
||||||
|
r := &mockReader{Reader: reader, Closer: io.NopCloser(reader)}
|
||||||
|
return r, nil
|
||||||
|
})
|
||||||
|
reader, err := NewReader(context.Background(), cm, schema, "dummy path", 1024, ',', "")
|
||||||
|
suite.NoError(err)
|
||||||
|
// call Read once to ensure parsing proceeds
|
||||||
|
_, err = reader.Read()
|
||||||
|
suite.NoError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -27,6 +27,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/json"
|
"github.com/milvus-io/milvus/internal/json"
|
||||||
"github.com/milvus-io/milvus/internal/util/importutilv2/common"
|
"github.com/milvus-io/milvus/internal/util/importutilv2/common"
|
||||||
"github.com/milvus-io/milvus/internal/util/nullutil"
|
"github.com/milvus-io/milvus/internal/util/nullutil"
|
||||||
|
pkgcommon "github.com/milvus-io/milvus/pkg/v2/common"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/parameterutil"
|
"github.com/milvus-io/milvus/pkg/v2/util/parameterutil"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||||
@ -99,10 +100,11 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st
|
|||||||
for _, name := range header {
|
for _, name := range header {
|
||||||
headerMap[name] = true
|
headerMap[name] = true
|
||||||
}
|
}
|
||||||
|
allowInsertAutoID, _ := pkgcommon.IsAllowInsertAutoID(schema.GetProperties()...)
|
||||||
|
|
||||||
// check if csv header provides the primary key while it should be auto-generated
|
// check if csv header provides the primary key while it should be auto-generated
|
||||||
_, pkInHeader := headerMap[pkField.GetName()]
|
_, pkInHeader := headerMap[pkField.GetName()]
|
||||||
if pkInHeader && pkField.GetAutoID() {
|
if pkInHeader && pkField.GetAutoID() && !allowInsertAutoID {
|
||||||
return nil, merr.WrapErrImportFailed(
|
return nil, merr.WrapErrImportFailed(
|
||||||
fmt.Sprintf("the primary key '%s' is auto-generated, no need to provide", pkField.GetName()))
|
fmt.Sprintf("the primary key '%s' is auto-generated, no need to provide", pkField.GetName()))
|
||||||
}
|
}
|
||||||
|
|||||||
@ -426,3 +426,68 @@ func (suite *ReaderSuite) TestReadCount() {
|
|||||||
func TestJsonReader(t *testing.T) {
|
func TestJsonReader(t *testing.T) {
|
||||||
suite.Run(t, new(ReaderSuite))
|
suite.Run(t, new(ReaderSuite))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *ReaderSuite) TestAllowInsertAutoID_KeepUserPK() {
|
||||||
|
schema := &schemapb.CollectionSchema{
|
||||||
|
Fields: []*schemapb.FieldSchema{
|
||||||
|
{
|
||||||
|
FieldID: 100,
|
||||||
|
Name: "pk",
|
||||||
|
IsPrimaryKey: true,
|
||||||
|
DataType: schemapb.DataType_Int64,
|
||||||
|
AutoID: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
FieldID: 101,
|
||||||
|
Name: "vec",
|
||||||
|
DataType: schemapb.DataType_FloatVector,
|
||||||
|
TypeParams: []*commonpb.KeyValuePair{
|
||||||
|
{Key: common.DimKey, Value: "8"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// build rows that explicitly include pk and a valid vec of dim 8
|
||||||
|
rows := make([]any, 0, suite.numRows)
|
||||||
|
for i := 0; i < suite.numRows; i++ {
|
||||||
|
row := make(map[string]any)
|
||||||
|
row["pk"] = int64(i + 1)
|
||||||
|
vec := make([]float64, 8)
|
||||||
|
for j := 0; j < 8; j++ {
|
||||||
|
vec[j] = float64(j)
|
||||||
|
}
|
||||||
|
row["vec"] = vec
|
||||||
|
rows = append(rows, row)
|
||||||
|
}
|
||||||
|
jsonBytes, err := json.Marshal(rows)
|
||||||
|
suite.NoError(err)
|
||||||
|
|
||||||
|
// allow_insert_autoid=false, providing PK should error
|
||||||
|
{
|
||||||
|
cm := mocks.NewChunkManager(suite.T())
|
||||||
|
cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) {
|
||||||
|
r := &mockReader{Reader: strings.NewReader(string(jsonBytes))}
|
||||||
|
return r, nil
|
||||||
|
})
|
||||||
|
reader, err := NewReader(context.Background(), cm, schema, "mockPath", math.MaxInt)
|
||||||
|
suite.NoError(err)
|
||||||
|
_, err = reader.Read()
|
||||||
|
suite.Error(err)
|
||||||
|
suite.Contains(err.Error(), "is auto-generated, no need to provide")
|
||||||
|
}
|
||||||
|
|
||||||
|
// allow_insert_autoid=true, providing PK should be allowed
|
||||||
|
{
|
||||||
|
schema.Properties = []*commonpb.KeyValuePair{{Key: common.AllowInsertAutoIDKey, Value: "true"}}
|
||||||
|
cm := mocks.NewChunkManager(suite.T())
|
||||||
|
cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) {
|
||||||
|
r := &mockReader{Reader: strings.NewReader(string(jsonBytes))}
|
||||||
|
return r, nil
|
||||||
|
})
|
||||||
|
reader, err := NewReader(context.Background(), cm, schema, "mockPath", math.MaxInt)
|
||||||
|
suite.NoError(err)
|
||||||
|
_, err = reader.Read()
|
||||||
|
suite.NoError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -26,6 +26,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/json"
|
"github.com/milvus-io/milvus/internal/json"
|
||||||
"github.com/milvus-io/milvus/internal/util/importutilv2/common"
|
"github.com/milvus-io/milvus/internal/util/importutilv2/common"
|
||||||
"github.com/milvus-io/milvus/internal/util/nullutil"
|
"github.com/milvus-io/milvus/internal/util/nullutil"
|
||||||
|
pkgcommon "github.com/milvus-io/milvus/pkg/v2/common"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/parameterutil"
|
"github.com/milvus-io/milvus/pkg/v2/util/parameterutil"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||||
@ -43,7 +44,8 @@ type rowParser struct {
|
|||||||
dynamicField *schemapb.FieldSchema
|
dynamicField *schemapb.FieldSchema
|
||||||
functionOutputFields map[string]int64
|
functionOutputFields map[string]int64
|
||||||
|
|
||||||
structArrays map[string]interface{}
|
structArrays map[string]interface{}
|
||||||
|
allowInsertAutoID bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) {
|
func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) {
|
||||||
@ -82,6 +84,7 @@ func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) {
|
|||||||
return field.GetName(), field.GetFieldID()
|
return field.GetName(), field.GetFieldID()
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
allowInsertAutoID, _ := pkgcommon.IsAllowInsertAutoID(schema.GetProperties()...)
|
||||||
|
|
||||||
sturctArrays := lo.SliceToMap(
|
sturctArrays := lo.SliceToMap(
|
||||||
schema.GetStructArrayFields(),
|
schema.GetStructArrayFields(),
|
||||||
@ -98,6 +101,7 @@ func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) {
|
|||||||
dynamicField: dynamicField,
|
dynamicField: dynamicField,
|
||||||
functionOutputFields: functionOutputFields,
|
functionOutputFields: functionOutputFields,
|
||||||
structArrays: sturctArrays,
|
structArrays: sturctArrays,
|
||||||
|
allowInsertAutoID: allowInsertAutoID,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -163,7 +167,7 @@ func (r *rowParser) Parse(raw any) (Row, error) {
|
|||||||
return nil, merr.WrapErrImportFailed(
|
return nil, merr.WrapErrImportFailed(
|
||||||
fmt.Sprintf("invalid JSON format, each row should be a key-value map, but got type %T", raw))
|
fmt.Sprintf("invalid JSON format, each row should be a key-value map, but got type %T", raw))
|
||||||
}
|
}
|
||||||
if _, ok = stringMap[r.pkField.GetName()]; ok && r.pkField.GetAutoID() {
|
if _, ok = stringMap[r.pkField.GetName()]; ok && r.pkField.GetAutoID() && !r.allowInsertAutoID {
|
||||||
return nil, merr.WrapErrImportFailed(
|
return nil, merr.WrapErrImportFailed(
|
||||||
fmt.Sprintf("the primary key '%s' is auto-generated, no need to provide", r.pkField.GetName()))
|
fmt.Sprintf("the primary key '%s' is auto-generated, no need to provide", r.pkField.GetName()))
|
||||||
}
|
}
|
||||||
|
|||||||
@ -823,6 +823,13 @@ func TestParquetReaderError(t *testing.T) {
|
|||||||
schema.Fields[0].AutoID = true
|
schema.Fields[0].AutoID = true
|
||||||
checkFunc(schema, filePath, false)
|
checkFunc(schema, filePath, false)
|
||||||
|
|
||||||
|
// allow_insert_autoid=true should allow providing PK even if AutoID
|
||||||
|
schema.Fields[0].AutoID = true
|
||||||
|
schema.Properties = []*commonpb.KeyValuePair{{Key: common.AllowInsertAutoIDKey, Value: "true"}}
|
||||||
|
checkFunc(schema, filePath, true)
|
||||||
|
// reset properties
|
||||||
|
schema.Properties = nil
|
||||||
|
|
||||||
// now set the vec to be FunctionOutput
|
// now set the vec to be FunctionOutput
|
||||||
// NewReader will return error "the field is output by function, no need to provide"
|
// NewReader will return error "the field is output by function, no need to provide"
|
||||||
schema.Fields[0].AutoID = false
|
schema.Fields[0].AutoID = false
|
||||||
|
|||||||
@ -27,6 +27,7 @@ import (
|
|||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
"github.com/milvus-io/milvus/internal/storage"
|
"github.com/milvus-io/milvus/internal/storage"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/common"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/log"
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||||
@ -85,6 +86,7 @@ func CreateFieldReaders(ctx context.Context, fileReader *pqarrow.FileReader, sch
|
|||||||
// this loop is for "how many fields are provided by this parquet file?"
|
// this loop is for "how many fields are provided by this parquet file?"
|
||||||
readFields := make(map[string]int64)
|
readFields := make(map[string]int64)
|
||||||
crs := make(map[int64]*FieldReader)
|
crs := make(map[int64]*FieldReader)
|
||||||
|
allowInsertAutoID, _ := common.IsAllowInsertAutoID(schema.GetProperties()...)
|
||||||
for i, pqField := range pqSchema.Fields() {
|
for i, pqField := range pqSchema.Fields() {
|
||||||
field, ok := nameToField[pqField.Name]
|
field, ok := nameToField[pqField.Name]
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -93,7 +95,7 @@ func CreateFieldReaders(ctx context.Context, fileReader *pqarrow.FileReader, sch
|
|||||||
}
|
}
|
||||||
|
|
||||||
// auto-id field must not provided
|
// auto-id field must not provided
|
||||||
if typeutil.IsAutoPKField(field) {
|
if typeutil.IsAutoPKField(field) && !allowInsertAutoID {
|
||||||
return nil, merr.WrapErrImportFailed(
|
return nil, merr.WrapErrImportFailed(
|
||||||
fmt.Sprintf("the primary key '%s' is auto-generated, no need to provide", field.GetName()))
|
fmt.Sprintf("the primary key '%s' is auto-generated, no need to provide", field.GetName()))
|
||||||
}
|
}
|
||||||
|
|||||||
@ -239,6 +239,7 @@ const (
|
|||||||
// timezone releated
|
// timezone releated
|
||||||
DatabaseDefaultTimezone = "database.timezone"
|
DatabaseDefaultTimezone = "database.timezone"
|
||||||
CollectionDefaultTimezone = "collection.timezone"
|
CollectionDefaultTimezone = "collection.timezone"
|
||||||
|
AllowInsertAutoIDKey = "allow_insert_auto_id"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -543,3 +544,13 @@ func AllocAutoID(allocFunc func(uint32) (int64, int64, error), rowNum uint32, cl
|
|||||||
|
|
||||||
return idStart | int64(reversed), idEnd | int64(reversed), nil
|
return idStart | int64(reversed), idEnd | int64(reversed), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func IsAllowInsertAutoID(kvs ...*commonpb.KeyValuePair) (bool, bool) {
|
||||||
|
for _, kv := range kvs {
|
||||||
|
if kv.Key == AllowInsertAutoIDKey {
|
||||||
|
enable, _ := strconv.ParseBool(kv.Value)
|
||||||
|
return enable, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, false
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user