mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
Fix bug of bulkinsert for dynamic field (#24569)
Signed-off-by: yhmo <yihua.mo@zilliz.com>
This commit is contained in:
parent
5caa654622
commit
3022e37298
@ -401,24 +401,33 @@ func fillDynamicData(blockData map[storage.FieldID]storage.FieldData, collection
|
|||||||
|
|
||||||
rowCount := 0
|
rowCount := 0
|
||||||
if len(blockData) > 0 {
|
if len(blockData) > 0 {
|
||||||
for _, v := range blockData {
|
for id, v := range blockData {
|
||||||
|
if id == dynamicFieldID {
|
||||||
|
continue
|
||||||
|
}
|
||||||
rowCount = v.RowNum()
|
rowCount = v.RowNum()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, ok := blockData[dynamicFieldID]
|
dynamicData, ok := blockData[dynamicFieldID]
|
||||||
if !ok {
|
if !ok || dynamicData == nil {
|
||||||
data := &storage.JSONFieldData{
|
// dynamic field data is not provided, create new one
|
||||||
|
dynamicData = &storage.JSONFieldData{
|
||||||
Data: make([][]byte, 0),
|
Data: make([][]byte, 0),
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if dynamicData.RowNum() == 0 {
|
||||||
|
// fill the dynamic data by row count
|
||||||
|
data := dynamicData.(*storage.JSONFieldData)
|
||||||
bs := []byte("{}")
|
bs := []byte("{}")
|
||||||
for i := 0; i < rowCount; i++ {
|
for i := 0; i < rowCount; i++ {
|
||||||
data.Data = append(data.Data, bs)
|
data.Data = append(data.Data, bs)
|
||||||
}
|
}
|
||||||
blockData[dynamicFieldID] = data
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
blockData[dynamicFieldID] = dynamicData
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -446,6 +455,12 @@ func tryFlushBlocks(ctx context.Context,
|
|||||||
}
|
}
|
||||||
|
|
||||||
blockData := blocksData[i]
|
blockData := blocksData[i]
|
||||||
|
err := fillDynamicData(blockData, collectionSchema)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Import util: failed to fill dynamic field", zap.Error(err))
|
||||||
|
return fmt.Errorf("failed to fill dynamic field, error: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Note: even rowCount is 0, the size is still non-zero
|
// Note: even rowCount is 0, the size is still non-zero
|
||||||
size := 0
|
size := 0
|
||||||
rowCount := 0
|
rowCount := 0
|
||||||
@ -457,12 +472,7 @@ func tryFlushBlocks(ctx context.Context,
|
|||||||
// force to flush, called at the end of Read()
|
// force to flush, called at the end of Read()
|
||||||
if force && rowCount > 0 {
|
if force && rowCount > 0 {
|
||||||
printFieldsDataInfo(blockData, "import util: prepare to force flush a block", nil)
|
printFieldsDataInfo(blockData, "import util: prepare to force flush a block", nil)
|
||||||
err := fillDynamicData(blockData, collectionSchema)
|
err := callFlushFunc(blockData, i)
|
||||||
if err != nil {
|
|
||||||
log.Error("Import util: failed to fill dynamic field", zap.Error(err))
|
|
||||||
return fmt.Errorf("failed to fill dynamic field, error: %w", err)
|
|
||||||
}
|
|
||||||
err = callFlushFunc(blockData, i)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Import util: failed to force flush block data", zap.Int("shardID", i), zap.Error(err))
|
log.Error("Import util: failed to force flush block data", zap.Int("shardID", i), zap.Error(err))
|
||||||
return fmt.Errorf("failed to force flush block data for shard id %d, error: %w", i, err)
|
return fmt.Errorf("failed to force flush block data for shard id %d, error: %w", i, err)
|
||||||
@ -481,12 +491,7 @@ func tryFlushBlocks(ctx context.Context,
|
|||||||
// initialize a new FieldData list for next round batch read
|
// initialize a new FieldData list for next round batch read
|
||||||
if size > int(blockSize) && rowCount > 0 {
|
if size > int(blockSize) && rowCount > 0 {
|
||||||
printFieldsDataInfo(blockData, "import util: prepare to flush block larger than blockSize", nil)
|
printFieldsDataInfo(blockData, "import util: prepare to flush block larger than blockSize", nil)
|
||||||
err := fillDynamicData(blockData, collectionSchema)
|
err := callFlushFunc(blockData, i)
|
||||||
if err != nil {
|
|
||||||
log.Error("Import util: failed to fill dynamic field", zap.Error(err))
|
|
||||||
return fmt.Errorf("failed to fill dynamic field, error: %w", err)
|
|
||||||
}
|
|
||||||
err = callFlushFunc(blockData, i)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Import util: failed to flush block data", zap.Int("shardID", i), zap.Error(err))
|
log.Error("Import util: failed to flush block data", zap.Int("shardID", i), zap.Error(err))
|
||||||
return fmt.Errorf("failed to flush block data for shard id %d, error: %w", i, err)
|
return fmt.Errorf("failed to flush block data for shard id %d, error: %w", i, err)
|
||||||
@ -520,6 +525,12 @@ func tryFlushBlocks(ctx context.Context,
|
|||||||
}
|
}
|
||||||
|
|
||||||
blockData := blocksData[biggestItem]
|
blockData := blocksData[biggestItem]
|
||||||
|
err := fillDynamicData(blockData, collectionSchema)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Import util: failed to fill dynamic field", zap.Error(err))
|
||||||
|
return fmt.Errorf("failed to fill dynamic field, error: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Note: even rowCount is 0, the size is still non-zero
|
// Note: even rowCount is 0, the size is still non-zero
|
||||||
size := 0
|
size := 0
|
||||||
rowCount := 0
|
rowCount := 0
|
||||||
@ -530,11 +541,6 @@ func tryFlushBlocks(ctx context.Context,
|
|||||||
|
|
||||||
if rowCount > 0 {
|
if rowCount > 0 {
|
||||||
printFieldsDataInfo(blockData, "import util: prepare to flush biggest block", nil)
|
printFieldsDataInfo(blockData, "import util: prepare to flush biggest block", nil)
|
||||||
err := fillDynamicData(blockData, collectionSchema)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Import util: failed to fill dynamic field", zap.Error(err))
|
|
||||||
return fmt.Errorf("failed to fill dynamic field, error: %w", err)
|
|
||||||
}
|
|
||||||
err = callFlushFunc(blockData, biggestItem)
|
err = callFlushFunc(blockData, biggestItem)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Import util: failed to flush biggest block data", zap.Int("shardID", biggestItem))
|
log.Error("Import util: failed to flush biggest block data", zap.Int("shardID", biggestItem))
|
||||||
|
|||||||
@ -738,6 +738,10 @@ func (p *NumpyParser) checkRowCount(fieldsData map[storage.FieldID]storage.Field
|
|||||||
if !schema.GetAutoID() {
|
if !schema.GetAutoID() {
|
||||||
v, ok := fieldsData[schema.GetFieldID()]
|
v, ok := fieldsData[schema.GetFieldID()]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
if schema.GetIsDynamic() {
|
||||||
|
// user might not provide numpy file for dynamic field, skip it, will auto-generate later
|
||||||
|
continue
|
||||||
|
}
|
||||||
log.Error("Numpy parser: field not provided", zap.String("fieldName", schema.GetName()))
|
log.Error("Numpy parser: field not provided", zap.String("fieldName", schema.GetName()))
|
||||||
return 0, nil, fmt.Errorf("field '%s' not provided", schema.GetName())
|
return 0, nil, fmt.Errorf("field '%s' not provided", schema.GetName())
|
||||||
}
|
}
|
||||||
@ -852,12 +856,16 @@ func (p *NumpyParser) splitFieldsData(fieldsData map[storage.FieldID]storage.Fie
|
|||||||
schema := p.collectionSchema.Fields[k]
|
schema := p.collectionSchema.Fields[k]
|
||||||
srcData := fieldsData[schema.GetFieldID()]
|
srcData := fieldsData[schema.GetFieldID()]
|
||||||
targetData := shards[shard][schema.GetFieldID()]
|
targetData := shards[shard][schema.GetFieldID()]
|
||||||
|
if srcData == nil && schema.GetIsDynamic() {
|
||||||
|
// user might not provide numpy file for dynamic field, skip it, will auto-generate later
|
||||||
|
continue
|
||||||
|
}
|
||||||
if srcData == nil || targetData == nil {
|
if srcData == nil || targetData == nil {
|
||||||
log.Error("Numpy parser: cannot append data since source or target field data is nil",
|
log.Error("Numpy parser: cannot append data since source or target field data is nil",
|
||||||
zap.String("FieldName", schema.GetName()),
|
zap.String("FieldName", schema.GetName()),
|
||||||
zap.Bool("sourceNil", srcData == nil), zap.Bool("targetNil", targetData == nil))
|
zap.Bool("sourceNil", srcData == nil), zap.Bool("targetNil", targetData == nil))
|
||||||
return fmt.Errorf("cannot append data for field '%s' since source or target field data is nil",
|
return fmt.Errorf("cannot append data for field '%s', possibly no any fields corresponding to this numpy file, or a required numpy file is not provided",
|
||||||
primaryKey.GetName())
|
schema.GetName())
|
||||||
}
|
}
|
||||||
appendFunc := appendFunctions[schema.GetName()]
|
appendFunc := appendFunctions[schema.GetName()]
|
||||||
err := appendFunc(srcData, i, targetData)
|
err := appendFunc(srcData, i, targetData)
|
||||||
|
|||||||
@ -129,6 +129,36 @@ func Test_NumpyParserValidateFileNames(t *testing.T) {
|
|||||||
fileNames = append(fileNames, "FieldFloatVector.npy")
|
fileNames = append(fileNames, "FieldFloatVector.npy")
|
||||||
err = parser.validateFileNames(fileNames)
|
err = parser.validateFileNames(fileNames)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// has dynamic field
|
||||||
|
parser.collectionSchema = &schemapb.CollectionSchema{
|
||||||
|
Name: "schema",
|
||||||
|
Description: "schema",
|
||||||
|
AutoID: true,
|
||||||
|
EnableDynamicField: true,
|
||||||
|
Fields: []*schemapb.FieldSchema{
|
||||||
|
{
|
||||||
|
FieldID: 101,
|
||||||
|
Name: "FieldInt64",
|
||||||
|
IsPrimaryKey: true,
|
||||||
|
AutoID: false,
|
||||||
|
DataType: schemapb.DataType_Int64,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
FieldID: 102,
|
||||||
|
Name: "FieldDynamic",
|
||||||
|
IsDynamic: true,
|
||||||
|
DataType: schemapb.DataType_JSON,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
fileNames = []string{"FieldInt64.npy"}
|
||||||
|
err = parser.validateFileNames(fileNames)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
fileNames = append(fileNames, "FieldDynamic.npy")
|
||||||
|
err = parser.validateFileNames(fileNames)
|
||||||
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NumpyParserValidateHeader(t *testing.T) {
|
func Test_NumpyParserValidateHeader(t *testing.T) {
|
||||||
@ -641,6 +671,37 @@ func Test_NumpyParserCheckRowCount(t *testing.T) {
|
|||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
assert.Zero(t, rowCount)
|
assert.Zero(t, rowCount)
|
||||||
assert.Nil(t, primaryKey)
|
assert.Nil(t, primaryKey)
|
||||||
|
|
||||||
|
// has dynamic field
|
||||||
|
parser.collectionSchema = &schemapb.CollectionSchema{
|
||||||
|
Name: "schema",
|
||||||
|
Description: "schema",
|
||||||
|
AutoID: true,
|
||||||
|
EnableDynamicField: true,
|
||||||
|
Fields: []*schemapb.FieldSchema{
|
||||||
|
{
|
||||||
|
FieldID: 101,
|
||||||
|
Name: "FieldInt64",
|
||||||
|
IsPrimaryKey: true,
|
||||||
|
AutoID: false,
|
||||||
|
DataType: schemapb.DataType_Int64,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
FieldID: 102,
|
||||||
|
Name: "FieldDynamic",
|
||||||
|
IsDynamic: true,
|
||||||
|
DataType: schemapb.DataType_JSON,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
segmentData[101] = &storage.Int64FieldData{
|
||||||
|
Data: []int64{1, 2, 4},
|
||||||
|
}
|
||||||
|
|
||||||
|
rowCount, primaryKey, err = parser.checkRowCount(segmentData)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, 3, rowCount)
|
||||||
|
assert.NotNil(t, primaryKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NumpyParserSplitFieldsData(t *testing.T) {
|
func Test_NumpyParserSplitFieldsData(t *testing.T) {
|
||||||
@ -729,6 +790,41 @@ func Test_NumpyParserSplitFieldsData(t *testing.T) {
|
|||||||
|
|
||||||
schema.AutoID = false
|
schema.AutoID = false
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("has dynamic field", func(t *testing.T) {
|
||||||
|
parser.collectionSchema = &schemapb.CollectionSchema{
|
||||||
|
Name: "schema",
|
||||||
|
Description: "schema",
|
||||||
|
AutoID: true,
|
||||||
|
EnableDynamicField: true,
|
||||||
|
Fields: []*schemapb.FieldSchema{
|
||||||
|
{
|
||||||
|
FieldID: 101,
|
||||||
|
Name: "FieldInt64",
|
||||||
|
IsPrimaryKey: true,
|
||||||
|
AutoID: false,
|
||||||
|
DataType: schemapb.DataType_Int64,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
FieldID: 102,
|
||||||
|
Name: "FieldDynamic",
|
||||||
|
IsDynamic: true,
|
||||||
|
DataType: schemapb.DataType_JSON,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
shards = make([]map[storage.FieldID]storage.FieldData, 0, parser.shardNum)
|
||||||
|
for i := 0; i < int(parser.shardNum); i++ {
|
||||||
|
segmentData := initSegmentData(parser.collectionSchema)
|
||||||
|
shards = append(shards, segmentData)
|
||||||
|
}
|
||||||
|
segmentData = make(map[storage.FieldID]storage.FieldData)
|
||||||
|
segmentData[101] = &storage.Int64FieldData{
|
||||||
|
Data: []int64{1, 2, 4},
|
||||||
|
}
|
||||||
|
err = parser.splitFieldsData(segmentData, shards)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NumpyParserCalcRowCountPerBlock(t *testing.T) {
|
func Test_NumpyParserCalcRowCountPerBlock(t *testing.T) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user