mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
ref: https://github.com/milvus-io/milvus/issues/42148 --------- Signed-off-by: SpadeA <tangchenjie1210@gmail.com> Signed-off-by: SpadeA-Tang <tangchenjie1210@gmail.com>
435 lines
12 KiB
Go
435 lines
12 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package storage
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/apache/arrow/go/v17/arrow"
|
|
"github.com/apache/arrow/go/v17/arrow/array"
|
|
"github.com/apache/arrow/go/v17/arrow/memory"
|
|
"github.com/samber/lo"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
|
)
|
|
|
|
func appendValueAt(builder array.Builder, a arrow.Array, idx int, defaultValue *schemapb.ValueField) (uint64, error) {
|
|
switch b := builder.(type) {
|
|
case *array.BooleanBuilder:
|
|
if a == nil {
|
|
if defaultValue != nil {
|
|
b.Append(defaultValue.GetBoolData())
|
|
return 1, nil
|
|
} else {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
}
|
|
}
|
|
ba, ok := a.(*array.Boolean)
|
|
if !ok {
|
|
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
|
}
|
|
if ba.IsNull(idx) {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
} else {
|
|
b.Append(ba.Value(idx))
|
|
return 1, nil
|
|
}
|
|
case *array.Int8Builder:
|
|
if a == nil {
|
|
if defaultValue != nil {
|
|
b.Append(int8(defaultValue.GetIntData()))
|
|
return 1, nil
|
|
} else {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
}
|
|
}
|
|
ia, ok := a.(*array.Int8)
|
|
if !ok {
|
|
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
|
}
|
|
if ia.IsNull(idx) {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
} else {
|
|
b.Append(ia.Value(idx))
|
|
return 1, nil
|
|
}
|
|
case *array.Int16Builder:
|
|
if a == nil {
|
|
if defaultValue != nil {
|
|
b.Append(int16(defaultValue.GetIntData()))
|
|
return 2, nil
|
|
} else {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
}
|
|
}
|
|
ia, ok := a.(*array.Int16)
|
|
if !ok {
|
|
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
|
}
|
|
if ia.IsNull(idx) {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
} else {
|
|
b.Append(ia.Value(idx))
|
|
return 2, nil
|
|
}
|
|
case *array.Int32Builder:
|
|
if a == nil {
|
|
if defaultValue != nil {
|
|
b.Append(defaultValue.GetIntData())
|
|
return 4, nil
|
|
} else {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
}
|
|
}
|
|
ia, ok := a.(*array.Int32)
|
|
if !ok {
|
|
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
|
}
|
|
if ia.IsNull(idx) {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
} else {
|
|
b.Append(ia.Value(idx))
|
|
return 4, nil
|
|
}
|
|
case *array.Int64Builder:
|
|
if a == nil {
|
|
if defaultValue != nil {
|
|
b.Append(defaultValue.GetLongData())
|
|
return 8, nil
|
|
} else {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
}
|
|
}
|
|
ia, ok := a.(*array.Int64)
|
|
if !ok {
|
|
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
|
}
|
|
if ia.IsNull(idx) {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
} else {
|
|
b.Append(ia.Value(idx))
|
|
return 8, nil
|
|
}
|
|
case *array.Float32Builder:
|
|
if a == nil {
|
|
if defaultValue != nil {
|
|
b.Append(defaultValue.GetFloatData())
|
|
return 4, nil
|
|
} else {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
}
|
|
}
|
|
fa, ok := a.(*array.Float32)
|
|
if !ok {
|
|
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
|
}
|
|
if fa.IsNull(idx) {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
} else {
|
|
b.Append(fa.Value(idx))
|
|
return 4, nil
|
|
}
|
|
case *array.Float64Builder:
|
|
if a == nil {
|
|
if defaultValue != nil {
|
|
b.Append(defaultValue.GetDoubleData())
|
|
return 8, nil
|
|
} else {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
}
|
|
}
|
|
fa, ok := a.(*array.Float64)
|
|
if !ok {
|
|
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
|
}
|
|
if fa.IsNull(idx) {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
} else {
|
|
b.Append(fa.Value(idx))
|
|
return 8, nil
|
|
}
|
|
case *array.StringBuilder:
|
|
if a == nil {
|
|
if defaultValue != nil {
|
|
val := defaultValue.GetStringData()
|
|
b.Append(val)
|
|
return uint64(len(val)), nil
|
|
} else {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
}
|
|
}
|
|
sa, ok := a.(*array.String)
|
|
if !ok {
|
|
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
|
}
|
|
if sa.IsNull(idx) {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
} else {
|
|
val := sa.Value(idx)
|
|
b.Append(val)
|
|
return uint64(len(val)), nil
|
|
}
|
|
case *array.BinaryBuilder:
|
|
// array type, not support defaultValue now
|
|
if a == nil {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
}
|
|
ba, ok := a.(*array.Binary)
|
|
if !ok {
|
|
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
|
}
|
|
if ba.IsNull(idx) {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
} else {
|
|
val := ba.Value(idx)
|
|
b.Append(val)
|
|
return uint64(len(val)), nil
|
|
}
|
|
case *array.FixedSizeBinaryBuilder:
|
|
ba, ok := a.(*array.FixedSizeBinary)
|
|
if !ok {
|
|
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
|
}
|
|
if ba.IsNull(idx) {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
} else {
|
|
val := ba.Value(idx)
|
|
b.Append(val)
|
|
return uint64(len(val)), nil
|
|
}
|
|
case *array.ListBuilder:
|
|
// Handle ListBuilder for ArrayOfVector type
|
|
if a == nil {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
}
|
|
la, ok := a.(*array.List)
|
|
if !ok {
|
|
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
|
}
|
|
if la.IsNull(idx) {
|
|
b.AppendNull()
|
|
return 0, nil
|
|
}
|
|
|
|
start, end := la.ValueOffsets(idx)
|
|
b.Append(true)
|
|
|
|
valuesArray := la.ListValues()
|
|
var totalSize uint64 = 0
|
|
valueBuilder := b.ValueBuilder()
|
|
switch vb := valueBuilder.(type) {
|
|
case *array.FixedSizeBinaryBuilder:
|
|
fixedArray, ok := valuesArray.(*array.FixedSizeBinary)
|
|
if !ok {
|
|
return 0, fmt.Errorf("invalid value type %T, expect %T", valuesArray.DataType(), vb.Type())
|
|
}
|
|
for i := start; i < end; i++ {
|
|
val := fixedArray.Value(int(i))
|
|
vb.Append(val)
|
|
totalSize += uint64(len(val))
|
|
}
|
|
default:
|
|
return 0, fmt.Errorf("unsupported value builder type in ListBuilder: %T", valueBuilder)
|
|
}
|
|
|
|
return totalSize, nil
|
|
default:
|
|
return 0, fmt.Errorf("unsupported builder type: %T", builder)
|
|
}
|
|
}
|
|
|
|
// GenerateEmptyArrayFromSchema generate empty array from schema
|
|
// If schema has default value, the array will bef filled with it.
|
|
// Otherwise, null will be used instead.
|
|
// If input schema is not nullable, an error will be returned.
|
|
func GenerateEmptyArrayFromSchema(schema *schemapb.FieldSchema, numRows int) (arrow.Array, error) {
|
|
// if not nullable, return error
|
|
if !schema.GetNullable() {
|
|
return nil, merr.WrapErrServiceInternal(fmt.Sprintf("missing field data %s", schema.Name))
|
|
}
|
|
dim, _ := typeutil.GetDim(schema)
|
|
|
|
elementType := schemapb.DataType_None
|
|
if schema.GetDataType() == schemapb.DataType_ArrayOfVector {
|
|
elementType = schema.GetElementType()
|
|
}
|
|
builder := array.NewBuilder(memory.DefaultAllocator, serdeMap[schema.GetDataType()].arrowType(int(dim), elementType)) // serdeEntry[schema.GetDataType()].newBuilder()
|
|
if schema.GetDefaultValue() != nil {
|
|
switch schema.GetDataType() {
|
|
case schemapb.DataType_Bool:
|
|
bd := builder.(*array.BooleanBuilder)
|
|
bd.AppendValues(
|
|
lo.RepeatBy(numRows, func(_ int) bool { return schema.GetDefaultValue().GetBoolData() }),
|
|
nil)
|
|
case schemapb.DataType_Int8:
|
|
bd := builder.(*array.Int8Builder)
|
|
bd.AppendValues(
|
|
lo.RepeatBy(numRows, func(_ int) int8 { return int8(schema.GetDefaultValue().GetIntData()) }),
|
|
nil)
|
|
case schemapb.DataType_Int16:
|
|
bd := builder.(*array.Int16Builder)
|
|
bd.AppendValues(
|
|
lo.RepeatBy(numRows, func(_ int) int16 { return int16(schema.GetDefaultValue().GetIntData()) }),
|
|
nil)
|
|
case schemapb.DataType_Int32:
|
|
bd := builder.(*array.Int32Builder)
|
|
bd.AppendValues(
|
|
lo.RepeatBy(numRows, func(_ int) int32 { return schema.GetDefaultValue().GetIntData() }),
|
|
nil)
|
|
case schemapb.DataType_Int64:
|
|
bd := builder.(*array.Int64Builder)
|
|
bd.AppendValues(
|
|
lo.RepeatBy(numRows, func(_ int) int64 { return schema.GetDefaultValue().GetLongData() }),
|
|
nil)
|
|
case schemapb.DataType_Float:
|
|
bd := builder.(*array.Float32Builder)
|
|
bd.AppendValues(
|
|
lo.RepeatBy(numRows, func(_ int) float32 { return schema.GetDefaultValue().GetFloatData() }),
|
|
nil)
|
|
case schemapb.DataType_Double:
|
|
bd := builder.(*array.Float64Builder)
|
|
bd.AppendValues(
|
|
lo.RepeatBy(numRows, func(_ int) float64 { return schema.GetDefaultValue().GetDoubleData() }),
|
|
nil)
|
|
|
|
case schemapb.DataType_Timestamptz:
|
|
bd := builder.(*array.Int64Builder)
|
|
bd.AppendValues(
|
|
lo.RepeatBy(numRows, func(_ int) int64 { return schema.GetDefaultValue().GetTimestamptzData() }),
|
|
nil)
|
|
|
|
case schemapb.DataType_VarChar, schemapb.DataType_String:
|
|
bd := builder.(*array.StringBuilder)
|
|
bd.AppendValues(
|
|
lo.RepeatBy(numRows, func(_ int) string { return schema.GetDefaultValue().GetStringData() }),
|
|
nil)
|
|
default:
|
|
return nil, merr.WrapErrServiceInternal(fmt.Sprintf("Unexpected default value type: %s", schema.GetDataType().String()))
|
|
}
|
|
} else {
|
|
builder.AppendNulls(numRows)
|
|
}
|
|
|
|
return builder.NewArray(), nil
|
|
}
|
|
|
|
// RecordBuilder is a helper to build arrow record.
|
|
// Due to current arrow impl (v12), the write performance is largely dependent on the batch size,
|
|
// small batch size will cause write performance degradation. To work around this issue, we accumulate
|
|
// records and write them in batches. This requires additional memory copy.
|
|
type RecordBuilder struct {
|
|
fields []*schemapb.FieldSchema
|
|
builders []array.Builder
|
|
|
|
nRows int
|
|
size uint64
|
|
}
|
|
|
|
func (b *RecordBuilder) Append(rec Record, start, end int) error {
|
|
for offset := start; offset < end; offset++ {
|
|
for i, builder := range b.builders {
|
|
f := b.fields[i]
|
|
col := rec.Column(f.FieldID)
|
|
size, err := appendValueAt(builder, col, offset, f.GetDefaultValue())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to append value at offset %d for field %s: %w", offset, f.GetName(), err)
|
|
}
|
|
b.size += size
|
|
}
|
|
}
|
|
b.nRows += (end - start)
|
|
return nil
|
|
}
|
|
|
|
func (b *RecordBuilder) GetRowNum() int {
|
|
return b.nRows
|
|
}
|
|
|
|
func (b *RecordBuilder) GetSize() uint64 {
|
|
return b.size
|
|
}
|
|
|
|
func (b *RecordBuilder) Build() Record {
|
|
arrays := make([]arrow.Array, len(b.builders))
|
|
fields := make([]arrow.Field, len(b.builders))
|
|
field2Col := make(map[FieldID]int, len(b.builders))
|
|
for c, builder := range b.builders {
|
|
arrays[c] = builder.NewArray()
|
|
f := b.fields[c]
|
|
fid := f.FieldID
|
|
fields[c] = arrow.Field{
|
|
Name: f.GetName(),
|
|
Type: arrays[c].DataType(),
|
|
Nullable: f.Nullable,
|
|
}
|
|
field2Col[fid] = c
|
|
}
|
|
|
|
rec := NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(b.nRows)), field2Col)
|
|
b.nRows = 0
|
|
b.size = 0
|
|
return rec
|
|
}
|
|
|
|
func NewRecordBuilder(schema *schemapb.CollectionSchema) *RecordBuilder {
|
|
// assumes 5 sub fields per StructArrayField
|
|
fields := make([]*schemapb.FieldSchema, 0, len(schema.Fields)+len(schema.StructArrayFields)*5)
|
|
fields = append(fields, schema.Fields...)
|
|
for _, sf := range schema.StructArrayFields {
|
|
fields = append(fields, sf.Fields...)
|
|
}
|
|
|
|
builders := make([]array.Builder, len(fields))
|
|
for i, field := range fields {
|
|
dim, _ := typeutil.GetDim(field)
|
|
|
|
elementType := schemapb.DataType_None
|
|
if field.DataType == schemapb.DataType_ArrayOfVector {
|
|
elementType = field.GetElementType()
|
|
}
|
|
arrowType := serdeMap[field.DataType].arrowType(int(dim), elementType)
|
|
builders[i] = array.NewBuilder(memory.DefaultAllocator, arrowType)
|
|
}
|
|
|
|
return &RecordBuilder{
|
|
fields: fields,
|
|
builders: builders,
|
|
}
|
|
}
|