feat: allow users to write pk field when autoid is enabled (#44520)

issue: https://github.com/milvus-io/milvus/issues/44011
pr: https://github.com/milvus-io/milvus/pull/44424

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
Bingyi Sun 2025-09-24 20:26:06 +08:00 committed by GitHub
parent 3dc43422be
commit 2e0c0c08bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 413 additions and 20 deletions

View File

@ -174,7 +174,13 @@ func AppendSystemFieldsData(task *ImportTask, data *storage.InsertData, rowNum i
for i := 0; i < rowNum; i++ { for i := 0; i < rowNum; i++ {
ids[i] = start + int64(i) ids[i] = start + int64(i)
} }
pkData, ok := data.Data[pkField.GetFieldID()]
allowInsertAutoID, _ := common.IsAllowInsertAutoID(task.req.Schema.GetProperties()...)
if pkField.GetAutoID() { if pkField.GetAutoID() {
if allowInsertAutoID && ok && pkData != nil {
// if allowInsertAutoID is true, and pkData is not nil, skip generating primary key data
return nil
}
switch pkField.GetDataType() { switch pkField.GetDataType() {
case schemapb.DataType_Int64: case schemapb.DataType_Int64:
data.Data[pkField.GetFieldID()] = &storage.Int64FieldData{Data: ids} data.Data[pkField.GetFieldID()] = &storage.Int64FieldData{Data: ids}

View File

@ -95,6 +95,54 @@ func Test_AppendSystemFieldsData(t *testing.T) {
assert.Equal(t, count, insertData.Data[common.TimeStampField].RowNum()) assert.Equal(t, count, insertData.Data[common.TimeStampField].RowNum())
} }
func Test_AppendSystemFieldsData_AllowInsertAutoID_KeepUserPK(t *testing.T) {
const count = 10
pkField := &schemapb.FieldSchema{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
AutoID: true,
}
vecField := &schemapb.FieldSchema{
FieldID: 101,
Name: "vec",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "4"},
},
}
schema := &schemapb.CollectionSchema{}
schema.Fields = []*schemapb.FieldSchema{pkField, vecField}
schema.Properties = []*commonpb.KeyValuePair{{Key: common.AllowInsertAutoIDKey, Value: "true"}}
task := &ImportTask{
req: &datapb.ImportRequest{Ts: 1000, Schema: schema},
allocator: allocator.NewLocalAllocator(0, count*2),
}
insertData, err := testutil.CreateInsertData(schema, count)
assert.NoError(t, err)
userPK := make([]int64, count)
for i := 0; i < count; i++ {
userPK[i] = 1000 + int64(i)
}
insertData.Data[pkField.GetFieldID()] = &storage.Int64FieldData{Data: userPK}
rowNum, _ := GetInsertDataRowCount(insertData, task.GetSchema())
err = AppendSystemFieldsData(task, insertData, rowNum)
assert.NoError(t, err)
got := insertData.Data[pkField.GetFieldID()].(*storage.Int64FieldData)
assert.Equal(t, count, got.RowNum())
for i := 0; i < count; i++ {
assert.Equal(t, userPK[i], got.Data[i])
}
}
func Test_UnsetAutoID(t *testing.T) { func Test_UnsetAutoID(t *testing.T) {
pkField := &schemapb.FieldSchema{ pkField := &schemapb.FieldSchema{
FieldID: 100, FieldID: 100,

View File

@ -977,6 +977,10 @@ func (t *alterCollectionTask) PreExecute(ctx context.Context) error {
if len(t.GetProperties()) > 0 && len(t.GetDeleteKeys()) > 0 { if len(t.GetProperties()) > 0 && len(t.GetDeleteKeys()) > 0 {
return merr.WrapErrParameterInvalidMsg("cannot provide both DeleteKeys and ExtraParams") return merr.WrapErrParameterInvalidMsg("cannot provide both DeleteKeys and ExtraParams")
} }
collSchema, err := globalMetaCache.GetCollectionSchema(ctx, t.GetDbName(), t.CollectionName)
if err != nil {
return err
}
collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), t.CollectionName) collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), t.CollectionName)
if err != nil { if err != nil {
@ -995,6 +999,16 @@ func (t *alterCollectionTask) PreExecute(ctx context.Context) error {
return merr.WrapErrCollectionLoaded(t.CollectionName, "can not alter mmap properties if collection loaded") return merr.WrapErrCollectionLoaded(t.CollectionName, "can not alter mmap properties if collection loaded")
} }
} }
enabled, _ := common.IsAllowInsertAutoID(t.Properties...)
if enabled {
primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(collSchema.CollectionSchema)
if err != nil {
return err
}
if !primaryFieldSchema.AutoID {
return merr.WrapErrParameterInvalidMsg("the value for %s must be false when autoID is false", common.AllowInsertAutoIDKey)
}
}
} else if len(t.GetDeleteKeys()) > 0 { } else if len(t.GetDeleteKeys()) > 0 {
key := hasPropInDeletekeys(t.DeleteKeys) key := hasPropInDeletekeys(t.DeleteKeys)
if key != "" { if key != "" {

View File

@ -10,7 +10,11 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/testutils" "github.com/milvus-io/milvus/pkg/v2/util/testutils"
@ -312,6 +316,113 @@ func TestMaxInsertSize(t *testing.T) {
}) })
} }
func TestInsertTask_KeepUserPK_WhenAllowInsertAutoIDTrue(t *testing.T) {
paramtable.Init()
// run auto-id path with field count check; allow user to pass PK
Params.Save(Params.ProxyCfg.SkipAutoIDCheck.Key, "false")
defer Params.Reset(Params.ProxyCfg.SkipAutoIDCheck.Key)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rc := mocks.NewMockRootCoordClient(t)
rc.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{
Status: merr.Status(nil),
ID: 11198,
Count: 10,
}, nil)
idAllocator, err := allocator.NewIDAllocator(ctx, rc, 0)
idAllocator.Start()
defer idAllocator.Close()
assert.NoError(t, err)
nb := 5
userIDs := []int64{101, 102, 103, 104, 105}
collectionName := "TestInsertTask_KeepUserPK"
schema := &schemapb.CollectionSchema{
Name: collectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
{FieldID: 100, Name: "id", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, AutoID: true},
},
Properties: []*commonpb.KeyValuePair{
{Key: common.AllowInsertAutoIDKey, Value: "true"},
},
}
pkFieldData := &schemapb.FieldData{
FieldName: "id",
FieldId: 100,
Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{Data: userIDs},
},
},
},
}
task := insertTask{
ctx: context.Background(),
insertMsg: &BaseInsertTask{
InsertRequest: &msgpb.InsertRequest{
CollectionName: collectionName,
DbName: "test_db",
PartitionName: "_default",
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
Version: msgpb.InsertDataVersion_ColumnBased,
FieldsData: []*schemapb.FieldData{pkFieldData},
NumRows: uint64(nb),
},
},
idAllocator: idAllocator,
}
info := newSchemaInfo(schema)
cache := NewMockCache(t)
collectionID := UniqueID(0)
cache.On("GetCollectionID",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(collectionID, nil)
cache.On("GetCollectionSchema",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(info, nil)
cache.On("GetCollectionInfo",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(&collectionInfo{schema: info}, nil)
cache.On("GetDatabaseInfo",
mock.Anything,
mock.Anything,
).Return(&databaseInfo{properties: []*commonpb.KeyValuePair{}}, nil)
globalMetaCache = cache
err = task.PreExecute(context.Background())
assert.NoError(t, err)
ids := task.result.IDs
if ids.GetIntId() == nil {
t.Fatalf("expected int IDs, got nil")
}
got := ids.GetIntId().GetData()
assert.Equal(t, userIDs, got)
}
func TestInsertTaskForSchemaMismatch(t *testing.T) { func TestInsertTaskForSchemaMismatch(t *testing.T) {
cache := globalMetaCache cache := globalMetaCache
defer func() { globalMetaCache = cache }() defer func() { globalMetaCache = cache }()

View File

@ -397,6 +397,81 @@ func constructCollectionSchemaWithAllType(
} }
} }
func TestAlterCollection_AllowInsertAutoID_Validation(t *testing.T) {
ctx := context.Background()
buildRoot := func(autoID bool) *mocks.MockRootCoordClient {
root := mocks.NewMockRootCoordClient(t)
// InitMetaCache requires ListPolicy
root.EXPECT().ListPolicy(mock.Anything, mock.Anything, mock.Anything).Return(&internalpb.ListPolicyResponse{Status: merr.Success()}, nil).Once()
// Meta cache update path fetches partitions info
root.EXPECT().ShowPartitions(mock.Anything, mock.Anything, mock.Anything).Return(&milvuspb.ShowPartitionsResponse{Status: merr.Success()}, nil).Maybe()
// DescribeCollection returns a schema with PK autoID configurable
root.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Success(),
CollectionID: 1,
DbName: dbName,
Schema: &schemapb.CollectionSchema{
Name: "allow_autoid_test",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{FieldID: 100, Name: "pk", IsPrimaryKey: true, DataType: schemapb.DataType_Int64, AutoID: autoID},
{FieldID: 101, Name: "vec", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "8"}}},
},
},
}, nil).Maybe()
return root
}
t.Run("success when PK autoID=true and allow_insert_autoid=true", func(t *testing.T) {
cache := globalMetaCache
defer func() { globalMetaCache = cache }()
root := buildRoot(true)
query := mocks.NewMockQueryCoordClient(t)
query.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{}, nil).Maybe()
mgr := newShardClientMgr()
err := InitMetaCache(ctx, root, query, mgr)
assert.NoError(t, err)
task := &alterCollectionTask{
AlterCollectionRequest: &milvuspb.AlterCollectionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollectionField},
DbName: dbName,
CollectionName: "allow_autoid_test",
Properties: []*commonpb.KeyValuePair{
{Key: common.AllowInsertAutoIDKey, Value: "true"},
},
},
}
err = task.PreExecute(ctx)
assert.NoError(t, err)
})
t.Run("error when PK autoID=false and allow_insert_autoid=true", func(t *testing.T) {
cache := globalMetaCache
defer func() { globalMetaCache = cache }()
root := buildRoot(false)
query := mocks.NewMockQueryCoordClient(t)
query.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{}, nil).Maybe()
mgr := newShardClientMgr()
err := InitMetaCache(ctx, root, query, mgr)
assert.NoError(t, err)
task := &alterCollectionTask{
AlterCollectionRequest: &milvuspb.AlterCollectionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollectionField},
DbName: dbName,
CollectionName: "allow_autoid_test",
Properties: []*commonpb.KeyValuePair{
{Key: common.AllowInsertAutoIDKey, Value: "true"},
},
},
}
err = task.PreExecute(ctx)
assert.Error(t, err)
})
}
func constructPlaceholderGroup( func constructPlaceholderGroup(
nq, dim int, nq, dim int,
) *commonpb.PlaceholderGroup { ) *commonpb.PlaceholderGroup {
@ -4693,3 +4768,52 @@ func TestAlterCollectionField1(t *testing.T) {
}) })
} }
} }
func TestAlterCollectionField_AllowInsertAutoID_AutoIDFalse(t *testing.T) {
rc := NewRootCoordMock()
qc := mocks.NewMockQueryCoordClient(t)
qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{}, nil).Maybe()
InitMetaCache(context.Background(), rc, qc, nil)
ctx := context.Background()
collectionName := "test_alter_allow_insert_autoid_autoid_false"
// fallback: use existing helper to build schema with array field if needed; here use simple schema
schema := &schemapb.CollectionSchema{
Name: collectionName,
Fields: []*schemapb.FieldSchema{
{FieldID: 100, Name: "pk", IsPrimaryKey: true, DataType: schemapb.DataType_Int64, AutoID: false},
{FieldID: 101, Name: "vec", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "8"}}},
},
}
schemaBytes, err := proto.Marshal(schema)
assert.NoError(t, err)
createColReq := &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateCollection,
MsgID: 200,
Timestamp: 200,
},
DbName: dbName,
CollectionName: collectionName,
Schema: schemaBytes,
ShardsNum: 1,
}
_, _ = rc.CreateCollection(ctx, createColReq)
task := &alterCollectionTask{
AlterCollectionRequest: &milvuspb.AlterCollectionRequest{
Base: &commonpb.MsgBase{},
CollectionName: collectionName,
Properties: []*commonpb.KeyValuePair{
{Key: common.AllowInsertAutoIDKey, Value: "true"},
},
},
rootCoord: rc,
queryCoord: qc,
}
err = task.PreExecute(ctx)
assert.Error(t, err)
assert.Equal(t, merr.Code(merr.ErrParameterInvalid), merr.Code(err))
}

View File

@ -1652,6 +1652,10 @@ func checkFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgst
dataNameSet.Insert(fieldName) dataNameSet.Insert(fieldName)
} }
allowInsertAutoID, _ := common.IsAllowInsertAutoID(schema.GetProperties()...)
hasPkData := false
needAutoGenPk := false
for _, fieldSchema := range schema.Fields { for _, fieldSchema := range schema.Fields {
if fieldSchema.AutoID && !fieldSchema.IsPrimaryKey { if fieldSchema.AutoID && !fieldSchema.IsPrimaryKey {
log.Warn("not primary key field, but set autoID true", zap.String("field", fieldSchema.GetName())) log.Warn("not primary key field, but set autoID true", zap.String("field", fieldSchema.GetName()))
@ -1660,16 +1664,18 @@ func checkFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgst
if fieldSchema.IsPrimaryKey { if fieldSchema.IsPrimaryKey {
primaryKeyNum++ primaryKeyNum++
hasPkData = dataNameSet.Contain(fieldSchema.GetName())
needAutoGenPk = fieldSchema.AutoID && (!allowInsertAutoID || !hasPkData)
} }
if fieldSchema.GetDefaultValue() != nil && fieldSchema.IsPrimaryKey { if fieldSchema.GetDefaultValue() != nil && fieldSchema.IsPrimaryKey {
return merr.WrapErrParameterInvalidMsg("primary key can't be with default value") return merr.WrapErrParameterInvalidMsg("primary key can't be with default value")
} }
if (fieldSchema.IsPrimaryKey && fieldSchema.AutoID && !Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() && inInsert) || fieldSchema.GetIsFunctionOutput() { if (fieldSchema.IsPrimaryKey && fieldSchema.AutoID && !Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() && needAutoGenPk && inInsert) || fieldSchema.GetIsFunctionOutput() {
// when inInsert, no need to pass when pk is autoid and SkipAutoIDCheck is false // when inInsert, no need to pass when pk is autoid and SkipAutoIDCheck is false
autoGenFieldNum++ autoGenFieldNum++
} }
if _, ok := dataNameSet[fieldSchema.GetName()]; !ok { if _, ok := dataNameSet[fieldSchema.GetName()]; !ok {
if (fieldSchema.IsPrimaryKey && fieldSchema.AutoID && !Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() && inInsert) || fieldSchema.GetIsFunctionOutput() { if (fieldSchema.IsPrimaryKey && fieldSchema.AutoID && !Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() && needAutoGenPk && inInsert) || fieldSchema.GetIsFunctionOutput() {
// autoGenField // autoGenField
continue continue
} }
@ -1729,9 +1735,9 @@ func checkPrimaryFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstre
var primaryFieldData *schemapb.FieldData var primaryFieldData *schemapb.FieldData
// when checkPrimaryFieldData in insert // when checkPrimaryFieldData in insert
skipAutoIDCheck := Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() && allowInsertAutoID, _ := common.IsAllowInsertAutoID(schema.GetProperties()...)
primaryFieldSchema.AutoID && skipAutoIDCheck := primaryFieldSchema.AutoID &&
typeutil.IsPrimaryFieldDataExist(insertMsg.GetFieldsData(), primaryFieldSchema) typeutil.IsPrimaryFieldDataExist(insertMsg.GetFieldsData(), primaryFieldSchema) && (Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() || allowInsertAutoID)
if !primaryFieldSchema.AutoID || skipAutoIDCheck { if !primaryFieldSchema.AutoID || skipAutoIDCheck {
primaryFieldData, err = typeutil.GetPrimaryFieldData(insertMsg.GetFieldsData(), primaryFieldSchema) primaryFieldData, err = typeutil.GetPrimaryFieldData(insertMsg.GetFieldsData(), primaryFieldSchema)
@ -1742,7 +1748,7 @@ func checkPrimaryFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstre
} else { } else {
// check primary key data not exist // check primary key data not exist
if typeutil.IsPrimaryFieldDataExist(insertMsg.GetFieldsData(), primaryFieldSchema) { if typeutil.IsPrimaryFieldDataExist(insertMsg.GetFieldsData(), primaryFieldSchema) {
return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("can not assign primary field data when auto id enabled %v", primaryFieldSchema.Name)) return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("can not assign primary field data when auto id enabled and allow_insert_auto_id is false %v", primaryFieldSchema.Name))
} }
// if autoID == true, currently support autoID for int64 and varchar PrimaryField // if autoID == true, currently support autoID for int64 and varchar PrimaryField
primaryFieldData, err = autoGenPrimaryFieldData(primaryFieldSchema, insertMsg.GetRowIDs()) primaryFieldData, err = autoGenPrimaryFieldData(primaryFieldSchema, insertMsg.GetRowIDs())

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/internal/util/importutilv2/common"
pkgcommon "github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/parameterutil" "github.com/milvus-io/milvus/pkg/v2/util/parameterutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@ -71,8 +72,9 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st
} }
} }
allowInsertAutoID, _ := pkgcommon.IsAllowInsertAutoID(schema.GetProperties()...)
// check if csv header provides the primary key while it should be auto-generated // check if csv header provides the primary key while it should be auto-generated
if pkField.GetAutoID() && lo.Contains(header, pkField.GetName()) { if pkField.GetAutoID() && lo.Contains(header, pkField.GetName()) && !allowInsertAutoID {
return nil, fmt.Errorf("the primary key '%s' is auto-generated, no need to provide", pkField.GetName()) return nil, fmt.Errorf("the primary key '%s' is auto-generated, no need to provide", pkField.GetName())
} }

View File

@ -314,3 +314,68 @@ func (suite *ReaderSuite) TestVector() {
func TestUtil(t *testing.T) { func TestUtil(t *testing.T) {
suite.Run(t, new(ReaderSuite)) suite.Run(t, new(ReaderSuite))
} }
func (suite *ReaderSuite) TestAllowInsertAutoID_KeepUserPK() {
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
AutoID: true,
},
{
FieldID: 101,
Name: "vec",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
},
},
},
}
// build rows that explicitly include pk and a valid vec of dim 8
rows := make([]any, 0, suite.numRows)
for i := 0; i < suite.numRows; i++ {
row := make(map[string]any)
row["pk"] = int64(i + 1)
vec := make([]float64, 8)
for j := 0; j < 8; j++ {
vec[j] = float64(j)
}
row["vec"] = vec
rows = append(rows, row)
}
jsonBytes, err := json.Marshal(rows)
suite.NoError(err)
// allow_insert_autoid=false, providing PK should error
{
cm := mocks.NewChunkManager(suite.T())
cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) {
r := &mockReader{Reader: strings.NewReader(string(jsonBytes))}
return r, nil
})
reader, err := NewReader(context.Background(), cm, schema, "mockPath", math.MaxInt)
suite.NoError(err)
_, err = reader.Read()
suite.Error(err)
suite.Contains(err.Error(), "is auto-generated, no need to provide")
}
// allow_insert_autoid=true, providing PK should be allowed
{
schema.Properties = []*commonpb.KeyValuePair{{Key: common.AllowInsertAutoIDKey, Value: "true"}}
cm := mocks.NewChunkManager(suite.T())
cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) {
r := &mockReader{Reader: strings.NewReader(string(jsonBytes))}
return r, nil
})
reader, err := NewReader(context.Background(), cm, schema, "mockPath", math.MaxInt)
suite.NoError(err)
_, err = reader.Read()
suite.NoError(err)
}
}

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/internal/util/importutilv2/common"
"github.com/milvus-io/milvus/internal/util/nullutil" "github.com/milvus-io/milvus/internal/util/nullutil"
pkgcommon "github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/parameterutil" "github.com/milvus-io/milvus/pkg/v2/util/parameterutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@ -37,11 +38,12 @@ type RowParser interface {
} }
type rowParser struct { type rowParser struct {
id2Dim map[int64]int id2Dim map[int64]int
id2Field map[int64]*schemapb.FieldSchema id2Field map[int64]*schemapb.FieldSchema
name2FieldID map[string]int64 name2FieldID map[string]int64
pkField *schemapb.FieldSchema pkField *schemapb.FieldSchema
dynamicField *schemapb.FieldSchema dynamicField *schemapb.FieldSchema
allowInsertAutoID bool
} }
func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) { func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) {
@ -66,9 +68,10 @@ func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) {
} }
dynamicField := typeutil.GetDynamicField(schema) dynamicField := typeutil.GetDynamicField(schema)
allowInsertAutoID, _ := pkgcommon.IsAllowInsertAutoID(schema.GetProperties()...)
name2FieldID := lo.SliceToMap( name2FieldID := lo.SliceToMap(
lo.Filter(schema.GetFields(), func(field *schemapb.FieldSchema, _ int) bool { lo.Filter(schema.GetFields(), func(field *schemapb.FieldSchema, _ int) bool {
return !field.GetIsFunctionOutput() && !typeutil.IsAutoPKField(field) && field.GetName() != dynamicField.GetName() return !field.GetIsFunctionOutput() && (!typeutil.IsAutoPKField(field) || allowInsertAutoID) && field.GetName() != dynamicField.GetName()
}), }),
func(field *schemapb.FieldSchema) (string, int64) { func(field *schemapb.FieldSchema) (string, int64) {
return field.GetName(), field.GetFieldID() return field.GetName(), field.GetFieldID()
@ -76,11 +79,12 @@ func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) {
) )
return &rowParser{ return &rowParser{
id2Dim: id2Dim, id2Dim: id2Dim,
id2Field: id2Field, id2Field: id2Field,
name2FieldID: name2FieldID, name2FieldID: name2FieldID,
pkField: pkField, pkField: pkField,
dynamicField: dynamicField, dynamicField: dynamicField,
allowInsertAutoID: allowInsertAutoID,
}, nil }, nil
} }
@ -106,7 +110,7 @@ func (r *rowParser) Parse(raw any) (Row, error) {
if !ok { if !ok {
return nil, fmt.Errorf("invalid JSON format, each row should be a key-value map, but got type %T", raw) return nil, fmt.Errorf("invalid JSON format, each row should be a key-value map, but got type %T", raw)
} }
if _, ok = stringMap[r.pkField.GetName()]; ok && r.pkField.GetAutoID() { if _, ok = stringMap[r.pkField.GetName()]; ok && r.pkField.GetAutoID() && !r.allowInsertAutoID {
return nil, fmt.Errorf("the primary key '%s' is auto-generated, no need to provide", r.pkField.GetName()) return nil, fmt.Errorf("the primary key '%s' is auto-generated, no need to provide", r.pkField.GetName())
} }
dynamicValues := make(map[string]any) dynamicValues := make(map[string]any)

View File

@ -25,6 +25,7 @@ import (
"github.com/samber/lo" "github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
@ -56,6 +57,7 @@ func CreateFieldReaders(ctx context.Context, fileReader *pqarrow.FileReader, sch
} }
crs := make(map[int64]*FieldReader) crs := make(map[int64]*FieldReader)
allowInsertAutoID, _ := common.IsAllowInsertAutoID(schema.GetProperties()...)
for i, pqField := range pqSchema.Fields() { for i, pqField := range pqSchema.Fields() {
field, ok := nameToField[pqField.Name] field, ok := nameToField[pqField.Name]
if !ok { if !ok {
@ -63,7 +65,7 @@ func CreateFieldReaders(ctx context.Context, fileReader *pqarrow.FileReader, sch
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the field: %s is not in schema, "+ return nil, merr.WrapErrImportFailed(fmt.Sprintf("the field: %s is not in schema, "+
"if it's a dynamic field, please reformat data by bulk_writer", pqField.Name)) "if it's a dynamic field, please reformat data by bulk_writer", pqField.Name))
} }
if typeutil.IsAutoPKField(field) { if typeutil.IsAutoPKField(field) && !allowInsertAutoID {
return nil, merr.WrapErrImportFailed( return nil, merr.WrapErrImportFailed(
fmt.Sprintf("the primary key '%s' is auto-generated, no need to provide", field.GetName())) fmt.Sprintf("the primary key '%s' is auto-generated, no need to provide", field.GetName()))
} }

View File

@ -215,6 +215,7 @@ const (
ReplicateIDKey = "replicate.id" ReplicateIDKey = "replicate.id"
ReplicateEndTSKey = "replicate.endTS" ReplicateEndTSKey = "replicate.endTS"
IndexNonEncoding = "index.nonEncoding" IndexNonEncoding = "index.nonEncoding"
AllowInsertAutoIDKey = "allow_insert_auto_id"
) )
const ( const (
@ -471,3 +472,13 @@ func AllocAutoID(allocFunc func(uint32) (int64, int64, error), rowNum uint32, cl
return idStart | int64(reversed), idEnd | int64(reversed), nil return idStart | int64(reversed), idEnd | int64(reversed), nil
} }
func IsAllowInsertAutoID(kvs ...*commonpb.KeyValuePair) (bool, bool) {
for _, kv := range kvs {
if kv.Key == AllowInsertAutoIDKey {
enable, _ := strconv.ParseBool(kv.Value)
return enable, true
}
}
return false, false
}