diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index 477127717a..04518c20ba 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -567,6 +567,33 @@ func ToCompressedFormatNullable(field *schemapb.FieldData) error { } sd.ArrayData.Data = ret } + case *schemapb.ScalarField_TimestamptzData: + validRowNum := getValidNumber(field.GetValidData()) + if validRowNum == 0 { + sd.TimestamptzData.Data = make([]int64, 0) + } else { + ret := make([]int64, 0, validRowNum) + for i, valid := range field.GetValidData() { + if valid { + ret = append(ret, sd.TimestamptzData.Data[i]) + } + } + sd.TimestamptzData.Data = ret + } + + case *schemapb.ScalarField_GeometryWktData: + validRowNum := getValidNumber(field.GetValidData()) + if validRowNum == 0 { + sd.GeometryWktData.Data = make([]string, 0) + } else { + ret := make([]string, 0, validRowNum) + for i, valid := range field.GetValidData() { + if valid { + ret = append(ret, sd.GeometryWktData.Data[i]) + } + } + sd.GeometryWktData.Data = ret + } case *schemapb.ScalarField_GeometryData: validRowNum := getValidNumber(field.GetValidData()) @@ -740,6 +767,42 @@ func GenNullableFieldData(field *schemapb.FieldSchema, upsertIDSize int) (*schem }, }, nil + case schemapb.DataType_Timestamptz: + return &schemapb.FieldData{ + FieldId: field.FieldID, + FieldName: field.Name, + Type: field.DataType, + IsDynamic: field.IsDynamic, + ValidData: make([]bool, upsertIDSize), + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_TimestamptzData{ + TimestamptzData: &schemapb.TimestamptzArray{ + Data: make([]int64, upsertIDSize), + }, + }, + }, + }, + }, nil + + // the intput data of geometry field is in wkt format + case schemapb.DataType_Geometry: + return &schemapb.FieldData{ + FieldId: field.FieldID, + FieldName: field.Name, + Type: field.DataType, + IsDynamic: field.IsDynamic, + ValidData: make([]bool, upsertIDSize), + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_GeometryWktData{ + GeometryWktData: &schemapb.GeometryWktArray{ + Data: make([]string, upsertIDSize), + }, + }, + }, + }, + }, nil default: return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("undefined scalar data type:%s", field.DataType.String())) } diff --git a/internal/proxy/task_upsert_test.go b/internal/proxy/task_upsert_test.go index c8c4e22351..bda86943f2 100644 --- a/internal/proxy/task_upsert_test.go +++ b/internal/proxy/task_upsert_test.go @@ -1356,3 +1356,111 @@ func TestUpsertTask_queryPreExecute_PureUpdate(t *testing.T) { assert.NotNil(t, valueField) assert.Equal(t, []int32{600, 700}, valueField.GetScalars().GetIntData().GetData()) } + +// Test ToCompressedFormatNullable for Geometry and Timestamptz types +func TestToCompressedFormatNullable_GeometryAndTimestamptz(t *testing.T) { + t.Run("timestamptz with null values", func(t *testing.T) { + field := &schemapb.FieldData{ + Type: schemapb.DataType_Timestamptz, + FieldName: "timestamp_field", + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_TimestamptzData{ + TimestamptzData: &schemapb.TimestamptzArray{ + Data: []int64{1000, 0, 3000, 0}, + }, + }, + }, + }, + ValidData: []bool{true, false, true, false}, + } + + err := ToCompressedFormatNullable(field) + assert.NoError(t, err) + assert.Equal(t, []int64{1000, 3000}, field.GetScalars().GetTimestamptzData().GetData()) + assert.Equal(t, []bool{true, false, true, false}, field.ValidData) + }) + + t.Run("geometry WKT with null values", func(t *testing.T) { + field := &schemapb.FieldData{ + Type: schemapb.DataType_Geometry, + FieldName: "geometry_field", + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_GeometryWktData{ + GeometryWktData: &schemapb.GeometryWktArray{ + Data: []string{"POINT (1 2)", "", "POINT (5 6)"}, + }, + }, + }, + }, + ValidData: []bool{true, false, true}, + } + + err := ToCompressedFormatNullable(field) + assert.NoError(t, err) + assert.Equal(t, []string{"POINT (1 2)", "POINT (5 6)"}, field.GetScalars().GetGeometryWktData().GetData()) + }) + + t.Run("geometry WKB with null values", func(t *testing.T) { + field := &schemapb.FieldData{ + Type: schemapb.DataType_Geometry, + FieldName: "geometry_field", + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_GeometryData{ + GeometryData: &schemapb.GeometryArray{ + Data: [][]byte{{0x01, 0x02}, nil, {0x05, 0x06}}, + }, + }, + }, + }, + ValidData: []bool{true, false, true}, + } + + err := ToCompressedFormatNullable(field) + assert.NoError(t, err) + assert.Equal(t, [][]byte{{0x01, 0x02}, {0x05, 0x06}}, field.GetScalars().GetGeometryData().GetData()) + }) +} + +// Test GenNullableFieldData for Geometry and Timestamptz types +func TestGenNullableFieldData_GeometryAndTimestamptz(t *testing.T) { + t.Run("generate timestamptz nullable field", func(t *testing.T) { + field := &schemapb.FieldSchema{ + FieldID: 100, + Name: "timestamp_field", + DataType: schemapb.DataType_Timestamptz, + IsDynamic: false, + } + + upsertIDSize := 5 + fieldData, err := GenNullableFieldData(field, upsertIDSize) + + assert.NoError(t, err) + assert.NotNil(t, fieldData) + assert.Equal(t, int64(100), fieldData.FieldId) + assert.Equal(t, "timestamp_field", fieldData.FieldName) + assert.Len(t, fieldData.ValidData, upsertIDSize) + assert.Len(t, fieldData.GetScalars().GetTimestamptzData().GetData(), upsertIDSize) + }) + + t.Run("generate geometry nullable field", func(t *testing.T) { + field := &schemapb.FieldSchema{ + FieldID: 101, + Name: "geometry_field", + DataType: schemapb.DataType_Geometry, + IsDynamic: false, + } + + upsertIDSize := 3 + fieldData, err := GenNullableFieldData(field, upsertIDSize) + + assert.NoError(t, err) + assert.NotNil(t, fieldData) + assert.Equal(t, int64(101), fieldData.FieldId) + assert.Equal(t, "geometry_field", fieldData.FieldName) + assert.Len(t, fieldData.ValidData, upsertIDSize) + assert.Len(t, fieldData.GetScalars().GetGeometryWktData().GetData(), upsertIDSize) + }) +} diff --git a/internal/proxy/validate_util.go b/internal/proxy/validate_util.go index 8537c33344..2be52b9132 100644 --- a/internal/proxy/validate_util.go +++ b/internal/proxy/validate_util.go @@ -484,11 +484,15 @@ func FillWithNullValue(field *schemapb.FieldData, fieldSchema *schemapb.FieldSch } case *schemapb.ScalarField_GeometryData: - if fieldSchema.GetNullable() { - sd.GeometryData.Data, err = fillWithNullValueImpl(sd.GeometryData.Data, field.GetValidData()) - if err != nil { - return err - } + sd.GeometryData.Data, err = fillWithNullValueImpl(sd.GeometryData.Data, field.GetValidData()) + if err != nil { + return err + } + + case *schemapb.ScalarField_GeometryWktData: + sd.GeometryWktData.Data, err = fillWithNullValueImpl(sd.GeometryWktData.Data, field.GetValidData()) + if err != nil { + return err } default: return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("undefined data type:%s", field.Type.String())) diff --git a/internal/proxy/validate_util_test.go b/internal/proxy/validate_util_test.go index 64b4923c2d..e70e6ff50c 100644 --- a/internal/proxy/validate_util_test.go +++ b/internal/proxy/validate_util_test.go @@ -7188,3 +7188,72 @@ func Test_validateUtil_checkAligned_ArrayOfVector(t *testing.T) { assert.Error(t, err) }) } + +// Test FillWithNullValue for Geometry fields +func TestFillWithNullValue_Geometry(t *testing.T) { + t.Run("geometry WKT with null values", func(t *testing.T) { + field := &schemapb.FieldData{ + Type: schemapb.DataType_Geometry, + FieldName: "geo_field", + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_GeometryWktData{ + GeometryWktData: &schemapb.GeometryWktArray{ + Data: []string{"POINT (1 2)", "POINT (3 4)"}, + }, + }, + }, + }, + ValidData: []bool{true, false, true, false}, + } + + fieldSchema := &schemapb.FieldSchema{ + Name: "geo_field", + DataType: schemapb.DataType_Geometry, + Nullable: true, + } + + numRows := 4 + err := FillWithNullValue(field, fieldSchema, numRows) + + assert.NoError(t, err) + assert.Len(t, field.GetScalars().GetGeometryWktData().GetData(), numRows) + assert.Equal(t, "POINT (1 2)", field.GetScalars().GetGeometryWktData().GetData()[0]) + assert.Equal(t, "", field.GetScalars().GetGeometryWktData().GetData()[1]) + assert.Equal(t, "POINT (3 4)", field.GetScalars().GetGeometryWktData().GetData()[2]) + assert.Equal(t, "", field.GetScalars().GetGeometryWktData().GetData()[3]) + }) + + t.Run("geometry WKB with null values", func(t *testing.T) { + field := &schemapb.FieldData{ + Type: schemapb.DataType_Geometry, + FieldName: "geo_field", + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_GeometryData{ + GeometryData: &schemapb.GeometryArray{ + Data: [][]byte{{0x01, 0x02}, {0x03, 0x04}}, + }, + }, + }, + }, + ValidData: []bool{true, false, true, false}, + } + + fieldSchema := &schemapb.FieldSchema{ + Name: "geo_field", + DataType: schemapb.DataType_Geometry, + Nullable: true, + } + + numRows := 4 + err := FillWithNullValue(field, fieldSchema, numRows) + + assert.NoError(t, err) + assert.Len(t, field.GetScalars().GetGeometryData().GetData(), numRows) + assert.Equal(t, []byte{0x01, 0x02}, field.GetScalars().GetGeometryData().GetData()[0]) + assert.Nil(t, field.GetScalars().GetGeometryData().GetData()[1]) + assert.Equal(t, []byte{0x03, 0x04}, field.GetScalars().GetGeometryData().GetData()[2]) + assert.Nil(t, field.GetScalars().GetGeometryData().GetData()[3]) + }) +} diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index 6d9cfebb5a..a7309c45d9 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -1273,6 +1273,27 @@ func UpdateFieldData(base, update []*schemapb.FieldData, baseIdx, updateIdx int6 baseScalar.GetJsonData().Data[baseIdx] = updateData.Data[updateIdx] } } + case *schemapb.ScalarField_TimestamptzData: + updateData := updateScalar.GetTimestamptzData() + baseData := baseScalar.GetTimestamptzData() + if updateData != nil && baseData != nil && + int(updateIdx) < len(updateData.Data) && int(baseIdx) < len(baseData.Data) { + baseData.Data[baseIdx] = updateData.Data[updateIdx] + } + case *schemapb.ScalarField_GeometryData: + updateData := updateScalar.GetGeometryData() + baseData := baseScalar.GetGeometryData() + if updateData != nil && baseData != nil && + int(updateIdx) < len(updateData.Data) && int(baseIdx) < len(baseData.Data) { + baseData.Data[baseIdx] = updateData.Data[updateIdx] + } + case *schemapb.ScalarField_GeometryWktData: + updateData := updateScalar.GetGeometryWktData() + baseData := baseScalar.GetGeometryWktData() + if updateData != nil && baseData != nil && + int(updateIdx) < len(updateData.Data) && int(baseIdx) < len(baseData.Data) { + baseData.Data[baseIdx] = updateData.Data[updateIdx] + } default: log.Error("Not supported scalar field type", zap.String("field type", baseFieldData.Type.String())) return fmt.Errorf("unsupported scalar field type: %s", baseFieldData.Type.String()) diff --git a/pkg/util/typeutil/schema_test.go b/pkg/util/typeutil/schema_test.go index 62d9c1f697..5bba8ba06d 100644 --- a/pkg/util/typeutil/schema_test.go +++ b/pkg/util/typeutil/schema_test.go @@ -4589,3 +4589,138 @@ func TestGetNeedProcessFunctions(t *testing.T) { assert.Equal(t, f[0].Name, "test_func") } } + +// Test UpdateFieldData for Geometry and Timestamptz types +func TestUpdateFieldData_GeometryAndTimestamptz(t *testing.T) { + t.Run("update timestamptz field", func(t *testing.T) { + baseData := []*schemapb.FieldData{ + { + Type: schemapb.DataType_Timestamptz, + FieldName: "timestamp_field", + FieldId: 100, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_TimestamptzData{ + TimestamptzData: &schemapb.TimestamptzArray{ + Data: []int64{1000, 2000, 3000}, + }, + }, + }, + }, + }, + } + + updateData := []*schemapb.FieldData{ + { + Type: schemapb.DataType_Timestamptz, + FieldName: "timestamp_field", + FieldId: 100, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_TimestamptzData{ + TimestamptzData: &schemapb.TimestamptzArray{ + Data: []int64{9999, 8888, 7777}, + }, + }, + }, + }, + }, + } + + err := UpdateFieldData(baseData, updateData, 1, 1) + assert.NoError(t, err) + + timestampData := baseData[0].GetScalars().GetTimestamptzData().GetData() + assert.Equal(t, int64(1000), timestampData[0]) + assert.Equal(t, int64(8888), timestampData[1]) + assert.Equal(t, int64(3000), timestampData[2]) + }) + + t.Run("update geometry WKT field", func(t *testing.T) { + baseData := []*schemapb.FieldData{ + { + Type: schemapb.DataType_Geometry, + FieldName: "geometry_field", + FieldId: 101, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_GeometryWktData{ + GeometryWktData: &schemapb.GeometryWktArray{ + Data: []string{"POINT (0 0)", "POINT (1 1)", "POINT (2 2)"}, + }, + }, + }, + }, + }, + } + + updateData := []*schemapb.FieldData{ + { + Type: schemapb.DataType_Geometry, + FieldName: "geometry_field", + FieldId: 101, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_GeometryWktData{ + GeometryWktData: &schemapb.GeometryWktArray{ + Data: []string{"POINT (10 10)", "POINT (11 11)", "POINT (12 12)"}, + }, + }, + }, + }, + }, + } + + err := UpdateFieldData(baseData, updateData, 1, 1) + assert.NoError(t, err) + + geoData := baseData[0].GetScalars().GetGeometryWktData().GetData() + assert.Equal(t, "POINT (0 0)", geoData[0]) + assert.Equal(t, "POINT (11 11)", geoData[1]) + assert.Equal(t, "POINT (2 2)", geoData[2]) + }) + + t.Run("update geometry WKB field", func(t *testing.T) { + baseData := []*schemapb.FieldData{ + { + Type: schemapb.DataType_Geometry, + FieldName: "geometry_field", + FieldId: 102, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_GeometryData{ + GeometryData: &schemapb.GeometryArray{ + Data: [][]byte{{0x01, 0x02}, {0x03, 0x04}, {0x05, 0x06}}, + }, + }, + }, + }, + }, + } + + updateData := []*schemapb.FieldData{ + { + Type: schemapb.DataType_Geometry, + FieldName: "geometry_field", + FieldId: 102, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_GeometryData{ + GeometryData: &schemapb.GeometryArray{ + Data: [][]byte{{0xFF, 0xFE}, {0xFD, 0xFC}, {0xFB, 0xFA}}, + }, + }, + }, + }, + }, + } + + err := UpdateFieldData(baseData, updateData, 2, 2) + assert.NoError(t, err) + + geoData := baseData[0].GetScalars().GetGeometryData().GetData() + assert.Equal(t, []byte{0x01, 0x02}, geoData[0]) + assert.Equal(t, []byte{0x03, 0x04}, geoData[1]) + assert.Equal(t, []byte{0xFB, 0xFA}, geoData[2]) + }) +}