mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: Support import geometry data by json/csv (#44826)
issue: #44787 --------- Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
b7935557e1
commit
d5ecb63f53
@ -847,14 +847,14 @@ func (data *GeometryFieldData) AppendRow(row interface{}) error {
|
||||
data.ValidData = append(data.ValidData, false)
|
||||
return nil
|
||||
}
|
||||
v, ok := row.([]byte)
|
||||
if !ok {
|
||||
switch v := row.(type) {
|
||||
case []byte:
|
||||
data.Data = append(data.Data, v)
|
||||
case string:
|
||||
data.Data = append(data.Data, []byte(v))
|
||||
default:
|
||||
return merr.WrapErrParameterInvalid("[]byte", row, "Wrong row type")
|
||||
}
|
||||
if data.GetNullable() {
|
||||
data.ValidData = append(data.ValidData, true)
|
||||
}
|
||||
data.Data = append(data.Data, v)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1184,11 +1184,16 @@ func (data *JSONFieldData) AppendDataRows(rows interface{}) error {
|
||||
}
|
||||
|
||||
func (data *GeometryFieldData) AppendDataRows(rows interface{}) error {
|
||||
v, ok := rows.([][]byte)
|
||||
if !ok {
|
||||
switch v := rows.(type) {
|
||||
case [][]byte:
|
||||
data.Data = append(data.Data, v...)
|
||||
case []string:
|
||||
for _, row := range v {
|
||||
data.Data = append(data.Data, []byte(row))
|
||||
}
|
||||
default:
|
||||
return merr.WrapErrParameterInvalid("[][]byte", rows, "Wrong rows type")
|
||||
}
|
||||
data.Data = append(data.Data, v...)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -22,6 +22,9 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"github.com/twpayne/go-geom/encoding/wkb"
|
||||
"github.com/twpayne/go-geom/encoding/wkbcommon"
|
||||
"github.com/twpayne/go-geom/encoding/wkt"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/json"
|
||||
@ -407,6 +410,16 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string, useElem
|
||||
return nil, err
|
||||
}
|
||||
return []byte(obj), nil
|
||||
case schemapb.DataType_Geometry:
|
||||
geomT, err := wkt.Unmarshal(obj)
|
||||
if err != nil {
|
||||
return nil, r.wrapTypeError(obj, field)
|
||||
}
|
||||
wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
|
||||
if err != nil {
|
||||
return nil, r.wrapTypeError(obj, field)
|
||||
}
|
||||
return wkbValue, nil
|
||||
case schemapb.DataType_FloatVector:
|
||||
var vec []float32
|
||||
err := json.Unmarshal([]byte(obj), &vec)
|
||||
|
||||
@ -23,6 +23,9 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/twpayne/go-geom/encoding/wkb"
|
||||
"github.com/twpayne/go-geom/encoding/wkbcommon"
|
||||
"github.com/twpayne/go-geom/encoding/wkt"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
@ -332,6 +335,12 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema {
|
||||
DataType: schemapb.DataType_JSON,
|
||||
Nullable: suite.hasNullable,
|
||||
},
|
||||
{
|
||||
FieldID: 110,
|
||||
Name: "geometry",
|
||||
DataType: schemapb.DataType_Geometry,
|
||||
Nullable: suite.hasNullable,
|
||||
},
|
||||
},
|
||||
StructArrayFields: []*schemapb.StructArrayFieldSchema{structArray},
|
||||
}
|
||||
@ -380,7 +389,7 @@ func (suite *RowParserSuite) genAllTypesRowData(resetKey string, resetVal string
|
||||
rawContent["$meta"] = "{\"dynamic\": \"dummy\"}"
|
||||
rawContent["struct_array"] = "[{\"sub_float_vector\": \"[0.1, 0.2]\", \"sub_str\": \"hello1\"}, " +
|
||||
"{\"sub_float_vector\": \"[0.3, 0.4]\", \"sub_str\": \"hello2\"}]"
|
||||
|
||||
rawContent["geometry"] = "POINT (30.123 -10.456)"
|
||||
rawContent[resetKey] = resetVal // reset a value
|
||||
for _, deleteKey := range deleteKeys {
|
||||
delete(rawContent, deleteKey) // delete a key
|
||||
@ -446,6 +455,8 @@ func compareValues(t *testing.T, field *schemapb.FieldSchema, val any) {
|
||||
assert.Equal(t, field.GetDefaultValue().GetDoubleData(), val.(float64))
|
||||
case schemapb.DataType_VarChar:
|
||||
assert.Equal(t, field.GetDefaultValue().GetStringData(), val.(string))
|
||||
case schemapb.DataType_Geometry:
|
||||
assert.Equal(t, field.GetDefaultValue().GetStringData(), val.(string))
|
||||
default:
|
||||
}
|
||||
} else if field.GetNullable() {
|
||||
@ -614,6 +625,12 @@ func (suite *RowParserSuite) runValid(c *testCase) {
|
||||
|
||||
suite.Equal(expectedFlat, vf.GetFloatVector().GetData())
|
||||
}
|
||||
case schemapb.DataType_Geometry:
|
||||
geomT, err := wkt.Unmarshal(rawVal)
|
||||
suite.NoError(err)
|
||||
wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
|
||||
suite.NoError(err)
|
||||
suite.Equal(wkbValue, val)
|
||||
default:
|
||||
continue
|
||||
}
|
||||
|
||||
@ -21,6 +21,9 @@ import (
|
||||
"strconv"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"github.com/twpayne/go-geom/encoding/wkb"
|
||||
"github.com/twpayne/go-geom/encoding/wkbcommon"
|
||||
"github.com/twpayne/go-geom/encoding/wkt"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/json"
|
||||
@ -537,6 +540,22 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) {
|
||||
} else {
|
||||
return nil, r.wrapTypeError(obj, fieldID)
|
||||
}
|
||||
case schemapb.DataType_Geometry:
|
||||
wktValue, ok := obj.(string)
|
||||
if !ok {
|
||||
return nil, r.wrapTypeError(obj, fieldID)
|
||||
}
|
||||
|
||||
geomT, err := wkt.Unmarshal(wktValue)
|
||||
if err != nil {
|
||||
return nil, r.wrapTypeError(wktValue, fieldID)
|
||||
}
|
||||
wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
|
||||
if err != nil {
|
||||
return nil, r.wrapTypeError(wktValue, fieldID)
|
||||
}
|
||||
|
||||
return wkbValue, nil
|
||||
case schemapb.DataType_Array:
|
||||
arr, ok := obj.([]interface{})
|
||||
if !ok {
|
||||
|
||||
@ -22,6 +22,9 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/twpayne/go-geom/encoding/wkb"
|
||||
"github.com/twpayne/go-geom/encoding/wkbcommon"
|
||||
"github.com/twpayne/go-geom/encoding/wkt"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
@ -323,6 +326,12 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema {
|
||||
DataType: schemapb.DataType_JSON,
|
||||
Nullable: suite.hasNullable,
|
||||
},
|
||||
{
|
||||
FieldID: 110,
|
||||
Name: "geometry",
|
||||
DataType: schemapb.DataType_Geometry,
|
||||
Nullable: suite.hasNullable,
|
||||
},
|
||||
},
|
||||
StructArrayFields: []*schemapb.StructArrayFieldSchema{structArray},
|
||||
}
|
||||
@ -367,6 +376,7 @@ func (suite *RowParserSuite) genAllTypesRowData(resetKey string, resetVal any, d
|
||||
rawContent["double"] = 6.28
|
||||
rawContent["varchar"] = "test"
|
||||
rawContent["json"] = map[string]any{"a": 1}
|
||||
rawContent["geometry"] = "POINT (30.123 -10.456)"
|
||||
rawContent["x"] = 6
|
||||
rawContent["$meta"] = map[string]any{"dynamic": "dummy"}
|
||||
rawContent["struct_array"] = []any{
|
||||
@ -434,6 +444,8 @@ func compareValues(t *testing.T, field *schemapb.FieldSchema, val any) {
|
||||
assert.Equal(t, field.GetDefaultValue().GetDoubleData(), val.(float64))
|
||||
case schemapb.DataType_VarChar:
|
||||
assert.Equal(t, field.GetDefaultValue().GetStringData(), val.(string))
|
||||
case schemapb.DataType_Geometry:
|
||||
assert.Equal(t, field.GetDefaultValue().GetStringData(), val.(string))
|
||||
default:
|
||||
}
|
||||
} else if field.GetNullable() {
|
||||
@ -572,6 +584,12 @@ func (suite *RowParserSuite) runValid(c *testCase) {
|
||||
default:
|
||||
continue
|
||||
}
|
||||
case schemapb.DataType_Geometry:
|
||||
geomT, err := wkt.Unmarshal(rawVal.(string))
|
||||
suite.NoError(err)
|
||||
wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
|
||||
suite.NoError(err)
|
||||
suite.Equal(wkbValue, val)
|
||||
default:
|
||||
continue
|
||||
}
|
||||
|
||||
@ -25,6 +25,9 @@ import (
|
||||
"github.com/apache/arrow/go/v17/parquet/pqarrow"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
"github.com/twpayne/go-geom/encoding/wkb"
|
||||
"github.com/twpayne/go-geom/encoding/wkbcommon"
|
||||
"github.com/twpayne/go-geom/encoding/wkt"
|
||||
"golang.org/x/exp/constraints"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
@ -666,15 +669,23 @@ func ReadNullableGeometryData(pcr *FieldReader, count int64) (any, []bool, error
|
||||
if data == nil {
|
||||
return nil, nil, nil
|
||||
}
|
||||
byteArr := make([][]byte, 0)
|
||||
for i, str := range data.([]string) {
|
||||
wkbValues := make([][]byte, 0)
|
||||
for i, wktValue := range data.([]string) {
|
||||
if !validData[i] {
|
||||
byteArr = append(byteArr, []byte(nil))
|
||||
wkbValues = append(wkbValues, []byte(nil))
|
||||
continue
|
||||
}
|
||||
byteArr = append(byteArr, []byte(str))
|
||||
geomT, err := wkt.Unmarshal(wktValue)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
wkbValues = append(wkbValues, wkbValue)
|
||||
}
|
||||
return byteArr, validData, nil
|
||||
return wkbValues, validData, nil
|
||||
}
|
||||
|
||||
func ReadGeometryData(pcr *FieldReader, count int64) (any, error) {
|
||||
@ -686,11 +697,20 @@ func ReadGeometryData(pcr *FieldReader, count int64) (any, error) {
|
||||
if data == nil {
|
||||
return nil, nil
|
||||
}
|
||||
byteArr := make([][]byte, 0)
|
||||
for _, str := range data.([]string) {
|
||||
byteArr = append(byteArr, []byte(str))
|
||||
|
||||
wkbValues := make([][]byte, 0)
|
||||
for _, wktValue := range data.([]string) {
|
||||
geomT, err := wkt.Unmarshal(wktValue)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
wkbValues = append(wkbValues, wkbValue)
|
||||
}
|
||||
return byteArr, nil
|
||||
return wkbValues, nil
|
||||
}
|
||||
|
||||
func ReadBinaryData(pcr *FieldReader, count int64) (any, error) {
|
||||
|
||||
@ -32,6 +32,9 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/twpayne/go-geom/encoding/wkb"
|
||||
"github.com/twpayne/go-geom/encoding/wkbcommon"
|
||||
"github.com/twpayne/go-geom/encoding/wkt"
|
||||
"golang.org/x/exp/slices"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
@ -233,6 +236,17 @@ func (s *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType
|
||||
default:
|
||||
s.Fail("unsupported array element type")
|
||||
}
|
||||
} else if fieldDataType == schemapb.DataType_Geometry && expect != nil {
|
||||
expectData := expect.([]byte)
|
||||
geomT, err := wkt.Unmarshal(string(expectData))
|
||||
if err != nil {
|
||||
s.Fail("unmarshal wkt failed")
|
||||
}
|
||||
wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
|
||||
if err != nil {
|
||||
s.Fail("marshal wkb failed")
|
||||
}
|
||||
s.Equal(wkbValue, actual.([]byte))
|
||||
} else {
|
||||
s.Equal(expect, actual)
|
||||
}
|
||||
|
||||
@ -188,8 +188,8 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent .
|
||||
case schemapb.DataType_JSON:
|
||||
insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateJSONArray(rows))
|
||||
case schemapb.DataType_Geometry:
|
||||
// wkb bytes array
|
||||
insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateGeometryArray(rows))
|
||||
// wkt array
|
||||
insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateGeometryWktArray(rows))
|
||||
case schemapb.DataType_Array:
|
||||
switch f.GetElementType() {
|
||||
case schemapb.DataType_Bool:
|
||||
@ -574,9 +574,9 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser
|
||||
columns = append(columns, builder.NewStringArray())
|
||||
case schemapb.DataType_Geometry:
|
||||
builder := array.NewStringBuilder(mem)
|
||||
wkbData := insertData.Data[fieldID].(*storage.GeometryFieldData).Data
|
||||
wktData := insertData.Data[fieldID].(*storage.GeometryFieldData).Data
|
||||
validData := insertData.Data[fieldID].(*storage.GeometryFieldData).ValidData
|
||||
builder.AppendValues(lo.Map(wkbData, func(bs []byte, _ int) string {
|
||||
builder.AppendValues(lo.Map(wktData, func(bs []byte, _ int) string {
|
||||
return string(bs)
|
||||
}), validData)
|
||||
columns = append(columns, builder.NewStringArray())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user