Report bulk load state (#16555)

Signed-off-by: groot <yihua.mo@zilliz.com>
This commit is contained in:
groot 2022-04-21 21:37:42 +08:00 committed by GitHub
parent 91e84ffedf
commit a6a3b69d91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 285 additions and 70 deletions

View File

@ -32,12 +32,13 @@ services:
image: minio/minio:RELEASE.2022-03-17T06-34-49Z
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
command: minio server /minio_data
command: minio server /minio_data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s

View File

@ -36,12 +36,13 @@ services:
image: minio/minio:RELEASE.2022-03-17T06-34-49Z
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
command: minio server /minio_data
command: minio server /minio_data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s

View File

@ -775,19 +775,39 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
zap.Any("channel names", req.GetImportTask().GetChannelNames()),
zap.Any("working dataNodes", req.WorkingNodes))
importResult := &rootcoordpb.ImportResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
TaskId: req.GetImportTask().TaskId,
DatanodeId: node.NodeID,
State: commonpb.ImportState_ImportStarted,
Segments: make([]int64, 0),
AutoIds: make([]int64, 0),
RowCount: 0,
}
reportFunc := func(res *rootcoordpb.ImportResult) error {
_, err := node.rootCoord.ReportImport(ctx, res)
return err
}
if !node.isHealthy() {
log.Warn("DataNode.Import failed",
log.Warn("DataNode import failed",
zap.Int64("collection ID", req.GetImportTask().GetCollectionId()),
zap.Int64("partition ID", req.GetImportTask().GetPartitionId()),
zap.Int64("taskID", req.GetImportTask().GetTaskId()),
zap.Error(errDataNodeIsUnhealthy(Params.DataNodeCfg.NodeID)))
msg := msgDataNodeIsUnhealthy(Params.DataNodeCfg.NodeID)
importResult.State = commonpb.ImportState_ImportFailed
importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: "failed_reason", Value: msg})
reportFunc(importResult)
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgDataNodeIsUnhealthy(Params.DataNodeCfg.NodeID),
Reason: msg,
}, nil
}
rep, err := node.rootCoord.AllocTimestamp(node.ctx, &rootcoordpb.AllocTimestampRequest{
rep, err := node.rootCoord.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_RequestTSO,
MsgID: 0,
@ -796,11 +816,17 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
},
Count: 1,
})
if rep.Status.ErrorCode != commonpb.ErrorCode_Success || err != nil {
msg := "DataNode alloc ts failed"
log.Warn(msg)
importResult.State = commonpb.ImportState_ImportFailed
importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: "failed_reason", Value: msg})
reportFunc(importResult)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "DataNode alloc ts failed",
Reason: msg,
}, nil
}
}
@ -810,35 +836,27 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
metaService := newMetaService(node.rootCoord, req.GetImportTask().GetCollectionId())
schema, err := metaService.getCollectionSchema(ctx, req.GetImportTask().GetCollectionId(), 0)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}, nil
}
idAllocator, err := allocator2.NewIDAllocator(node.ctx, node.rootCoord, Params.DataNodeCfg.NodeID)
importResult := &rootcoordpb.ImportResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
TaskId: req.GetImportTask().TaskId,
DatanodeId: node.NodeID,
State: commonpb.ImportState_ImportPersisted,
Segments: make([]int64, 0),
RowCount: 0,
}
importWrapper := importutil.NewImportWrapper(ctx, schema, 2, Params.DataNodeCfg.FlushInsertBufferSize, idAllocator, node.chunkManager,
importFlushReqFunc(node, req, importResult, schema, ts))
err = importWrapper.Import(req.GetImportTask().GetFiles(), req.GetImportTask().GetRowBased(), false)
if err != nil {
importResult.State = commonpb.ImportState_ImportFailed
importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: "failed_reason", Value: err.Error()})
reportFunc(importResult)
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}, nil
}
// report root coord that the import task has been finished
_, err = node.rootCoord.ReportImport(ctx, importResult)
// temp id allocator service
idAllocator, err := allocator2.NewIDAllocator(node.ctx, node.rootCoord, Params.DataNodeCfg.NodeID)
_ = idAllocator.Start()
defer idAllocator.Close()
importWrapper := importutil.NewImportWrapper(ctx, schema, 2, Params.DataNodeCfg.FlushInsertBufferSize, idAllocator, node.chunkManager,
importFlushReqFunc(node, req, importResult, schema, ts), importResult, reportFunc)
err = importWrapper.Import(req.GetImportTask().GetFiles(), req.GetImportTask().GetRowBased(), false)
if err != nil {
importResult.State = commonpb.ImportState_ImportFailed
importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: "failed_reason", Value: err.Error()})
reportFunc(importResult)
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),

View File

@ -400,6 +400,15 @@ func TestDataNode(t *testing.T) {
stat, err := node.Import(context.WithValue(ctx, ctxKey{}, ""), req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, stat.ErrorCode)
stat, err = node.Import(context.WithValue(ctx, ctxKey{}, returnError), req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode())
node.State.Store(internalpb.StateCode_Abnormal)
stat, err = node.Import(context.WithValue(ctx, ctxKey{}, ""), req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode())
})
t.Run("Test BackGroundGC", func(t *testing.T) {

View File

@ -890,6 +890,13 @@ func (m *RootCoordFactory) AllocTimestamp(ctx context.Context, in *rootcoordpb.A
Status: &commonpb.Status{},
Timestamp: 1000,
}
v := ctx.Value(ctxKey{})
if v != nil && v.(string) == returnError {
resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
return resp, fmt.Errorf("injected error")
}
return resp, nil
}

View File

@ -328,6 +328,7 @@ func (m *importManager) updateTaskState(ir *rootcoordpb.ImportResult) (*datapb.I
v.State.StateCode = ir.GetState()
v.State.Segments = ir.GetSegments()
v.State.RowCount = ir.GetRowCount()
v.State.RowIds = ir.AutoIds
for _, kv := range ir.GetInfos() {
if kv.GetKey() == FailedReason {
v.State.ErrorMessage = kv.GetValue()

View File

@ -14,6 +14,8 @@ import (
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/typeutil"
@ -34,10 +36,14 @@ type ImportWrapper struct {
chunkManager storage.ChunkManager
callFlushFunc ImportFlushFunc // call back function to flush a segment
importResult *rootcoordpb.ImportResult // import result
reportFunc func(res *rootcoordpb.ImportResult) error // report import state to rootcoord
}
func NewImportWrapper(ctx context.Context, collectionSchema *schemapb.CollectionSchema, shardNum int32, segmentSize int64,
idAlloc *allocator.IDAllocator, cm storage.ChunkManager, flushFunc ImportFlushFunc) *ImportWrapper {
idAlloc *allocator.IDAllocator, cm storage.ChunkManager, flushFunc ImportFlushFunc,
importResult *rootcoordpb.ImportResult, reportFunc func(res *rootcoordpb.ImportResult) error) *ImportWrapper {
if collectionSchema == nil {
log.Error("import error: collection schema is nil")
return nil
@ -69,6 +75,8 @@ func NewImportWrapper(ctx context.Context, collectionSchema *schemapb.Collection
rowIDAllocator: idAlloc,
callFlushFunc: flushFunc,
chunkManager: cm,
importResult: importResult,
reportFunc: reportFunc,
}
return wrapper
@ -110,16 +118,23 @@ func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate b
for i := 0; i < len(filePaths); i++ {
filePath := filePaths[i]
_, fileType := getFileNameAndExt(filePath)
log.Info("imprort wrapper: row-based file ", zap.Any("filePath", filePath), zap.Any("fileType", fileType))
log.Info("import wrapper: row-based file ", zap.Any("filePath", filePath), zap.Any("fileType", fileType))
if fileType == JSONFileExt {
err := func() error {
// for minio storage, chunkManager will download file into local memory
// for local storage, chunkManager open the file directly
file, err := p.chunkManager.Reader(filePath)
if err != nil {
return err
}
defer file.Close()
// report file process state
p.importResult.State = commonpb.ImportState_ImportDownloaded
p.reportFunc(p.importResult)
// parse file
reader := bufio.NewReader(file)
parser := NewJSONParser(p.ctx, p.collectionSchema)
var consumer *JSONRowConsumer
@ -131,17 +146,26 @@ func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate b
consumer = NewJSONRowConsumer(p.collectionSchema, p.rowIDAllocator, p.shardNum, p.segmentSize, flushFunc)
}
validator := NewJSONRowValidator(p.collectionSchema, consumer)
err = parser.ParseRows(reader, validator)
if err != nil {
log.Error("import error: "+err.Error(), zap.String("filePath", filePath))
return err
}
// for row-based files, auto-id is generated within JSONRowConsumer
if consumer != nil {
p.importResult.AutoIds = append(p.importResult.AutoIds, consumer.IDRange()...)
}
// report file process state
p.importResult.State = commonpb.ImportState_ImportParsed
p.reportFunc(p.importResult)
return nil
}()
if err != nil {
log.Error("imprort error: "+err.Error(), zap.String("filePath", filePath))
log.Error("import error: "+err.Error(), zap.String("filePath", filePath))
return err
}
}
@ -160,7 +184,7 @@ func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate b
return nil
}
p.printFieldsDataInfo(fields, "imprort wrapper: combine field data", nil)
p.printFieldsDataInfo(fields, "import wrapper: combine field data", nil)
fieldNames := make([]storage.FieldID, 0)
for k, v := range fields {
@ -193,17 +217,24 @@ func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate b
for i := 0; i < len(filePaths); i++ {
filePath := filePaths[i]
fileName, fileType := getFileNameAndExt(filePath)
log.Info("imprort wrapper: column-based file ", zap.Any("filePath", filePath), zap.Any("fileType", fileType))
log.Info("import wrapper: column-based file ", zap.Any("filePath", filePath), zap.Any("fileType", fileType))
if fileType == JSONFileExt {
err := func() error {
// for minio storage, chunkManager will download file into local memory
// for local storage, chunkManager open the file directly
file, err := p.chunkManager.Reader(filePath)
if err != nil {
log.Error("imprort error: "+err.Error(), zap.String("filePath", filePath))
log.Error("import error: "+err.Error(), zap.String("filePath", filePath))
return err
}
defer file.Close()
// report file process state
p.importResult.State = commonpb.ImportState_ImportDownloaded
p.reportFunc(p.importResult)
// parse file
reader := bufio.NewReader(file)
parser := NewJSONParser(p.ctx, p.collectionSchema)
var consumer *JSONColumnConsumer
@ -214,24 +245,35 @@ func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate b
err = parser.ParseColumns(reader, validator)
if err != nil {
log.Error("imprort error: "+err.Error(), zap.String("filePath", filePath))
log.Error("import error: "+err.Error(), zap.String("filePath", filePath))
return err
}
// report file process state
p.importResult.State = commonpb.ImportState_ImportParsed
p.reportFunc(p.importResult)
return nil
}()
if err != nil {
log.Error("imprort error: "+err.Error(), zap.String("filePath", filePath))
log.Error("import error: "+err.Error(), zap.String("filePath", filePath))
return err
}
} else if fileType == NumpyFileExt {
// for minio storage, chunkManager will download file into local memory
// for local storage, chunkManager open the file directly
file, err := p.chunkManager.Reader(filePath)
if err != nil {
log.Error("imprort error: "+err.Error(), zap.String("filePath", filePath))
log.Error("import error: "+err.Error(), zap.String("filePath", filePath))
return err
}
defer file.Close()
// report file process state
p.importResult.State = commonpb.ImportState_ImportDownloaded
p.reportFunc(p.importResult)
var id storage.FieldID
for _, field := range p.collectionSchema.Fields {
if field.GetName() == fileName {
@ -251,21 +293,27 @@ func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate b
parser := NewNumpyParser(p.ctx, p.collectionSchema, flushFunc)
err = parser.Parse(file, fileName, onlyValidate)
if err != nil {
log.Error("imprort error: "+err.Error(), zap.String("filePath", filePath))
log.Error("import error: "+err.Error(), zap.String("filePath", filePath))
return err
}
// report file process state
p.importResult.State = commonpb.ImportState_ImportParsed
p.reportFunc(p.importResult)
}
}
// split fields data into segments
err := p.splitFieldsData(fieldsData, filePaths)
if err != nil {
log.Error("imprort error: " + err.Error())
log.Error("import error: " + err.Error())
return err
}
}
return nil
// report file process state
p.importResult.State = commonpb.ImportState_ImportPersisted
return p.reportFunc(p.importResult)
}
func (p *ImportWrapper) appendFunc(schema *schemapb.FieldSchema) func(src storage.FieldData, n int, target storage.FieldData) error {
@ -346,7 +394,7 @@ func (p *ImportWrapper) appendFunc(schema *schemapb.FieldSchema) func(src storag
func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.FieldData, files []string) error {
if len(fieldsData) == 0 {
return errors.New("imprort error: fields data is empty")
return errors.New("import error: fields data is empty")
}
var primaryKey *schemapb.FieldSchema
@ -357,37 +405,43 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F
} else {
_, ok := fieldsData[schema.GetFieldID()]
if !ok {
return errors.New("imprort error: field " + schema.GetName() + " not provided")
return errors.New("import error: field " + schema.GetName() + " not provided")
}
}
}
if primaryKey == nil {
return errors.New("imprort error: primary key field is not found")
return errors.New("import error: primary key field is not found")
}
rowCount := 0
for _, v := range fieldsData {
rowCount = v.RowNum()
break
if v.RowNum() > rowCount {
rowCount = v.RowNum()
}
}
primaryData, ok := fieldsData[primaryKey.GetFieldID()]
if !ok {
// generate auto id for primary key
if primaryKey.GetAutoID() {
var rowIDBegin typeutil.UniqueID
var rowIDEnd typeutil.UniqueID
rowIDBegin, rowIDEnd, _ = p.rowIDAllocator.Alloc(uint32(rowCount))
return errors.New("import error: primary key field is not provided")
}
primaryDataArr := primaryData.(*storage.Int64FieldData)
for i := rowIDBegin; i < rowIDEnd; i++ {
primaryDataArr.Data = append(primaryDataArr.Data, rowIDBegin+i)
}
// generate auto id for primary key
if primaryKey.GetAutoID() {
log.Info("import wrapper: generating auto-id", zap.Any("rowCount", rowCount))
var rowIDBegin typeutil.UniqueID
var rowIDEnd typeutil.UniqueID
rowIDBegin, rowIDEnd, _ = p.rowIDAllocator.Alloc(uint32(rowCount))
primaryDataArr := primaryData.(*storage.Int64FieldData)
for i := rowIDBegin; i < rowIDEnd; i++ {
primaryDataArr.Data = append(primaryDataArr.Data, rowIDBegin+i)
}
p.importResult.AutoIds = append(p.importResult.AutoIds, rowIDBegin, rowIDEnd)
}
if primaryData.RowNum() <= 0 {
return errors.New("imprort error: primary key " + primaryKey.GetName() + " not provided")
return errors.New("import error: primary key " + primaryKey.GetName() + " not provided")
}
// prepare segemnts
@ -406,7 +460,7 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F
schema := p.collectionSchema.Fields[i]
appendFunc := p.appendFunc(schema)
if appendFunc == nil {
return errors.New("imprort error: unsupported field data type")
return errors.New("import error: unsupported field data type")
}
appendFunctions[schema.GetName()] = appendFunc
}

View File

@ -14,6 +14,7 @@ import (
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/dependency"
@ -29,7 +30,8 @@ func Test_NewImportWrapper(t *testing.T) {
ctx := context.Background()
cm, err := f.NewVectorStorageChunkManager(ctx)
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, nil, 2, 1, nil, cm, nil)
wrapper := NewImportWrapper(ctx, nil, 2, 1, nil, cm, nil, nil, nil)
assert.Nil(t, wrapper)
schema := &schemapb.CollectionSchema{
@ -47,7 +49,7 @@ func Test_NewImportWrapper(t *testing.T) {
Description: "int64",
DataType: schemapb.DataType_Int64,
})
wrapper = NewImportWrapper(ctx, schema, 2, 1, nil, cm, nil)
wrapper = NewImportWrapper(ctx, schema, 2, 1, nil, cm, nil, nil, nil)
assert.NotNil(t, wrapper)
err = wrapper.Cancel()
@ -93,12 +95,27 @@ func Test_ImportRowBased(t *testing.T) {
}
// success case
wrapper := NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc)
importResult := &rootcoordpb.ImportResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
TaskId: 1,
DatanodeId: 1,
State: commonpb.ImportState_ImportStarted,
Segments: make([]int64, 0),
AutoIds: make([]int64, 0),
RowCount: 0,
}
reportFunc := func(res *rootcoordpb.ImportResult) error {
return nil
}
wrapper := NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc, importResult, reportFunc)
files := make([]string, 0)
files = append(files, filePath)
err = wrapper.Import(files, true, false)
assert.Nil(t, err)
assert.Equal(t, 5, rowCount)
assert.Equal(t, commonpb.ImportState_ImportPersisted, importResult.State)
// parse error
content = []byte(`{
@ -111,11 +128,13 @@ func Test_ImportRowBased(t *testing.T) {
err = cm.Write(filePath, content)
assert.NoError(t, err)
wrapper = NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc)
importResult.State = commonpb.ImportState_ImportStarted
wrapper = NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc, importResult, reportFunc)
files = make([]string, 0)
files = append(files, filePath)
err = wrapper.Import(files, true, false)
assert.NotNil(t, err)
assert.NotEqual(t, commonpb.ImportState_ImportPersisted, importResult.State)
// file doesn't exist
files = make([]string, 0)
@ -178,12 +197,27 @@ func Test_ImportColumnBased_json(t *testing.T) {
}
// success case
wrapper := NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc)
importResult := &rootcoordpb.ImportResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
TaskId: 1,
DatanodeId: 1,
State: commonpb.ImportState_ImportStarted,
Segments: make([]int64, 0),
AutoIds: make([]int64, 0),
RowCount: 0,
}
reportFunc := func(res *rootcoordpb.ImportResult) error {
return nil
}
wrapper := NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc, importResult, reportFunc)
files := make([]string, 0)
files = append(files, filePath)
err = wrapper.Import(files, false, false)
assert.Nil(t, err)
assert.Equal(t, 5, rowCount)
assert.Equal(t, commonpb.ImportState_ImportPersisted, importResult.State)
// parse error
content = []byte(`{
@ -194,11 +228,13 @@ func Test_ImportColumnBased_json(t *testing.T) {
err = cm.Write(filePath, content)
assert.NoError(t, err)
wrapper = NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc)
importResult.State = commonpb.ImportState_ImportStarted
wrapper = NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc, importResult, reportFunc)
files = make([]string, 0)
files = append(files, filePath)
err = wrapper.Import(files, false, false)
assert.NotNil(t, err)
assert.NotEqual(t, commonpb.ImportState_ImportPersisted, importResult.State)
// file doesn't exist
files = make([]string, 0)
@ -268,11 +304,26 @@ func Test_ImportColumnBased_numpy(t *testing.T) {
}
// success case
wrapper := NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc)
importResult := &rootcoordpb.ImportResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
TaskId: 1,
DatanodeId: 1,
State: commonpb.ImportState_ImportStarted,
Segments: make([]int64, 0),
AutoIds: make([]int64, 0),
RowCount: 0,
}
reportFunc := func(res *rootcoordpb.ImportResult) error {
return nil
}
wrapper := NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc, importResult, reportFunc)
err = wrapper.Import(files, false, false)
assert.Nil(t, err)
assert.Equal(t, 5, rowCount)
assert.Equal(t, commonpb.ImportState_ImportPersisted, importResult.State)
// parse error
content = []byte(`{
@ -283,11 +334,12 @@ func Test_ImportColumnBased_numpy(t *testing.T) {
err = cm.Write(filePath, content)
assert.NoError(t, err)
wrapper = NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc)
wrapper = NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, flushFunc, importResult, reportFunc)
files = make([]string, 0)
files = append(files, filePath)
err = wrapper.Import(files, false, false)
assert.NotNil(t, err)
assert.NotEqual(t, commonpb.ImportState_ImportPersisted, importResult.State)
// file doesn't exist
files = make([]string, 0)
@ -403,7 +455,21 @@ func Test_ImportRowBased_perf(t *testing.T) {
schema := perfSchema(dim)
wrapper := NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, flushFunc)
importResult := &rootcoordpb.ImportResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
TaskId: 1,
DatanodeId: 1,
State: commonpb.ImportState_ImportStarted,
Segments: make([]int64, 0),
AutoIds: make([]int64, 0),
RowCount: 0,
}
reportFunc := func(res *rootcoordpb.ImportResult) error {
return nil
}
wrapper := NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, flushFunc, importResult, reportFunc)
files := make([]string, 0)
files = append(files, filePath)
err = wrapper.Import(files, true, false)
@ -501,7 +567,21 @@ func Test_ImportColumnBased_perf(t *testing.T) {
schema := perfSchema(dim)
wrapper := NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, flushFunc)
importResult := &rootcoordpb.ImportResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
TaskId: 1,
DatanodeId: 1,
State: commonpb.ImportState_ImportStarted,
Segments: make([]int64, 0),
AutoIds: make([]int64, 0),
RowCount: 0,
}
reportFunc := func(res *rootcoordpb.ImportResult) error {
return nil
}
wrapper := NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, flushFunc, importResult, reportFunc)
files := make([]string, 0)
files = append(files, filePath1)
files = append(files, filePath2)

View File

@ -395,6 +395,7 @@ type JSONRowConsumer struct {
segmentsData []map[storage.FieldID]storage.FieldData // in-memory segments data
segmentSize int64 // maximum size of a segment(unit:byte)
primaryKey storage.FieldID // name of primary key
autoIDRange []int64 // auto-generated id range, for example: [1, 10, 20, 25] means id from 1 to 10 and 20 to 25
callFlushFunc ImportFlushFunc // call back function to flush segment
}
@ -482,6 +483,7 @@ func NewJSONRowConsumer(collectionSchema *schemapb.CollectionSchema, idAlloc *al
segmentSize: segmentSize,
rowCounter: 0,
primaryKey: -1,
autoIDRange: make([]int64, 0),
callFlushFunc: flushFunc,
}
@ -517,6 +519,10 @@ func NewJSONRowConsumer(collectionSchema *schemapb.CollectionSchema, idAlloc *al
return v
}
func (v *JSONRowConsumer) IDRange() []int64 {
return v.autoIDRange
}
func (v *JSONRowConsumer) flush(force bool) error {
// force flush all data
if force {
@ -580,6 +586,7 @@ func (v *JSONRowConsumer) Handle(rows []map[storage.FieldID]interface{}) error {
if rowIDEnd-rowIDBegin != int64(len(rows)) {
return errors.New("JSON row consumer: failed to allocate ID for " + strconv.Itoa(len(rows)) + " rows")
}
v.autoIDRange = append(v.autoIDRange, rowIDBegin, rowIDEnd)
}
// consume rows

View File

@ -103,7 +103,7 @@ func (p *NumpyParser) validate(adapter *NumpyAdapter, fieldName string) error {
// 1. field data type should be consist to numpy data type
// 2. vector field dimension should be consist to numpy shape
if schemapb.DataType_FloatVector == schema.DataType {
if elementType != schemapb.DataType_Float {
if elementType != schemapb.DataType_Float && elementType != schemapb.DataType_Double {
return errors.New("illegal data type " + adapter.GetType() + " for field " + schema.GetName())
}
@ -247,11 +247,32 @@ func (p *NumpyParser) consume(adapter *NumpyAdapter) error {
Dim: p.columnDesc.dimension,
}
case schemapb.DataType_FloatVector:
data, err := adapter.ReadFloat32(p.columnDesc.elementCount)
// for float vector, we support float32 and float64 numpy file because python float value is 64 bit
// for float64 numpy file, the performance is worse than float32 numpy file
// we don't check overflow here
elementType, err := convertNumpyType(adapter.GetType())
if err != nil {
return err
}
var data []float32
if elementType == schemapb.DataType_Float {
data, err = adapter.ReadFloat32(p.columnDesc.elementCount)
if err != nil {
return err
}
} else if elementType == schemapb.DataType_Double {
data = make([]float32, 0, p.columnDesc.elementCount)
data64, err := adapter.ReadFloat64(p.columnDesc.elementCount)
if err != nil {
return err
}
for _, f64 := range data64 {
data = append(data, float32(f64))
}
}
p.columnData = &storage.FloatVectorFieldData{
NumRows: []int64{int64(p.columnDesc.elementCount)},
Data: data,

View File

@ -442,7 +442,7 @@ func Test_Parse(t *testing.T) {
}
checkFunc(data8, "field_binary_vector", flushFunc)
// double vector
// double vector(element can be float32 or float64)
data9 := [][4]float32{{1.1, 2.1, 3.1, 4.1}, {5.2, 6.2, 7.2, 8.2}}
flushFunc = func(field storage.FieldData) error {
assert.NotNil(t, field)
@ -458,6 +458,22 @@ func Test_Parse(t *testing.T) {
return nil
}
checkFunc(data9, "field_float_vector", flushFunc)
data10 := [][4]float64{{1.1, 2.1, 3.1, 4.1}, {5.2, 6.2, 7.2, 8.2}}
flushFunc = func(field storage.FieldData) error {
assert.NotNil(t, field)
assert.Equal(t, len(data10), field.RowNum())
for i := 0; i < len(data10); i++ {
row := field.GetRow(i).([]float32)
for k := 0; k < len(row); k++ {
assert.Equal(t, float32(data10[i][k]), row[k])
}
}
return nil
}
checkFunc(data10, "field_float_vector", flushFunc)
}
func Test_Parse_perf(t *testing.T) {