From f27dfa449080f01626d82fbf8f4ef4caba3f170a Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Fri, 17 Oct 2025 17:14:23 +0800 Subject: [PATCH] enhance: [2.5]Support import geometry data by json/csv (#44828) issue: #44787 master pr: #44826 2.6 pr: #44827 --------- Signed-off-by: Cai Zhang --- internal/storage/insert_data.go | 23 ++++++----- internal/util/importutilv2/csv/row_parser.go | 13 +++++++ internal/util/importutilv2/json/row_parser.go | 19 ++++++++++ .../util/importutilv2/parquet/field_reader.go | 38 ++++++++++++++----- .../util/importutilv2/parquet/reader_test.go | 14 +++++++ internal/util/testutil/test_util.go | 8 ++-- 6 files changed, 93 insertions(+), 22 deletions(-) diff --git a/internal/storage/insert_data.go b/internal/storage/insert_data.go index 5298680626..908ff8bb6c 100644 --- a/internal/storage/insert_data.go +++ b/internal/storage/insert_data.go @@ -744,14 +744,14 @@ func (data *GeometryFieldData) AppendRow(row interface{}) error { data.ValidData = append(data.ValidData, false) return nil } - v, ok := row.([]byte) - if !ok { + switch v := row.(type) { + case []byte: + data.Data = append(data.Data, v) + case string: + data.Data = append(data.Data, []byte(v)) + default: return merr.WrapErrParameterInvalid("[]byte", row, "Wrong row type") } - if data.GetNullable() { - data.ValidData = append(data.ValidData, true) - } - data.Data = append(data.Data, v) return nil } @@ -1030,11 +1030,16 @@ func (data *JSONFieldData) AppendDataRows(rows interface{}) error { } func (data *GeometryFieldData) AppendDataRows(rows interface{}) error { - v, ok := rows.([][]byte) - if !ok { + switch v := rows.(type) { + case [][]byte: + data.Data = append(data.Data, v...) + case []string: + for _, row := range v { + data.Data = append(data.Data, []byte(row)) + } + default: return merr.WrapErrParameterInvalid("[][]byte", rows, "Wrong rows type") } - data.Data = append(data.Data, v...) return nil } diff --git a/internal/util/importutilv2/csv/row_parser.go b/internal/util/importutilv2/csv/row_parser.go index dbbb7dd435..cb566e060f 100644 --- a/internal/util/importutilv2/csv/row_parser.go +++ b/internal/util/importutilv2/csv/row_parser.go @@ -23,6 +23,9 @@ import ( "github.com/cockroachdb/errors" "github.com/samber/lo" + "github.com/twpayne/go-geom/encoding/wkb" + "github.com/twpayne/go-geom/encoding/wkbcommon" + "github.com/twpayne/go-geom/encoding/wkt" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/json" @@ -285,6 +288,16 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e return nil, err } return []byte(obj), nil + case schemapb.DataType_Geometry: + geomT, err := wkt.Unmarshal(obj) + if err != nil { + return nil, r.wrapTypeError(obj, field) + } + wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) + if err != nil { + return nil, r.wrapTypeError(obj, field) + } + return wkbValue, nil case schemapb.DataType_FloatVector: if nullable && obj == r.nullkey { return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") diff --git a/internal/util/importutilv2/json/row_parser.go b/internal/util/importutilv2/json/row_parser.go index 1be7929a06..8de4b0045b 100644 --- a/internal/util/importutilv2/json/row_parser.go +++ b/internal/util/importutilv2/json/row_parser.go @@ -22,6 +22,9 @@ import ( "github.com/cockroachdb/errors" "github.com/samber/lo" + "github.com/twpayne/go-geom/encoding/wkb" + "github.com/twpayne/go-geom/encoding/wkbcommon" + "github.com/twpayne/go-geom/encoding/wkt" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/json" @@ -419,6 +422,22 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) { } else { return nil, r.wrapTypeError(obj, fieldID) } + case schemapb.DataType_Geometry: + wktValue, ok := obj.(string) + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } + + geomT, err := wkt.Unmarshal(wktValue) + if err != nil { + return nil, r.wrapTypeError(wktValue, fieldID) + } + wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) + if err != nil { + return nil, r.wrapTypeError(wktValue, fieldID) + } + + return wkbValue, nil case schemapb.DataType_Array: arr, ok := obj.([]interface{}) if !ok { diff --git a/internal/util/importutilv2/parquet/field_reader.go b/internal/util/importutilv2/parquet/field_reader.go index d8fee7db01..d309c66cae 100644 --- a/internal/util/importutilv2/parquet/field_reader.go +++ b/internal/util/importutilv2/parquet/field_reader.go @@ -25,6 +25,9 @@ import ( "github.com/apache/arrow/go/v17/parquet/pqarrow" "github.com/cockroachdb/errors" "github.com/samber/lo" + "github.com/twpayne/go-geom/encoding/wkb" + "github.com/twpayne/go-geom/encoding/wkbcommon" + "github.com/twpayne/go-geom/encoding/wkt" "golang.org/x/exp/constraints" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -719,15 +722,23 @@ func ReadNullableGeometryData(pcr *FieldReader, count int64) (any, []bool, error if data == nil { return nil, nil, nil } - byteArr := make([][]byte, 0) - for i, str := range data.([]string) { + wkbValues := make([][]byte, 0) + for i, wktValue := range data.([]string) { if !validData[i] { - byteArr = append(byteArr, []byte(nil)) + wkbValues = append(wkbValues, []byte(nil)) continue } - byteArr = append(byteArr, []byte(str)) + geomT, err := wkt.Unmarshal(wktValue) + if err != nil { + return nil, nil, err + } + wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) + if err != nil { + return nil, nil, err + } + wkbValues = append(wkbValues, wkbValue) } - return byteArr, validData, nil + return wkbValues, validData, nil } func ReadGeometryData(pcr *FieldReader, count int64) (any, error) { @@ -739,11 +750,20 @@ func ReadGeometryData(pcr *FieldReader, count int64) (any, error) { if data == nil { return nil, nil } - byteArr := make([][]byte, 0) - for _, str := range data.([]string) { - byteArr = append(byteArr, []byte(str)) + + wkbValues := make([][]byte, 0) + for _, wktValue := range data.([]string) { + geomT, err := wkt.Unmarshal(wktValue) + if err != nil { + return nil, err + } + wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) + if err != nil { + return nil, err + } + wkbValues = append(wkbValues, wkbValue) } - return byteArr, nil + return wkbValues, nil } func ReadBinaryData(pcr *FieldReader, count int64) (any, error) { diff --git a/internal/util/importutilv2/parquet/reader_test.go b/internal/util/importutilv2/parquet/reader_test.go index 657eb03822..9975e262c2 100644 --- a/internal/util/importutilv2/parquet/reader_test.go +++ b/internal/util/importutilv2/parquet/reader_test.go @@ -31,6 +31,9 @@ import ( "github.com/apache/arrow/go/v17/parquet/pqarrow" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "github.com/twpayne/go-geom/encoding/wkb" + "github.com/twpayne/go-geom/encoding/wkbcommon" + "github.com/twpayne/go-geom/encoding/wkt" "golang.org/x/exp/slices" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -216,6 +219,17 @@ func (s *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType default: s.Fail("unsupported array element type") } + } else if fieldDataType == schemapb.DataType_Geometry && expect != nil { + expectData := expect.([]byte) + geomT, err := wkt.Unmarshal(string(expectData)) + if err != nil { + s.Fail("unmarshal wkt failed") + } + wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) + if err != nil { + s.Fail("marshal wkb failed") + } + s.Equal(wkbValue, actual.([]byte)) } else { s.Equal(expect, actual) } diff --git a/internal/util/testutil/test_util.go b/internal/util/testutil/test_util.go index 5eb55bac58..f8016dd601 100644 --- a/internal/util/testutil/test_util.go +++ b/internal/util/testutil/test_util.go @@ -177,8 +177,8 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent . case schemapb.DataType_JSON: insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateJSONArray(rows)) case schemapb.DataType_Geometry: - // wkb bytes array - insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateGeometryArray(rows)) + // wkt array + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateGeometryWktArray(rows)) case schemapb.DataType_Array: switch f.GetElementType() { case schemapb.DataType_Bool: @@ -533,9 +533,9 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser columns = append(columns, builder.NewStringArray()) case schemapb.DataType_Geometry: builder := array.NewStringBuilder(mem) - wkbData := insertData.Data[fieldID].(*storage.GeometryFieldData).Data + wktData := insertData.Data[fieldID].(*storage.GeometryFieldData).Data validData := insertData.Data[fieldID].(*storage.GeometryFieldData).ValidData - builder.AppendValues(lo.Map(wkbData, func(bs []byte, _ int) string { + builder.AppendValues(lo.Map(wktData, func(bs []byte, _ int) string { return string(bs) }), validData) columns = append(columns, builder.NewStringArray())