fix: Fix bug for importing Geometry data (#45089)

issue: #44787 , #45012

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-10-27 20:34:11 +08:00 committed by GitHub
parent a38610cd5d
commit c33d221536
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 59 additions and 37 deletions

View File

@ -611,12 +611,7 @@ func FillWithDefaultValue(field *schemapb.FieldData, fieldSchema *schemapb.Field
return merr.WrapErrParameterInvalid(numRows, len(field.GetValidData()), msg) return merr.WrapErrParameterInvalid(numRows, len(field.GetValidData()), msg)
} }
defaultValue := fieldSchema.GetDefaultValue().GetStringData() defaultValue := fieldSchema.GetDefaultValue().GetStringData()
geomT, err := wkt.Unmarshal(defaultValue) defaultValueWkbBytes, err := common.ConvertWKTToWKB(defaultValue)
if err != nil {
log.Warn("invalid default value for geometry field", zap.Error(err))
return merr.WrapErrParameterInvalidMsg("invalid default value for geometry field")
}
defaultValueWkbBytes, err := wkb.Marshal(geomT, wkb.NDR)
if err != nil { if err != nil {
log.Warn("invalid default value for geometry field", zap.Error(err)) log.Warn("invalid default value for geometry field", zap.Error(err))
return merr.WrapErrParameterInvalidMsg("invalid default value for geometry field") return merr.WrapErrParameterInvalidMsg("invalid default value for geometry field")

View File

@ -1584,6 +1584,11 @@ func GetDefaultValue(fieldSchema *schemapb.FieldSchema) interface{} {
return fieldSchema.GetDefaultValue().GetTimestamptzData() return fieldSchema.GetDefaultValue().GetTimestamptzData()
case schemapb.DataType_JSON: case schemapb.DataType_JSON:
return fieldSchema.GetDefaultValue().GetBytesData() return fieldSchema.GetDefaultValue().GetBytesData()
case schemapb.DataType_Geometry:
// ignore err because the default value has been checked when create collection.
wkbValue, _ := common.ConvertWKTToWKB(fieldSchema.GetDefaultValue().GetStringData())
return wkbValue
default: default:
// won't happen // won't happen
panic(fmt.Sprintf("undefined data type:%s", fieldSchema.DataType.String())) panic(fmt.Sprintf("undefined data type:%s", fieldSchema.DataType.String()))

View File

@ -22,9 +22,6 @@ import (
"strings" "strings"
"github.com/samber/lo" "github.com/samber/lo"
"github.com/twpayne/go-geom/encoding/wkb"
"github.com/twpayne/go-geom/encoding/wkbcommon"
"github.com/twpayne/go-geom/encoding/wkt"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/json"
@ -412,11 +409,7 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string, useElem
} }
return []byte(obj), nil return []byte(obj), nil
case schemapb.DataType_Geometry: case schemapb.DataType_Geometry:
geomT, err := wkt.Unmarshal(obj) wkbValue, err := pkgcommon.ConvertWKTToWKB(obj)
if err != nil {
return nil, r.wrapTypeError(obj, field)
}
wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
if err != nil { if err != nil {
return nil, r.wrapTypeError(obj, field) return nil, r.wrapTypeError(obj, field)
} }

View File

@ -21,9 +21,6 @@ import (
"strconv" "strconv"
"github.com/samber/lo" "github.com/samber/lo"
"github.com/twpayne/go-geom/encoding/wkb"
"github.com/twpayne/go-geom/encoding/wkbcommon"
"github.com/twpayne/go-geom/encoding/wkt"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/json"
@ -560,11 +557,7 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) {
return nil, r.wrapTypeError(obj, fieldID) return nil, r.wrapTypeError(obj, fieldID)
} }
geomT, err := wkt.Unmarshal(wktValue) wkbValue, err := pkgcommon.ConvertWKTToWKB(wktValue)
if err != nil {
return nil, r.wrapTypeError(wktValue, fieldID)
}
wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
if err != nil { if err != nil {
return nil, r.wrapTypeError(wktValue, fieldID) return nil, r.wrapTypeError(wktValue, fieldID)
} }

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/internal/util/importutilv2/common"
pkgcommon "github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/parameterutil" "github.com/milvus-io/milvus/pkg/v2/util/parameterutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@ -189,6 +190,22 @@ func (c *FieldReader) Next(count int64) (any, any, error) {
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
case schemapb.DataType_Geometry:
var strs []string
strs, err = c.ReadString(readCount)
if err != nil {
return nil, nil, err
}
byteArr := make([][]byte, 0)
for _, wktValue := range strs {
wkbValue, err := pkgcommon.ConvertWKTToWKB(wktValue)
if err != nil {
return nil, nil, err
}
byteArr = append(byteArr, wkbValue)
}
data = byteArr
c.readPosition += int(readCount)
case schemapb.DataType_JSON: case schemapb.DataType_JSON:
var strs []string var strs []string
strs, err = c.ReadString(readCount) strs, err = c.ReadString(readCount)

View File

@ -296,6 +296,14 @@ func TestNumpyFieldReaderError(t *testing.T) {
DataType: schemapb.DataType_JSON, DataType: schemapb.DataType_JSON,
}, },
}, },
{
name: "read geometry error",
fieldSchema: &schemapb.FieldSchema{
FieldID: 100,
Name: "geometry",
DataType: schemapb.DataType_Geometry,
},
},
} }
for _, tt := range tests { for _, tt := range tests {

View File

@ -89,6 +89,13 @@ func createReader(fieldData storage.FieldData, dataType schemapb.DataType) (io.R
jsonStrs = append(jsonStrs, string(row.([]byte))) jsonStrs = append(jsonStrs, string(row.([]byte)))
} }
data = jsonStrs data = jsonStrs
case schemapb.DataType_Geometry:
geoStrs := make([]string, 0, rowNum)
for i := 0; i < rowNum; i++ {
row := fieldData.GetRow(i)
geoStrs = append(geoStrs, string(row.([]byte)))
}
data = geoStrs
case schemapb.DataType_BinaryVector: case schemapb.DataType_BinaryVector:
rows := fieldData.GetDataRows().([]byte) rows := fieldData.GetDataRows().([]byte)
const rowBytes = dim / 8 const rowBytes = dim / 8

View File

@ -321,7 +321,7 @@ func validateHeader(npyReader *npy.Reader, field *schemapb.FieldSchema, dim int)
if shape[1] != dim { if shape[1] != dim {
return wrapDimError(shape[1], dim, field) return wrapDimError(shape[1], dim, field)
} }
case schemapb.DataType_VarChar, schemapb.DataType_JSON: case schemapb.DataType_VarChar, schemapb.DataType_JSON, schemapb.DataType_Geometry:
if elementType != schemapb.DataType_VarChar { if elementType != schemapb.DataType_VarChar {
return wrapElementTypeError(elementType, field) return wrapElementTypeError(elementType, field)
} }

View File

@ -25,9 +25,6 @@ import (
"github.com/apache/arrow/go/v17/parquet/pqarrow" "github.com/apache/arrow/go/v17/parquet/pqarrow"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/samber/lo" "github.com/samber/lo"
"github.com/twpayne/go-geom/encoding/wkb"
"github.com/twpayne/go-geom/encoding/wkbcommon"
"github.com/twpayne/go-geom/encoding/wkt"
"golang.org/x/exp/constraints" "golang.org/x/exp/constraints"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
@ -35,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/internal/util/importutilv2/common"
"github.com/milvus-io/milvus/internal/util/nullutil" "github.com/milvus-io/milvus/internal/util/nullutil"
pkgcommon "github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/parameterutil" "github.com/milvus-io/milvus/pkg/v2/util/parameterutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@ -601,7 +599,7 @@ func ReadNullableStringData(pcr *FieldReader, count int64, isVarcharField bool)
if len(data) == 0 { if len(data) == 0 {
return nil, nil, nil return nil, nil, nil
} }
if isVarcharField && pcr.field.GetDefaultValue() != nil { if pcr.field.GetDefaultValue() != nil {
defaultValue := pcr.field.GetDefaultValue().GetStringData() defaultValue := pcr.field.GetDefaultValue().GetStringData()
return fillWithDefaultValueImpl(data, defaultValue, validData, pcr.field) return fillWithDefaultValueImpl(data, defaultValue, validData, pcr.field)
} }
@ -683,11 +681,7 @@ func ReadNullableGeometryData(pcr *FieldReader, count int64) (any, []bool, error
wkbValues = append(wkbValues, []byte(nil)) wkbValues = append(wkbValues, []byte(nil))
continue continue
} }
geomT, err := wkt.Unmarshal(wktValue) wkbValue, err := pkgcommon.ConvertWKTToWKB(wktValue)
if err != nil {
return nil, nil, err
}
wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -708,11 +702,7 @@ func ReadGeometryData(pcr *FieldReader, count int64) (any, error) {
wkbValues := make([][]byte, 0) wkbValues := make([][]byte, 0)
for _, wktValue := range data.([]string) { for _, wktValue := range data.([]string) {
geomT, err := wkt.Unmarshal(wktValue) wkbValue, err := pkgcommon.ConvertWKTToWKB(wktValue)
if err != nil {
return nil, err
}
wkbValue, err := wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
) )
@ -57,6 +58,8 @@ func GetDefaultValue(field *schemapb.FieldSchema) (any, error) {
return field.GetDefaultValue().GetTimestamptzData(), nil return field.GetDefaultValue().GetTimestamptzData(), nil
case schemapb.DataType_String, schemapb.DataType_VarChar: case schemapb.DataType_String, schemapb.DataType_VarChar:
return field.GetDefaultValue().GetStringData(), nil return field.GetDefaultValue().GetStringData(), nil
case schemapb.DataType_Geometry:
return common.ConvertWKTToWKB(field.GetDefaultValue().GetStringData())
default: default:
msg := fmt.Sprintf("type (%s) not support default_value", field.GetDataType().String()) msg := fmt.Sprintf("type (%s) not support default_value", field.GetDataType().String())
return nil, merr.WrapErrParameterInvalidMsg(msg) return nil, merr.WrapErrParameterInvalidMsg(msg)

View File

@ -25,6 +25,9 @@ import (
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/samber/lo" "github.com/samber/lo"
"github.com/twpayne/go-geom/encoding/wkb"
"github.com/twpayne/go-geom/encoding/wkbcommon"
"github.com/twpayne/go-geom/encoding/wkt"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -585,3 +588,11 @@ func CheckNamespace(schema *schemapb.CollectionSchema, namespace *string) error
} }
return nil return nil
} }
func ConvertWKTToWKB(wktStr string) ([]byte, error) {
geomT, err := wkt.Unmarshal(wktStr)
if err != nil {
return nil, err
}
return wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
}