diff --git a/internal/proxy/validate_util.go b/internal/proxy/validate_util.go index 7d21692f47..c7f30b17af 100644 --- a/internal/proxy/validate_util.go +++ b/internal/proxy/validate_util.go @@ -611,12 +611,7 @@ func FillWithDefaultValue(field *schemapb.FieldData, fieldSchema *schemapb.Field return merr.WrapErrParameterInvalid(numRows, len(field.GetValidData()), msg) } defaultValue := fieldSchema.GetDefaultValue().GetStringData() - geomT, err := wkt.Unmarshal(defaultValue) - if err != nil { - log.Warn("invalid default value for geometry field", zap.Error(err)) - return merr.WrapErrParameterInvalidMsg("invalid default value for geometry field") - } - defaultValueWkbBytes, err := wkb.Marshal(geomT, wkb.NDR) + defaultValueWkbBytes, err := common.ConvertWKTToWKB(defaultValue) if err != nil { log.Warn("invalid default value for geometry field", zap.Error(err)) return merr.WrapErrParameterInvalidMsg("invalid default value for geometry field") diff --git a/internal/storage/utils.go b/internal/storage/utils.go index 1ca3ed4651..1b2754a620 100644 --- a/internal/storage/utils.go +++ b/internal/storage/utils.go @@ -1584,6 +1584,11 @@ func GetDefaultValue(fieldSchema *schemapb.FieldSchema) interface{} { return fieldSchema.GetDefaultValue().GetTimestamptzData() case schemapb.DataType_JSON: return fieldSchema.GetDefaultValue().GetBytesData() + case schemapb.DataType_Geometry: + // ignore err because the default value has been checked when create collection. + wkbValue, _ := common.ConvertWKTToWKB(fieldSchema.GetDefaultValue().GetStringData()) + return wkbValue + default: // won't happen panic(fmt.Sprintf("undefined data type:%s", fieldSchema.DataType.String())) diff --git a/internal/util/importutilv2/csv/row_parser.go b/internal/util/importutilv2/csv/row_parser.go index e2db1d65b4..6f438ba17a 100644 --- a/internal/util/importutilv2/csv/row_parser.go +++ b/internal/util/importutilv2/csv/row_parser.go @@ -22,9 +22,6 @@ import ( "strings" "github.com/samber/lo" - "github.com/twpayne/go-geom/encoding/wkb" - "github.com/twpayne/go-geom/encoding/wkbcommon" - "github.com/twpayne/go-geom/encoding/wkt" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/json" @@ -412,11 +409,7 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string, useElem } return []byte(obj), nil case schemapb.DataType_Geometry: - geomT, err := wkt.Unmarshal(obj) - if err != nil { - return nil, r.wrapTypeError(obj, field) - } - wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) + wkbValue, err := pkgcommon.ConvertWKTToWKB(obj) if err != nil { return nil, r.wrapTypeError(obj, field) } diff --git a/internal/util/importutilv2/json/row_parser.go b/internal/util/importutilv2/json/row_parser.go index 0ce150353c..f18fd6f273 100644 --- a/internal/util/importutilv2/json/row_parser.go +++ b/internal/util/importutilv2/json/row_parser.go @@ -21,9 +21,6 @@ import ( "strconv" "github.com/samber/lo" - "github.com/twpayne/go-geom/encoding/wkb" - "github.com/twpayne/go-geom/encoding/wkbcommon" - "github.com/twpayne/go-geom/encoding/wkt" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/json" @@ -560,11 +557,7 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) { return nil, r.wrapTypeError(obj, fieldID) } - geomT, err := wkt.Unmarshal(wktValue) - if err != nil { - return nil, r.wrapTypeError(wktValue, fieldID) - } - wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) + wkbValue, err := pkgcommon.ConvertWKTToWKB(wktValue) if err != nil { return nil, r.wrapTypeError(wktValue, fieldID) } diff --git a/internal/util/importutilv2/numpy/field_reader.go b/internal/util/importutilv2/numpy/field_reader.go index ae00730b20..7cc31cba39 100644 --- a/internal/util/importutilv2/numpy/field_reader.go +++ b/internal/util/importutilv2/numpy/field_reader.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/util/importutilv2/common" + pkgcommon "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/parameterutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" @@ -189,6 +190,22 @@ func (c *FieldReader) Next(count int64) (any, any, error) { if err != nil { return nil, nil, err } + case schemapb.DataType_Geometry: + var strs []string + strs, err = c.ReadString(readCount) + if err != nil { + return nil, nil, err + } + byteArr := make([][]byte, 0) + for _, wktValue := range strs { + wkbValue, err := pkgcommon.ConvertWKTToWKB(wktValue) + if err != nil { + return nil, nil, err + } + byteArr = append(byteArr, wkbValue) + } + data = byteArr + c.readPosition += int(readCount) case schemapb.DataType_JSON: var strs []string strs, err = c.ReadString(readCount) diff --git a/internal/util/importutilv2/numpy/field_reader_test.go b/internal/util/importutilv2/numpy/field_reader_test.go index 0dd2c304c9..f273c73ced 100644 --- a/internal/util/importutilv2/numpy/field_reader_test.go +++ b/internal/util/importutilv2/numpy/field_reader_test.go @@ -296,6 +296,14 @@ func TestNumpyFieldReaderError(t *testing.T) { DataType: schemapb.DataType_JSON, }, }, + { + name: "read geometry error", + fieldSchema: &schemapb.FieldSchema{ + FieldID: 100, + Name: "geometry", + DataType: schemapb.DataType_Geometry, + }, + }, } for _, tt := range tests { diff --git a/internal/util/importutilv2/numpy/reader_test.go b/internal/util/importutilv2/numpy/reader_test.go index 0e3ac2eaad..4c6657b40a 100644 --- a/internal/util/importutilv2/numpy/reader_test.go +++ b/internal/util/importutilv2/numpy/reader_test.go @@ -89,6 +89,13 @@ func createReader(fieldData storage.FieldData, dataType schemapb.DataType) (io.R jsonStrs = append(jsonStrs, string(row.([]byte))) } data = jsonStrs + case schemapb.DataType_Geometry: + geoStrs := make([]string, 0, rowNum) + for i := 0; i < rowNum; i++ { + row := fieldData.GetRow(i) + geoStrs = append(geoStrs, string(row.([]byte))) + } + data = geoStrs case schemapb.DataType_BinaryVector: rows := fieldData.GetDataRows().([]byte) const rowBytes = dim / 8 diff --git a/internal/util/importutilv2/numpy/util.go b/internal/util/importutilv2/numpy/util.go index 7667e1f0c7..466ff3ade4 100644 --- a/internal/util/importutilv2/numpy/util.go +++ b/internal/util/importutilv2/numpy/util.go @@ -321,7 +321,7 @@ func validateHeader(npyReader *npy.Reader, field *schemapb.FieldSchema, dim int) if shape[1] != dim { return wrapDimError(shape[1], dim, field) } - case schemapb.DataType_VarChar, schemapb.DataType_JSON: + case schemapb.DataType_VarChar, schemapb.DataType_JSON, schemapb.DataType_Geometry: if elementType != schemapb.DataType_VarChar { return wrapElementTypeError(elementType, field) } diff --git a/internal/util/importutilv2/parquet/field_reader.go b/internal/util/importutilv2/parquet/field_reader.go index fcdffc0a64..442337e4d7 100644 --- a/internal/util/importutilv2/parquet/field_reader.go +++ b/internal/util/importutilv2/parquet/field_reader.go @@ -25,9 +25,6 @@ import ( "github.com/apache/arrow/go/v17/parquet/pqarrow" "github.com/cockroachdb/errors" "github.com/samber/lo" - "github.com/twpayne/go-geom/encoding/wkb" - "github.com/twpayne/go-geom/encoding/wkbcommon" - "github.com/twpayne/go-geom/encoding/wkt" "golang.org/x/exp/constraints" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -35,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/internal/util/nullutil" + pkgcommon "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/parameterutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" @@ -601,7 +599,7 @@ func ReadNullableStringData(pcr *FieldReader, count int64, isVarcharField bool) if len(data) == 0 { return nil, nil, nil } - if isVarcharField && pcr.field.GetDefaultValue() != nil { + if pcr.field.GetDefaultValue() != nil { defaultValue := pcr.field.GetDefaultValue().GetStringData() return fillWithDefaultValueImpl(data, defaultValue, validData, pcr.field) } @@ -683,11 +681,7 @@ func ReadNullableGeometryData(pcr *FieldReader, count int64) (any, []bool, error wkbValues = append(wkbValues, []byte(nil)) continue } - geomT, err := wkt.Unmarshal(wktValue) - if err != nil { - return nil, nil, err - } - wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) + wkbValue, err := pkgcommon.ConvertWKTToWKB(wktValue) if err != nil { return nil, nil, err } @@ -708,11 +702,7 @@ func ReadGeometryData(pcr *FieldReader, count int64) (any, error) { wkbValues := make([][]byte, 0) for _, wktValue := range data.([]string) { - geomT, err := wkt.Unmarshal(wktValue) - if err != nil { - return nil, err - } - wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) + wkbValue, err := pkgcommon.ConvertWKTToWKB(wktValue) if err != nil { return nil, err } diff --git a/internal/util/nullutil/nullutil.go b/internal/util/nullutil/nullutil.go index 772865a452..4d04c0a491 100644 --- a/internal/util/nullutil/nullutil.go +++ b/internal/util/nullutil/nullutil.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/util/merr" ) @@ -57,6 +58,8 @@ func GetDefaultValue(field *schemapb.FieldSchema) (any, error) { return field.GetDefaultValue().GetTimestamptzData(), nil case schemapb.DataType_String, schemapb.DataType_VarChar: return field.GetDefaultValue().GetStringData(), nil + case schemapb.DataType_Geometry: + return common.ConvertWKTToWKB(field.GetDefaultValue().GetStringData()) default: msg := fmt.Sprintf("type (%s) not support default_value", field.GetDataType().String()) return nil, merr.WrapErrParameterInvalidMsg(msg) diff --git a/pkg/common/common.go b/pkg/common/common.go index d7e413a323..28719e3b2f 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -25,6 +25,9 @@ import ( "github.com/cockroachdb/errors" "github.com/samber/lo" + "github.com/twpayne/go-geom/encoding/wkb" + "github.com/twpayne/go-geom/encoding/wkbcommon" + "github.com/twpayne/go-geom/encoding/wkt" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -585,3 +588,11 @@ func CheckNamespace(schema *schemapb.CollectionSchema, namespace *string) error } return nil } + +func ConvertWKTToWKB(wktStr string) ([]byte, error) { + geomT, err := wkt.Unmarshal(wktStr) + if err != nil { + return nil, err + } + return wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) +}