diff --git a/internal/util/importutil/binlog_adapter.go b/internal/util/importutil/binlog_adapter.go index 3ba81c9257..29b2ef8b44 100644 --- a/internal/util/importutil/binlog_adapter.go +++ b/internal/util/importutil/binlog_adapter.go @@ -305,14 +305,20 @@ func (p *BinlogAdapter) readDeltalogs(segmentHolder *SegmentFilesHolder) (map[in if primaryKey.GetDataType() == schemapb.DataType_Int64 { deletedIDDict := make(map[int64]uint64) for _, deleteLog := range deleteLogs { - deletedIDDict[deleteLog.Pk.GetValue().(int64)] = deleteLog.Ts + _, exist := deletedIDDict[deleteLog.Pk.GetValue().(int64)] + if !exist || deleteLog.Ts > deletedIDDict[deleteLog.Pk.GetValue().(int64)] { + deletedIDDict[deleteLog.Pk.GetValue().(int64)] = deleteLog.Ts + } } log.Info("Binlog adapter: count of deleted entities", zap.Int("deletedCount", len(deletedIDDict))) return deletedIDDict, nil, nil } else if primaryKey.GetDataType() == schemapb.DataType_VarChar { deletedIDDict := make(map[string]uint64) for _, deleteLog := range deleteLogs { - deletedIDDict[deleteLog.Pk.GetValue().(string)] = deleteLog.Ts + _, exist := deletedIDDict[deleteLog.Pk.GetValue().(string)] + if !exist || deleteLog.Ts > deletedIDDict[deleteLog.Pk.GetValue().(string)] { + deletedIDDict[deleteLog.Pk.GetValue().(string)] = deleteLog.Ts + } } log.Info("Binlog adapter: count of deleted entities", zap.Int("deletedCount", len(deletedIDDict))) return nil, deletedIDDict, nil @@ -530,9 +536,10 @@ func (p *BinlogAdapter) getShardingListByPrimaryInt64(primaryKeys []int64, continue } - _, deleted := intDeletedList[key] + deleteTs, deleted := intDeletedList[key] // if the key exists in intDeletedList, that means this entity has been deleted - if deleted { + // only skip entity when delete happen after insert + if deleted && deleteTs > uint64(ts) { shardList = append(shardList, -1) // this entity has been deleted, set shardID = -1 and skip this entity actualDeleted++ } else { @@ -584,9 +591,10 @@ func (p *BinlogAdapter) getShardingListByPrimaryVarchar(primaryKeys []string, continue } - _, deleted := strDeletedList[key] + deleteTs, deleted := strDeletedList[key] // if exists in strDeletedList, that means this entity has been deleted - if deleted { + // only skip entity when delete happen after insert + if deleted && deleteTs > uint64(ts) { shardList = append(shardList, -1) // this entity has been deleted, set shardID = -1 and skip this entity actualDeleted++ } else { diff --git a/internal/util/importutil/binlog_adapter_test.go b/internal/util/importutil/binlog_adapter_test.go index 2bfde09ed9..615bfd868f 100644 --- a/internal/util/importutil/binlog_adapter_test.go +++ b/internal/util/importutil/binlog_adapter_test.go @@ -35,7 +35,7 @@ const ( baseTimestamp = 43757345 ) -func createDeltalogBuf(t *testing.T, deleteList interface{}, varcharType bool) []byte { +func createDeltalogBuf(t *testing.T, deleteList interface{}, varcharType bool, startTimestamp uint64) []byte { deleteData := &storage.DeleteData{ Pks: make([]storage.PrimaryKey, 0), Tss: make([]storage.Timestamp, 0), @@ -47,7 +47,7 @@ func createDeltalogBuf(t *testing.T, deleteList interface{}, varcharType bool) [ assert.NotNil(t, deltaData) for i, id := range deltaData { deleteData.Pks = append(deleteData.Pks, storage.NewVarCharPrimaryKey(id)) - deleteData.Tss = append(deleteData.Tss, baseTimestamp+uint64(i)) + deleteData.Tss = append(deleteData.Tss, startTimestamp+uint64(i)) deleteData.RowCount++ } } else { @@ -55,7 +55,7 @@ func createDeltalogBuf(t *testing.T, deleteList interface{}, varcharType bool) [ assert.NotNil(t, deltaData) for i, id := range deltaData { deleteData.Pks = append(deleteData.Pks, storage.NewInt64PrimaryKey(id)) - deleteData.Tss = append(deleteData.Tss, baseTimestamp+uint64(i)) + deleteData.Tss = append(deleteData.Tss, startTimestamp+uint64(i)) deleteData.RowCount++ } } @@ -171,7 +171,7 @@ func Test_BinlogAdapterReadDeltalog(t *testing.T) { ctx := context.Background() deleteItems := []int64{1001, 1002, 1003} - buf := createDeltalogBuf(t, deleteItems, false) + buf := createDeltalogBuf(t, deleteItems, false, baseTimestamp) chunkManager := &MockChunkManager{ readBuf: map[string][]byte{ "dummy": buf, @@ -212,7 +212,7 @@ func Test_BinlogAdapterDecodeDeleteLogs(t *testing.T) { ctx := context.Background() deleteItems := []int64{1001, 1002, 1003, 1004, 1005} - buf := createDeltalogBuf(t, deleteItems, false) + buf := createDeltalogBuf(t, deleteItems, false, baseTimestamp) chunkManager := &MockChunkManager{ readBuf: map[string][]byte{ "dummy": buf, @@ -244,7 +244,7 @@ func Test_BinlogAdapterDecodeDeleteLogs(t *testing.T) { // wrong data type of delta log chunkManager.readBuf = map[string][]byte{ - "dummy": createDeltalogBuf(t, []string{"1001", "1002"}, true), + "dummy": createDeltalogBuf(t, []string{"1001", "1002"}, true, baseTimestamp), } adapter, err = NewBinlogAdapter(ctx, collectionInfo, 1024, 2048, chunkManager, flushFunc, 0, math.MaxUint64) @@ -317,7 +317,7 @@ func Test_BinlogAdapterReadDeltalogs(t *testing.T) { ctx := context.Background() deleteItems := []int64{1001, 1002, 1003, 1004, 1005} - buf := createDeltalogBuf(t, deleteItems, false) + buf := createDeltalogBuf(t, deleteItems, false, baseTimestamp) chunkManager := &MockChunkManager{ readBuf: map[string][]byte{ "dummy": buf, @@ -374,7 +374,7 @@ func Test_BinlogAdapterReadDeltalogs(t *testing.T) { collectionInfo.resetSchema(schema) chunkManager.readBuf = map[string][]byte{ - "dummy": createDeltalogBuf(t, []string{"1001", "1002"}, true), + "dummy": createDeltalogBuf(t, []string{"1001", "1002"}, true, baseTimestamp), } adapter, err = NewBinlogAdapter(ctx, collectionInfo, 1024, 2048, chunkManager, flushFunc, 0, math.MaxUint64) @@ -462,7 +462,7 @@ func Test_BinlogAdapterReadTimestamp(t *testing.T) { // succeed rowCount := 10 - fieldsData := createFieldsData(sampleSchema(), rowCount) + fieldsData := createFieldsData(sampleSchema(), rowCount, baseTimestamp) chunkManager.readBuf["dummy"] = createBinlogBuf(t, schemapb.DataType_Int64, fieldsData[106].([]int64)) ts, err = adapter.readTimestamp("dummy") assert.NoError(t, err) @@ -502,7 +502,7 @@ func Test_BinlogAdapterReadPrimaryKeys(t *testing.T) { // wrong primary key type rowCount := 10 - fieldsData := createFieldsData(sampleSchema(), rowCount) + fieldsData := createFieldsData(sampleSchema(), rowCount, baseTimestamp) chunkManager.readBuf["dummy"] = createBinlogBuf(t, schemapb.DataType_Bool, fieldsData[102].([]bool)) adapter.collectionInfo.PrimaryKey.DataType = schemapb.DataType_Bool @@ -545,7 +545,7 @@ func Test_BinlogAdapterShardListInt64(t *testing.T) { assert.NotNil(t, adapter) assert.NoError(t, err) - fieldsData := createFieldsData(sampleSchema(), 0) + fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp) shardsData := createShardsData(sampleSchema(), fieldsData, shardNum, []int64{1}) // wrong input @@ -587,7 +587,7 @@ func Test_BinlogAdapterShardListVarchar(t *testing.T) { assert.NotNil(t, adapter) assert.NoError(t, err) - fieldsData := createFieldsData(strKeySchema(), 0) + fieldsData := createFieldsData(strKeySchema(), 0, baseTimestamp) shardsData := createShardsData(strKeySchema(), fieldsData, shardNum, []int64{1}) // wrong input shardList, err := adapter.getShardingListByPrimaryVarchar([]string{"1"}, []int64{1, 2}, shardsData, map[string]uint64{}) @@ -615,6 +615,7 @@ func Test_BinlogAdapterShardListVarchar(t *testing.T) { func Test_BinlogAdapterReadInt64PK(t *testing.T) { ctx := context.Background() + paramtable.Init() chunkManager := &MockChunkManager{} @@ -677,7 +678,7 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) { // prepare binlog data rowCount := 1000 - fieldsData := createFieldsData(sampleSchema(), rowCount) + fieldsData := createFieldsData(sampleSchema(), rowCount, baseTimestamp) deletedItems := []int64{41, 51, 100, 400, 600} chunkManager.readBuf = map[string][]byte{ @@ -693,7 +694,7 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) { "111_insertlog": createBinlogBuf(t, schemapb.DataType_FloatVector, fieldsData[111].([][]float32)), "112_insertlog": createBinlogBuf(t, schemapb.DataType_JSON, fieldsData[112].([][]byte)), "113_insertlog": createBinlogBuf(t, schemapb.DataType_Array, fieldsData[113].([]*schemapb.ScalarField)), - "deltalog": createDeltalogBuf(t, deletedItems, false), + "deltalog": createDeltalogBuf(t, deletedItems, false, baseTimestamp+300), } // failed to read primary keys @@ -708,15 +709,18 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) { // succeed flush chunkManager.readBuf["1_insertlog"] = createBinlogBuf(t, schemapb.DataType_Int64, fieldsData[1].([]int64)) - adapter.tsEndPoint = baseTimestamp + uint64(499) // 4 entities deleted, 500 entities excluded + // as we createDeltalogBuf with baseTimestamp+300. deletedata pk = {41, 51, 100, 400, 600} ts = {341, 351, 400, 700, 900} + // ts = {341, 351, 400} < 499 will be deleted + adapter.tsEndPoint = baseTimestamp + uint64(499) // 3 entities deleted, 500 entities excluded err = adapter.Read(holder) assert.NoError(t, err) assert.Equal(t, shardNum, int32(flushCounter)) - assert.Equal(t, rowCount-4-500, flushRowCount) + assert.Equal(t, rowCount-3-500, flushRowCount) } func Test_BinlogAdapterReadVarcharPK(t *testing.T) { ctx := context.Background() + paramtable.Init() chunkManager := &MockChunkManager{} @@ -788,7 +792,7 @@ func Test_BinlogAdapterReadVarcharPK(t *testing.T) { "104_insertlog": createBinlogBuf(t, schemapb.DataType_VarChar, varcharData), "105_insertlog": createBinlogBuf(t, schemapb.DataType_Bool, boolData), "106_insertlog": createBinlogBuf(t, schemapb.DataType_FloatVector, floatVecData), - "deltalog": createDeltalogBuf(t, deletedItems, true), + "deltalog": createDeltalogBuf(t, deletedItems, true, baseTimestamp+300), } // succeed @@ -800,7 +804,7 @@ func Test_BinlogAdapterReadVarcharPK(t *testing.T) { assert.NotNil(t, adapter) assert.NoError(t, err) - adapter.tsEndPoint = baseTimestamp + uint64(499) // 3 entities deleted, 500 entities excluded, the "999" is excluded, so totally 502 entities skipped + adapter.tsEndPoint = baseTimestamp + uint64(499) // 2 entities deleted, 500 entities excluded, the "999" is excluded, so totally 502 entities skipped err = adapter.Read(holder) assert.NoError(t, err) assert.Equal(t, shardNum, int32(flushCounter)) @@ -823,7 +827,7 @@ func Test_BinlogAdapterDispatch(t *testing.T) { // prepare empty in-memory segments data partitionID := int64(1) - fieldsData := createFieldsData(sampleSchema(), 0) + fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp) shardsData := createShardsData(sampleSchema(), fieldsData, shardNum, []int64{partitionID}) shardList := []int32{0, -1, 1} @@ -1146,7 +1150,7 @@ func Test_BinlogAdapterVerifyField(t *testing.T) { shardNum := int32(2) partitionID := int64(1) - fieldsData := createFieldsData(sampleSchema(), 0) + fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp) shardsData := createShardsData(sampleSchema(), fieldsData, shardNum, []int64{partitionID}) flushFunc := func(fields BlockData, shardID int, partID int64) error { @@ -1173,7 +1177,7 @@ func Test_BinlogAdapterReadInsertlog(t *testing.T) { shardNum := int32(2) partitionID := int64(1) - fieldsData := createFieldsData(sampleSchema(), 0) + fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp) shardsData := createShardsData(sampleSchema(), fieldsData, shardNum, []int64{partitionID}) flushFunc := func(fields BlockData, shardID int, partID int64) error { @@ -1205,7 +1209,7 @@ func Test_BinlogAdapterReadInsertlog(t *testing.T) { // prepare binlog data rowCount := 3 - fieldsData = createFieldsData(sampleSchema(), rowCount) + fieldsData = createFieldsData(sampleSchema(), rowCount, baseTimestamp) failedFunc := func(fieldID int64, fieldName string, fieldType schemapb.DataType, wrongField int64, wrongType schemapb.DataType) { // row count mismatch diff --git a/internal/util/importutil/binlog_file_test.go b/internal/util/importutil/binlog_file_test.go index ec12a754d1..807e1b360c 100644 --- a/internal/util/importutil/binlog_file_test.go +++ b/internal/util/importutil/binlog_file_test.go @@ -331,7 +331,7 @@ func Test_BinlogFileBool(t *testing.T) { binlogFile.Close() // wrong log type - chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false) + chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp) err = binlogFile.Open("dummy") assert.NoError(t, err) @@ -388,7 +388,7 @@ func Test_BinlogFileInt8(t *testing.T) { binlogFile.Close() // wrong log type - chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false) + chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp) err = binlogFile.Open("dummy") assert.NoError(t, err) @@ -446,7 +446,7 @@ func Test_BinlogFileInt16(t *testing.T) { binlogFile.Close() // wrong log type - chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false) + chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp) err = binlogFile.Open("dummy") assert.NoError(t, err) @@ -503,7 +503,7 @@ func Test_BinlogFileInt32(t *testing.T) { binlogFile.Close() // wrong log type - chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false) + chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp) err = binlogFile.Open("dummy") assert.NoError(t, err) @@ -560,7 +560,7 @@ func Test_BinlogFileInt64(t *testing.T) { binlogFile.Close() // wrong log type - chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false) + chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp) err = binlogFile.Open("dummy") assert.NoError(t, err) @@ -617,7 +617,7 @@ func Test_BinlogFileFloat(t *testing.T) { binlogFile.Close() // wrong log type - chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false) + chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp) err = binlogFile.Open("dummy") assert.NoError(t, err) @@ -674,7 +674,7 @@ func Test_BinlogFileDouble(t *testing.T) { binlogFile.Close() // wrong log type - chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false) + chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp) err = binlogFile.Open("dummy") assert.NoError(t, err) @@ -778,7 +778,7 @@ func Test_BinlogFileJSON(t *testing.T) { binlogFile.Close() // wrong log type - chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false) + chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp) err = binlogFile.Open("dummy") assert.NoError(t, err) @@ -858,7 +858,7 @@ func Test_BinlogFileArray(t *testing.T) { binlogFile.Close() // wrong log type - chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false) + chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp) err = binlogFile.Open("dummy") assert.NoError(t, err) @@ -937,7 +937,7 @@ func Test_BinlogFileBinaryVector(t *testing.T) { binlogFile.Close() // wrong log type - chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false) + chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp) err = binlogFile.Open("dummy") assert.NoError(t, err) @@ -1004,7 +1004,7 @@ func Test_BinlogFileFloatVector(t *testing.T) { binlogFile.Close() // wrong log type - chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false) + chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp) err = binlogFile.Open("dummy") assert.NoError(t, err) @@ -1072,7 +1072,7 @@ func Test_BinlogFileFloat16Vector(t *testing.T) { binlogFile.Close() // wrong log type - chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false) + chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp) err = binlogFile.Open("dummy") assert.NoError(t, err) diff --git a/internal/util/importutil/binlog_parser_test.go b/internal/util/importutil/binlog_parser_test.go index afd7ce2b19..2ee1b8c3ab 100644 --- a/internal/util/importutil/binlog_parser_test.go +++ b/internal/util/importutil/binlog_parser_test.go @@ -321,7 +321,7 @@ func Test_BinlogParserParse(t *testing.T) { // progress rowCount := 100 - fieldsData := createFieldsData(sampleSchema(), rowCount) + fieldsData := createFieldsData(sampleSchema(), rowCount, baseTimestamp) chunkManager.listResult["deltaPath"] = []string{} chunkManager.listResult["insertPath"] = []string{ "123/0/a", diff --git a/internal/util/importutil/import_util_test.go b/internal/util/importutil/import_util_test.go index 8ec2819d60..16bf8ebfe5 100644 --- a/internal/util/importutil/import_util_test.go +++ b/internal/util/importutil/import_util_test.go @@ -240,7 +240,7 @@ func jsonNumber(value string) json.Number { return json.Number(value) } -func createFieldsData(collectionSchema *schemapb.CollectionSchema, rowCount int) map[storage.FieldID]interface{} { +func createFieldsData(collectionSchema *schemapb.CollectionSchema, rowCount int, startTimestamp int64) map[storage.FieldID]interface{} { fieldsData := make(map[storage.FieldID]interface{}) // internal fields @@ -248,7 +248,7 @@ func createFieldsData(collectionSchema *schemapb.CollectionSchema, rowCount int) timestampData := make([]int64, 0) for i := 0; i < rowCount; i++ { rowIDData = append(rowIDData, int64(i)) - timestampData = append(timestampData, baseTimestamp+int64(i)) + timestampData = append(timestampData, startTimestamp+int64(i)) } fieldsData[0] = rowIDData fieldsData[1] = timestampData @@ -1083,7 +1083,7 @@ func Test_TryFlushBlocks(t *testing.T) { // prepare flush data, 3 shards, each shard 10 rows rowCount := 10 - fieldsData := createFieldsData(schema, rowCount) + fieldsData := createFieldsData(schema, rowCount, baseTimestamp) shardsData := createShardsData(schema, fieldsData, shardNum, []int64{partitionID}) t.Run("non-force flush", func(t *testing.T) { diff --git a/internal/util/importutil/import_wrapper_test.go b/internal/util/importutil/import_wrapper_test.go index 3cb4247899..d4cb5ac887 100644 --- a/internal/util/importutil/import_wrapper_test.go +++ b/internal/util/importutil/import_wrapper_test.go @@ -991,7 +991,7 @@ func Test_ImportWrapperFlushFunc(t *testing.T) { assert.NoError(t, err) }) - fieldsData := createFieldsData(schema, 5) + fieldsData := createFieldsData(schema, 5, baseTimestamp) blockData := createBlockData(schema, fieldsData) t.Run("fieldsData is not empty", func(t *testing.T) { err = wrapper.flushFunc(blockData, shardID, partitionID) diff --git a/internal/util/importutil/json_handler_test.go b/internal/util/importutil/json_handler_test.go index 7f5db26db0..71306b00ab 100644 --- a/internal/util/importutil/json_handler_test.go +++ b/internal/util/importutil/json_handler_test.go @@ -207,7 +207,7 @@ func Test_JSONRowConsumerHandleIntPK(t *testing.T) { consumer.rowIDAllocator = newIDAllocator(ctx, t, errors.New("error")) waitFlushRowCount := 10 - fieldsData := createFieldsData(schema, waitFlushRowCount) + fieldsData := createFieldsData(schema, waitFlushRowCount, baseTimestamp) consumer.shardsData = createShardsData(schema, fieldsData, shardNum, []int64{partitionID}) // nil input will trigger force flush, flushErrFunc returns error diff --git a/internal/util/importutil/numpy_parser_test.go b/internal/util/importutil/numpy_parser_test.go index 62b89fa39b..656831395b 100644 --- a/internal/util/importutil/numpy_parser_test.go +++ b/internal/util/importutil/numpy_parser_test.go @@ -824,7 +824,7 @@ func Test_NumpyParserSplitFieldsData(t *testing.T) { } t.Run("shards number mismatch", func(t *testing.T) { - fieldsData := createFieldsData(sampleSchema(), 0) + fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp) shards := createShardsData(sampleSchema(), fieldsData, 1, []int64{1}) segmentData := genFieldsDataFunc() parser.autoIDRange, err = splitFieldsData(parser.collectionInfo, segmentData, shards, parser.rowIDAllocator) @@ -861,7 +861,7 @@ func Test_NumpyParserSplitFieldsData(t *testing.T) { } parser.collectionInfo.resetSchema(schema) parser.collectionInfo.ShardNum = 2 - fieldsData := createFieldsData(schema, 0) + fieldsData := createFieldsData(schema, 0, baseTimestamp) shards := createShardsData(schema, fieldsData, 2, []int64{1}) parser.autoIDRange, err = splitFieldsData(parser.collectionInfo, segmentData, shards, parser.rowIDAllocator) assert.Error(t, err) @@ -871,7 +871,7 @@ func Test_NumpyParserSplitFieldsData(t *testing.T) { ctx := context.Background() parser.rowIDAllocator = newIDAllocator(ctx, t, errors.New("dummy error")) parser.collectionInfo.resetSchema(sampleSchema()) - fieldsData := createFieldsData(sampleSchema(), 0) + fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp) shards := createShardsData(sampleSchema(), fieldsData, 2, []int64{1}) segmentData := genFieldsDataFunc() parser.autoIDRange, err = splitFieldsData(parser.collectionInfo, segmentData, shards, parser.rowIDAllocator) @@ -885,7 +885,7 @@ func Test_NumpyParserSplitFieldsData(t *testing.T) { schema.AutoID = true partitionID := int64(1) - fieldsData := createFieldsData(sampleSchema(), 0) + fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp) shards := createShardsData(sampleSchema(), fieldsData, 2, []int64{partitionID}) segmentData := genFieldsDataFunc() parser.autoIDRange, err = splitFieldsData(parser.collectionInfo, segmentData, shards, parser.rowIDAllocator) @@ -929,7 +929,7 @@ func Test_NumpyParserSplitFieldsData(t *testing.T) { }, } parser.collectionInfo.resetSchema(schema) - fieldsData := createFieldsData(schema, 0) + fieldsData := createFieldsData(schema, 0, baseTimestamp) shards := createShardsData(schema, fieldsData, 2, []int64{1}) segmentData := make(BlockData) segmentData[101] = &storage.Int64FieldData{ @@ -1198,7 +1198,7 @@ func Test_NumpyParserHashToPartition(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, parser) - fieldsData := createFieldsData(schema, 5) + fieldsData := createFieldsData(schema, 5, baseTimestamp) blockData := createBlockData(schema, fieldsData) // no partition key, partition ID list greater than 1, return error