diff --git a/deployments/docker/dev/docker-compose-apple-silicon.yml b/deployments/docker/dev/docker-compose-apple-silicon.yml index 2c815e7188..6bc68971ae 100644 --- a/deployments/docker/dev/docker-compose-apple-silicon.yml +++ b/deployments/docker/dev/docker-compose-apple-silicon.yml @@ -32,12 +32,13 @@ services: image: minio/minio:RELEASE.2022-03-17T06-34-49Z ports: - "9000:9000" + - "9001:9001" environment: MINIO_ACCESS_KEY: minioadmin MINIO_SECRET_KEY: minioadmin volumes: - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data - command: minio server /minio_data + command: minio server /minio_data --console-address ":9001" healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] interval: 30s diff --git a/deployments/docker/dev/docker-compose.yml b/deployments/docker/dev/docker-compose.yml index cbf7e4d3b1..3e7c10e8e8 100644 --- a/deployments/docker/dev/docker-compose.yml +++ b/deployments/docker/dev/docker-compose.yml @@ -36,12 +36,13 @@ services: image: minio/minio:RELEASE.2022-03-17T06-34-49Z ports: - "9000:9000" + - "9001:9001" environment: MINIO_ACCESS_KEY: minioadmin MINIO_SECRET_KEY: minioadmin volumes: - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data - command: minio server /minio_data + command: minio server /minio_data --console-address ":9001" healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] interval: 30s diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 05f8669fc4..d2d8ffdc29 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -775,19 +775,39 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) zap.Any("channel names", req.GetImportTask().GetChannelNames()), zap.Any("working dataNodes", req.WorkingNodes)) + importResult := &rootcoordpb.ImportResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + TaskId: req.GetImportTask().TaskId, + DatanodeId: node.NodeID, + State: commonpb.ImportState_ImportStarted, + Segments: make([]int64, 0), + AutoIds: make([]int64, 0), + RowCount: 0, + } + reportFunc := func(res *rootcoordpb.ImportResult) error { + _, err := node.rootCoord.ReportImport(ctx, res) + return err + } + if !node.isHealthy() { - log.Warn("DataNode.Import failed", + log.Warn("DataNode import failed", zap.Int64("collection ID", req.GetImportTask().GetCollectionId()), zap.Int64("partition ID", req.GetImportTask().GetPartitionId()), zap.Int64("taskID", req.GetImportTask().GetTaskId()), zap.Error(errDataNodeIsUnhealthy(Params.DataNodeCfg.NodeID))) + msg := msgDataNodeIsUnhealthy(Params.DataNodeCfg.NodeID) + importResult.State = commonpb.ImportState_ImportFailed + importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: "failed_reason", Value: msg}) + reportFunc(importResult) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgDataNodeIsUnhealthy(Params.DataNodeCfg.NodeID), + Reason: msg, }, nil } - rep, err := node.rootCoord.AllocTimestamp(node.ctx, &rootcoordpb.AllocTimestampRequest{ + rep, err := node.rootCoord.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_RequestTSO, MsgID: 0, @@ -796,11 +816,17 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) }, Count: 1, }) + if rep.Status.ErrorCode != commonpb.ErrorCode_Success || err != nil { + msg := "DataNode alloc ts failed" + log.Warn(msg) + importResult.State = commonpb.ImportState_ImportFailed + importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: "failed_reason", Value: msg}) + reportFunc(importResult) if err != nil { return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "DataNode alloc ts failed", + Reason: msg, }, nil } } @@ -810,35 +836,27 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) metaService := newMetaService(node.rootCoord, req.GetImportTask().GetCollectionId()) schema, err := metaService.getCollectionSchema(ctx, req.GetImportTask().GetCollectionId(), 0) if err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, nil - } - idAllocator, err := allocator2.NewIDAllocator(node.ctx, node.rootCoord, Params.DataNodeCfg.NodeID) - importResult := &rootcoordpb.ImportResult{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - TaskId: req.GetImportTask().TaskId, - DatanodeId: node.NodeID, - State: commonpb.ImportState_ImportPersisted, - Segments: make([]int64, 0), - RowCount: 0, - } - importWrapper := importutil.NewImportWrapper(ctx, schema, 2, Params.DataNodeCfg.FlushInsertBufferSize, idAllocator, node.chunkManager, - importFlushReqFunc(node, req, importResult, schema, ts)) - err = importWrapper.Import(req.GetImportTask().GetFiles(), req.GetImportTask().GetRowBased(), false) - if err != nil { + importResult.State = commonpb.ImportState_ImportFailed + importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: "failed_reason", Value: err.Error()}) + reportFunc(importResult) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), }, nil } - // report root coord that the import task has been finished - _, err = node.rootCoord.ReportImport(ctx, importResult) + // temp id allocator service + idAllocator, err := allocator2.NewIDAllocator(node.ctx, node.rootCoord, Params.DataNodeCfg.NodeID) + _ = idAllocator.Start() + defer idAllocator.Close() + + importWrapper := importutil.NewImportWrapper(ctx, schema, 2, Params.DataNodeCfg.FlushInsertBufferSize, idAllocator, node.chunkManager, + importFlushReqFunc(node, req, importResult, schema, ts), importResult, reportFunc) + err = importWrapper.Import(req.GetImportTask().GetFiles(), req.GetImportTask().GetRowBased(), false) if err != nil { + importResult.State = commonpb.ImportState_ImportFailed + importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: "failed_reason", Value: err.Error()}) + reportFunc(importResult) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index f41f9e752a..fc84c669e1 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -400,6 +400,15 @@ func TestDataNode(t *testing.T) { stat, err := node.Import(context.WithValue(ctx, ctxKey{}, ""), req) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_UnexpectedError, stat.ErrorCode) + + stat, err = node.Import(context.WithValue(ctx, ctxKey{}, returnError), req) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode()) + + node.State.Store(internalpb.StateCode_Abnormal) + stat, err = node.Import(context.WithValue(ctx, ctxKey{}, ""), req) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode()) }) t.Run("Test BackGroundGC", func(t *testing.T) { diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index ae2055941a..ec3e1fe6a7 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -890,6 +890,13 @@ func (m *RootCoordFactory) AllocTimestamp(ctx context.Context, in *rootcoordpb.A Status: &commonpb.Status{}, Timestamp: 1000, } + + v := ctx.Value(ctxKey{}) + if v != nil && v.(string) == returnError { + resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError + return resp, fmt.Errorf("injected error") + } + return resp, nil } diff --git a/internal/rootcoord/import_manager.go b/internal/rootcoord/import_manager.go index d6429c90e7..20dc1bfe95 100644 --- a/internal/rootcoord/import_manager.go +++ b/internal/rootcoord/import_manager.go @@ -328,6 +328,7 @@ func (m *importManager) updateTaskState(ir *rootcoordpb.ImportResult) (*datapb.I v.State.StateCode = ir.GetState() v.State.Segments = ir.GetSegments() v.State.RowCount = ir.GetRowCount() + v.State.RowIds = ir.AutoIds for _, kv := range ir.GetInfos() { if kv.GetKey() == FailedReason { v.State.ErrorMessage = kv.GetValue() diff --git a/internal/util/importutil/import_wrapper.go b/internal/util/importutil/import_wrapper.go index a707e74995..6d9e03e488 100644 --- a/internal/util/importutil/import_wrapper.go +++ b/internal/util/importutil/import_wrapper.go @@ -14,6 +14,8 @@ import ( "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -34,10 +36,14 @@ type ImportWrapper struct { chunkManager storage.ChunkManager callFlushFunc ImportFlushFunc // call back function to flush a segment + + importResult *rootcoordpb.ImportResult // import result + reportFunc func(res *rootcoordpb.ImportResult) error // report import state to rootcoord } func NewImportWrapper(ctx context.Context, collectionSchema *schemapb.CollectionSchema, shardNum int32, segmentSize int64, - idAlloc *allocator.IDAllocator, cm storage.ChunkManager, flushFunc ImportFlushFunc) *ImportWrapper { + idAlloc *allocator.IDAllocator, cm storage.ChunkManager, flushFunc ImportFlushFunc, + importResult *rootcoordpb.ImportResult, reportFunc func(res *rootcoordpb.ImportResult) error) *ImportWrapper { if collectionSchema == nil { log.Error("import error: collection schema is nil") return nil @@ -69,6 +75,8 @@ func NewImportWrapper(ctx context.Context, collectionSchema *schemapb.Collection rowIDAllocator: idAlloc, callFlushFunc: flushFunc, chunkManager: cm, + importResult: importResult, + reportFunc: reportFunc, } return wrapper @@ -110,16 +118,23 @@ func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate b for i := 0; i < len(filePaths); i++ { filePath := filePaths[i] _, fileType := getFileNameAndExt(filePath) - log.Info("imprort wrapper: row-based file ", zap.Any("filePath", filePath), zap.Any("fileType", fileType)) + log.Info("import wrapper: row-based file ", zap.Any("filePath", filePath), zap.Any("fileType", fileType)) if fileType == JSONFileExt { err := func() error { + // for minio storage, chunkManager will download file into local memory + // for local storage, chunkManager open the file directly file, err := p.chunkManager.Reader(filePath) if err != nil { return err } defer file.Close() + // report file process state + p.importResult.State = commonpb.ImportState_ImportDownloaded + p.reportFunc(p.importResult) + + // parse file reader := bufio.NewReader(file) parser := NewJSONParser(p.ctx, p.collectionSchema) var consumer *JSONRowConsumer @@ -131,17 +146,26 @@ func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate b consumer = NewJSONRowConsumer(p.collectionSchema, p.rowIDAllocator, p.shardNum, p.segmentSize, flushFunc) } validator := NewJSONRowValidator(p.collectionSchema, consumer) - err = parser.ParseRows(reader, validator) if err != nil { + log.Error("import error: "+err.Error(), zap.String("filePath", filePath)) return err } + // for row-based files, auto-id is generated within JSONRowConsumer + if consumer != nil { + p.importResult.AutoIds = append(p.importResult.AutoIds, consumer.IDRange()...) + } + + // report file process state + p.importResult.State = commonpb.ImportState_ImportParsed + p.reportFunc(p.importResult) + return nil }() if err != nil { - log.Error("imprort error: "+err.Error(), zap.String("filePath", filePath)) + log.Error("import error: "+err.Error(), zap.String("filePath", filePath)) return err } } @@ -160,7 +184,7 @@ func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate b return nil } - p.printFieldsDataInfo(fields, "imprort wrapper: combine field data", nil) + p.printFieldsDataInfo(fields, "import wrapper: combine field data", nil) fieldNames := make([]storage.FieldID, 0) for k, v := range fields { @@ -193,17 +217,24 @@ func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate b for i := 0; i < len(filePaths); i++ { filePath := filePaths[i] fileName, fileType := getFileNameAndExt(filePath) - log.Info("imprort wrapper: column-based file ", zap.Any("filePath", filePath), zap.Any("fileType", fileType)) + log.Info("import wrapper: column-based file ", zap.Any("filePath", filePath), zap.Any("fileType", fileType)) if fileType == JSONFileExt { err := func() error { + // for minio storage, chunkManager will download file into local memory + // for local storage, chunkManager open the file directly file, err := p.chunkManager.Reader(filePath) if err != nil { - log.Error("imprort error: "+err.Error(), zap.String("filePath", filePath)) + log.Error("import error: "+err.Error(), zap.String("filePath", filePath)) return err } defer file.Close() + // report file process state + p.importResult.State = commonpb.ImportState_ImportDownloaded + p.reportFunc(p.importResult) + + // parse file reader := bufio.NewReader(file) parser := NewJSONParser(p.ctx, p.collectionSchema) var consumer *JSONColumnConsumer @@ -214,24 +245,35 @@ func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate b err = parser.ParseColumns(reader, validator) if err != nil { - log.Error("imprort error: "+err.Error(), zap.String("filePath", filePath)) + log.Error("import error: "+err.Error(), zap.String("filePath", filePath)) return err } + // report file process state + p.importResult.State = commonpb.ImportState_ImportParsed + p.reportFunc(p.importResult) + return nil }() if err != nil { - log.Error("imprort error: "+err.Error(), zap.String("filePath", filePath)) + log.Error("import error: "+err.Error(), zap.String("filePath", filePath)) return err } } else if fileType == NumpyFileExt { + // for minio storage, chunkManager will download file into local memory + // for local storage, chunkManager open the file directly file, err := p.chunkManager.Reader(filePath) if err != nil { - log.Error("imprort error: "+err.Error(), zap.String("filePath", filePath)) + log.Error("import error: "+err.Error(), zap.String("filePath", filePath)) return err } defer file.Close() + + // report file process state + p.importResult.State = commonpb.ImportState_ImportDownloaded + p.reportFunc(p.importResult) + var id storage.FieldID for _, field := range p.collectionSchema.Fields { if field.GetName() == fileName { @@ -251,21 +293,27 @@ func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate b parser := NewNumpyParser(p.ctx, p.collectionSchema, flushFunc) err = parser.Parse(file, fileName, onlyValidate) if err != nil { - log.Error("imprort error: "+err.Error(), zap.String("filePath", filePath)) + log.Error("import error: "+err.Error(), zap.String("filePath", filePath)) return err } + + // report file process state + p.importResult.State = commonpb.ImportState_ImportParsed + p.reportFunc(p.importResult) } } // split fields data into segments err := p.splitFieldsData(fieldsData, filePaths) if err != nil { - log.Error("imprort error: " + err.Error()) + log.Error("import error: " + err.Error()) return err } } - return nil + // report file process state + p.importResult.State = commonpb.ImportState_ImportPersisted + return p.reportFunc(p.importResult) } func (p *ImportWrapper) appendFunc(schema *schemapb.FieldSchema) func(src storage.FieldData, n int, target storage.FieldData) error { @@ -346,7 +394,7 @@ func (p *ImportWrapper) appendFunc(schema *schemapb.FieldSchema) func(src storag func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.FieldData, files []string) error { if len(fieldsData) == 0 { - return errors.New("imprort error: fields data is empty") + return errors.New("import error: fields data is empty") } var primaryKey *schemapb.FieldSchema @@ -357,37 +405,43 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F } else { _, ok := fieldsData[schema.GetFieldID()] if !ok { - return errors.New("imprort error: field " + schema.GetName() + " not provided") + return errors.New("import error: field " + schema.GetName() + " not provided") } } } if primaryKey == nil { - return errors.New("imprort error: primary key field is not found") + return errors.New("import error: primary key field is not found") } rowCount := 0 for _, v := range fieldsData { - rowCount = v.RowNum() - break + if v.RowNum() > rowCount { + rowCount = v.RowNum() + } } primaryData, ok := fieldsData[primaryKey.GetFieldID()] if !ok { - // generate auto id for primary key - if primaryKey.GetAutoID() { - var rowIDBegin typeutil.UniqueID - var rowIDEnd typeutil.UniqueID - rowIDBegin, rowIDEnd, _ = p.rowIDAllocator.Alloc(uint32(rowCount)) + return errors.New("import error: primary key field is not provided") + } - primaryDataArr := primaryData.(*storage.Int64FieldData) - for i := rowIDBegin; i < rowIDEnd; i++ { - primaryDataArr.Data = append(primaryDataArr.Data, rowIDBegin+i) - } + // generate auto id for primary key + if primaryKey.GetAutoID() { + log.Info("import wrapper: generating auto-id", zap.Any("rowCount", rowCount)) + var rowIDBegin typeutil.UniqueID + var rowIDEnd typeutil.UniqueID + rowIDBegin, rowIDEnd, _ = p.rowIDAllocator.Alloc(uint32(rowCount)) + + primaryDataArr := primaryData.(*storage.Int64FieldData) + for i := rowIDBegin; i < rowIDEnd; i++ { + primaryDataArr.Data = append(primaryDataArr.Data, rowIDBegin+i) } + + p.importResult.AutoIds = append(p.importResult.AutoIds, rowIDBegin, rowIDEnd) } if primaryData.RowNum() <= 0 { - return errors.New("imprort error: primary key " + primaryKey.GetName() + " not provided") + return errors.New("import error: primary key " + primaryKey.GetName() + " not provided") } // prepare segemnts @@ -406,7 +460,7 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F schema := p.collectionSchema.Fields[i] appendFunc := p.appendFunc(schema) if appendFunc == nil { - return errors.New("imprort error: unsupported field data type") + return errors.New("import error: unsupported field data type") } appendFunctions[schema.GetName()] = appendFunc } diff --git a/internal/util/importutil/import_wrapper_test.go b/internal/util/importutil/import_wrapper_test.go index a944990448..53ca13ab05 100644 --- a/internal/util/importutil/import_wrapper_test.go +++ b/internal/util/importutil/import_wrapper_test.go @@ -14,6 +14,7 @@ import ( "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/dependency" @@ -29,7 +30,8 @@ func Test_NewImportWrapper(t *testing.T) { ctx := context.Background() cm, err := f.NewVectorStorageChunkManager(ctx) assert.NoError(t, err) - wrapper := NewImportWrapper(ctx, nil, 2, 1, nil, cm, nil) + + wrapper := NewImportWrapper(ctx, nil, 2, 1, nil, cm, nil, nil, nil) assert.Nil(t, wrapper) schema := &schemapb.CollectionSchema{ @@ -47,7 +49,7 @@ func Test_NewImportWrapper(t *testing.T) { Description: "int64", DataType: schemapb.DataType_Int64, }) - wrapper = NewImportWrapper(ctx, schema, 2, 1, nil, cm, nil) + wrapper = NewImportWrapper(ctx, schema, 2, 1, nil, cm, nil, nil, nil) assert.NotNil(t, wrapper) err = wrapper.Cancel() @@ -93,12 +95,27 @@ func Test_ImportRowBased(t *testing.T) { } // success case - wrapper := NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc) + importResult := &rootcoordpb.ImportResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + TaskId: 1, + DatanodeId: 1, + State: commonpb.ImportState_ImportStarted, + Segments: make([]int64, 0), + AutoIds: make([]int64, 0), + RowCount: 0, + } + reportFunc := func(res *rootcoordpb.ImportResult) error { + return nil + } + wrapper := NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc, importResult, reportFunc) files := make([]string, 0) files = append(files, filePath) err = wrapper.Import(files, true, false) assert.Nil(t, err) assert.Equal(t, 5, rowCount) + assert.Equal(t, commonpb.ImportState_ImportPersisted, importResult.State) // parse error content = []byte(`{ @@ -111,11 +128,13 @@ func Test_ImportRowBased(t *testing.T) { err = cm.Write(filePath, content) assert.NoError(t, err) - wrapper = NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc) + importResult.State = commonpb.ImportState_ImportStarted + wrapper = NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc, importResult, reportFunc) files = make([]string, 0) files = append(files, filePath) err = wrapper.Import(files, true, false) assert.NotNil(t, err) + assert.NotEqual(t, commonpb.ImportState_ImportPersisted, importResult.State) // file doesn't exist files = make([]string, 0) @@ -178,12 +197,27 @@ func Test_ImportColumnBased_json(t *testing.T) { } // success case - wrapper := NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc) + importResult := &rootcoordpb.ImportResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + TaskId: 1, + DatanodeId: 1, + State: commonpb.ImportState_ImportStarted, + Segments: make([]int64, 0), + AutoIds: make([]int64, 0), + RowCount: 0, + } + reportFunc := func(res *rootcoordpb.ImportResult) error { + return nil + } + wrapper := NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc, importResult, reportFunc) files := make([]string, 0) files = append(files, filePath) err = wrapper.Import(files, false, false) assert.Nil(t, err) assert.Equal(t, 5, rowCount) + assert.Equal(t, commonpb.ImportState_ImportPersisted, importResult.State) // parse error content = []byte(`{ @@ -194,11 +228,13 @@ func Test_ImportColumnBased_json(t *testing.T) { err = cm.Write(filePath, content) assert.NoError(t, err) - wrapper = NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc) + importResult.State = commonpb.ImportState_ImportStarted + wrapper = NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc, importResult, reportFunc) files = make([]string, 0) files = append(files, filePath) err = wrapper.Import(files, false, false) assert.NotNil(t, err) + assert.NotEqual(t, commonpb.ImportState_ImportPersisted, importResult.State) // file doesn't exist files = make([]string, 0) @@ -268,11 +304,26 @@ func Test_ImportColumnBased_numpy(t *testing.T) { } // success case - wrapper := NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc) + importResult := &rootcoordpb.ImportResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + TaskId: 1, + DatanodeId: 1, + State: commonpb.ImportState_ImportStarted, + Segments: make([]int64, 0), + AutoIds: make([]int64, 0), + RowCount: 0, + } + reportFunc := func(res *rootcoordpb.ImportResult) error { + return nil + } + wrapper := NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc, importResult, reportFunc) err = wrapper.Import(files, false, false) assert.Nil(t, err) assert.Equal(t, 5, rowCount) + assert.Equal(t, commonpb.ImportState_ImportPersisted, importResult.State) // parse error content = []byte(`{ @@ -283,11 +334,12 @@ func Test_ImportColumnBased_numpy(t *testing.T) { err = cm.Write(filePath, content) assert.NoError(t, err) - wrapper = NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc) + wrapper = NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc, importResult, reportFunc) files = make([]string, 0) files = append(files, filePath) err = wrapper.Import(files, false, false) assert.NotNil(t, err) + assert.NotEqual(t, commonpb.ImportState_ImportPersisted, importResult.State) // file doesn't exist files = make([]string, 0) @@ -403,7 +455,21 @@ func Test_ImportRowBased_perf(t *testing.T) { schema := perfSchema(dim) - wrapper := NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, flushFunc) + importResult := &rootcoordpb.ImportResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + TaskId: 1, + DatanodeId: 1, + State: commonpb.ImportState_ImportStarted, + Segments: make([]int64, 0), + AutoIds: make([]int64, 0), + RowCount: 0, + } + reportFunc := func(res *rootcoordpb.ImportResult) error { + return nil + } + wrapper := NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, flushFunc, importResult, reportFunc) files := make([]string, 0) files = append(files, filePath) err = wrapper.Import(files, true, false) @@ -501,7 +567,21 @@ func Test_ImportColumnBased_perf(t *testing.T) { schema := perfSchema(dim) - wrapper := NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, flushFunc) + importResult := &rootcoordpb.ImportResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + TaskId: 1, + DatanodeId: 1, + State: commonpb.ImportState_ImportStarted, + Segments: make([]int64, 0), + AutoIds: make([]int64, 0), + RowCount: 0, + } + reportFunc := func(res *rootcoordpb.ImportResult) error { + return nil + } + wrapper := NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, flushFunc, importResult, reportFunc) files := make([]string, 0) files = append(files, filePath1) files = append(files, filePath2) diff --git a/internal/util/importutil/json_handler.go b/internal/util/importutil/json_handler.go index 77ffdb5988..cb2d4b45e0 100644 --- a/internal/util/importutil/json_handler.go +++ b/internal/util/importutil/json_handler.go @@ -395,6 +395,7 @@ type JSONRowConsumer struct { segmentsData []map[storage.FieldID]storage.FieldData // in-memory segments data segmentSize int64 // maximum size of a segment(unit:byte) primaryKey storage.FieldID // name of primary key + autoIDRange []int64 // auto-generated id range, for example: [1, 10, 20, 25] means id from 1 to 10 and 20 to 25 callFlushFunc ImportFlushFunc // call back function to flush segment } @@ -482,6 +483,7 @@ func NewJSONRowConsumer(collectionSchema *schemapb.CollectionSchema, idAlloc *al segmentSize: segmentSize, rowCounter: 0, primaryKey: -1, + autoIDRange: make([]int64, 0), callFlushFunc: flushFunc, } @@ -517,6 +519,10 @@ func NewJSONRowConsumer(collectionSchema *schemapb.CollectionSchema, idAlloc *al return v } +func (v *JSONRowConsumer) IDRange() []int64 { + return v.autoIDRange +} + func (v *JSONRowConsumer) flush(force bool) error { // force flush all data if force { @@ -580,6 +586,7 @@ func (v *JSONRowConsumer) Handle(rows []map[storage.FieldID]interface{}) error { if rowIDEnd-rowIDBegin != int64(len(rows)) { return errors.New("JSON row consumer: failed to allocate ID for " + strconv.Itoa(len(rows)) + " rows") } + v.autoIDRange = append(v.autoIDRange, rowIDBegin, rowIDEnd) } // consume rows diff --git a/internal/util/importutil/numpy_parser.go b/internal/util/importutil/numpy_parser.go index f6ccb3df4d..eff9db01a4 100644 --- a/internal/util/importutil/numpy_parser.go +++ b/internal/util/importutil/numpy_parser.go @@ -103,7 +103,7 @@ func (p *NumpyParser) validate(adapter *NumpyAdapter, fieldName string) error { // 1. field data type should be consist to numpy data type // 2. vector field dimension should be consist to numpy shape if schemapb.DataType_FloatVector == schema.DataType { - if elementType != schemapb.DataType_Float { + if elementType != schemapb.DataType_Float && elementType != schemapb.DataType_Double { return errors.New("illegal data type " + adapter.GetType() + " for field " + schema.GetName()) } @@ -247,11 +247,32 @@ func (p *NumpyParser) consume(adapter *NumpyAdapter) error { Dim: p.columnDesc.dimension, } case schemapb.DataType_FloatVector: - data, err := adapter.ReadFloat32(p.columnDesc.elementCount) + // for float vector, we support float32 and float64 numpy file because python float value is 64 bit + // for float64 numpy file, the performance is worse than float32 numpy file + // we don't check overflow here + elementType, err := convertNumpyType(adapter.GetType()) if err != nil { return err } + var data []float32 + if elementType == schemapb.DataType_Float { + data, err = adapter.ReadFloat32(p.columnDesc.elementCount) + if err != nil { + return err + } + } else if elementType == schemapb.DataType_Double { + data = make([]float32, 0, p.columnDesc.elementCount) + data64, err := adapter.ReadFloat64(p.columnDesc.elementCount) + if err != nil { + return err + } + + for _, f64 := range data64 { + data = append(data, float32(f64)) + } + } + p.columnData = &storage.FloatVectorFieldData{ NumRows: []int64{int64(p.columnDesc.elementCount)}, Data: data, diff --git a/internal/util/importutil/numpy_parser_test.go b/internal/util/importutil/numpy_parser_test.go index 84edd18e86..87b8195a09 100644 --- a/internal/util/importutil/numpy_parser_test.go +++ b/internal/util/importutil/numpy_parser_test.go @@ -442,7 +442,7 @@ func Test_Parse(t *testing.T) { } checkFunc(data8, "field_binary_vector", flushFunc) - // double vector + // double vector(element can be float32 or float64) data9 := [][4]float32{{1.1, 2.1, 3.1, 4.1}, {5.2, 6.2, 7.2, 8.2}} flushFunc = func(field storage.FieldData) error { assert.NotNil(t, field) @@ -458,6 +458,22 @@ func Test_Parse(t *testing.T) { return nil } checkFunc(data9, "field_float_vector", flushFunc) + + data10 := [][4]float64{{1.1, 2.1, 3.1, 4.1}, {5.2, 6.2, 7.2, 8.2}} + flushFunc = func(field storage.FieldData) error { + assert.NotNil(t, field) + assert.Equal(t, len(data10), field.RowNum()) + + for i := 0; i < len(data10); i++ { + row := field.GetRow(i).([]float32) + for k := 0; k < len(row); k++ { + assert.Equal(t, float32(data10[i][k]), row[k]) + } + } + + return nil + } + checkFunc(data10, "field_float_vector", flushFunc) } func Test_Parse_perf(t *testing.T) {