mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: [2.5]Support import geometry data by json/csv (#44828)
issue: #44787 master pr: #44826 2.6 pr: #44827 --------- Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
e4b72977dd
commit
f27dfa4490
@ -744,14 +744,14 @@ func (data *GeometryFieldData) AppendRow(row interface{}) error {
|
||||
data.ValidData = append(data.ValidData, false)
|
||||
return nil
|
||||
}
|
||||
v, ok := row.([]byte)
|
||||
if !ok {
|
||||
switch v := row.(type) {
|
||||
case []byte:
|
||||
data.Data = append(data.Data, v)
|
||||
case string:
|
||||
data.Data = append(data.Data, []byte(v))
|
||||
default:
|
||||
return merr.WrapErrParameterInvalid("[]byte", row, "Wrong row type")
|
||||
}
|
||||
if data.GetNullable() {
|
||||
data.ValidData = append(data.ValidData, true)
|
||||
}
|
||||
data.Data = append(data.Data, v)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1030,11 +1030,16 @@ func (data *JSONFieldData) AppendDataRows(rows interface{}) error {
|
||||
}
|
||||
|
||||
func (data *GeometryFieldData) AppendDataRows(rows interface{}) error {
|
||||
v, ok := rows.([][]byte)
|
||||
if !ok {
|
||||
switch v := rows.(type) {
|
||||
case [][]byte:
|
||||
data.Data = append(data.Data, v...)
|
||||
case []string:
|
||||
for _, row := range v {
|
||||
data.Data = append(data.Data, []byte(row))
|
||||
}
|
||||
default:
|
||||
return merr.WrapErrParameterInvalid("[][]byte", rows, "Wrong rows type")
|
||||
}
|
||||
data.Data = append(data.Data, v...)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -23,6 +23,9 @@ import (
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
"github.com/twpayne/go-geom/encoding/wkb"
|
||||
"github.com/twpayne/go-geom/encoding/wkbcommon"
|
||||
"github.com/twpayne/go-geom/encoding/wkt"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/json"
|
||||
@ -285,6 +288,16 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
|
||||
return nil, err
|
||||
}
|
||||
return []byte(obj), nil
|
||||
case schemapb.DataType_Geometry:
|
||||
geomT, err := wkt.Unmarshal(obj)
|
||||
if err != nil {
|
||||
return nil, r.wrapTypeError(obj, field)
|
||||
}
|
||||
wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
|
||||
if err != nil {
|
||||
return nil, r.wrapTypeError(obj, field)
|
||||
}
|
||||
return wkbValue, nil
|
||||
case schemapb.DataType_FloatVector:
|
||||
if nullable && obj == r.nullkey {
|
||||
return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
|
||||
|
||||
@ -22,6 +22,9 @@ import (
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
"github.com/twpayne/go-geom/encoding/wkb"
|
||||
"github.com/twpayne/go-geom/encoding/wkbcommon"
|
||||
"github.com/twpayne/go-geom/encoding/wkt"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/json"
|
||||
@ -419,6 +422,22 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) {
|
||||
} else {
|
||||
return nil, r.wrapTypeError(obj, fieldID)
|
||||
}
|
||||
case schemapb.DataType_Geometry:
|
||||
wktValue, ok := obj.(string)
|
||||
if !ok {
|
||||
return nil, r.wrapTypeError(obj, fieldID)
|
||||
}
|
||||
|
||||
geomT, err := wkt.Unmarshal(wktValue)
|
||||
if err != nil {
|
||||
return nil, r.wrapTypeError(wktValue, fieldID)
|
||||
}
|
||||
wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
|
||||
if err != nil {
|
||||
return nil, r.wrapTypeError(wktValue, fieldID)
|
||||
}
|
||||
|
||||
return wkbValue, nil
|
||||
case schemapb.DataType_Array:
|
||||
arr, ok := obj.([]interface{})
|
||||
if !ok {
|
||||
|
||||
@ -25,6 +25,9 @@ import (
|
||||
"github.com/apache/arrow/go/v17/parquet/pqarrow"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
"github.com/twpayne/go-geom/encoding/wkb"
|
||||
"github.com/twpayne/go-geom/encoding/wkbcommon"
|
||||
"github.com/twpayne/go-geom/encoding/wkt"
|
||||
"golang.org/x/exp/constraints"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
@ -719,15 +722,23 @@ func ReadNullableGeometryData(pcr *FieldReader, count int64) (any, []bool, error
|
||||
if data == nil {
|
||||
return nil, nil, nil
|
||||
}
|
||||
byteArr := make([][]byte, 0)
|
||||
for i, str := range data.([]string) {
|
||||
wkbValues := make([][]byte, 0)
|
||||
for i, wktValue := range data.([]string) {
|
||||
if !validData[i] {
|
||||
byteArr = append(byteArr, []byte(nil))
|
||||
wkbValues = append(wkbValues, []byte(nil))
|
||||
continue
|
||||
}
|
||||
byteArr = append(byteArr, []byte(str))
|
||||
geomT, err := wkt.Unmarshal(wktValue)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
wkbValues = append(wkbValues, wkbValue)
|
||||
}
|
||||
return byteArr, validData, nil
|
||||
return wkbValues, validData, nil
|
||||
}
|
||||
|
||||
func ReadGeometryData(pcr *FieldReader, count int64) (any, error) {
|
||||
@ -739,11 +750,20 @@ func ReadGeometryData(pcr *FieldReader, count int64) (any, error) {
|
||||
if data == nil {
|
||||
return nil, nil
|
||||
}
|
||||
byteArr := make([][]byte, 0)
|
||||
for _, str := range data.([]string) {
|
||||
byteArr = append(byteArr, []byte(str))
|
||||
|
||||
wkbValues := make([][]byte, 0)
|
||||
for _, wktValue := range data.([]string) {
|
||||
geomT, err := wkt.Unmarshal(wktValue)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
wkbValues = append(wkbValues, wkbValue)
|
||||
}
|
||||
return byteArr, nil
|
||||
return wkbValues, nil
|
||||
}
|
||||
|
||||
func ReadBinaryData(pcr *FieldReader, count int64) (any, error) {
|
||||
|
||||
@ -31,6 +31,9 @@ import (
|
||||
"github.com/apache/arrow/go/v17/parquet/pqarrow"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/twpayne/go-geom/encoding/wkb"
|
||||
"github.com/twpayne/go-geom/encoding/wkbcommon"
|
||||
"github.com/twpayne/go-geom/encoding/wkt"
|
||||
"golang.org/x/exp/slices"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
@ -216,6 +219,17 @@ func (s *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType
|
||||
default:
|
||||
s.Fail("unsupported array element type")
|
||||
}
|
||||
} else if fieldDataType == schemapb.DataType_Geometry && expect != nil {
|
||||
expectData := expect.([]byte)
|
||||
geomT, err := wkt.Unmarshal(string(expectData))
|
||||
if err != nil {
|
||||
s.Fail("unmarshal wkt failed")
|
||||
}
|
||||
wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
|
||||
if err != nil {
|
||||
s.Fail("marshal wkb failed")
|
||||
}
|
||||
s.Equal(wkbValue, actual.([]byte))
|
||||
} else {
|
||||
s.Equal(expect, actual)
|
||||
}
|
||||
|
||||
@ -177,8 +177,8 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent .
|
||||
case schemapb.DataType_JSON:
|
||||
insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateJSONArray(rows))
|
||||
case schemapb.DataType_Geometry:
|
||||
// wkb bytes array
|
||||
insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateGeometryArray(rows))
|
||||
// wkt array
|
||||
insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateGeometryWktArray(rows))
|
||||
case schemapb.DataType_Array:
|
||||
switch f.GetElementType() {
|
||||
case schemapb.DataType_Bool:
|
||||
@ -533,9 +533,9 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser
|
||||
columns = append(columns, builder.NewStringArray())
|
||||
case schemapb.DataType_Geometry:
|
||||
builder := array.NewStringBuilder(mem)
|
||||
wkbData := insertData.Data[fieldID].(*storage.GeometryFieldData).Data
|
||||
wktData := insertData.Data[fieldID].(*storage.GeometryFieldData).Data
|
||||
validData := insertData.Data[fieldID].(*storage.GeometryFieldData).ValidData
|
||||
builder.AppendValues(lo.Map(wkbData, func(bs []byte, _ int) string {
|
||||
builder.AppendValues(lo.Map(wktData, func(bs []byte, _ int) string {
|
||||
return string(bs)
|
||||
}), validData)
|
||||
columns = append(columns, builder.NewStringArray())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user