From 490a618c30d3bf8a6160b66a9bb145b0f9f4c4d4 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Wed, 5 Nov 2025 15:05:33 +0800 Subject: [PATCH] fix: Handle timestamptz import errors (#45287) issue: https://github.com/milvus-io/milvus/issues/44585 Signed-off-by: zhenshan.cao --- internal/rootcoord/create_collection_task.go | 2 +- internal/util/importutilv2/common/util.go | 18 +++ internal/util/importutilv2/csv/row_parser.go | 12 +- internal/util/importutilv2/json/row_parser.go | 37 +++-- .../util/importutilv2/numpy/field_reader.go | 20 ++- .../importutilv2/numpy/field_reader_test.go | 25 +++- internal/util/importutilv2/numpy/reader.go | 4 +- .../util/importutilv2/parquet/field_reader.go | 127 ++++++++++++++++-- .../importutilv2/parquet/field_reader_test.go | 3 +- internal/util/importutilv2/parquet/util.go | 11 +- pkg/common/common.go | 6 +- 11 files changed, 211 insertions(+), 54 deletions(-) diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go index 9855daa493..25243ca8b5 100644 --- a/internal/rootcoord/create_collection_task.go +++ b/internal/rootcoord/create_collection_task.go @@ -553,7 +553,7 @@ func (t *createCollectionTask) Prepare(ctx context.Context) error { if !ok2 { dbTz = common.DefaultTimezone } - timezoneKV := &commonpb.KeyValuePair{Key: common.CollectionDefaultTimezone, Value: dbTz} + timezoneKV := &commonpb.KeyValuePair{Key: common.TimezoneKey, Value: dbTz} t.Req.Properties = append(properties, timezoneKV) } diff --git a/internal/util/importutilv2/common/util.go b/internal/util/importutilv2/common/util.go index 5edac6aa2c..02e395ff59 100644 --- a/internal/util/importutilv2/common/util.go +++ b/internal/util/importutilv2/common/util.go @@ -22,6 +22,8 @@ import ( "unicode/utf8" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -105,3 +107,19 @@ func CheckValidString(s string, maxLength int64, field *schemapb.FieldSchema) er } return nil } + +// GetSchemaTimezone retrieves the timezone string from the CollectionSchema's properties. +// It falls back to common.DefaultTimezone if the key is not found or the value is empty. +func GetSchemaTimezone(schema *schemapb.CollectionSchema) string { + // 1. Attempt to retrieve the timezone value from the schema's properties. + // We assume funcutil.TryGetAttrByKeyFromRepeatedKV returns the value and a boolean indicating existence. + // If the key is not found, the returned timezone string will be the zero value (""). + timezone, _ := funcutil.TryGetAttrByKeyFromRepeatedKV(common.TimezoneKey, schema.GetProperties()) + + // 2. If the retrieved value is empty, use the system default timezone. + if timezone == "" { + timezone = common.DefaultTimezone + } + + return timezone +} diff --git a/internal/util/importutilv2/csv/row_parser.go b/internal/util/importutilv2/csv/row_parser.go index 6f438ba17a..fcb4550561 100644 --- a/internal/util/importutilv2/csv/row_parser.go +++ b/internal/util/importutilv2/csv/row_parser.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/internal/util/nullutil" pkgcommon "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/parameterutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" @@ -46,6 +47,8 @@ type rowParser struct { pkField *schemapb.FieldSchema dynamicField *schemapb.FieldSchema allowInsertAutoID bool + + timezone string } func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey string) (RowParser, error) { @@ -140,6 +143,7 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st pkField: pkField, dynamicField: dynamicField, allowInsertAutoID: allowInsertAutoID, + timezone: common.GetSchemaTimezone(schema), }, nil } @@ -364,7 +368,7 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string, useElem return 0, r.wrapTypeError(obj, field) } return int32(num), nil - case schemapb.DataType_Int64, schemapb.DataType_Timestamptz: + case schemapb.DataType_Int64: num, err := strconv.ParseInt(obj, 10, 64) if err != nil { return 0, r.wrapTypeError(obj, field) @@ -414,6 +418,12 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string, useElem return nil, r.wrapTypeError(obj, field) } return wkbValue, nil + case schemapb.DataType_Timestamptz: + tz, err := funcutil.ValidateAndReturnUnixMicroTz(obj, r.timezone) + if err != nil { + return nil, err + } + return tz, nil case schemapb.DataType_FloatVector: var vec []float32 err := json.Unmarshal([]byte(obj), &vec) diff --git a/internal/util/importutilv2/json/row_parser.go b/internal/util/importutilv2/json/row_parser.go index f18fd6f273..b5585372b2 100644 --- a/internal/util/importutilv2/json/row_parser.go +++ b/internal/util/importutilv2/json/row_parser.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/internal/util/nullutil" pkgcommon "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/parameterutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" @@ -46,6 +47,8 @@ type rowParser struct { structArrays map[string]interface{} allowInsertAutoID bool + + timezone string } func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) { @@ -102,6 +105,7 @@ func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) { functionOutputFields: functionOutputFields, structArrays: sturctArrays, allowInsertAutoID: allowInsertAutoID, + timezone: common.GetSchemaTimezone(schema), }, nil } @@ -371,7 +375,7 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) { return nil, err } return int32(num), nil - case schemapb.DataType_Int64, schemapb.DataType_Timestamptz: + case schemapb.DataType_Int64: value, ok := obj.(json.Number) if !ok { return nil, r.wrapTypeError(obj, fieldID) @@ -561,8 +565,17 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) { if err != nil { return nil, r.wrapTypeError(wktValue, fieldID) } - return wkbValue, nil + case schemapb.DataType_Timestamptz: + strValue, ok := obj.(string) + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } + tz, err := funcutil.ValidateAndReturnUnixMicroTz(strValue, r.timezone) + if err != nil { + return nil, err + } + return tz, nil case schemapb.DataType_Array: arr, ok := obj.([]interface{}) if !ok { @@ -710,26 +723,6 @@ func (r *rowParser) arrayToFieldData(arr []interface{}, field *schemapb.FieldSch }, }, }, nil - case schemapb.DataType_Timestamptz: - values := make([]int64, len(arr)) - for i, v := range arr { - value, ok := v.(json.Number) - if !ok { - return nil, r.wrapArrayValueTypeError(arr, eleType) - } - num, err := strconv.ParseInt(value.String(), 0, 64) - if err != nil { - return nil, fmt.Errorf("failed to parse int64: %w", err) - } - values[i] = num - } - return &schemapb.ScalarField{ - Data: &schemapb.ScalarField_TimestamptzData{ - TimestamptzData: &schemapb.TimestamptzArray{ - Data: values, - }, - }, - }, nil case schemapb.DataType_VarChar, schemapb.DataType_String: values := make([]string, len(arr)) for i, v := range arr { diff --git a/internal/util/importutilv2/numpy/field_reader.go b/internal/util/importutilv2/numpy/field_reader.go index 7cc31cba39..61285931c1 100644 --- a/internal/util/importutilv2/numpy/field_reader.go +++ b/internal/util/importutilv2/numpy/field_reader.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/util/importutilv2/common" pkgcommon "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/parameterutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" @@ -44,10 +45,13 @@ type FieldReader struct { dim int64 field *schemapb.FieldSchema + // timezone is the collection's default timezone + timezone string + readPosition int } -func NewFieldReader(reader io.Reader, field *schemapb.FieldSchema) (*FieldReader, error) { +func NewFieldReader(reader io.Reader, field *schemapb.FieldSchema, timezone string) (*FieldReader, error) { r, err := npyio.NewReader(reader) if err != nil { return nil, err @@ -72,6 +76,7 @@ func NewFieldReader(reader io.Reader, field *schemapb.FieldSchema) (*FieldReader npyReader: r, dim: dim, field: field, + timezone: timezone, } cr.setByteOrder() return cr, nil @@ -180,10 +185,21 @@ func (c *FieldReader) Next(count int64) (any, any, error) { } c.readPosition += int(readCount) case schemapb.DataType_Timestamptz: - data, err = ReadN[int64](c.reader, c.order, readCount) + var strs []string + strs, err = c.ReadString(readCount) if err != nil { return nil, nil, err } + int64Ts := make([]int64, 0, len(strs)) + for _, strValue := range strs { + tz, err := funcutil.ValidateAndReturnUnixMicroTz(strValue, c.timezone) + if err != nil { + return nil, nil, err + } + int64Ts = append(int64Ts, tz) + } + data = int64Ts + c.readPosition += int(readCount) case schemapb.DataType_VarChar: data, err = c.ReadString(readCount) c.readPosition += int(readCount) diff --git a/internal/util/importutilv2/numpy/field_reader_test.go b/internal/util/importutilv2/numpy/field_reader_test.go index f273c73ced..c8077d05b8 100644 --- a/internal/util/importutilv2/numpy/field_reader_test.go +++ b/internal/util/importutilv2/numpy/field_reader_test.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/util/testutil" + "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/util/merr" ) @@ -110,7 +111,7 @@ func TestInvalidUTF8(t *testing.T) { } reader := WriteNonUTF8Npy() - fr, err := NewFieldReader(reader, fieldSchema) + fr, err := NewFieldReader(reader, fieldSchema, common.DefaultTimezone) assert.NoError(t, err) _, _, err = fr.Next(int64(6)) @@ -170,7 +171,7 @@ func TestNormalRead(t *testing.T) { } reader := WriteNormalNpy() - fr, err := NewFieldReader(reader, fieldSchema) + fr, err := NewFieldReader(reader, fieldSchema, common.DefaultTimezone) assert.NoError(t, err) data, validData, err := fr.Next(int64(6)) @@ -215,13 +216,14 @@ func TestNumpyFieldReaderError(t *testing.T) { Name: "str", DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: "max_length", Value: "256"}}, - }) + }, common.DefaultTimezone) assert.Error(t, err) // read values error tests := []struct { name string fieldSchema *schemapb.FieldSchema + timezone string }{ { name: "read bool error", @@ -304,6 +306,15 @@ func TestNumpyFieldReaderError(t *testing.T) { DataType: schemapb.DataType_Geometry, }, }, + { + name: "read geometry error", + fieldSchema: &schemapb.FieldSchema{ + FieldID: 100, + Name: "geometry", + DataType: schemapb.DataType_Geometry, + }, + timezone: "Asia/Shanghai", + }, } for _, tt := range tests { @@ -316,7 +327,11 @@ func TestNumpyFieldReaderError(t *testing.T) { fieldData := insertData.Data[tt.fieldSchema.FieldID] reader, err := createReader(fieldData, tt.fieldSchema.DataType) assert.NoError(t, err) - fieldReader, err := NewFieldReader(reader, tt.fieldSchema) + tz := common.DefaultTimezone + if len(tt.timezone) > 0 { + tz = tt.timezone + } + fieldReader, err := NewFieldReader(reader, tt.fieldSchema, tz) assert.NoError(t, err) fieldReader.reader = errReader @@ -457,7 +472,7 @@ func TestNumpyValidateHeaderError(t *testing.T) { schema.Fields[0].DataType = tt.fieldType schema.Fields[0].TypeParams[1].Value = "32" - fieldReader, err := NewFieldReader(reader, schema.Fields[0]) + fieldReader, err := NewFieldReader(reader, schema.Fields[0], common.DefaultTimezone) assert.Error(t, err) assert.Nil(t, fieldReader) }) diff --git a/internal/util/importutilv2/numpy/reader.go b/internal/util/importutilv2/numpy/reader.go index 5ef4052616..1b4151190d 100644 --- a/internal/util/importutilv2/numpy/reader.go +++ b/internal/util/importutilv2/numpy/reader.go @@ -54,9 +54,9 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co if err != nil { return nil, err } - + timezone := common.GetSchemaTimezone(schema) for fieldID, r := range readers { - cr, err := NewFieldReader(r, fields[fieldID]) + cr, err := NewFieldReader(r, fields[fieldID], timezone) if err != nil { return nil, err } diff --git a/internal/util/importutilv2/parquet/field_reader.go b/internal/util/importutilv2/parquet/field_reader.go index a956b98b49..0228596aa6 100644 --- a/internal/util/importutilv2/parquet/field_reader.go +++ b/internal/util/importutilv2/parquet/field_reader.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/internal/util/nullutil" pkgcommon "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/parameterutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" @@ -46,11 +47,14 @@ type FieldReader struct { field *schemapb.FieldSchema sparseIsString bool + // timezone is the collection's default timezone + timezone string + // structReader is non-nil when Struct Array field exists structReader *StructFieldReader } -func NewFieldReader(ctx context.Context, reader *pqarrow.FileReader, columnIndex int, field *schemapb.FieldSchema) (*FieldReader, error) { +func NewFieldReader(ctx context.Context, reader *pqarrow.FileReader, columnIndex int, field *schemapb.FieldSchema, timezone string) (*FieldReader, error) { columnReader, err := reader.GetColumn(ctx, columnIndex) if err != nil { return nil, err @@ -77,6 +81,7 @@ func NewFieldReader(ctx context.Context, reader *pqarrow.FileReader, columnIndex dim: int(dim), field: field, sparseIsString: sparseIsString, + timezone: timezone, } return cr, nil } @@ -112,7 +117,7 @@ func (c *FieldReader) Next(count int64) (any, any, error) { } data, err := ReadIntegerOrFloatData[int32](c, count) return data, nil, err - case schemapb.DataType_Int64, schemapb.DataType_Timestamptz: + case schemapb.DataType_Int64: if c.field.GetNullable() || c.field.GetDefaultValue() != nil { return ReadNullableIntegerOrFloatData[int64](c, count) } @@ -158,7 +163,7 @@ func (c *FieldReader) Next(count int64) (any, any, error) { return data, nil, typeutil.VerifyFloats64(data.([]float64)) case schemapb.DataType_VarChar, schemapb.DataType_String: if c.field.GetNullable() || c.field.GetDefaultValue() != nil { - return ReadNullableStringData(c, count, true) + return ReadNullableStringData(c, count) } data, err := ReadStringData(c, count, true) return data, nil, err @@ -170,11 +175,17 @@ func (c *FieldReader) Next(count int64) (any, any, error) { data, err := ReadJSONData(c, count) return data, nil, err case schemapb.DataType_Geometry: - if c.field.GetNullable() { + if c.field.GetNullable() || c.field.GetDefaultValue() != nil { return ReadNullableGeometryData(c, count) } data, err := ReadGeometryData(c, count) return data, nil, err + case schemapb.DataType_Timestamptz: + if c.field.GetNullable() || c.field.GetDefaultValue() != nil { + return ReadNullableTimestamptzData(c, count) + } + data, err := ReadTimestamptzData(c, count) + return data, nil, err case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: // vector not support default_value if c.field.GetNullable() { @@ -551,14 +562,19 @@ func ReadStringData(pcr *FieldReader, count int64, isVarcharField bool) (any, er return data, nil } -func ReadNullableStringData(pcr *FieldReader, count int64, isVarcharField bool) (any, []bool, error) { +// readRawStringDataFromParquet handles the low-level logic of reading string chunks +// from the Parquet column, extracting data, validity mask, and performing VARCHAR length checks. +// It returns the raw string data and the corresponding validity mask. +func readRawStringDataFromParquet(pcr *FieldReader, count int64) ([]string, []bool, error) { chunked, err := pcr.columnReader.NextBatch(count) if err != nil { return nil, nil, err } + dataType := pcr.field.GetDataType() data := make([]string, 0, count) validData := make([]bool, 0, count) var maxLength int64 + isVarcharField := typeutil.IsStringType(dataType) if isVarcharField { maxLength, err = parameterutil.GetMaxLength(pcr.field) if err != nil { @@ -599,10 +615,26 @@ func ReadNullableStringData(pcr *FieldReader, count int64, isVarcharField bool) if len(data) == 0 { return nil, nil, nil } + return data, validData, nil +} + +func ReadNullableStringData(pcr *FieldReader, count int64) (any, []bool, error) { + // Delegate I/O, Arrow iteration, and VARCHAR validation to the helper function. + data, validData, err := readRawStringDataFromParquet(pcr, count) + if err != nil { + return nil, nil, err + } + if data == nil { + return nil, nil, nil + } if pcr.field.GetDefaultValue() != nil { + // Fill default values for standard string fields (VARCHAR, String, Geometry). defaultValue := pcr.field.GetDefaultValue().GetStringData() + // Assuming fillWithDefaultValueImpl is available return fillWithDefaultValueImpl(data, defaultValue, validData, pcr.field) } + + // Return raw data for convertible types or non-defaulted fields. return data, validData, nil } @@ -636,7 +668,7 @@ func ReadJSONData(pcr *FieldReader, count int64) (any, error) { func ReadNullableJSONData(pcr *FieldReader, count int64) (any, []bool, error) { // JSON field read data from string array Parquet - data, validData, err := ReadNullableStringData(pcr, count, false) + data, validData, err := readRawStringDataFromParquet(pcr, count) if err != nil { return nil, nil, err } @@ -644,9 +676,10 @@ func ReadNullableJSONData(pcr *FieldReader, count int64) (any, []bool, error) { return nil, nil, nil } byteArr := make([][]byte, 0) - for i, str := range data.([]string) { + defaultValue := []byte(nil) + for i, str := range data { if !validData[i] { - byteArr = append(byteArr, []byte(nil)) + byteArr = append(byteArr, defaultValue) continue } var dummy interface{} @@ -666,9 +699,76 @@ func ReadNullableJSONData(pcr *FieldReader, count int64) (any, []bool, error) { return byteArr, validData, nil } +// ReadNullableTimestamptzData reads Timestamptz data from the Parquet column, +// handling nullability by parsing the time string and converting it to the internal int64 format. +func ReadNullableTimestamptzData(pcr *FieldReader, count int64) (any, []bool, error) { + // 1. Read the raw data as strings from the underlying Parquet column. + // This is because Timestamptz data is initially stored as strings in the insertion layer + // or represented as strings in the Parquet file for ease of parsing/validation. + data, validData, err := readRawStringDataFromParquet(pcr, count) + if err != nil { + return nil, nil, err + } + // If no data was read (e.g., end of file), return nil. + if data == nil { + return nil, nil, nil + } + + // 2. Initialize the target array for internal int64 timestamps (UTC microseconds). + int64Ts := make([]int64, 0, len(data)) + defaultValue := pcr.field.GetDefaultValue().GetTimestamptzData() + + // 3. Iterate over the string array and convert each timestamp. + for i, strValue := range data { + // Check the validity mask: If it's null, append the zero value (0) and continue. + if !validData[i] { + int64Ts = append(int64Ts, defaultValue) + continue + } + + // Convert the ISO 8601 string to int64 (UTC microseconds). + // The pcr.timezone is used as the default timezone if the string (strValue) + // does not contain an explicit UTC offset (e.g., "+08:00"). + tz, err := funcutil.ValidateAndReturnUnixMicroTz(strValue, pcr.timezone) + if err != nil { + return nil, nil, err + } + int64Ts = append(int64Ts, tz) + } + return int64Ts, validData, nil +} + +// ReadTimestamptzData reads non-nullable Timestamptz data from the Parquet column. +// It assumes all values are present (non-null) and converts them to the internal int64 format. +func ReadTimestamptzData(pcr *FieldReader, count int64) (any, error) { + // Read the raw data as strings. Since this is a non-nullable field, we use ReadStringData. + data, err := ReadStringData(pcr, count, false) + if err != nil { + return nil, err + } + // If no data was read (e.g., end of file), return nil. + if data == nil { + return nil, nil + } + + int64Ts := make([]int64, 0, len(data.([]string))) + for _, strValue := range data.([]string) { + // Convert the ISO 8601 string to int64 (UTC microseconds). + // The pcr.timezone is used as the default if the string lacks an explicit offset. + tz, err := funcutil.ValidateAndReturnUnixMicroTz(strValue, pcr.timezone) + if err != nil { + return nil, err + } + int64Ts = append(int64Ts, tz) + } + + // Return the converted int64 array. + return int64Ts, nil +} + func ReadNullableGeometryData(pcr *FieldReader, count int64) (any, []bool, error) { // Geometry field read data from string array Parquet - data, validData, err := ReadNullableStringData(pcr, count, false) + data, validData, err := readRawStringDataFromParquet(pcr, count) if err != nil { return nil, nil, err } @@ -676,9 +776,14 @@ func ReadNullableGeometryData(pcr *FieldReader, count int64) (any, []bool, error return nil, nil, nil } wkbValues := make([][]byte, 0) - for i, wktValue := range data.([]string) { + defaultValueStr := pcr.field.GetDefaultValue().GetStringData() + defaultValue := []byte(nil) + if defaultValueStr != "" { + defaultValue, _ = pkgcommon.ConvertWKTToWKB(defaultValueStr) + } + for i, wktValue := range data { if !validData[i] { - wkbValues = append(wkbValues, []byte(nil)) + wkbValues = append(wkbValues, defaultValue) continue } wkbValue, err := pkgcommon.ConvertWKTToWKB(wktValue) diff --git a/internal/util/importutilv2/parquet/field_reader_test.go b/internal/util/importutilv2/parquet/field_reader_test.go index 6316b4a9c9..d518ab2e3f 100644 --- a/internal/util/importutilv2/parquet/field_reader_test.go +++ b/internal/util/importutilv2/parquet/field_reader_test.go @@ -20,6 +20,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/testutil" + "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/objectstorage" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -578,7 +579,7 @@ func TestTypeMismatch(t *testing.T) { } fileReader, err := pqarrow.NewFileReader(reader, readProps, memory.DefaultAllocator) assert.NoError(t, err) - columnReader, err := NewFieldReader(ctx, fileReader, 0, schema.Fields[0]) + columnReader, err := NewFieldReader(ctx, fileReader, 0, schema.Fields[0], common.DefaultTimezone) assert.NoError(t, err) _, _, err = columnReader.Next(int64(rowCount)) diff --git a/internal/util/importutilv2/parquet/util.go b/internal/util/importutilv2/parquet/util.go index 3ebe082b5f..5dae2cdd6e 100644 --- a/internal/util/importutilv2/parquet/util.go +++ b/internal/util/importutilv2/parquet/util.go @@ -26,6 +26,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + common2 "github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/util/merr" @@ -135,18 +136,18 @@ func CreateFieldReaders(ctx context.Context, fileReader *pqarrow.FileReader, sch continue } - // auto-id field must not provided + // auto-id field must not be provided if typeutil.IsAutoPKField(field) && !allowInsertAutoID { return nil, merr.WrapErrImportFailed( fmt.Sprintf("the primary key '%s' is auto-generated, no need to provide", field.GetName())) } - // function output field must not provided + // function output field must not be provided if field.GetIsFunctionOutput() { return nil, merr.WrapErrImportFailed( fmt.Sprintf("the field '%s' is output by function, no need to provide", field.GetName())) } - cr, err := NewFieldReader(ctx, fileReader, i, field) + cr, err := NewFieldReader(ctx, fileReader, i, field, common2.GetSchemaTimezone(schema)) if err != nil { return nil, err } @@ -352,13 +353,13 @@ func convertToArrowDataType(field *schemapb.FieldSchema, isArray bool) (arrow.Da return &arrow.Int16Type{}, nil case schemapb.DataType_Int32: return &arrow.Int32Type{}, nil - case schemapb.DataType_Int64, schemapb.DataType_Timestamptz: + case schemapb.DataType_Int64: return &arrow.Int64Type{}, nil case schemapb.DataType_Float: return &arrow.Float32Type{}, nil case schemapb.DataType_Double: return &arrow.Float64Type{}, nil - case schemapb.DataType_VarChar, schemapb.DataType_String: + case schemapb.DataType_VarChar, schemapb.DataType_String, schemapb.DataType_Timestamptz: return &arrow.StringType{}, nil case schemapb.DataType_JSON: return &arrow.StringType{}, nil diff --git a/pkg/common/common.go b/pkg/common/common.go index 3b0db81c96..827aedcccc 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -248,10 +248,8 @@ const ( NamespaceEnabledKey = "namespace.enabled" // timezone releated - TimezoneKey = "timezone" - DatabaseDefaultTimezone = "database.timezone" - CollectionDefaultTimezone = "collection.timezone" - AllowInsertAutoIDKey = "allow_insert_auto_id" + TimezoneKey = "timezone" + AllowInsertAutoIDKey = "allow_insert_auto_id" ) const (