diff --git a/internal/proxy/validate_util.go b/internal/proxy/validate_util.go index e0cbdb9611..33bc9c2b63 100644 --- a/internal/proxy/validate_util.go +++ b/internal/proxy/validate_util.go @@ -9,6 +9,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/nullutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -296,7 +297,7 @@ func (v *validateUtil) fillWithValue(data []*schemapb.FieldData, schema *typeuti } func (v *validateUtil) fillWithNullValue(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema, numRows int) error { - err := checkValidData(field, fieldSchema, numRows) + err := nullutil.CheckValidData(field.GetValidData(), fieldSchema, numRows) if err != nil { return err } @@ -491,7 +492,7 @@ func (v *validateUtil) fillWithDefaultValue(field *schemapb.FieldData, fieldSche } } - err = checkValidData(field, fieldSchema, numRows) + err = nullutil.CheckValidData(field.GetValidData(), fieldSchema, numRows) if err != nil { return err } @@ -499,19 +500,6 @@ func (v *validateUtil) fillWithDefaultValue(field *schemapb.FieldData, fieldSche return nil } -func checkValidData(data *schemapb.FieldData, schema *schemapb.FieldSchema, numRows int) error { - expectedNum := 0 - // if nullable, the length of ValidData is numRows - if schema.GetNullable() { - expectedNum = numRows - } - if len(data.GetValidData()) != expectedNum { - msg := fmt.Sprintf("the length of valid_data of field(%s) is wrong", data.GetFieldName()) - return merr.WrapErrParameterInvalid(expectedNum, len(data.GetValidData()), msg) - } - return nil -} - func fillWithNullValueImpl[T any](array []T, validData []bool) ([]T, error) { n := getValidNumber(validData) if len(array) != n { diff --git a/internal/util/importutilv2/json/reader_test.go b/internal/util/importutilv2/json/reader_test.go index db2257ab65..eebf518cec 100644 --- a/internal/util/importutilv2/json/reader_test.go +++ b/internal/util/importutilv2/json/reader_test.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/nullutil" "github.com/milvus-io/milvus/internal/util/testutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -160,6 +161,84 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data checkFn(res, 0, suite.numRows) } +func (suite *ReaderSuite) runWithDefaultValue(dataType schemapb.DataType, elemType schemapb.DataType) { + schema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + IsPrimaryKey: true, + DataType: suite.pkDataType, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxLengthKey, + Value: "128", + }, + }, + }, + { + FieldID: 101, + Name: "vec", + DataType: suite.vecDataType, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "8", + }, + }, + }, + }, + } + // here always set nullable==true just for test, insertData will store validData only nullable==true + // jsonBytes Marshal from rows, if expectInsertData is nulls and set default value + // actualData will be default_value + fieldSchema, err := testutil.CreateFieldWithDefaultValue(dataType, 102, true) + suite.NoError(err) + schema.Fields = append(schema.Fields, fieldSchema) + + insertData, err := testutil.CreateInsertData(schema, suite.numRows) + suite.NoError(err) + + rows, err := testutil.CreateInsertDataRowsForJSON(schema, insertData) + suite.NoError(err) + + jsonBytes, err := json.Marshal(rows) + suite.NoError(err) + + type mockReader struct { + io.Reader + io.Closer + io.ReaderAt + io.Seeker + } + cm := mocks.NewChunkManager(suite.T()) + cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) { + r := &mockReader{Reader: strings.NewReader(string(jsonBytes))} + return r, nil + }) + reader, err := NewReader(context.Background(), cm, schema, "mockPath", math.MaxInt) + suite.NoError(err) + + checkFn := func(actualInsertData *storage.InsertData, offsetBegin, expectRows int) { + expectInsertData := insertData + for fieldID, data := range actualInsertData.Data { + suite.Equal(expectRows, data.RowNum()) + for i := 0; i < expectRows; i++ { + expect := expectInsertData.Data[fieldID].GetRow(i + offsetBegin) + actual := data.GetRow(i) + if expect == nil { + expect, err = nullutil.GetDefaultValue(fieldSchema) + suite.NoError(err) + } + suite.Equal(expect, actual) + } + } + } + res, err := reader.Read() + suite.NoError(err) + checkFn(res, 0, suite.numRows) +} + func (suite *ReaderSuite) TestReadScalarFields() { suite.run(schemapb.DataType_Bool, schemapb.DataType_None, false) suite.run(schemapb.DataType_Int8, schemapb.DataType_None, false) @@ -202,6 +281,18 @@ func (suite *ReaderSuite) TestReadScalarFields() { suite.run(schemapb.DataType_Array, schemapb.DataType_String, true) } +func (suite *ReaderSuite) TestReadScalarFieldsWithDefaultValue() { + suite.runWithDefaultValue(schemapb.DataType_Bool, schemapb.DataType_None) + suite.runWithDefaultValue(schemapb.DataType_Int8, schemapb.DataType_None) + suite.runWithDefaultValue(schemapb.DataType_Int16, schemapb.DataType_None) + suite.runWithDefaultValue(schemapb.DataType_Int32, schemapb.DataType_None) + suite.runWithDefaultValue(schemapb.DataType_Int64, schemapb.DataType_None) + suite.runWithDefaultValue(schemapb.DataType_Float, schemapb.DataType_None) + suite.runWithDefaultValue(schemapb.DataType_Double, schemapb.DataType_None) + suite.runWithDefaultValue(schemapb.DataType_String, schemapb.DataType_None) + suite.runWithDefaultValue(schemapb.DataType_VarChar, schemapb.DataType_None) +} + func (suite *ReaderSuite) TestStringPK() { suite.pkDataType = schemapb.DataType_VarChar suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) diff --git a/internal/util/importutilv2/json/row_parser.go b/internal/util/importutilv2/json/row_parser.go index cb1e5b29f9..dde341aa7f 100644 --- a/internal/util/importutilv2/json/row_parser.go +++ b/internal/util/importutilv2/json/row_parser.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/util/importutilv2/common" + "github.com/milvus-io/milvus/internal/util/nullutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/parameterutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -195,6 +196,9 @@ func (r *rowParser) combineDynamicRow(dynamicValues map[string]any, row Row) err } func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) { + if r.id2Field[fieldID].GetDefaultValue() != nil && obj == nil { + return nullutil.GetDefaultValue(r.id2Field[fieldID]) + } if r.id2Field[fieldID].GetNullable() { return r.parseNullableEntity(fieldID, obj) } diff --git a/internal/util/importutilv2/parquet/field_reader.go b/internal/util/importutilv2/parquet/field_reader.go index e854522305..e291ba16d4 100644 --- a/internal/util/importutilv2/parquet/field_reader.go +++ b/internal/util/importutilv2/parquet/field_reader.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/importutilv2/common" + "github.com/milvus-io/milvus/internal/util/nullutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/parameterutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -69,37 +70,37 @@ func NewFieldReader(ctx context.Context, reader *pqarrow.FileReader, columnIndex func (c *FieldReader) Next(count int64) (any, any, error) { switch c.field.GetDataType() { case schemapb.DataType_Bool: - if c.field.GetNullable() { + if c.field.GetNullable() || c.field.GetDefaultValue() != nil { return ReadNullableBoolData(c, count) } data, err := ReadBoolData(c, count) return data, nil, err case schemapb.DataType_Int8: - if c.field.GetNullable() { + if c.field.GetNullable() || c.field.GetDefaultValue() != nil { return ReadNullableIntegerOrFloatData[int8](c, count) } data, err := ReadIntegerOrFloatData[int8](c, count) return data, nil, err case schemapb.DataType_Int16: - if c.field.GetNullable() { + if c.field.GetNullable() || c.field.GetDefaultValue() != nil { return ReadNullableIntegerOrFloatData[int16](c, count) } data, err := ReadIntegerOrFloatData[int16](c, count) return data, nil, err case schemapb.DataType_Int32: - if c.field.GetNullable() { + if c.field.GetNullable() || c.field.GetDefaultValue() != nil { return ReadNullableIntegerOrFloatData[int32](c, count) } data, err := ReadIntegerOrFloatData[int32](c, count) return data, nil, err case schemapb.DataType_Int64: - if c.field.GetNullable() { + if c.field.GetNullable() || c.field.GetDefaultValue() != nil { return ReadNullableIntegerOrFloatData[int64](c, count) } data, err := ReadIntegerOrFloatData[int64](c, count) return data, nil, err case schemapb.DataType_Float: - if c.field.GetNullable() { + if c.field.GetNullable() || c.field.GetDefaultValue() != nil { data, validData, err := ReadNullableIntegerOrFloatData[float32](c, count) if err != nil { return nil, nil, err @@ -118,7 +119,7 @@ func (c *FieldReader) Next(count int64) (any, any, error) { } return data, nil, typeutil.VerifyFloats32(data.([]float32)) case schemapb.DataType_Double: - if c.field.GetNullable() { + if c.field.GetNullable() || c.field.GetDefaultValue() != nil { data, validData, err := ReadNullableIntegerOrFloatData[float64](c, count) if err != nil { return nil, nil, err @@ -137,18 +138,20 @@ func (c *FieldReader) Next(count int64) (any, any, error) { } return data, nil, typeutil.VerifyFloats64(data.([]float64)) case schemapb.DataType_VarChar, schemapb.DataType_String: - if c.field.GetNullable() { + if c.field.GetNullable() || c.field.GetDefaultValue() != nil { return ReadNullableVarcharData(c, count) } data, err := ReadVarcharData(c, count) return data, nil, err case schemapb.DataType_JSON: + // json has not support default_value if c.field.GetNullable() { return ReadNullableJSONData(c, count) } data, err := ReadJSONData(c, count) return data, nil, err case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: + // vector not support default_value if c.field.GetNullable() { return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") } @@ -174,6 +177,7 @@ func (c *FieldReader) Next(count int64) (any, any, error) { data, err := ReadSparseFloatVectorData(c, count) return data, nil, err case schemapb.DataType_Array: + // array has not support default_value if c.field.GetNullable() { return ReadNullableArrayData(c, count) } @@ -212,6 +216,30 @@ func ReadBoolData(pcr *FieldReader, count int64) (any, error) { return data, nil } +func fillWithDefaultValueImpl[T any](array []T, value T, validData []bool, field *schemapb.FieldSchema) (any, []bool, error) { + rowNum := len(validData) + for i, v := range validData { + if !v { + array[i] = value + } + } + if !typeutil.IsVectorType(field.GetDataType()) { + if field.GetNullable() { + for i := range validData { + validData[i] = true + } + } else { + validData = []bool{} + } + } + + err := nullutil.CheckValidData(validData, field, rowNum) + if err != nil { + return nil, nil, err + } + return array, validData, nil +} + func ReadNullableBoolData(pcr *FieldReader, count int64) (any, []bool, error) { chunked, err := pcr.columnReader.NextBatch(count) if err != nil { @@ -237,11 +265,15 @@ func ReadNullableBoolData(pcr *FieldReader, count int64) (any, []bool, error) { } } } + if len(data) != len(validData) { + return nil, nil, merr.WrapErrParameterInvalid(len(data), len(validData), "length of data is not equal to length of valid_data") + } if len(data) == 0 { return nil, nil, nil } - if len(data) != len(validData) { - return nil, nil, merr.WrapErrParameterInvalid(len(data), len(validData), "length of data is not equal to length of valid_data") + if pcr.field.GetDefaultValue() != nil { + defaultValue := pcr.field.GetDefaultValue().GetBoolData() + return fillWithDefaultValueImpl(data, defaultValue, validData, pcr.field) } return data, validData, nil } @@ -367,11 +399,19 @@ func ReadNullableIntegerOrFloatData[T constraints.Integer | constraints.Float](p return nil, nil, WrapTypeErr("integer|float|null", chunk.DataType().Name(), pcr.field) } } + if len(data) != len(validData) { + return nil, nil, merr.WrapErrParameterInvalid(len(data), len(validData), "length of data is not equal to length of valid_data") + } if len(data) == 0 { return nil, nil, nil } - if len(data) != len(validData) { - return nil, nil, merr.WrapErrParameterInvalid(len(data), len(validData), "length of data is not equal to length of valid_data") + if pcr.field.GetDefaultValue() != nil { + defaultValue, err := nullutil.GetDefaultValue(pcr.field) + if err != nil { + // won't happen + return nil, nil, err + } + return fillWithDefaultValueImpl(data, defaultValue.(T), validData, pcr.field) } return data, validData, nil } @@ -430,12 +470,12 @@ func ReadNullableStringData(pcr *FieldReader, count int64) (any, []bool, error) } } } - if len(data) == 0 { - return nil, nil, nil - } if len(data) != len(validData) { return nil, nil, merr.WrapErrParameterInvalid(len(data), len(validData), "length of data is not equal to length of valid_data") } + if len(data) == 0 { + return nil, nil, nil + } return data, validData, nil } @@ -507,11 +547,15 @@ func ReadNullableVarcharData(pcr *FieldReader, count int64) (any, []bool, error) } } } + if len(data) != len(validData) { + return nil, nil, merr.WrapErrParameterInvalid(len(data), len(validData), "length of data is not equal to length of valid_data") + } if len(data) == 0 { return nil, nil, nil } - if len(data) != len(validData) { - return nil, nil, merr.WrapErrParameterInvalid(len(data), len(validData), "length of data is not equal to length of valid_data") + if pcr.field.GetDefaultValue() != nil { + defaultValue := pcr.field.GetDefaultValue().GetStringData() + return fillWithDefaultValueImpl(data, defaultValue, validData, pcr.field) } return data, validData, nil } @@ -739,12 +783,12 @@ func ReadNullableBoolArrayData(pcr *FieldReader, count int64) (any, []bool, erro } } } - if len(data) == 0 { - return nil, nil, nil - } if len(data) != len(validData) { return nil, nil, merr.WrapErrParameterInvalid(len(data), len(validData), "length of data is not equal to length of valid_data") } + if len(data) == 0 { + return nil, nil, nil + } return data, validData, nil } @@ -898,12 +942,12 @@ func ReadNullableIntegerOrFloatArrayData[T constraints.Integer | constraints.Flo } } } - if len(data) == 0 { - return nil, nil, nil - } if len(data) != len(validData) { return nil, nil, merr.WrapErrParameterInvalid(len(data), len(validData), "length of data is not equal to length of valid_data") } + if len(data) == 0 { + return nil, nil, nil + } return data, validData, nil } @@ -977,12 +1021,12 @@ func ReadNullableStringArrayData(pcr *FieldReader, count int64) (any, []bool, er } } } - if len(data) == 0 { - return nil, nil, nil - } if len(data) != len(validData) { return nil, nil, merr.WrapErrParameterInvalid(len(data), len(validData), "length of data is not equal to length of valid_data") } + if len(data) == 0 { + return nil, nil, nil + } return data, validData, nil } diff --git a/internal/util/importutilv2/parquet/reader_test.go b/internal/util/importutilv2/parquet/reader_test.go index fac4fc0ac7..30ec59d19e 100644 --- a/internal/util/importutilv2/parquet/reader_test.go +++ b/internal/util/importutilv2/parquet/reader_test.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/nullutil" "github.com/milvus-io/milvus/internal/util/testutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -285,6 +286,135 @@ func (s *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) { s.Error(err) } +func (s *ReaderSuite) runWithDefaultValue(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool, nullPercent int) { + schema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + IsPrimaryKey: true, + DataType: s.pkDataType, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "max_length", + Value: "256", + }, + }, + }, + { + FieldID: 101, + Name: "vec", + DataType: s.vecDataType, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "8", + }, + }, + }, + }, + } + // here always set nullable==true just for test, insertData will store validData only nullable==true + // if expectInsertData is nulls and set default value + // actualData will be default_value + fieldSchema, err := testutil.CreateFieldWithDefaultValue(dataType, 102, true) + s.NoError(err) + schema.Fields = append(schema.Fields, fieldSchema) + + filePath := fmt.Sprintf("/tmp/test_%d_reader.parquet", rand.Int()) + defer os.Remove(filePath) + wf, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666) + assert.NoError(s.T(), err) + insertData, err := writeParquet(wf, schema, s.numRows, nullPercent) + assert.NoError(s.T(), err) + + ctx := context.Background() + f := storage.NewChunkManagerFactory("local", storage.RootPath("/tmp/milvus_test/test_parquet_reader/")) + cm, err := f.NewPersistentStorageChunkManager(ctx) + assert.NoError(s.T(), err) + schema.Fields[2].Nullable = nullable + reader, err := NewReader(ctx, cm, schema, filePath, 64*1024*1024) + s.NoError(err) + + checkFn := func(actualInsertData *storage.InsertData, offsetBegin, expectRows int) { + expectInsertData := insertData + for fieldID, data := range actualInsertData.Data { + s.Equal(expectRows, data.RowNum()) + for i := 0; i < expectRows; i++ { + expect := expectInsertData.Data[fieldID].GetRow(i + offsetBegin) + actual := data.GetRow(i) + if expect == nil { + expect, err = nullutil.GetDefaultValue(fieldSchema) + s.NoError(err) + } + s.Equal(expect, actual) + } + } + } + + res, err := reader.Read() + s.NoError(err) + checkFn(res, 0, s.numRows) +} + +func (s *ReaderSuite) TestReadScalarFieldsWithDefaultValue() { + s.runWithDefaultValue(schemapb.DataType_Bool, schemapb.DataType_None, true, 0) + s.runWithDefaultValue(schemapb.DataType_Int8, schemapb.DataType_None, true, 0) + s.runWithDefaultValue(schemapb.DataType_Int16, schemapb.DataType_None, true, 0) + s.runWithDefaultValue(schemapb.DataType_Int32, schemapb.DataType_None, true, 0) + s.runWithDefaultValue(schemapb.DataType_Int64, schemapb.DataType_None, true, 0) + s.runWithDefaultValue(schemapb.DataType_Float, schemapb.DataType_None, true, 0) + s.runWithDefaultValue(schemapb.DataType_Double, schemapb.DataType_None, true, 0) + s.runWithDefaultValue(schemapb.DataType_String, schemapb.DataType_None, true, 0) + s.runWithDefaultValue(schemapb.DataType_VarChar, schemapb.DataType_None, true, 0) + + s.runWithDefaultValue(schemapb.DataType_Bool, schemapb.DataType_None, true, 50) + s.runWithDefaultValue(schemapb.DataType_Int8, schemapb.DataType_None, true, 50) + s.runWithDefaultValue(schemapb.DataType_Int16, schemapb.DataType_None, true, 50) + s.runWithDefaultValue(schemapb.DataType_Int32, schemapb.DataType_None, true, 50) + s.runWithDefaultValue(schemapb.DataType_Int64, schemapb.DataType_None, true, 50) + s.runWithDefaultValue(schemapb.DataType_Float, schemapb.DataType_None, true, 50) + s.runWithDefaultValue(schemapb.DataType_String, schemapb.DataType_None, true, 50) + s.runWithDefaultValue(schemapb.DataType_VarChar, schemapb.DataType_None, true, 50) + + s.runWithDefaultValue(schemapb.DataType_Bool, schemapb.DataType_None, true, 100) + s.runWithDefaultValue(schemapb.DataType_Int8, schemapb.DataType_None, true, 100) + s.runWithDefaultValue(schemapb.DataType_Int16, schemapb.DataType_None, true, 100) + s.runWithDefaultValue(schemapb.DataType_Int32, schemapb.DataType_None, true, 100) + s.runWithDefaultValue(schemapb.DataType_Int64, schemapb.DataType_None, true, 100) + s.runWithDefaultValue(schemapb.DataType_Float, schemapb.DataType_None, true, 100) + s.runWithDefaultValue(schemapb.DataType_String, schemapb.DataType_None, true, 100) + s.runWithDefaultValue(schemapb.DataType_VarChar, schemapb.DataType_None, true, 100) + + s.runWithDefaultValue(schemapb.DataType_Bool, schemapb.DataType_None, false, 0) + s.runWithDefaultValue(schemapb.DataType_Int8, schemapb.DataType_None, false, 0) + s.runWithDefaultValue(schemapb.DataType_Int16, schemapb.DataType_None, false, 0) + s.runWithDefaultValue(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) + s.runWithDefaultValue(schemapb.DataType_Int64, schemapb.DataType_None, false, 0) + s.runWithDefaultValue(schemapb.DataType_Float, schemapb.DataType_None, false, 0) + s.runWithDefaultValue(schemapb.DataType_Double, schemapb.DataType_None, false, 0) + s.runWithDefaultValue(schemapb.DataType_String, schemapb.DataType_None, false, 0) + s.runWithDefaultValue(schemapb.DataType_VarChar, schemapb.DataType_None, false, 0) + + s.runWithDefaultValue(schemapb.DataType_Bool, schemapb.DataType_None, false, 50) + s.runWithDefaultValue(schemapb.DataType_Int8, schemapb.DataType_None, false, 50) + s.runWithDefaultValue(schemapb.DataType_Int16, schemapb.DataType_None, false, 50) + s.runWithDefaultValue(schemapb.DataType_Int32, schemapb.DataType_None, false, 50) + s.runWithDefaultValue(schemapb.DataType_Int64, schemapb.DataType_None, false, 50) + s.runWithDefaultValue(schemapb.DataType_Float, schemapb.DataType_None, false, 50) + s.runWithDefaultValue(schemapb.DataType_String, schemapb.DataType_None, false, 50) + s.runWithDefaultValue(schemapb.DataType_VarChar, schemapb.DataType_None, false, 50) + + s.runWithDefaultValue(schemapb.DataType_Bool, schemapb.DataType_None, false, 100) + s.runWithDefaultValue(schemapb.DataType_Int8, schemapb.DataType_None, false, 100) + s.runWithDefaultValue(schemapb.DataType_Int16, schemapb.DataType_None, false, 100) + s.runWithDefaultValue(schemapb.DataType_Int32, schemapb.DataType_None, false, 100) + s.runWithDefaultValue(schemapb.DataType_Int64, schemapb.DataType_None, false, 100) + s.runWithDefaultValue(schemapb.DataType_Float, schemapb.DataType_None, false, 100) + s.runWithDefaultValue(schemapb.DataType_String, schemapb.DataType_None, false, 100) + s.runWithDefaultValue(schemapb.DataType_VarChar, schemapb.DataType_None, false, 100) +} + func (s *ReaderSuite) TestReadScalarFields() { s.run(schemapb.DataType_Bool, schemapb.DataType_None, false, 0) s.run(schemapb.DataType_Int8, schemapb.DataType_None, false, 0) diff --git a/internal/util/importutilv2/parquet/util.go b/internal/util/importutilv2/parquet/util.go index fd9c237158..659f6fd5b5 100644 --- a/internal/util/importutilv2/parquet/util.go +++ b/internal/util/importutilv2/parquet/util.go @@ -115,7 +115,7 @@ func isArrowArithmeticType(dataType arrow.Type) bool { return isArrowIntegerType(dataType) || isArrowFloatingType(dataType) } -func isArrowDataTypeConvertible(src arrow.DataType, dst arrow.DataType, nullable bool) bool { +func isArrowDataTypeConvertible(src arrow.DataType, dst arrow.DataType, field *schemapb.FieldSchema) bool { srcType := src.ID() dstType := dst.ID() switch srcType { @@ -142,9 +142,10 @@ func isArrowDataTypeConvertible(src arrow.DataType, dst arrow.DataType, nullable case arrow.BINARY: return dstType == arrow.LIST && dst.(*arrow.ListType).Elem().ID() == arrow.UINT8 case arrow.LIST: - return dstType == arrow.LIST && isArrowDataTypeConvertible(src.(*arrow.ListType).Elem(), dst.(*arrow.ListType).Elem(), nullable) + return dstType == arrow.LIST && isArrowDataTypeConvertible(src.(*arrow.ListType).Elem(), dst.(*arrow.ListType).Elem(), field) case arrow.NULL: - return nullable + // if nullable==true or has set default_value, can use null type + return field.GetNullable() || field.GetDefaultValue() != nil default: return false } @@ -218,13 +219,18 @@ func ConvertToArrowSchema(schema *schemapb.CollectionSchema, useNullType bool) ( if err != nil { return nil, err } + nullable := field.GetNullable() if field.GetNullable() && useNullType { arrDataType = arrow.Null } + if field.GetDefaultValue() != nil && useNullType { + arrDataType = arrow.Null + nullable = true + } arrFields = append(arrFields, arrow.Field{ Name: field.GetName(), Type: arrDataType, - Nullable: field.GetNullable(), + Nullable: nullable, Metadata: arrow.Metadata{}, }) } @@ -250,7 +256,7 @@ func isSchemaEqual(schema *schemapb.CollectionSchema, arrSchema *arrow.Schema) e if err != nil { return err } - if !isArrowDataTypeConvertible(arrField.Type, toArrDataType, field.GetNullable()) { + if !isArrowDataTypeConvertible(arrField.Type, toArrDataType, field) { return merr.WrapErrImportFailed(fmt.Sprintf("field '%s' type mis-match, milvus data type '%s', arrow data type get '%s'", field.Name, field.DataType.String(), arrField.Type.String())) } diff --git a/internal/util/nullutil/nullutil.go b/internal/util/nullutil/nullutil.go new file mode 100644 index 0000000000..ed29d6af76 --- /dev/null +++ b/internal/util/nullutil/nullutil.go @@ -0,0 +1,64 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nullutil + +import ( + "fmt" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +func CheckValidData(validData []bool, schema *schemapb.FieldSchema, numRows int) error { + expectedNum := 0 + // if nullable, the length of ValidData is numRows + if schema.GetNullable() { + expectedNum = numRows + } + if len(validData) != expectedNum { + msg := fmt.Sprintf("the length of valid_data of field(%s) is wrong", schema.GetName()) + return merr.WrapErrParameterInvalid(expectedNum, len(validData), msg) + } + return nil +} + +func GetDefaultValue(field *schemapb.FieldSchema) (any, error) { + if field.GetDefaultValue() != nil { + switch field.GetDataType() { + case schemapb.DataType_Bool: + return field.GetDefaultValue().GetBoolData(), nil + case schemapb.DataType_Int8: + return int8(field.GetDefaultValue().GetIntData()), nil + case schemapb.DataType_Int16: + return int16(field.GetDefaultValue().GetIntData()), nil + case schemapb.DataType_Int32: + return field.GetDefaultValue().GetIntData(), nil + case schemapb.DataType_Int64: + return field.GetDefaultValue().GetLongData(), nil + case schemapb.DataType_Float: + return field.GetDefaultValue().GetFloatData(), nil + case schemapb.DataType_Double: + return field.GetDefaultValue().GetDoubleData(), nil + case schemapb.DataType_String, schemapb.DataType_VarChar: + return field.GetDefaultValue().GetStringData(), nil + default: + msg := fmt.Sprintf("type (%s) not support default_value", field.GetDataType().String()) + return nil, merr.WrapErrParameterInvalidMsg(msg) + } + } + return nil, nil +} diff --git a/internal/util/testutil/test_util.go b/internal/util/testutil/test_util.go index 900d0784b6..5182a331dd 100644 --- a/internal/util/testutil/test_util.go +++ b/internal/util/testutil/test_util.go @@ -201,14 +201,82 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent . insertData.Data[f.FieldID].AppendValidDataRows(testutils.GenerateBoolArray(rows)) } else if len(nullPercent) == 1 && nullPercent[0] == 100 { insertData.Data[f.FieldID].AppendValidDataRows(make([]bool, rows)) + } else if len(nullPercent) == 1 && nullPercent[0] == 0 { + validData := make([]bool, rows) + for i := range validData { + validData[i] = true + } + insertData.Data[f.FieldID].AppendValidDataRows(validData) } else { - return nil, merr.WrapErrParameterInvalidMsg("not support the number of nullPercent") + return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("not support the number of nullPercent(%d)", nullPercent)) } } } return insertData, nil } +func CreateFieldWithDefaultValue(dataType schemapb.DataType, id int64, nullable bool) (*schemapb.FieldSchema, error) { + field := &schemapb.FieldSchema{ + FieldID: 102, + Name: dataType.String(), + DataType: dataType, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxLengthKey, + Value: "128", + }, + { + Key: common.MaxCapacityKey, + Value: "128", + }, + }, + Nullable: nullable, + } + + switch field.GetDataType() { + case schemapb.DataType_Bool: + field.DefaultValue = &schemapb.ValueField{ + Data: &schemapb.ValueField_BoolData{ + BoolData: ([]bool{true, false})[rand.Intn(2)], + }, + } + case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32: + field.DefaultValue = &schemapb.ValueField{ + Data: &schemapb.ValueField_IntData{ + IntData: ([]int32{1, 10, 100, 1000})[rand.Intn(4)], + }, + } + case schemapb.DataType_Int64: + field.DefaultValue = &schemapb.ValueField{ + Data: &schemapb.ValueField_LongData{ + LongData: rand.Int63(), + }, + } + case schemapb.DataType_Float: + field.DefaultValue = &schemapb.ValueField{ + Data: &schemapb.ValueField_FloatData{ + FloatData: rand.Float32(), + }, + } + case schemapb.DataType_Double: + field.DefaultValue = &schemapb.ValueField{ + Data: &schemapb.ValueField_DoubleData{ + DoubleData: rand.Float64(), + }, + } + case schemapb.DataType_String, schemapb.DataType_VarChar: + field.DefaultValue = &schemapb.ValueField{ + Data: &schemapb.ValueField_StringData{ + StringData: randomString(10), + }, + } + default: + msg := fmt.Sprintf("type (%s) not support default_value", field.GetDataType().String()) + return nil, merr.WrapErrParameterInvalidMsg(msg) + } + return field, nil +} + func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.InsertData, useNullType bool) ([]arrow.Array, error) { mem := memory.NewGoAllocator() columns := make([]arrow.Array, 0, len(schema.Fields))