diff --git a/internal/util/importutil/import_util.go b/internal/util/importutil/import_util.go index d23ace8f17..c44d072c8c 100644 --- a/internal/util/importutil/import_util.go +++ b/internal/util/importutil/import_util.go @@ -401,24 +401,33 @@ func fillDynamicData(blockData map[storage.FieldID]storage.FieldData, collection rowCount := 0 if len(blockData) > 0 { - for _, v := range blockData { + for id, v := range blockData { + if id == dynamicFieldID { + continue + } rowCount = v.RowNum() } } - _, ok := blockData[dynamicFieldID] - if !ok { - data := &storage.JSONFieldData{ + dynamicData, ok := blockData[dynamicFieldID] + if !ok || dynamicData == nil { + // dynamic field data is not provided, create new one + dynamicData = &storage.JSONFieldData{ Data: make([][]byte, 0), } + } + if dynamicData.RowNum() == 0 { + // fill the dynamic data by row count + data := dynamicData.(*storage.JSONFieldData) bs := []byte("{}") for i := 0; i < rowCount; i++ { data.Data = append(data.Data, bs) } - blockData[dynamicFieldID] = data } + blockData[dynamicFieldID] = dynamicData + return nil } @@ -446,6 +455,12 @@ func tryFlushBlocks(ctx context.Context, } blockData := blocksData[i] + err := fillDynamicData(blockData, collectionSchema) + if err != nil { + log.Error("Import util: failed to fill dynamic field", zap.Error(err)) + return fmt.Errorf("failed to fill dynamic field, error: %w", err) + } + // Note: even rowCount is 0, the size is still non-zero size := 0 rowCount := 0 @@ -457,12 +472,7 @@ func tryFlushBlocks(ctx context.Context, // force to flush, called at the end of Read() if force && rowCount > 0 { printFieldsDataInfo(blockData, "import util: prepare to force flush a block", nil) - err := fillDynamicData(blockData, collectionSchema) - if err != nil { - log.Error("Import util: failed to fill dynamic field", zap.Error(err)) - return fmt.Errorf("failed to fill dynamic field, error: %w", err) - } - err = callFlushFunc(blockData, i) + err := callFlushFunc(blockData, i) if err != nil { log.Error("Import util: failed to force flush block data", zap.Int("shardID", i), zap.Error(err)) return fmt.Errorf("failed to force flush block data for shard id %d, error: %w", i, err) @@ -481,12 +491,7 @@ func tryFlushBlocks(ctx context.Context, // initialize a new FieldData list for next round batch read if size > int(blockSize) && rowCount > 0 { printFieldsDataInfo(blockData, "import util: prepare to flush block larger than blockSize", nil) - err := fillDynamicData(blockData, collectionSchema) - if err != nil { - log.Error("Import util: failed to fill dynamic field", zap.Error(err)) - return fmt.Errorf("failed to fill dynamic field, error: %w", err) - } - err = callFlushFunc(blockData, i) + err := callFlushFunc(blockData, i) if err != nil { log.Error("Import util: failed to flush block data", zap.Int("shardID", i), zap.Error(err)) return fmt.Errorf("failed to flush block data for shard id %d, error: %w", i, err) @@ -520,6 +525,12 @@ func tryFlushBlocks(ctx context.Context, } blockData := blocksData[biggestItem] + err := fillDynamicData(blockData, collectionSchema) + if err != nil { + log.Error("Import util: failed to fill dynamic field", zap.Error(err)) + return fmt.Errorf("failed to fill dynamic field, error: %w", err) + } + // Note: even rowCount is 0, the size is still non-zero size := 0 rowCount := 0 @@ -530,11 +541,6 @@ func tryFlushBlocks(ctx context.Context, if rowCount > 0 { printFieldsDataInfo(blockData, "import util: prepare to flush biggest block", nil) - err := fillDynamicData(blockData, collectionSchema) - if err != nil { - log.Error("Import util: failed to fill dynamic field", zap.Error(err)) - return fmt.Errorf("failed to fill dynamic field, error: %w", err) - } err = callFlushFunc(blockData, biggestItem) if err != nil { log.Error("Import util: failed to flush biggest block data", zap.Int("shardID", biggestItem)) diff --git a/internal/util/importutil/numpy_parser.go b/internal/util/importutil/numpy_parser.go index a664c47d83..3d245471ae 100644 --- a/internal/util/importutil/numpy_parser.go +++ b/internal/util/importutil/numpy_parser.go @@ -738,6 +738,10 @@ func (p *NumpyParser) checkRowCount(fieldsData map[storage.FieldID]storage.Field if !schema.GetAutoID() { v, ok := fieldsData[schema.GetFieldID()] if !ok { + if schema.GetIsDynamic() { + // user might not provide numpy file for dynamic field, skip it, will auto-generate later + continue + } log.Error("Numpy parser: field not provided", zap.String("fieldName", schema.GetName())) return 0, nil, fmt.Errorf("field '%s' not provided", schema.GetName()) } @@ -852,12 +856,16 @@ func (p *NumpyParser) splitFieldsData(fieldsData map[storage.FieldID]storage.Fie schema := p.collectionSchema.Fields[k] srcData := fieldsData[schema.GetFieldID()] targetData := shards[shard][schema.GetFieldID()] + if srcData == nil && schema.GetIsDynamic() { + // user might not provide numpy file for dynamic field, skip it, will auto-generate later + continue + } if srcData == nil || targetData == nil { log.Error("Numpy parser: cannot append data since source or target field data is nil", zap.String("FieldName", schema.GetName()), zap.Bool("sourceNil", srcData == nil), zap.Bool("targetNil", targetData == nil)) - return fmt.Errorf("cannot append data for field '%s' since source or target field data is nil", - primaryKey.GetName()) + return fmt.Errorf("cannot append data for field '%s', possibly no any fields corresponding to this numpy file, or a required numpy file is not provided", + schema.GetName()) } appendFunc := appendFunctions[schema.GetName()] err := appendFunc(srcData, i, targetData) diff --git a/internal/util/importutil/numpy_parser_test.go b/internal/util/importutil/numpy_parser_test.go index c62518b253..d457301afc 100644 --- a/internal/util/importutil/numpy_parser_test.go +++ b/internal/util/importutil/numpy_parser_test.go @@ -125,10 +125,40 @@ func Test_NumpyParserValidateFileNames(t *testing.T) { err = parser.validateFileNames(fileNames) assert.Error(t, err) - //valid + // valid fileNames = append(fileNames, "FieldFloatVector.npy") err = parser.validateFileNames(fileNames) assert.NoError(t, err) + + // has dynamic field + parser.collectionSchema = &schemapb.CollectionSchema{ + Name: "schema", + Description: "schema", + AutoID: true, + EnableDynamicField: true, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 101, + Name: "FieldInt64", + IsPrimaryKey: true, + AutoID: false, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 102, + Name: "FieldDynamic", + IsDynamic: true, + DataType: schemapb.DataType_JSON, + }, + }, + } + fileNames = []string{"FieldInt64.npy"} + err = parser.validateFileNames(fileNames) + assert.NoError(t, err) + + fileNames = append(fileNames, "FieldDynamic.npy") + err = parser.validateFileNames(fileNames) + assert.NoError(t, err) } func Test_NumpyParserValidateHeader(t *testing.T) { @@ -641,6 +671,37 @@ func Test_NumpyParserCheckRowCount(t *testing.T) { assert.Error(t, err) assert.Zero(t, rowCount) assert.Nil(t, primaryKey) + + // has dynamic field + parser.collectionSchema = &schemapb.CollectionSchema{ + Name: "schema", + Description: "schema", + AutoID: true, + EnableDynamicField: true, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 101, + Name: "FieldInt64", + IsPrimaryKey: true, + AutoID: false, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 102, + Name: "FieldDynamic", + IsDynamic: true, + DataType: schemapb.DataType_JSON, + }, + }, + } + segmentData[101] = &storage.Int64FieldData{ + Data: []int64{1, 2, 4}, + } + + rowCount, primaryKey, err = parser.checkRowCount(segmentData) + assert.NoError(t, err) + assert.Equal(t, 3, rowCount) + assert.NotNil(t, primaryKey) } func Test_NumpyParserSplitFieldsData(t *testing.T) { @@ -729,6 +790,41 @@ func Test_NumpyParserSplitFieldsData(t *testing.T) { schema.AutoID = false }) + + t.Run("has dynamic field", func(t *testing.T) { + parser.collectionSchema = &schemapb.CollectionSchema{ + Name: "schema", + Description: "schema", + AutoID: true, + EnableDynamicField: true, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 101, + Name: "FieldInt64", + IsPrimaryKey: true, + AutoID: false, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 102, + Name: "FieldDynamic", + IsDynamic: true, + DataType: schemapb.DataType_JSON, + }, + }, + } + shards = make([]map[storage.FieldID]storage.FieldData, 0, parser.shardNum) + for i := 0; i < int(parser.shardNum); i++ { + segmentData := initSegmentData(parser.collectionSchema) + shards = append(shards, segmentData) + } + segmentData = make(map[storage.FieldID]storage.FieldData) + segmentData[101] = &storage.Int64FieldData{ + Data: []int64{1, 2, 4}, + } + err = parser.splitFieldsData(segmentData, shards) + assert.NoError(t, err) + }) } func Test_NumpyParserCalcRowCountPerBlock(t *testing.T) {