From d5ecb63f534fe1ca03dcce1bc1439dc53bdcc008 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Fri, 17 Oct 2025 17:08:02 +0800 Subject: [PATCH] enhance: Support import geometry data by json/csv (#44826) issue: #44787 --------- Signed-off-by: Cai Zhang --- internal/storage/insert_data.go | 23 ++++++----- internal/util/importutilv2/csv/row_parser.go | 13 +++++++ .../util/importutilv2/csv/row_parser_test.go | 19 +++++++++- internal/util/importutilv2/json/row_parser.go | 19 ++++++++++ .../util/importutilv2/json/row_parser_test.go | 18 +++++++++ .../util/importutilv2/parquet/field_reader.go | 38 ++++++++++++++----- .../util/importutilv2/parquet/reader_test.go | 14 +++++++ internal/util/testutil/test_util.go | 8 ++-- 8 files changed, 129 insertions(+), 23 deletions(-) diff --git a/internal/storage/insert_data.go b/internal/storage/insert_data.go index 3b281dd5cd..d56a2127f0 100644 --- a/internal/storage/insert_data.go +++ b/internal/storage/insert_data.go @@ -847,14 +847,14 @@ func (data *GeometryFieldData) AppendRow(row interface{}) error { data.ValidData = append(data.ValidData, false) return nil } - v, ok := row.([]byte) - if !ok { + switch v := row.(type) { + case []byte: + data.Data = append(data.Data, v) + case string: + data.Data = append(data.Data, []byte(v)) + default: return merr.WrapErrParameterInvalid("[]byte", row, "Wrong row type") } - if data.GetNullable() { - data.ValidData = append(data.ValidData, true) - } - data.Data = append(data.Data, v) return nil } @@ -1184,11 +1184,16 @@ func (data *JSONFieldData) AppendDataRows(rows interface{}) error { } func (data *GeometryFieldData) AppendDataRows(rows interface{}) error { - v, ok := rows.([][]byte) - if !ok { + switch v := rows.(type) { + case [][]byte: + data.Data = append(data.Data, v...) + case []string: + for _, row := range v { + data.Data = append(data.Data, []byte(row)) + } + default: return merr.WrapErrParameterInvalid("[][]byte", rows, "Wrong rows type") } - data.Data = append(data.Data, v...) return nil } diff --git a/internal/util/importutilv2/csv/row_parser.go b/internal/util/importutilv2/csv/row_parser.go index 1b4981b94b..fc07eb0e6a 100644 --- a/internal/util/importutilv2/csv/row_parser.go +++ b/internal/util/importutilv2/csv/row_parser.go @@ -22,6 +22,9 @@ import ( "strings" "github.com/samber/lo" + "github.com/twpayne/go-geom/encoding/wkb" + "github.com/twpayne/go-geom/encoding/wkbcommon" + "github.com/twpayne/go-geom/encoding/wkt" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/json" @@ -407,6 +410,16 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string, useElem return nil, err } return []byte(obj), nil + case schemapb.DataType_Geometry: + geomT, err := wkt.Unmarshal(obj) + if err != nil { + return nil, r.wrapTypeError(obj, field) + } + wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) + if err != nil { + return nil, r.wrapTypeError(obj, field) + } + return wkbValue, nil case schemapb.DataType_FloatVector: var vec []float32 err := json.Unmarshal([]byte(obj), &vec) diff --git a/internal/util/importutilv2/csv/row_parser_test.go b/internal/util/importutilv2/csv/row_parser_test.go index 10bba40b5d..619d86d065 100644 --- a/internal/util/importutilv2/csv/row_parser_test.go +++ b/internal/util/importutilv2/csv/row_parser_test.go @@ -23,6 +23,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "github.com/twpayne/go-geom/encoding/wkb" + "github.com/twpayne/go-geom/encoding/wkbcommon" + "github.com/twpayne/go-geom/encoding/wkt" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -332,6 +335,12 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema { DataType: schemapb.DataType_JSON, Nullable: suite.hasNullable, }, + { + FieldID: 110, + Name: "geometry", + DataType: schemapb.DataType_Geometry, + Nullable: suite.hasNullable, + }, }, StructArrayFields: []*schemapb.StructArrayFieldSchema{structArray}, } @@ -380,7 +389,7 @@ func (suite *RowParserSuite) genAllTypesRowData(resetKey string, resetVal string rawContent["$meta"] = "{\"dynamic\": \"dummy\"}" rawContent["struct_array"] = "[{\"sub_float_vector\": \"[0.1, 0.2]\", \"sub_str\": \"hello1\"}, " + "{\"sub_float_vector\": \"[0.3, 0.4]\", \"sub_str\": \"hello2\"}]" - + rawContent["geometry"] = "POINT (30.123 -10.456)" rawContent[resetKey] = resetVal // reset a value for _, deleteKey := range deleteKeys { delete(rawContent, deleteKey) // delete a key @@ -446,6 +455,8 @@ func compareValues(t *testing.T, field *schemapb.FieldSchema, val any) { assert.Equal(t, field.GetDefaultValue().GetDoubleData(), val.(float64)) case schemapb.DataType_VarChar: assert.Equal(t, field.GetDefaultValue().GetStringData(), val.(string)) + case schemapb.DataType_Geometry: + assert.Equal(t, field.GetDefaultValue().GetStringData(), val.(string)) default: } } else if field.GetNullable() { @@ -614,6 +625,12 @@ func (suite *RowParserSuite) runValid(c *testCase) { suite.Equal(expectedFlat, vf.GetFloatVector().GetData()) } + case schemapb.DataType_Geometry: + geomT, err := wkt.Unmarshal(rawVal) + suite.NoError(err) + wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) + suite.NoError(err) + suite.Equal(wkbValue, val) default: continue } diff --git a/internal/util/importutilv2/json/row_parser.go b/internal/util/importutilv2/json/row_parser.go index 8b038e2e5f..1896948de9 100644 --- a/internal/util/importutilv2/json/row_parser.go +++ b/internal/util/importutilv2/json/row_parser.go @@ -21,6 +21,9 @@ import ( "strconv" "github.com/samber/lo" + "github.com/twpayne/go-geom/encoding/wkb" + "github.com/twpayne/go-geom/encoding/wkbcommon" + "github.com/twpayne/go-geom/encoding/wkt" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/json" @@ -537,6 +540,22 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) { } else { return nil, r.wrapTypeError(obj, fieldID) } + case schemapb.DataType_Geometry: + wktValue, ok := obj.(string) + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } + + geomT, err := wkt.Unmarshal(wktValue) + if err != nil { + return nil, r.wrapTypeError(wktValue, fieldID) + } + wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) + if err != nil { + return nil, r.wrapTypeError(wktValue, fieldID) + } + + return wkbValue, nil case schemapb.DataType_Array: arr, ok := obj.([]interface{}) if !ok { diff --git a/internal/util/importutilv2/json/row_parser_test.go b/internal/util/importutilv2/json/row_parser_test.go index 1a0526575f..e2d538c304 100644 --- a/internal/util/importutilv2/json/row_parser_test.go +++ b/internal/util/importutilv2/json/row_parser_test.go @@ -22,6 +22,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "github.com/twpayne/go-geom/encoding/wkb" + "github.com/twpayne/go-geom/encoding/wkbcommon" + "github.com/twpayne/go-geom/encoding/wkt" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -323,6 +326,12 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema { DataType: schemapb.DataType_JSON, Nullable: suite.hasNullable, }, + { + FieldID: 110, + Name: "geometry", + DataType: schemapb.DataType_Geometry, + Nullable: suite.hasNullable, + }, }, StructArrayFields: []*schemapb.StructArrayFieldSchema{structArray}, } @@ -367,6 +376,7 @@ func (suite *RowParserSuite) genAllTypesRowData(resetKey string, resetVal any, d rawContent["double"] = 6.28 rawContent["varchar"] = "test" rawContent["json"] = map[string]any{"a": 1} + rawContent["geometry"] = "POINT (30.123 -10.456)" rawContent["x"] = 6 rawContent["$meta"] = map[string]any{"dynamic": "dummy"} rawContent["struct_array"] = []any{ @@ -434,6 +444,8 @@ func compareValues(t *testing.T, field *schemapb.FieldSchema, val any) { assert.Equal(t, field.GetDefaultValue().GetDoubleData(), val.(float64)) case schemapb.DataType_VarChar: assert.Equal(t, field.GetDefaultValue().GetStringData(), val.(string)) + case schemapb.DataType_Geometry: + assert.Equal(t, field.GetDefaultValue().GetStringData(), val.(string)) default: } } else if field.GetNullable() { @@ -572,6 +584,12 @@ func (suite *RowParserSuite) runValid(c *testCase) { default: continue } + case schemapb.DataType_Geometry: + geomT, err := wkt.Unmarshal(rawVal.(string)) + suite.NoError(err) + wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) + suite.NoError(err) + suite.Equal(wkbValue, val) default: continue } diff --git a/internal/util/importutilv2/parquet/field_reader.go b/internal/util/importutilv2/parquet/field_reader.go index 3bc875e191..a4301783de 100644 --- a/internal/util/importutilv2/parquet/field_reader.go +++ b/internal/util/importutilv2/parquet/field_reader.go @@ -25,6 +25,9 @@ import ( "github.com/apache/arrow/go/v17/parquet/pqarrow" "github.com/cockroachdb/errors" "github.com/samber/lo" + "github.com/twpayne/go-geom/encoding/wkb" + "github.com/twpayne/go-geom/encoding/wkbcommon" + "github.com/twpayne/go-geom/encoding/wkt" "golang.org/x/exp/constraints" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -666,15 +669,23 @@ func ReadNullableGeometryData(pcr *FieldReader, count int64) (any, []bool, error if data == nil { return nil, nil, nil } - byteArr := make([][]byte, 0) - for i, str := range data.([]string) { + wkbValues := make([][]byte, 0) + for i, wktValue := range data.([]string) { if !validData[i] { - byteArr = append(byteArr, []byte(nil)) + wkbValues = append(wkbValues, []byte(nil)) continue } - byteArr = append(byteArr, []byte(str)) + geomT, err := wkt.Unmarshal(wktValue) + if err != nil { + return nil, nil, err + } + wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) + if err != nil { + return nil, nil, err + } + wkbValues = append(wkbValues, wkbValue) } - return byteArr, validData, nil + return wkbValues, validData, nil } func ReadGeometryData(pcr *FieldReader, count int64) (any, error) { @@ -686,11 +697,20 @@ func ReadGeometryData(pcr *FieldReader, count int64) (any, error) { if data == nil { return nil, nil } - byteArr := make([][]byte, 0) - for _, str := range data.([]string) { - byteArr = append(byteArr, []byte(str)) + + wkbValues := make([][]byte, 0) + for _, wktValue := range data.([]string) { + geomT, err := wkt.Unmarshal(wktValue) + if err != nil { + return nil, err + } + wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) + if err != nil { + return nil, err + } + wkbValues = append(wkbValues, wkbValue) } - return byteArr, nil + return wkbValues, nil } func ReadBinaryData(pcr *FieldReader, count int64) (any, error) { diff --git a/internal/util/importutilv2/parquet/reader_test.go b/internal/util/importutilv2/parquet/reader_test.go index 0706e8fe4b..0ddddfdcff 100644 --- a/internal/util/importutilv2/parquet/reader_test.go +++ b/internal/util/importutilv2/parquet/reader_test.go @@ -32,6 +32,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "github.com/twpayne/go-geom/encoding/wkb" + "github.com/twpayne/go-geom/encoding/wkbcommon" + "github.com/twpayne/go-geom/encoding/wkt" "golang.org/x/exp/slices" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -233,6 +236,17 @@ func (s *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType default: s.Fail("unsupported array element type") } + } else if fieldDataType == schemapb.DataType_Geometry && expect != nil { + expectData := expect.([]byte) + geomT, err := wkt.Unmarshal(string(expectData)) + if err != nil { + s.Fail("unmarshal wkt failed") + } + wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) + if err != nil { + s.Fail("marshal wkb failed") + } + s.Equal(wkbValue, actual.([]byte)) } else { s.Equal(expect, actual) } diff --git a/internal/util/testutil/test_util.go b/internal/util/testutil/test_util.go index 897522bfbc..6143b2e951 100644 --- a/internal/util/testutil/test_util.go +++ b/internal/util/testutil/test_util.go @@ -188,8 +188,8 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent . case schemapb.DataType_JSON: insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateJSONArray(rows)) case schemapb.DataType_Geometry: - // wkb bytes array - insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateGeometryArray(rows)) + // wkt array + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateGeometryWktArray(rows)) case schemapb.DataType_Array: switch f.GetElementType() { case schemapb.DataType_Bool: @@ -574,9 +574,9 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser columns = append(columns, builder.NewStringArray()) case schemapb.DataType_Geometry: builder := array.NewStringBuilder(mem) - wkbData := insertData.Data[fieldID].(*storage.GeometryFieldData).Data + wktData := insertData.Data[fieldID].(*storage.GeometryFieldData).Data validData := insertData.Data[fieldID].(*storage.GeometryFieldData).ValidData - builder.AppendValues(lo.Map(wkbData, func(bs []byte, _ int) string { + builder.AppendValues(lo.Map(wktData, func(bs []byte, _ int) string { return string(bs) }), validData) columns = append(columns, builder.NewStringArray())