// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package importutil import ( "context" "errors" "fmt" "path" "runtime/debug" "strconv" "strings" "go.uber.org/zap" "go.uber.org/zap/zapcore" "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/storage" ) func isCanceled(ctx context.Context) bool { // canceled? select { case <-ctx.Done(): return true default: break } return false } func initSegmentData(collectionSchema *schemapb.CollectionSchema) map[storage.FieldID]storage.FieldData { segmentData := make(map[storage.FieldID]storage.FieldData) // rowID field is a hidden field with fieldID=0, it is always auto-generated by IDAllocator // if primary key is int64 and autoID=true, primary key field is equal to rowID field segmentData[common.RowIDField] = &storage.Int64FieldData{ Data: make([]int64, 0), NumRows: []int64{0}, } for i := 0; i < len(collectionSchema.Fields); i++ { schema := collectionSchema.Fields[i] switch schema.DataType { case schemapb.DataType_Bool: segmentData[schema.GetFieldID()] = &storage.BoolFieldData{ Data: make([]bool, 0), NumRows: []int64{0}, } case schemapb.DataType_Float: segmentData[schema.GetFieldID()] = &storage.FloatFieldData{ Data: make([]float32, 0), NumRows: []int64{0}, } case schemapb.DataType_Double: segmentData[schema.GetFieldID()] = &storage.DoubleFieldData{ Data: make([]float64, 0), NumRows: []int64{0}, } case schemapb.DataType_Int8: segmentData[schema.GetFieldID()] = &storage.Int8FieldData{ Data: make([]int8, 0), NumRows: []int64{0}, } case schemapb.DataType_Int16: segmentData[schema.GetFieldID()] = &storage.Int16FieldData{ Data: make([]int16, 0), NumRows: []int64{0}, } case schemapb.DataType_Int32: segmentData[schema.GetFieldID()] = &storage.Int32FieldData{ Data: make([]int32, 0), NumRows: []int64{0}, } case schemapb.DataType_Int64: segmentData[schema.GetFieldID()] = &storage.Int64FieldData{ Data: make([]int64, 0), NumRows: []int64{0}, } case schemapb.DataType_BinaryVector: dim, _ := getFieldDimension(schema) segmentData[schema.GetFieldID()] = &storage.BinaryVectorFieldData{ Data: make([]byte, 0), NumRows: []int64{0}, Dim: dim, } case schemapb.DataType_FloatVector: dim, _ := getFieldDimension(schema) segmentData[schema.GetFieldID()] = &storage.FloatVectorFieldData{ Data: make([]float32, 0), NumRows: []int64{0}, Dim: dim, } case schemapb.DataType_String, schemapb.DataType_VarChar: segmentData[schema.GetFieldID()] = &storage.StringFieldData{ Data: make([]string, 0), NumRows: []int64{0}, } default: log.Error("Import util: unsupported data type", zap.Int("DataType", int(schema.DataType))) return nil } } return segmentData } // initValidators constructs valiator methods and data conversion methods func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[storage.FieldID]*Validator) error { if collectionSchema == nil { return errors.New("collection schema is nil") } // json decoder parse all the numeric value into float64 numericValidator := func(obj interface{}) error { switch obj.(type) { case float64: return nil default: s := fmt.Sprintf("%v", obj) msg := "illegal numeric value " + s return errors.New(msg) } } for i := 0; i < len(collectionSchema.Fields); i++ { schema := collectionSchema.Fields[i] validators[schema.GetFieldID()] = &Validator{} validators[schema.GetFieldID()].primaryKey = schema.GetIsPrimaryKey() validators[schema.GetFieldID()].autoID = schema.GetAutoID() validators[schema.GetFieldID()].fieldName = schema.GetName() validators[schema.GetFieldID()].isString = false switch schema.DataType { case schemapb.DataType_Bool: validators[schema.GetFieldID()].validateFunc = func(obj interface{}) error { switch obj.(type) { case bool: return nil default: s := fmt.Sprintf("%v", obj) msg := "illegal value " + s + " for bool type field " + schema.GetName() return errors.New(msg) } } validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { value := obj.(bool) field.(*storage.BoolFieldData).Data = append(field.(*storage.BoolFieldData).Data, value) field.(*storage.BoolFieldData).NumRows[0]++ return nil } case schemapb.DataType_Float: validators[schema.GetFieldID()].validateFunc = numericValidator validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { value := float32(obj.(float64)) field.(*storage.FloatFieldData).Data = append(field.(*storage.FloatFieldData).Data, value) field.(*storage.FloatFieldData).NumRows[0]++ return nil } case schemapb.DataType_Double: validators[schema.GetFieldID()].validateFunc = numericValidator validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { value := obj.(float64) field.(*storage.DoubleFieldData).Data = append(field.(*storage.DoubleFieldData).Data, value) field.(*storage.DoubleFieldData).NumRows[0]++ return nil } case schemapb.DataType_Int8: validators[schema.GetFieldID()].validateFunc = numericValidator validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { value := int8(obj.(float64)) field.(*storage.Int8FieldData).Data = append(field.(*storage.Int8FieldData).Data, value) field.(*storage.Int8FieldData).NumRows[0]++ return nil } case schemapb.DataType_Int16: validators[schema.GetFieldID()].validateFunc = numericValidator validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { value := int16(obj.(float64)) field.(*storage.Int16FieldData).Data = append(field.(*storage.Int16FieldData).Data, value) field.(*storage.Int16FieldData).NumRows[0]++ return nil } case schemapb.DataType_Int32: validators[schema.GetFieldID()].validateFunc = numericValidator validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { value := int32(obj.(float64)) field.(*storage.Int32FieldData).Data = append(field.(*storage.Int32FieldData).Data, value) field.(*storage.Int32FieldData).NumRows[0]++ return nil } case schemapb.DataType_Int64: validators[schema.GetFieldID()].validateFunc = numericValidator validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { value := int64(obj.(float64)) field.(*storage.Int64FieldData).Data = append(field.(*storage.Int64FieldData).Data, value) field.(*storage.Int64FieldData).NumRows[0]++ return nil } case schemapb.DataType_BinaryVector: dim, err := getFieldDimension(schema) if err != nil { return err } validators[schema.GetFieldID()].dimension = dim validators[schema.GetFieldID()].validateFunc = func(obj interface{}) error { switch vt := obj.(type) { case []interface{}: if len(vt)*8 != dim { msg := "bit size " + strconv.Itoa(len(vt)*8) + " doesn't equal to vector dimension " + strconv.Itoa(dim) + " of field " + schema.GetName() return errors.New(msg) } for i := 0; i < len(vt); i++ { if e := numericValidator(vt[i]); e != nil { msg := e.Error() + " for binary vector field " + schema.GetName() return errors.New(msg) } t := int(vt[i].(float64)) if t > 255 || t < 0 { msg := "illegal value " + strconv.Itoa(t) + " for binary vector field " + schema.GetName() return errors.New(msg) } } return nil default: s := fmt.Sprintf("%v", obj) msg := s + " is not an array for binary vector field " + schema.GetName() return errors.New(msg) } } validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { arr := obj.([]interface{}) for i := 0; i < len(arr); i++ { value := byte(arr[i].(float64)) field.(*storage.BinaryVectorFieldData).Data = append(field.(*storage.BinaryVectorFieldData).Data, value) } field.(*storage.BinaryVectorFieldData).NumRows[0]++ return nil } case schemapb.DataType_FloatVector: dim, err := getFieldDimension(schema) if err != nil { return err } validators[schema.GetFieldID()].dimension = dim validators[schema.GetFieldID()].validateFunc = func(obj interface{}) error { switch vt := obj.(type) { case []interface{}: if len(vt) != dim { msg := "array size " + strconv.Itoa(len(vt)) + " doesn't equal to vector dimension " + strconv.Itoa(dim) + " of field " + schema.GetName() return errors.New(msg) } for i := 0; i < len(vt); i++ { if e := numericValidator(vt[i]); e != nil { msg := e.Error() + " for float vector field " + schema.GetName() return errors.New(msg) } } return nil default: s := fmt.Sprintf("%v", obj) msg := s + " is not an array for float vector field " + schema.GetName() return errors.New(msg) } } validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { arr := obj.([]interface{}) for i := 0; i < len(arr); i++ { value := float32(arr[i].(float64)) field.(*storage.FloatVectorFieldData).Data = append(field.(*storage.FloatVectorFieldData).Data, value) } field.(*storage.FloatVectorFieldData).NumRows[0]++ return nil } case schemapb.DataType_String, schemapb.DataType_VarChar: validators[schema.GetFieldID()].isString = true validators[schema.GetFieldID()].validateFunc = func(obj interface{}) error { switch obj.(type) { case string: return nil default: s := fmt.Sprintf("%v", obj) msg := s + " is not a string for string type field " + schema.GetName() return errors.New(msg) } } validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { value := obj.(string) field.(*storage.StringFieldData).Data = append(field.(*storage.StringFieldData).Data, value) field.(*storage.StringFieldData).NumRows[0]++ return nil } default: return errors.New("unsupport data type: " + strconv.Itoa(int(collectionSchema.Fields[i].DataType))) } } return nil } func printFieldsDataInfo(fieldsData map[storage.FieldID]storage.FieldData, msg string, files []string) { stats := make([]zapcore.Field, 0) for k, v := range fieldsData { stats = append(stats, zap.Int(strconv.FormatInt(k, 10), v.RowNum())) } if len(files) > 0 { stats = append(stats, zap.Any("files", files)) } log.Info(msg, stats...) } // GetFileNameAndExt extracts file name and extension // for example: "/a/b/c.ttt" returns "c" and ".ttt" func GetFileNameAndExt(filePath string) (string, string) { fileName := path.Base(filePath) fileType := path.Ext(fileName) fileNameWithoutExt := strings.TrimSuffix(fileName, fileType) return fileNameWithoutExt, fileType } // getFieldDimension gets dimension of vecotor field func getFieldDimension(schema *schemapb.FieldSchema) (int, error) { for _, kvPair := range schema.GetTypeParams() { key, value := kvPair.GetKey(), kvPair.GetValue() if key == "dim" { dim, err := strconv.Atoi(value) if err != nil { return 0, errors.New("vector dimension is invalid") } return dim, nil } } return 0, errors.New("vector dimension is not defined") } // triggerGC triggers golang gc to return all free memory back to the underlying system at once, // Note: this operation is expensive, and can lead to latency spikes as it holds the heap lock through the whole process func triggerGC() { debug.FreeOSMemory() } // tryFlushBlocks does the two things: // 1. if accumulate data of a block exceed blockSize, call callFlushFunc to generate new binlog file // 2. if total accumulate data exceed maxTotalSize, call callFlushFUnc to flush the biggest block func tryFlushBlocks(ctx context.Context, blocksData []map[storage.FieldID]storage.FieldData, collectionSchema *schemapb.CollectionSchema, callFlushFunc ImportFlushFunc, blockSize int64, maxTotalSize int64, force bool) error { totalSize := 0 biggestSize := 0 biggestItem := -1 // 1. if accumulate data of a block exceed blockSize, call callFlushFunc to generate new binlog file for i := 0; i < len(blocksData); i++ { // outside context might be canceled(service stop, or future enhancement for canceling import task) if isCanceled(ctx) { log.Error("Import util: import task was canceled") return errors.New("import task was canceled") } blockData := blocksData[i] // Note: even rowCount is 0, the size is still non-zero size := 0 rowCount := 0 for _, fieldData := range blockData { size += fieldData.GetMemorySize() rowCount = fieldData.RowNum() } // force to flush, called at the end of Read() if force && rowCount > 0 { printFieldsDataInfo(blockData, "import util: prepare to force flush a block", nil) err := callFlushFunc(blockData, i) if err != nil { log.Error("Import util: failed to force flush block data", zap.Int("shardID", i)) return err } log.Info("Import util: force flush", zap.Int("rowCount", rowCount), zap.Int("size", size), zap.Int("shardID", i)) blocksData[i] = initSegmentData(collectionSchema) if blocksData[i] == nil { log.Error("Import util: failed to initialize FieldData list") return errors.New("failed to initialize FieldData list") } continue } // if segment size is larger than predefined blockSize, flush to create a new binlog file // initialize a new FieldData list for next round batch read if size > int(blockSize) && rowCount > 0 { printFieldsDataInfo(blockData, "import util: prepare to flush block larger than maxBlockSize", nil) err := callFlushFunc(blockData, i) if err != nil { log.Error("Import util: failed to flush block data", zap.Int("shardID", i)) return err } log.Info("Import util: block size exceed limit and flush", zap.Int("rowCount", rowCount), zap.Int("size", size), zap.Int("shardID", i)) blocksData[i] = initSegmentData(collectionSchema) if blocksData[i] == nil { log.Error("Import util: failed to initialize FieldData list") return errors.New("failed to initialize FieldData list") } continue } // calculate the total size(ignore the flushed blocks) // find out the biggest block for the step 2 totalSize += size if size > biggestSize { biggestSize = size biggestItem = i } } // 2. if total accumulate data exceed maxTotalSize, call callFlushFUnc to flush the biggest block if totalSize > int(maxTotalSize) && biggestItem >= 0 { // outside context might be canceled(service stop, or future enhancement for canceling import task) if isCanceled(ctx) { log.Error("Import util: import task was canceled") return errors.New("import task was canceled") } blockData := blocksData[biggestItem] // Note: even rowCount is 0, the size is still non-zero size := 0 rowCount := 0 for _, fieldData := range blockData { size += fieldData.GetMemorySize() rowCount = fieldData.RowNum() } if rowCount > 0 { printFieldsDataInfo(blockData, "import util: prepare to flush biggest block", nil) err := callFlushFunc(blockData, biggestItem) if err != nil { log.Error("Import util: failed to flush biggest block data", zap.Int("shardID", biggestItem)) return err } log.Info("Import util: total size exceed limit and flush", zap.Int("rowCount", rowCount), zap.Int("size", size), zap.Int("totalSize", totalSize), zap.Int("shardID", biggestItem)) blocksData[biggestItem] = initSegmentData(collectionSchema) if blocksData[biggestItem] == nil { log.Error("Import util: failed to initialize FieldData list") return errors.New("failed to initialize FieldData list") } } } return nil } func getTypeName(dt schemapb.DataType) string { switch dt { case schemapb.DataType_Bool: return "Bool" case schemapb.DataType_Int8: return "Int8" case schemapb.DataType_Int16: return "Int16" case schemapb.DataType_Int32: return "Int32" case schemapb.DataType_Int64: return "Int64" case schemapb.DataType_Float: return "Float" case schemapb.DataType_Double: return "Double" case schemapb.DataType_VarChar: return "Varchar" case schemapb.DataType_String: return "String" case schemapb.DataType_BinaryVector: return "BinaryVector" case schemapb.DataType_FloatVector: return "FloatVector" default: return "InvalidType" } }