diff --git a/internal/datanode/channel/channel_manager_test.go b/internal/datanode/channel/channel_manager_test.go index 7ce5218df8..307b5be026 100644 --- a/internal/datanode/channel/channel_manager_test.go +++ b/internal/datanode/channel/channel_manager_test.go @@ -181,9 +181,7 @@ func (s *ChannelManagerSuite) TearDownTest() { } func (s *ChannelManagerSuite) TestReleaseStuck() { - var ( - channel = "by-dev-rootcoord-dml-2" - ) + channel := "by-dev-rootcoord-dml-2" s.manager.releaseFunc = func(channel string) { time.Sleep(1 * time.Second) } diff --git a/internal/flushcommon/writebuffer/bf_write_buffer_test.go b/internal/flushcommon/writebuffer/bf_write_buffer_test.go index bea8856efa..c5d182617b 100644 --- a/internal/flushcommon/writebuffer/bf_write_buffer_test.go +++ b/internal/flushcommon/writebuffer/bf_write_buffer_test.go @@ -213,12 +213,12 @@ func (s *BFWriteBufferSuite) TestBufferData() { value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection())) s.NoError(err) - s.MetricsEqual(value, 5604) + s.MetricsEqual(value, 5607) delMsg = s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) err = wb.BufferData([]*msgstream.InsertMsg{}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) s.NoError(err) - s.MetricsEqual(value, 5844) + s.MetricsEqual(value, 5847) }) s.Run("normal_run_varchar", func() { @@ -240,7 +240,7 @@ func (s *BFWriteBufferSuite) TestBufferData() { value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection())) s.NoError(err) - s.MetricsEqual(value, 7224) + s.MetricsEqual(value, 7227) }) s.Run("int_pk_type_not_match", func() { diff --git a/internal/flushcommon/writebuffer/insert_buffer_test.go b/internal/flushcommon/writebuffer/insert_buffer_test.go index bcb54aebc9..c7ac20d215 100644 --- a/internal/flushcommon/writebuffer/insert_buffer_test.go +++ b/internal/flushcommon/writebuffer/insert_buffer_test.go @@ -142,7 +142,7 @@ func (s *InsertBufferSuite) TestBuffer() { memSize := insertBuffer.Buffer(groups[0], &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) s.EqualValues(100, insertBuffer.MinTimestamp()) - s.EqualValues(5364, memSize) + s.EqualValues(5367, memSize) } func (s *InsertBufferSuite) TestYield() { diff --git a/internal/flushcommon/writebuffer/l0_write_buffer_test.go b/internal/flushcommon/writebuffer/l0_write_buffer_test.go index ebb4985598..53608754ef 100644 --- a/internal/flushcommon/writebuffer/l0_write_buffer_test.go +++ b/internal/flushcommon/writebuffer/l0_write_buffer_test.go @@ -188,12 +188,12 @@ func (s *L0WriteBufferSuite) TestBufferData() { value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacache.Collection())) s.NoError(err) - s.MetricsEqual(value, 5604) + s.MetricsEqual(value, 5607) delMsg = s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) err = wb.BufferData([]*msgstream.InsertMsg{}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) s.NoError(err) - s.MetricsEqual(value, 5844) + s.MetricsEqual(value, 5847) }) s.Run("pk_type_not_match", func() { diff --git a/internal/flushcommon/writebuffer/write_buffer.go b/internal/flushcommon/writebuffer/write_buffer.go index acd679583e..fdbc683cb0 100644 --- a/internal/flushcommon/writebuffer/write_buffer.go +++ b/internal/flushcommon/writebuffer/write_buffer.go @@ -491,11 +491,11 @@ func (wb *writeBufferBase) prepareInsert(insertMsgs []*msgstream.InsertMsg) ([]* return nil, merr.WrapErrServiceInternal("timestamp column row num not match") } - timestamps := tsFieldData.GetRows().([]int64) + timestamps := tsFieldData.GetDataRows().([]int64) switch wb.pkField.GetDataType() { case schemapb.DataType_Int64: - pks := pkFieldData.GetRows().([]int64) + pks := pkFieldData.GetDataRows().([]int64) for idx, pk := range pks { ts, ok := inData.intPKTs[pk] if !ok || timestamps[idx] < ts { @@ -503,7 +503,7 @@ func (wb *writeBufferBase) prepareInsert(insertMsgs []*msgstream.InsertMsg) ([]* } } case schemapb.DataType_VarChar: - pks := pkFieldData.GetRows().([]string) + pks := pkFieldData.GetDataRows().([]string) for idx, pk := range pks { ts, ok := inData.strPKTs[pk] if !ok || timestamps[idx] < ts { diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index c2dd57743b..37b0cf77db 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -246,8 +246,8 @@ func TestInsertCodecFailed(t *testing.T) { insertCodec := NewInsertCodecWithSchema(schema) insertDataEmpty := &InsertData{ Data: map[int64]FieldData{ - RowIDField: &Int64FieldData{[]int64{}, nil}, - TimestampField: &Int64FieldData{[]int64{}, nil}, + RowIDField: &Int64FieldData{[]int64{}, nil, false}, + TimestampField: &Int64FieldData{[]int64{}, nil, false}, }, } _, err := insertCodec.Serialize(PartitionID, SegmentID, insertDataEmpty) @@ -430,16 +430,16 @@ func TestInsertCodec(t *testing.T) { insertDataEmpty := &InsertData{ Data: map[int64]FieldData{ - RowIDField: &Int64FieldData{[]int64{}, nil}, - TimestampField: &Int64FieldData{[]int64{}, nil}, - BoolField: &BoolFieldData{[]bool{}, nil}, - Int8Field: &Int8FieldData{[]int8{}, nil}, - Int16Field: &Int16FieldData{[]int16{}, nil}, - Int32Field: &Int32FieldData{[]int32{}, nil}, - Int64Field: &Int64FieldData{[]int64{}, nil}, - FloatField: &FloatFieldData{[]float32{}, nil}, - DoubleField: &DoubleFieldData{[]float64{}, nil}, - StringField: &StringFieldData{[]string{}, schemapb.DataType_VarChar, nil}, + RowIDField: &Int64FieldData{[]int64{}, nil, false}, + TimestampField: &Int64FieldData{[]int64{}, nil, false}, + BoolField: &BoolFieldData{[]bool{}, nil, false}, + Int8Field: &Int8FieldData{[]int8{}, nil, false}, + Int16Field: &Int16FieldData{[]int16{}, nil, false}, + Int32Field: &Int32FieldData{[]int32{}, nil, false}, + Int64Field: &Int64FieldData{[]int64{}, nil, false}, + FloatField: &FloatFieldData{[]float32{}, nil, false}, + DoubleField: &DoubleFieldData{[]float64{}, nil, false}, + StringField: &StringFieldData{[]string{}, schemapb.DataType_VarChar, nil, false}, BinaryVectorField: &BinaryVectorFieldData{[]byte{}, 8}, FloatVectorField: &FloatVectorFieldData{[]float32{}, 4}, Float16VectorField: &Float16VectorFieldData{[]byte{}, 4}, @@ -450,8 +450,8 @@ func TestInsertCodec(t *testing.T) { Contents: [][]byte{}, }, }, - ArrayField: &ArrayFieldData{schemapb.DataType_Int32, []*schemapb.ScalarField{}, nil}, - JSONField: &JSONFieldData{[][]byte{}, nil}, + ArrayField: &ArrayFieldData{schemapb.DataType_Int32, []*schemapb.ScalarField{}, nil, false}, + JSONField: &JSONFieldData{[][]byte{}, nil, false}, }, } b, err := insertCodec.Serialize(PartitionID, SegmentID, insertDataEmpty) @@ -828,20 +828,20 @@ func TestMemorySize(t *testing.T) { }, }, } - assert.Equal(t, insertData1.Data[RowIDField].GetMemorySize(), 8) - assert.Equal(t, insertData1.Data[TimestampField].GetMemorySize(), 8) - assert.Equal(t, insertData1.Data[BoolField].GetMemorySize(), 1) - assert.Equal(t, insertData1.Data[Int8Field].GetMemorySize(), 1) - assert.Equal(t, insertData1.Data[Int16Field].GetMemorySize(), 2) - assert.Equal(t, insertData1.Data[Int32Field].GetMemorySize(), 4) - assert.Equal(t, insertData1.Data[Int64Field].GetMemorySize(), 8) - assert.Equal(t, insertData1.Data[FloatField].GetMemorySize(), 4) - assert.Equal(t, insertData1.Data[DoubleField].GetMemorySize(), 8) - assert.Equal(t, insertData1.Data[StringField].GetMemorySize(), 17) + assert.Equal(t, insertData1.Data[RowIDField].GetMemorySize(), 9) + assert.Equal(t, insertData1.Data[TimestampField].GetMemorySize(), 9) + assert.Equal(t, insertData1.Data[BoolField].GetMemorySize(), 2) + assert.Equal(t, insertData1.Data[Int8Field].GetMemorySize(), 2) + assert.Equal(t, insertData1.Data[Int16Field].GetMemorySize(), 3) + assert.Equal(t, insertData1.Data[Int32Field].GetMemorySize(), 5) + assert.Equal(t, insertData1.Data[Int64Field].GetMemorySize(), 9) + assert.Equal(t, insertData1.Data[FloatField].GetMemorySize(), 5) + assert.Equal(t, insertData1.Data[DoubleField].GetMemorySize(), 9) + assert.Equal(t, insertData1.Data[StringField].GetMemorySize(), 18) assert.Equal(t, insertData1.Data[BinaryVectorField].GetMemorySize(), 5) - assert.Equal(t, insertData1.Data[FloatField].GetMemorySize(), 4) - assert.Equal(t, insertData1.Data[ArrayField].GetMemorySize(), 3*4) - assert.Equal(t, insertData1.Data[JSONField].GetMemorySize(), len([]byte(`{"batch":1}`))+16) + assert.Equal(t, insertData1.Data[FloatField].GetMemorySize(), 5) + assert.Equal(t, insertData1.Data[ArrayField].GetMemorySize(), 3*4+1) + assert.Equal(t, insertData1.Data[JSONField].GetMemorySize(), len([]byte(`{"batch":1}`))+16+1) insertData2 := &InsertData{ Data: map[int64]FieldData{ @@ -886,46 +886,46 @@ func TestMemorySize(t *testing.T) { }, } - assert.Equal(t, insertData2.Data[RowIDField].GetMemorySize(), 16) - assert.Equal(t, insertData2.Data[TimestampField].GetMemorySize(), 16) - assert.Equal(t, insertData2.Data[BoolField].GetMemorySize(), 2) - assert.Equal(t, insertData2.Data[Int8Field].GetMemorySize(), 2) - assert.Equal(t, insertData2.Data[Int16Field].GetMemorySize(), 4) - assert.Equal(t, insertData2.Data[Int32Field].GetMemorySize(), 8) - assert.Equal(t, insertData2.Data[Int64Field].GetMemorySize(), 16) - assert.Equal(t, insertData2.Data[FloatField].GetMemorySize(), 8) - assert.Equal(t, insertData2.Data[DoubleField].GetMemorySize(), 16) - assert.Equal(t, insertData2.Data[StringField].GetMemorySize(), 35) + assert.Equal(t, insertData2.Data[RowIDField].GetMemorySize(), 17) + assert.Equal(t, insertData2.Data[TimestampField].GetMemorySize(), 17) + assert.Equal(t, insertData2.Data[BoolField].GetMemorySize(), 3) + assert.Equal(t, insertData2.Data[Int8Field].GetMemorySize(), 3) + assert.Equal(t, insertData2.Data[Int16Field].GetMemorySize(), 5) + assert.Equal(t, insertData2.Data[Int32Field].GetMemorySize(), 9) + assert.Equal(t, insertData2.Data[Int64Field].GetMemorySize(), 17) + assert.Equal(t, insertData2.Data[FloatField].GetMemorySize(), 9) + assert.Equal(t, insertData2.Data[DoubleField].GetMemorySize(), 17) + assert.Equal(t, insertData2.Data[StringField].GetMemorySize(), 36) assert.Equal(t, insertData2.Data[BinaryVectorField].GetMemorySize(), 6) - assert.Equal(t, insertData2.Data[FloatField].GetMemorySize(), 8) + assert.Equal(t, insertData2.Data[FloatField].GetMemorySize(), 9) insertDataEmpty := &InsertData{ Data: map[int64]FieldData{ - RowIDField: &Int64FieldData{[]int64{}, nil}, - TimestampField: &Int64FieldData{[]int64{}, nil}, - BoolField: &BoolFieldData{[]bool{}, nil}, - Int8Field: &Int8FieldData{[]int8{}, nil}, - Int16Field: &Int16FieldData{[]int16{}, nil}, - Int32Field: &Int32FieldData{[]int32{}, nil}, - Int64Field: &Int64FieldData{[]int64{}, nil}, - FloatField: &FloatFieldData{[]float32{}, nil}, - DoubleField: &DoubleFieldData{[]float64{}, nil}, - StringField: &StringFieldData{[]string{}, schemapb.DataType_VarChar, nil}, + RowIDField: &Int64FieldData{[]int64{}, nil, false}, + TimestampField: &Int64FieldData{[]int64{}, nil, false}, + BoolField: &BoolFieldData{[]bool{}, nil, false}, + Int8Field: &Int8FieldData{[]int8{}, nil, false}, + Int16Field: &Int16FieldData{[]int16{}, nil, false}, + Int32Field: &Int32FieldData{[]int32{}, nil, false}, + Int64Field: &Int64FieldData{[]int64{}, nil, false}, + FloatField: &FloatFieldData{[]float32{}, nil, false}, + DoubleField: &DoubleFieldData{[]float64{}, nil, false}, + StringField: &StringFieldData{[]string{}, schemapb.DataType_VarChar, nil, false}, BinaryVectorField: &BinaryVectorFieldData{[]byte{}, 8}, FloatVectorField: &FloatVectorFieldData{[]float32{}, 4}, }, } - assert.Equal(t, insertDataEmpty.Data[RowIDField].GetMemorySize(), 0) - assert.Equal(t, insertDataEmpty.Data[TimestampField].GetMemorySize(), 0) - assert.Equal(t, insertDataEmpty.Data[BoolField].GetMemorySize(), 0) - assert.Equal(t, insertDataEmpty.Data[Int8Field].GetMemorySize(), 0) - assert.Equal(t, insertDataEmpty.Data[Int16Field].GetMemorySize(), 0) - assert.Equal(t, insertDataEmpty.Data[Int32Field].GetMemorySize(), 0) - assert.Equal(t, insertDataEmpty.Data[Int64Field].GetMemorySize(), 0) - assert.Equal(t, insertDataEmpty.Data[FloatField].GetMemorySize(), 0) - assert.Equal(t, insertDataEmpty.Data[DoubleField].GetMemorySize(), 0) - assert.Equal(t, insertDataEmpty.Data[StringField].GetMemorySize(), 0) + assert.Equal(t, insertDataEmpty.Data[RowIDField].GetMemorySize(), 1) + assert.Equal(t, insertDataEmpty.Data[TimestampField].GetMemorySize(), 1) + assert.Equal(t, insertDataEmpty.Data[BoolField].GetMemorySize(), 1) + assert.Equal(t, insertDataEmpty.Data[Int8Field].GetMemorySize(), 1) + assert.Equal(t, insertDataEmpty.Data[Int16Field].GetMemorySize(), 1) + assert.Equal(t, insertDataEmpty.Data[Int32Field].GetMemorySize(), 1) + assert.Equal(t, insertDataEmpty.Data[Int64Field].GetMemorySize(), 1) + assert.Equal(t, insertDataEmpty.Data[FloatField].GetMemorySize(), 1) + assert.Equal(t, insertDataEmpty.Data[DoubleField].GetMemorySize(), 1) + assert.Equal(t, insertDataEmpty.Data[StringField].GetMemorySize(), 1) assert.Equal(t, insertDataEmpty.Data[BinaryVectorField].GetMemorySize(), 4) assert.Equal(t, insertDataEmpty.Data[FloatVectorField].GetMemorySize(), 4) } @@ -979,21 +979,21 @@ func TestAddFieldDataToPayload(t *testing.T) { w := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40, false) e, _ := w.NextInsertEventWriter() var err error - err = AddFieldDataToPayload(e, schemapb.DataType_Bool, &BoolFieldData{[]bool{}, nil}) + err = AddFieldDataToPayload(e, schemapb.DataType_Bool, &BoolFieldData{[]bool{}, nil, false}) assert.Error(t, err) - err = AddFieldDataToPayload(e, schemapb.DataType_Int8, &Int8FieldData{[]int8{}, nil}) + err = AddFieldDataToPayload(e, schemapb.DataType_Int8, &Int8FieldData{[]int8{}, nil, false}) assert.Error(t, err) - err = AddFieldDataToPayload(e, schemapb.DataType_Int16, &Int16FieldData{[]int16{}, nil}) + err = AddFieldDataToPayload(e, schemapb.DataType_Int16, &Int16FieldData{[]int16{}, nil, false}) assert.Error(t, err) - err = AddFieldDataToPayload(e, schemapb.DataType_Int32, &Int32FieldData{[]int32{}, nil}) + err = AddFieldDataToPayload(e, schemapb.DataType_Int32, &Int32FieldData{[]int32{}, nil, false}) assert.Error(t, err) - err = AddFieldDataToPayload(e, schemapb.DataType_Int64, &Int64FieldData{[]int64{}, nil}) + err = AddFieldDataToPayload(e, schemapb.DataType_Int64, &Int64FieldData{[]int64{}, nil, false}) assert.Error(t, err) - err = AddFieldDataToPayload(e, schemapb.DataType_Float, &FloatFieldData{[]float32{}, nil}) + err = AddFieldDataToPayload(e, schemapb.DataType_Float, &FloatFieldData{[]float32{}, nil, false}) assert.Error(t, err) - err = AddFieldDataToPayload(e, schemapb.DataType_Double, &DoubleFieldData{[]float64{}, nil}) + err = AddFieldDataToPayload(e, schemapb.DataType_Double, &DoubleFieldData{[]float64{}, nil, false}) assert.Error(t, err) - err = AddFieldDataToPayload(e, schemapb.DataType_String, &StringFieldData{[]string{"test"}, schemapb.DataType_VarChar, nil}) + err = AddFieldDataToPayload(e, schemapb.DataType_String, &StringFieldData{[]string{"test"}, schemapb.DataType_VarChar, nil, false}) assert.Error(t, err) err = AddFieldDataToPayload(e, schemapb.DataType_Array, &ArrayFieldData{ ElementType: schemapb.DataType_VarChar, @@ -1004,7 +1004,7 @@ func TestAddFieldDataToPayload(t *testing.T) { }}, }) assert.Error(t, err) - err = AddFieldDataToPayload(e, schemapb.DataType_JSON, &JSONFieldData{[][]byte{[]byte(`"batch":2}`)}, nil}) + err = AddFieldDataToPayload(e, schemapb.DataType_JSON, &JSONFieldData{[][]byte{[]byte(`"batch":2}`)}, nil, false}) assert.Error(t, err) err = AddFieldDataToPayload(e, schemapb.DataType_BinaryVector, &BinaryVectorFieldData{[]byte{}, 8}) assert.Error(t, err) diff --git a/internal/storage/insert_data.go b/internal/storage/insert_data.go index 95fc96c5d9..98b3eb3d55 100644 --- a/internal/storage/insert_data.go +++ b/internal/storage/insert_data.go @@ -62,7 +62,13 @@ func NewInsertDataWithCap(schema *schemapb.CollectionSchema, cap int) (*InsertDa Data: make(map[FieldID]FieldData), } - for _, field := range schema.GetFields() { + for _, field := range schema.Fields { + if field.IsPrimaryKey && field.GetNullable() { + return nil, merr.WrapErrParameterInvalidMsg("primary key field not support nullable") + } + if field.IsPartitionKey && field.GetNullable() { + return nil, merr.WrapErrParameterInvalidMsg("partition key field not support nullable") + } fieldData, err := NewFieldData(field.DataType, field, cap) if err != nil { return nil, err @@ -145,9 +151,12 @@ type FieldData interface { RowNum() int GetRow(i int) any GetRowSize(i int) int - GetRows() any + GetDataRows() any + // GetValidDataRows() any AppendRow(row interface{}) error - AppendRows(rows interface{}) error + AppendRows(dataRows interface{}, validDataRows interface{}) error + AppendDataRows(rows interface{}) error + AppendValidDataRows(rows interface{}) error GetDataType() schemapb.DataType GetNullable() bool } @@ -156,6 +165,9 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema, typeParams := fieldSchema.GetTypeParams() switch dataType { case schemapb.DataType_Float16Vector: + if fieldSchema.GetNullable() { + return nil, merr.WrapErrParameterInvalidMsg("vector not support null") + } dim, err := GetDimFromParams(typeParams) if err != nil { return nil, err @@ -165,6 +177,9 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema, Dim: dim, }, nil case schemapb.DataType_BFloat16Vector: + if fieldSchema.GetNullable() { + return nil, merr.WrapErrParameterInvalidMsg("vector not support null") + } dim, err := GetDimFromParams(typeParams) if err != nil { return nil, err @@ -174,6 +189,9 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema, Dim: dim, }, nil case schemapb.DataType_FloatVector: + if fieldSchema.GetNullable() { + return nil, merr.WrapErrParameterInvalidMsg("vector not support null") + } dim, err := GetDimFromParams(typeParams) if err != nil { return nil, err @@ -183,6 +201,9 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema, Dim: dim, }, nil case schemapb.DataType_BinaryVector: + if fieldSchema.GetNullable() { + return nil, merr.WrapErrParameterInvalidMsg("vector not support null") + } dim, err := GetDimFromParams(typeParams) if err != nil { return nil, err @@ -192,10 +213,14 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema, Dim: dim, }, nil case schemapb.DataType_SparseFloatVector: + if fieldSchema.GetNullable() { + return nil, merr.WrapErrParameterInvalidMsg("vector not support null") + } return &SparseFloatVectorFieldData{}, nil case schemapb.DataType_Bool: data := &BoolFieldData{ - Data: make([]bool, 0, cap), + Data: make([]bool, 0, cap), + Nullable: fieldSchema.GetNullable(), } if fieldSchema.GetNullable() { data.ValidData = make([]bool, 0, cap) @@ -204,7 +229,8 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema, case schemapb.DataType_Int8: data := &Int8FieldData{ - Data: make([]int8, 0, cap), + Data: make([]int8, 0, cap), + Nullable: fieldSchema.GetNullable(), } if fieldSchema.GetNullable() { data.ValidData = make([]bool, 0, cap) @@ -213,7 +239,8 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema, case schemapb.DataType_Int16: data := &Int16FieldData{ - Data: make([]int16, 0, cap), + Data: make([]int16, 0, cap), + Nullable: fieldSchema.GetNullable(), } if fieldSchema.GetNullable() { data.ValidData = make([]bool, 0, cap) @@ -222,7 +249,8 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema, case schemapb.DataType_Int32: data := &Int32FieldData{ - Data: make([]int32, 0, cap), + Data: make([]int32, 0, cap), + Nullable: fieldSchema.GetNullable(), } if fieldSchema.GetNullable() { data.ValidData = make([]bool, 0, cap) @@ -231,16 +259,17 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema, case schemapb.DataType_Int64: data := &Int64FieldData{ - Data: make([]int64, 0, cap), + Data: make([]int64, 0, cap), + Nullable: fieldSchema.GetNullable(), } if fieldSchema.GetNullable() { data.ValidData = make([]bool, 0, cap) } return data, nil - case schemapb.DataType_Float: data := &FloatFieldData{ - Data: make([]float32, 0, cap), + Data: make([]float32, 0, cap), + Nullable: fieldSchema.GetNullable(), } if fieldSchema.GetNullable() { data.ValidData = make([]bool, 0, cap) @@ -249,36 +278,37 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema, case schemapb.DataType_Double: data := &DoubleFieldData{ - Data: make([]float64, 0, cap), + Data: make([]float64, 0, cap), + Nullable: fieldSchema.GetNullable(), } if fieldSchema.GetNullable() { data.ValidData = make([]bool, 0, cap) } return data, nil - case schemapb.DataType_JSON: data := &JSONFieldData{ - Data: make([][]byte, 0, cap), + Data: make([][]byte, 0, cap), + Nullable: fieldSchema.GetNullable(), } if fieldSchema.GetNullable() { data.ValidData = make([]bool, 0, cap) } return data, nil - case schemapb.DataType_Array: data := &ArrayFieldData{ Data: make([]*schemapb.ScalarField, 0, cap), ElementType: fieldSchema.GetElementType(), + Nullable: fieldSchema.GetNullable(), } if fieldSchema.GetNullable() { data.ValidData = make([]bool, 0, cap) } return data, nil - case schemapb.DataType_String, schemapb.DataType_VarChar: data := &StringFieldData{ Data: make([]string, 0, cap), DataType: dataType, + Nullable: fieldSchema.GetNullable(), } if fieldSchema.GetNullable() { data.ValidData = make([]bool, 0, cap) @@ -292,44 +322,54 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema, type BoolFieldData struct { Data []bool ValidData []bool + Nullable bool } type Int8FieldData struct { Data []int8 ValidData []bool + Nullable bool } type Int16FieldData struct { Data []int16 ValidData []bool + Nullable bool } type Int32FieldData struct { Data []int32 ValidData []bool + Nullable bool } type Int64FieldData struct { Data []int64 ValidData []bool + Nullable bool } type FloatFieldData struct { Data []float32 ValidData []bool + Nullable bool } type DoubleFieldData struct { Data []float64 ValidData []bool + Nullable bool } type StringFieldData struct { Data []string DataType schemapb.DataType ValidData []bool + Nullable bool } type ArrayFieldData struct { ElementType schemapb.DataType Data []*schemapb.ScalarField ValidData []bool + Nullable bool } type JSONFieldData struct { Data [][]byte ValidData []bool + Nullable bool } type BinaryVectorFieldData struct { Data []byte @@ -382,16 +422,76 @@ func (data *BFloat16VectorFieldData) RowNum() int { func (data *SparseFloatVectorFieldData) RowNum() int { return len(data.Contents) } // GetRow implements FieldData.GetRow -func (data *BoolFieldData) GetRow(i int) any { return data.Data[i] } -func (data *Int8FieldData) GetRow(i int) any { return data.Data[i] } -func (data *Int16FieldData) GetRow(i int) any { return data.Data[i] } -func (data *Int32FieldData) GetRow(i int) any { return data.Data[i] } -func (data *Int64FieldData) GetRow(i int) any { return data.Data[i] } -func (data *FloatFieldData) GetRow(i int) any { return data.Data[i] } -func (data *DoubleFieldData) GetRow(i int) any { return data.Data[i] } -func (data *StringFieldData) GetRow(i int) any { return data.Data[i] } -func (data *ArrayFieldData) GetRow(i int) any { return data.Data[i] } -func (data *JSONFieldData) GetRow(i int) any { return data.Data[i] } +func (data *BoolFieldData) GetRow(i int) any { + if data.GetNullable() && !data.ValidData[i] { + return nil + } + return data.Data[i] +} + +func (data *Int8FieldData) GetRow(i int) any { + if data.GetNullable() && !data.ValidData[i] { + return nil + } + return data.Data[i] +} + +func (data *Int16FieldData) GetRow(i int) any { + if data.GetNullable() && !data.ValidData[i] { + return nil + } + return data.Data[i] +} + +func (data *Int32FieldData) GetRow(i int) any { + if data.GetNullable() && !data.ValidData[i] { + return nil + } + return data.Data[i] +} + +func (data *Int64FieldData) GetRow(i int) any { + if data.GetNullable() && !data.ValidData[i] { + return nil + } + return data.Data[i] +} + +func (data *FloatFieldData) GetRow(i int) any { + if data.GetNullable() && !data.ValidData[i] { + return nil + } + return data.Data[i] +} + +func (data *DoubleFieldData) GetRow(i int) any { + if data.GetNullable() && !data.ValidData[i] { + return nil + } + return data.Data[i] +} + +func (data *StringFieldData) GetRow(i int) any { + if data.GetNullable() && !data.ValidData[i] { + return nil + } + return data.Data[i] +} + +func (data *ArrayFieldData) GetRow(i int) any { + if data.GetNullable() && !data.ValidData[i] { + return nil + } + return data.Data[i] +} + +func (data *JSONFieldData) GetRow(i int) any { + if data.GetNullable() && !data.ValidData[i] { + return nil + } + return data.Data[i] +} + func (data *BinaryVectorFieldData) GetRow(i int) any { return data.Data[i*data.Dim/8 : (i+1)*data.Dim/8] } @@ -412,109 +512,189 @@ func (data *BFloat16VectorFieldData) GetRow(i int) interface{} { return data.Data[i*data.Dim*2 : (i+1)*data.Dim*2] } -func (data *BoolFieldData) GetRows() any { return data.Data } -func (data *Int8FieldData) GetRows() any { return data.Data } -func (data *Int16FieldData) GetRows() any { return data.Data } -func (data *Int32FieldData) GetRows() any { return data.Data } -func (data *Int64FieldData) GetRows() any { return data.Data } -func (data *FloatFieldData) GetRows() any { return data.Data } -func (data *DoubleFieldData) GetRows() any { return data.Data } -func (data *StringFieldData) GetRows() any { return data.Data } -func (data *ArrayFieldData) GetRows() any { return data.Data } -func (data *JSONFieldData) GetRows() any { return data.Data } -func (data *BinaryVectorFieldData) GetRows() any { return data.Data } -func (data *FloatVectorFieldData) GetRows() any { return data.Data } -func (data *Float16VectorFieldData) GetRows() any { return data.Data } -func (data *BFloat16VectorFieldData) GetRows() any { return data.Data } -func (data *SparseFloatVectorFieldData) GetRows() any { return data.Contents } +func (data *BoolFieldData) GetDataRows() any { return data.Data } +func (data *Int8FieldData) GetDataRows() any { return data.Data } +func (data *Int16FieldData) GetDataRows() any { return data.Data } +func (data *Int32FieldData) GetDataRows() any { return data.Data } +func (data *Int64FieldData) GetDataRows() any { return data.Data } +func (data *FloatFieldData) GetDataRows() any { return data.Data } +func (data *DoubleFieldData) GetDataRows() any { return data.Data } +func (data *StringFieldData) GetDataRows() any { return data.Data } +func (data *ArrayFieldData) GetDataRows() any { return data.Data } +func (data *JSONFieldData) GetDataRows() any { return data.Data } +func (data *BinaryVectorFieldData) GetDataRows() any { return data.Data } +func (data *FloatVectorFieldData) GetDataRows() any { return data.Data } +func (data *Float16VectorFieldData) GetDataRows() any { return data.Data } +func (data *BFloat16VectorFieldData) GetDataRows() any { return data.Data } +func (data *SparseFloatVectorFieldData) GetDataRows() any { return data.Contents } // AppendRow implements FieldData.AppendRow func (data *BoolFieldData) AppendRow(row interface{}) error { + if data.GetNullable() && row == nil { + data.Data = append(data.Data, make([]bool, 1)...) + data.ValidData = append(data.ValidData, false) + return nil + } v, ok := row.(bool) if !ok { return merr.WrapErrParameterInvalid("bool", row, "Wrong row type") } data.Data = append(data.Data, v) + if data.GetNullable() { + data.ValidData = append(data.ValidData, true) + } return nil } func (data *Int8FieldData) AppendRow(row interface{}) error { + if data.GetNullable() && row == nil { + data.Data = append(data.Data, make([]int8, 1)...) + data.ValidData = append(data.ValidData, false) + return nil + } v, ok := row.(int8) if !ok { return merr.WrapErrParameterInvalid("int8", row, "Wrong row type") } + if data.GetNullable() { + data.ValidData = append(data.ValidData, true) + } data.Data = append(data.Data, v) return nil } func (data *Int16FieldData) AppendRow(row interface{}) error { + if data.GetNullable() && row == nil { + data.Data = append(data.Data, make([]int16, 1)...) + data.ValidData = append(data.ValidData, false) + return nil + } v, ok := row.(int16) if !ok { return merr.WrapErrParameterInvalid("int16", row, "Wrong row type") } + if data.GetNullable() { + data.ValidData = append(data.ValidData, true) + } data.Data = append(data.Data, v) return nil } func (data *Int32FieldData) AppendRow(row interface{}) error { + if data.GetNullable() && row == nil { + data.Data = append(data.Data, make([]int32, 1)...) + data.ValidData = append(data.ValidData, false) + return nil + } v, ok := row.(int32) if !ok { return merr.WrapErrParameterInvalid("int32", row, "Wrong row type") } + if data.GetNullable() { + data.ValidData = append(data.ValidData, true) + } data.Data = append(data.Data, v) return nil } func (data *Int64FieldData) AppendRow(row interface{}) error { + if data.GetNullable() && row == nil { + data.Data = append(data.Data, make([]int64, 1)...) + data.ValidData = append(data.ValidData, false) + return nil + } v, ok := row.(int64) if !ok { return merr.WrapErrParameterInvalid("int64", row, "Wrong row type") } + if data.GetNullable() { + data.ValidData = append(data.ValidData, true) + } data.Data = append(data.Data, v) return nil } func (data *FloatFieldData) AppendRow(row interface{}) error { + if data.GetNullable() && row == nil { + data.Data = append(data.Data, make([]float32, 1)...) + data.ValidData = append(data.ValidData, false) + return nil + } v, ok := row.(float32) if !ok { return merr.WrapErrParameterInvalid("float32", row, "Wrong row type") } + if data.GetNullable() { + data.ValidData = append(data.ValidData, true) + } data.Data = append(data.Data, v) return nil } func (data *DoubleFieldData) AppendRow(row interface{}) error { + if data.GetNullable() && row == nil { + data.Data = append(data.Data, make([]float64, 1)...) + data.ValidData = append(data.ValidData, false) + return nil + } v, ok := row.(float64) if !ok { return merr.WrapErrParameterInvalid("float64", row, "Wrong row type") } + if data.GetNullable() { + data.ValidData = append(data.ValidData, true) + } data.Data = append(data.Data, v) return nil } func (data *StringFieldData) AppendRow(row interface{}) error { + if data.GetNullable() && row == nil { + data.Data = append(data.Data, make([]string, 1)...) + data.ValidData = append(data.ValidData, false) + return nil + } v, ok := row.(string) if !ok { return merr.WrapErrParameterInvalid("string", row, "Wrong row type") } + if data.GetNullable() { + data.ValidData = append(data.ValidData, true) + } data.Data = append(data.Data, v) return nil } func (data *ArrayFieldData) AppendRow(row interface{}) error { + if data.GetNullable() && row == nil { + data.Data = append(data.Data, make([]*schemapb.ScalarField, 1)...) + data.ValidData = append(data.ValidData, false) + return nil + } v, ok := row.(*schemapb.ScalarField) if !ok { return merr.WrapErrParameterInvalid("*schemapb.ScalarField", row, "Wrong row type") } + if data.GetNullable() { + data.ValidData = append(data.ValidData, true) + } data.Data = append(data.Data, v) return nil } func (data *JSONFieldData) AppendRow(row interface{}) error { + if data.GetNullable() && row == nil { + data.Data = append(data.Data, make([][]byte, 1)...) + data.ValidData = append(data.ValidData, false) + return nil + } v, ok := row.([]byte) if !ok { return merr.WrapErrParameterInvalid("[]byte", row, "Wrong row type") } + if data.GetNullable() { + data.ValidData = append(data.ValidData, true) + } data.Data = append(data.Data, v) return nil } @@ -571,7 +751,131 @@ func (data *SparseFloatVectorFieldData) AppendRow(row interface{}) error { return nil } -func (data *BoolFieldData) AppendRows(rows interface{}) error { +func (data *BoolFieldData) AppendRows(dataRows interface{}, validDataRows interface{}) error { + err := data.AppendDataRows(dataRows) + if err != nil { + return err + } + return data.AppendValidDataRows(validDataRows) +} + +func (data *Int8FieldData) AppendRows(dataRows interface{}, validDataRows interface{}) error { + err := data.AppendDataRows(dataRows) + if err != nil { + return err + } + return data.AppendValidDataRows(validDataRows) +} + +func (data *Int16FieldData) AppendRows(dataRows interface{}, validDataRows interface{}) error { + err := data.AppendDataRows(dataRows) + if err != nil { + return err + } + return data.AppendValidDataRows(validDataRows) +} + +func (data *Int32FieldData) AppendRows(dataRows interface{}, validDataRows interface{}) error { + err := data.AppendDataRows(dataRows) + if err != nil { + return err + } + return data.AppendValidDataRows(validDataRows) +} + +func (data *Int64FieldData) AppendRows(dataRows interface{}, validDataRows interface{}) error { + err := data.AppendDataRows(dataRows) + if err != nil { + return err + } + return data.AppendValidDataRows(validDataRows) +} + +func (data *FloatFieldData) AppendRows(dataRows interface{}, validDataRows interface{}) error { + err := data.AppendDataRows(dataRows) + if err != nil { + return err + } + return data.AppendValidDataRows(validDataRows) +} + +func (data *DoubleFieldData) AppendRows(dataRows interface{}, validDataRows interface{}) error { + err := data.AppendDataRows(dataRows) + if err != nil { + return err + } + return data.AppendValidDataRows(validDataRows) +} + +func (data *StringFieldData) AppendRows(dataRows interface{}, validDataRows interface{}) error { + err := data.AppendDataRows(dataRows) + if err != nil { + return err + } + return data.AppendValidDataRows(validDataRows) +} + +func (data *ArrayFieldData) AppendRows(dataRows interface{}, validDataRows interface{}) error { + err := data.AppendDataRows(dataRows) + if err != nil { + return err + } + return data.AppendValidDataRows(validDataRows) +} + +func (data *JSONFieldData) AppendRows(dataRows interface{}, validDataRows interface{}) error { + err := data.AppendDataRows(dataRows) + if err != nil { + return err + } + return data.AppendValidDataRows(validDataRows) +} + +// AppendDataRows appends FLATTEN vectors to field data. +func (data *BinaryVectorFieldData) AppendRows(dataRows interface{}, validDataRows interface{}) error { + err := data.AppendDataRows(dataRows) + if err != nil { + return err + } + return data.AppendValidDataRows(validDataRows) +} + +// AppendDataRows appends FLATTEN vectors to field data. +func (data *FloatVectorFieldData) AppendRows(dataRows interface{}, validDataRows interface{}) error { + err := data.AppendDataRows(dataRows) + if err != nil { + return err + } + return data.AppendValidDataRows(validDataRows) +} + +// AppendDataRows appends FLATTEN vectors to field data. +func (data *Float16VectorFieldData) AppendRows(dataRows interface{}, validDataRows interface{}) error { + err := data.AppendDataRows(dataRows) + if err != nil { + return err + } + return data.AppendValidDataRows(validDataRows) +} + +// AppendDataRows appends FLATTEN vectors to field data. +func (data *BFloat16VectorFieldData) AppendRows(dataRows interface{}, validDataRows interface{}) error { + err := data.AppendDataRows(dataRows) + if err != nil { + return err + } + return data.AppendValidDataRows(validDataRows) +} + +func (data *SparseFloatVectorFieldData) AppendRows(dataRows interface{}, validDataRows interface{}) error { + err := data.AppendDataRows(dataRows) + if err != nil { + return err + } + return data.AppendValidDataRows(validDataRows) +} + +func (data *BoolFieldData) AppendDataRows(rows interface{}) error { v, ok := rows.([]bool) if !ok { return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type") @@ -580,7 +884,7 @@ func (data *BoolFieldData) AppendRows(rows interface{}) error { return nil } -func (data *Int8FieldData) AppendRows(rows interface{}) error { +func (data *Int8FieldData) AppendDataRows(rows interface{}) error { v, ok := rows.([]int8) if !ok { return merr.WrapErrParameterInvalid("[]int8", rows, "Wrong rows type") @@ -589,7 +893,7 @@ func (data *Int8FieldData) AppendRows(rows interface{}) error { return nil } -func (data *Int16FieldData) AppendRows(rows interface{}) error { +func (data *Int16FieldData) AppendDataRows(rows interface{}) error { v, ok := rows.([]int16) if !ok { return merr.WrapErrParameterInvalid("[]int16", rows, "Wrong rows type") @@ -598,7 +902,7 @@ func (data *Int16FieldData) AppendRows(rows interface{}) error { return nil } -func (data *Int32FieldData) AppendRows(rows interface{}) error { +func (data *Int32FieldData) AppendDataRows(rows interface{}) error { v, ok := rows.([]int32) if !ok { return merr.WrapErrParameterInvalid("[]int32", rows, "Wrong rows type") @@ -607,7 +911,7 @@ func (data *Int32FieldData) AppendRows(rows interface{}) error { return nil } -func (data *Int64FieldData) AppendRows(rows interface{}) error { +func (data *Int64FieldData) AppendDataRows(rows interface{}) error { v, ok := rows.([]int64) if !ok { return merr.WrapErrParameterInvalid("[]int64", rows, "Wrong rows type") @@ -616,7 +920,7 @@ func (data *Int64FieldData) AppendRows(rows interface{}) error { return nil } -func (data *FloatFieldData) AppendRows(rows interface{}) error { +func (data *FloatFieldData) AppendDataRows(rows interface{}) error { v, ok := rows.([]float32) if !ok { return merr.WrapErrParameterInvalid("[]float32", rows, "Wrong rows type") @@ -625,7 +929,7 @@ func (data *FloatFieldData) AppendRows(rows interface{}) error { return nil } -func (data *DoubleFieldData) AppendRows(rows interface{}) error { +func (data *DoubleFieldData) AppendDataRows(rows interface{}) error { v, ok := rows.([]float64) if !ok { return merr.WrapErrParameterInvalid("[]float64", rows, "Wrong rows type") @@ -634,7 +938,7 @@ func (data *DoubleFieldData) AppendRows(rows interface{}) error { return nil } -func (data *StringFieldData) AppendRows(rows interface{}) error { +func (data *StringFieldData) AppendDataRows(rows interface{}) error { v, ok := rows.([]string) if !ok { return merr.WrapErrParameterInvalid("[]string", rows, "Wrong rows type") @@ -643,7 +947,7 @@ func (data *StringFieldData) AppendRows(rows interface{}) error { return nil } -func (data *ArrayFieldData) AppendRows(rows interface{}) error { +func (data *ArrayFieldData) AppendDataRows(rows interface{}) error { v, ok := rows.([]*schemapb.ScalarField) if !ok { return merr.WrapErrParameterInvalid("[]*schemapb.ScalarField", rows, "Wrong rows type") @@ -652,7 +956,7 @@ func (data *ArrayFieldData) AppendRows(rows interface{}) error { return nil } -func (data *JSONFieldData) AppendRows(rows interface{}) error { +func (data *JSONFieldData) AppendDataRows(rows interface{}) error { v, ok := rows.([][]byte) if !ok { return merr.WrapErrParameterInvalid("[][]byte", rows, "Wrong rows type") @@ -661,8 +965,8 @@ func (data *JSONFieldData) AppendRows(rows interface{}) error { return nil } -// AppendRows appends FLATTEN vectors to field data. -func (data *BinaryVectorFieldData) AppendRows(rows interface{}) error { +// AppendDataRows appends FLATTEN vectors to field data. +func (data *BinaryVectorFieldData) AppendDataRows(rows interface{}) error { v, ok := rows.([]byte) if !ok { return merr.WrapErrParameterInvalid("[]byte", rows, "Wrong rows type") @@ -674,8 +978,8 @@ func (data *BinaryVectorFieldData) AppendRows(rows interface{}) error { return nil } -// AppendRows appends FLATTEN vectors to field data. -func (data *FloatVectorFieldData) AppendRows(rows interface{}) error { +// AppendDataRows appends FLATTEN vectors to field data. +func (data *FloatVectorFieldData) AppendDataRows(rows interface{}) error { v, ok := rows.([]float32) if !ok { return merr.WrapErrParameterInvalid("[]float32", rows, "Wrong rows type") @@ -687,8 +991,8 @@ func (data *FloatVectorFieldData) AppendRows(rows interface{}) error { return nil } -// AppendRows appends FLATTEN vectors to field data. -func (data *Float16VectorFieldData) AppendRows(rows interface{}) error { +// AppendDataRows appends FLATTEN vectors to field data. +func (data *Float16VectorFieldData) AppendDataRows(rows interface{}) error { v, ok := rows.([]byte) if !ok { return merr.WrapErrParameterInvalid("[]byte", rows, "Wrong rows type") @@ -700,8 +1004,8 @@ func (data *Float16VectorFieldData) AppendRows(rows interface{}) error { return nil } -// AppendRows appends FLATTEN vectors to field data. -func (data *BFloat16VectorFieldData) AppendRows(rows interface{}) error { +// AppendDataRows appends FLATTEN vectors to field data. +func (data *BFloat16VectorFieldData) AppendDataRows(rows interface{}) error { v, ok := rows.([]byte) if !ok { return merr.WrapErrParameterInvalid("[]byte", rows, "Wrong rows type") @@ -713,7 +1017,7 @@ func (data *BFloat16VectorFieldData) AppendRows(rows interface{}) error { return nil } -func (data *SparseFloatVectorFieldData) AppendRows(rows interface{}) error { +func (data *SparseFloatVectorFieldData) AppendDataRows(rows interface{}) error { v, ok := rows.(*SparseFloatVectorFieldData) if !ok { return merr.WrapErrParameterInvalid("SparseFloatVectorFieldData", rows, "Wrong rows type") @@ -725,33 +1029,192 @@ func (data *SparseFloatVectorFieldData) AppendRows(rows interface{}) error { return nil } +func (data *BoolFieldData) AppendValidDataRows(rows interface{}) error { + if rows == nil { + return nil + } + v, ok := rows.([]bool) + if !ok { + return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type") + } + data.ValidData = append(data.ValidData, v...) + return nil +} + +func (data *Int8FieldData) AppendValidDataRows(rows interface{}) error { + if rows == nil { + return nil + } + v, ok := rows.([]bool) + if !ok { + return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type") + } + data.ValidData = append(data.ValidData, v...) + return nil +} + +func (data *Int16FieldData) AppendValidDataRows(rows interface{}) error { + if rows == nil { + return nil + } + v, ok := rows.([]bool) + if !ok { + return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type") + } + data.ValidData = append(data.ValidData, v...) + return nil +} + +func (data *Int32FieldData) AppendValidDataRows(rows interface{}) error { + if rows == nil { + return nil + } + v, ok := rows.([]bool) + if !ok { + return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type") + } + data.ValidData = append(data.ValidData, v...) + return nil +} + +func (data *Int64FieldData) AppendValidDataRows(rows interface{}) error { + if rows == nil { + return nil + } + v, ok := rows.([]bool) + if !ok { + return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type") + } + data.ValidData = append(data.ValidData, v...) + return nil +} + +func (data *FloatFieldData) AppendValidDataRows(rows interface{}) error { + if rows == nil { + return nil + } + v, ok := rows.([]bool) + if !ok { + return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type") + } + data.ValidData = append(data.ValidData, v...) + return nil +} + +func (data *DoubleFieldData) AppendValidDataRows(rows interface{}) error { + if rows == nil { + return nil + } + v, ok := rows.([]bool) + if !ok { + return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type") + } + data.ValidData = append(data.ValidData, v...) + return nil +} + +func (data *StringFieldData) AppendValidDataRows(rows interface{}) error { + if rows == nil { + return nil + } + v, ok := rows.([]bool) + if !ok { + return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type") + } + data.ValidData = append(data.ValidData, v...) + return nil +} + +func (data *ArrayFieldData) AppendValidDataRows(rows interface{}) error { + if rows == nil { + return nil + } + v, ok := rows.([]bool) + if !ok { + return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type") + } + data.ValidData = append(data.ValidData, v...) + return nil +} + +func (data *JSONFieldData) AppendValidDataRows(rows interface{}) error { + if rows == nil { + return nil + } + v, ok := rows.([]bool) + if !ok { + return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type") + } + data.ValidData = append(data.ValidData, v...) + return nil +} + +// AppendValidDataRows appends FLATTEN vectors to field data. +func (data *BinaryVectorFieldData) AppendValidDataRows(rows interface{}) error { + if rows != nil { + return merr.WrapErrParameterInvalidMsg("not support Nullable in vector") + } + return nil +} + +// AppendValidDataRows appends FLATTEN vectors to field data. +func (data *FloatVectorFieldData) AppendValidDataRows(rows interface{}) error { + if rows != nil { + return merr.WrapErrParameterInvalidMsg("not support Nullable in vector") + } + return nil +} + +// AppendValidDataRows appends FLATTEN vectors to field data. +func (data *Float16VectorFieldData) AppendValidDataRows(rows interface{}) error { + if rows != nil { + return merr.WrapErrParameterInvalidMsg("not support Nullable in vector") + } + return nil +} + +// AppendValidDataRows appends FLATTEN vectors to field data. +func (data *BFloat16VectorFieldData) AppendValidDataRows(rows interface{}) error { + if rows != nil { + return merr.WrapErrParameterInvalidMsg("not support Nullable in vector") + } + return nil +} + +func (data *SparseFloatVectorFieldData) AppendValidDataRows(rows interface{}) error { + if rows != nil { + return merr.WrapErrParameterInvalidMsg("not support Nullable in vector") + } + return nil +} + // GetMemorySize implements FieldData.GetMemorySize func (data *BoolFieldData) GetMemorySize() int { - return binary.Size(data.Data) + binary.Size(data.ValidData) + return binary.Size(data.Data) + binary.Size(data.ValidData) + binary.Size(data.Nullable) } func (data *Int8FieldData) GetMemorySize() int { - return binary.Size(data.Data) + binary.Size(data.ValidData) + return binary.Size(data.Data) + binary.Size(data.ValidData) + binary.Size(data.Nullable) } func (data *Int16FieldData) GetMemorySize() int { - return binary.Size(data.Data) + binary.Size(data.ValidData) + return binary.Size(data.Data) + binary.Size(data.ValidData) + binary.Size(data.Nullable) } func (data *Int32FieldData) GetMemorySize() int { - return binary.Size(data.Data) + binary.Size(data.ValidData) + return binary.Size(data.Data) + binary.Size(data.ValidData) + binary.Size(data.Nullable) } func (data *Int64FieldData) GetMemorySize() int { - return binary.Size(data.Data) + binary.Size(data.ValidData) + return binary.Size(data.Data) + binary.Size(data.ValidData) + binary.Size(data.Nullable) } func (data *FloatFieldData) GetMemorySize() int { - return binary.Size(data.Data) + binary.Size(data.ValidData) + return binary.Size(data.Data) + binary.Size(data.ValidData) + binary.Size(data.Nullable) } func (data *DoubleFieldData) GetMemorySize() int { - return binary.Size(data.Data) + binary.Size(data.ValidData) + return binary.Size(data.Data) + binary.Size(data.ValidData) + binary.Size(data.Nullable) } func (data *BinaryVectorFieldData) GetMemorySize() int { return binary.Size(data.Data) + 4 } func (data *FloatVectorFieldData) GetMemorySize() int { return binary.Size(data.Data) + 4 } @@ -803,7 +1266,7 @@ func (data *StringFieldData) GetMemorySize() int { for _, val := range data.Data { size += len(val) + 16 } - return size + return size + binary.Size(data.ValidData) + binary.Size(data.Nullable) } func (data *ArrayFieldData) GetMemorySize() int { @@ -828,7 +1291,7 @@ func (data *ArrayFieldData) GetMemorySize() int { size += (&StringFieldData{Data: val.GetStringData().GetData()}).GetMemorySize() } } - return size + return size + binary.Size(data.ValidData) + binary.Size(data.Nullable) } func (data *JSONFieldData) GetMemorySize() int { @@ -836,7 +1299,7 @@ func (data *JSONFieldData) GetMemorySize() int { for _, val := range data.Data { size += len(val) + 16 } - return size + return size + binary.Size(data.ValidData) + binary.Size(data.Nullable) } func (data *BoolFieldData) GetRowSize(i int) int { return 1 } @@ -879,31 +1342,31 @@ func (data *SparseFloatVectorFieldData) GetRowSize(i int) int { } func (data *BoolFieldData) GetNullable() bool { - return len(data.ValidData) != 0 + return data.Nullable } func (data *Int8FieldData) GetNullable() bool { - return len(data.ValidData) != 0 + return data.Nullable } func (data *Int16FieldData) GetNullable() bool { - return len(data.ValidData) != 0 + return data.Nullable } func (data *Int32FieldData) GetNullable() bool { - return len(data.ValidData) != 0 + return data.Nullable } func (data *Int64FieldData) GetNullable() bool { - return len(data.ValidData) != 0 + return data.Nullable } func (data *FloatFieldData) GetNullable() bool { - return len(data.ValidData) != 0 + return data.Nullable } func (data *DoubleFieldData) GetNullable() bool { - return len(data.ValidData) != 0 + return data.Nullable } func (data *BFloat16VectorFieldData) GetNullable() bool { @@ -927,13 +1390,13 @@ func (data *Float16VectorFieldData) GetNullable() bool { } func (data *StringFieldData) GetNullable() bool { - return len(data.ValidData) != 0 + return data.Nullable } func (data *ArrayFieldData) GetNullable() bool { - return len(data.ValidData) != 0 + return data.Nullable } func (data *JSONFieldData) GetNullable() bool { - return len(data.ValidData) != 0 + return data.Nullable } diff --git a/internal/storage/insert_data_test.go b/internal/storage/insert_data_test.go index a941150039..286f5f5ba2 100644 --- a/internal/storage/insert_data_test.go +++ b/internal/storage/insert_data_test.go @@ -114,15 +114,15 @@ func (s *InsertDataSuite) TestInsertData() { s.Run("init by New", func() { s.True(s.iDataEmpty.IsEmpty()) s.Equal(0, s.iDataEmpty.GetRowNum()) - s.Equal(16, s.iDataEmpty.GetMemorySize()) + s.Equal(28, s.iDataEmpty.GetMemorySize()) s.False(s.iDataOneRow.IsEmpty()) s.Equal(1, s.iDataOneRow.GetRowNum()) - s.Equal(179, s.iDataOneRow.GetMemorySize()) + s.Equal(191, s.iDataOneRow.GetMemorySize()) s.False(s.iDataTwoRows.IsEmpty()) s.Equal(2, s.iDataTwoRows.GetRowNum()) - s.Equal(340, s.iDataTwoRows.GetMemorySize()) + s.Equal(352, s.iDataTwoRows.GetMemorySize()) for _, field := range s.iDataTwoRows.Data { s.Equal(2, field.RowNum()) @@ -135,52 +135,52 @@ func (s *InsertDataSuite) TestInsertData() { } func (s *InsertDataSuite) TestMemorySize() { - s.Equal(s.iDataEmpty.Data[RowIDField].GetMemorySize(), 0) - s.Equal(s.iDataEmpty.Data[TimestampField].GetMemorySize(), 0) - s.Equal(s.iDataEmpty.Data[BoolField].GetMemorySize(), 0) - s.Equal(s.iDataEmpty.Data[Int8Field].GetMemorySize(), 0) - s.Equal(s.iDataEmpty.Data[Int16Field].GetMemorySize(), 0) - s.Equal(s.iDataEmpty.Data[Int32Field].GetMemorySize(), 0) - s.Equal(s.iDataEmpty.Data[Int64Field].GetMemorySize(), 0) - s.Equal(s.iDataEmpty.Data[FloatField].GetMemorySize(), 0) - s.Equal(s.iDataEmpty.Data[DoubleField].GetMemorySize(), 0) - s.Equal(s.iDataEmpty.Data[StringField].GetMemorySize(), 0) - s.Equal(s.iDataEmpty.Data[ArrayField].GetMemorySize(), 0) + s.Equal(s.iDataEmpty.Data[RowIDField].GetMemorySize(), 1) + s.Equal(s.iDataEmpty.Data[TimestampField].GetMemorySize(), 1) + s.Equal(s.iDataEmpty.Data[BoolField].GetMemorySize(), 1) + s.Equal(s.iDataEmpty.Data[Int8Field].GetMemorySize(), 1) + s.Equal(s.iDataEmpty.Data[Int16Field].GetMemorySize(), 1) + s.Equal(s.iDataEmpty.Data[Int32Field].GetMemorySize(), 1) + s.Equal(s.iDataEmpty.Data[Int64Field].GetMemorySize(), 1) + s.Equal(s.iDataEmpty.Data[FloatField].GetMemorySize(), 1) + s.Equal(s.iDataEmpty.Data[DoubleField].GetMemorySize(), 1) + s.Equal(s.iDataEmpty.Data[StringField].GetMemorySize(), 1) + s.Equal(s.iDataEmpty.Data[ArrayField].GetMemorySize(), 1) s.Equal(s.iDataEmpty.Data[BinaryVectorField].GetMemorySize(), 4) s.Equal(s.iDataEmpty.Data[FloatVectorField].GetMemorySize(), 4) s.Equal(s.iDataEmpty.Data[Float16VectorField].GetMemorySize(), 4) s.Equal(s.iDataEmpty.Data[BFloat16VectorField].GetMemorySize(), 4) s.Equal(s.iDataEmpty.Data[SparseFloatVectorField].GetMemorySize(), 0) - s.Equal(s.iDataOneRow.Data[RowIDField].GetMemorySize(), 8) - s.Equal(s.iDataOneRow.Data[TimestampField].GetMemorySize(), 8) - s.Equal(s.iDataOneRow.Data[BoolField].GetMemorySize(), 1) - s.Equal(s.iDataOneRow.Data[Int8Field].GetMemorySize(), 1) - s.Equal(s.iDataOneRow.Data[Int16Field].GetMemorySize(), 2) - s.Equal(s.iDataOneRow.Data[Int32Field].GetMemorySize(), 4) - s.Equal(s.iDataOneRow.Data[Int64Field].GetMemorySize(), 8) - s.Equal(s.iDataOneRow.Data[FloatField].GetMemorySize(), 4) - s.Equal(s.iDataOneRow.Data[DoubleField].GetMemorySize(), 8) - s.Equal(s.iDataOneRow.Data[StringField].GetMemorySize(), 19) - s.Equal(s.iDataOneRow.Data[JSONField].GetMemorySize(), len([]byte(`{"batch":1}`))+16) - s.Equal(s.iDataOneRow.Data[ArrayField].GetMemorySize(), 3*4) + s.Equal(s.iDataOneRow.Data[RowIDField].GetMemorySize(), 9) + s.Equal(s.iDataOneRow.Data[TimestampField].GetMemorySize(), 9) + s.Equal(s.iDataOneRow.Data[BoolField].GetMemorySize(), 2) + s.Equal(s.iDataOneRow.Data[Int8Field].GetMemorySize(), 2) + s.Equal(s.iDataOneRow.Data[Int16Field].GetMemorySize(), 3) + s.Equal(s.iDataOneRow.Data[Int32Field].GetMemorySize(), 5) + s.Equal(s.iDataOneRow.Data[Int64Field].GetMemorySize(), 9) + s.Equal(s.iDataOneRow.Data[FloatField].GetMemorySize(), 5) + s.Equal(s.iDataOneRow.Data[DoubleField].GetMemorySize(), 9) + s.Equal(s.iDataOneRow.Data[StringField].GetMemorySize(), 20) + s.Equal(s.iDataOneRow.Data[JSONField].GetMemorySize(), len([]byte(`{"batch":1}`))+16+1) + s.Equal(s.iDataOneRow.Data[ArrayField].GetMemorySize(), 3*4+1) s.Equal(s.iDataOneRow.Data[BinaryVectorField].GetMemorySize(), 5) s.Equal(s.iDataOneRow.Data[FloatVectorField].GetMemorySize(), 20) s.Equal(s.iDataOneRow.Data[Float16VectorField].GetMemorySize(), 12) s.Equal(s.iDataOneRow.Data[BFloat16VectorField].GetMemorySize(), 12) s.Equal(s.iDataOneRow.Data[SparseFloatVectorField].GetMemorySize(), 28) - s.Equal(s.iDataTwoRows.Data[RowIDField].GetMemorySize(), 16) - s.Equal(s.iDataTwoRows.Data[TimestampField].GetMemorySize(), 16) - s.Equal(s.iDataTwoRows.Data[BoolField].GetMemorySize(), 2) - s.Equal(s.iDataTwoRows.Data[Int8Field].GetMemorySize(), 2) - s.Equal(s.iDataTwoRows.Data[Int16Field].GetMemorySize(), 4) - s.Equal(s.iDataTwoRows.Data[Int32Field].GetMemorySize(), 8) - s.Equal(s.iDataTwoRows.Data[Int64Field].GetMemorySize(), 16) - s.Equal(s.iDataTwoRows.Data[FloatField].GetMemorySize(), 8) - s.Equal(s.iDataTwoRows.Data[DoubleField].GetMemorySize(), 16) - s.Equal(s.iDataTwoRows.Data[StringField].GetMemorySize(), 38) - s.Equal(s.iDataTwoRows.Data[ArrayField].GetMemorySize(), 24) + s.Equal(s.iDataTwoRows.Data[RowIDField].GetMemorySize(), 17) + s.Equal(s.iDataTwoRows.Data[TimestampField].GetMemorySize(), 17) + s.Equal(s.iDataTwoRows.Data[BoolField].GetMemorySize(), 3) + s.Equal(s.iDataTwoRows.Data[Int8Field].GetMemorySize(), 3) + s.Equal(s.iDataTwoRows.Data[Int16Field].GetMemorySize(), 5) + s.Equal(s.iDataTwoRows.Data[Int32Field].GetMemorySize(), 9) + s.Equal(s.iDataTwoRows.Data[Int64Field].GetMemorySize(), 17) + s.Equal(s.iDataTwoRows.Data[FloatField].GetMemorySize(), 9) + s.Equal(s.iDataTwoRows.Data[DoubleField].GetMemorySize(), 17) + s.Equal(s.iDataTwoRows.Data[StringField].GetMemorySize(), 39) + s.Equal(s.iDataTwoRows.Data[ArrayField].GetMemorySize(), 25) s.Equal(s.iDataTwoRows.Data[BinaryVectorField].GetMemorySize(), 6) s.Equal(s.iDataTwoRows.Data[FloatVectorField].GetMemorySize(), 36) s.Equal(s.iDataTwoRows.Data[Float16VectorField].GetMemorySize(), 20) @@ -230,7 +230,7 @@ func (s *InsertDataSuite) SetupTest() { s.Require().NoError(err) s.True(s.iDataEmpty.IsEmpty()) s.Equal(0, s.iDataEmpty.GetRowNum()) - s.Equal(16, s.iDataEmpty.GetMemorySize()) + s.Equal(28, s.iDataEmpty.GetMemorySize()) row1 := map[FieldID]interface{}{ RowIDField: int64(3), @@ -343,7 +343,7 @@ func (s *ArrayFieldDataSuite) TestArrayFieldData() { s.NoError(err) s.Equal(0, insertData.GetRowNum()) - s.Equal(0, insertData.GetMemorySize()) + s.Equal(11, insertData.GetMemorySize()) s.True(insertData.IsEmpty()) fieldIDToData := map[int64]interface{}{ @@ -395,7 +395,7 @@ func (s *ArrayFieldDataSuite) TestArrayFieldData() { err = insertData.Append(fieldIDToData) s.NoError(err) s.Equal(1, insertData.GetRowNum()) - s.Equal(114, insertData.GetMemorySize()) + s.Equal(126, insertData.GetMemorySize()) s.False(insertData.IsEmpty()) - s.Equal(114, insertData.GetRowSize(0)) + s.Equal(115, insertData.GetRowSize(0)) } diff --git a/internal/util/importutilv2/binlog/field_reader.go b/internal/util/importutilv2/binlog/field_reader.go index 324e249a6c..002ed47c03 100644 --- a/internal/util/importutilv2/binlog/field_reader.go +++ b/internal/util/importutilv2/binlog/field_reader.go @@ -48,8 +48,9 @@ func (r *fieldReader) Next() (storage.FieldData, error) { if err != nil { return nil, err } + // need append nulls for _, rows := range rowsSet { - err = fieldData.AppendRows(rows) + err = fieldData.AppendRows(rows, nil) if err != nil { return nil, err } diff --git a/internal/util/importutilv2/json/reader_test.go b/internal/util/importutilv2/json/reader_test.go index 5ebef90b55..0077686252 100644 --- a/internal/util/importutilv2/json/reader_test.go +++ b/internal/util/importutilv2/json/reader_test.go @@ -26,7 +26,6 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" - "golang.org/x/exp/slices" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -35,7 +34,6 @@ import ( "github.com/milvus-io/milvus/internal/util/testutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) type ReaderSuite struct { @@ -57,7 +55,7 @@ func (suite *ReaderSuite) SetupTest() { suite.vecDataType = schemapb.DataType_FloatVector } -func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType) { +func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool) { schema := &schemapb.CollectionSchema{ Fields: []*schemapb.FieldSchema{ { @@ -98,6 +96,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data Value: "128", }, }, + Nullable: nullable, }, }, } @@ -129,15 +128,10 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data expectInsertData := insertData for fieldID, data := range actualInsertData.Data { suite.Equal(expectRows, data.RowNum()) - fieldDataType := typeutil.GetField(schema, fieldID).GetDataType() for i := 0; i < expectRows; i++ { expect := expectInsertData.Data[fieldID].GetRow(i + offsetBegin) actual := data.GetRow(i) - if fieldDataType == schemapb.DataType_Array { - suite.True(slices.Equal(expect.(*schemapb.ScalarField).GetIntData().GetData(), actual.(*schemapb.ScalarField).GetIntData().GetData())) - } else { - suite.Equal(expect, actual) - } + suite.Equal(expect, actual) } } } @@ -148,43 +142,63 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data } func (suite *ReaderSuite) TestReadScalarFields() { - suite.run(schemapb.DataType_Bool, schemapb.DataType_None) - suite.run(schemapb.DataType_Int8, schemapb.DataType_None) - suite.run(schemapb.DataType_Int16, schemapb.DataType_None) - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) - suite.run(schemapb.DataType_Int64, schemapb.DataType_None) - suite.run(schemapb.DataType_Float, schemapb.DataType_None) - suite.run(schemapb.DataType_Double, schemapb.DataType_None) - suite.run(schemapb.DataType_String, schemapb.DataType_None) - suite.run(schemapb.DataType_VarChar, schemapb.DataType_None) - suite.run(schemapb.DataType_JSON, schemapb.DataType_None) + suite.run(schemapb.DataType_Bool, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Int8, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Int16, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Int64, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Float, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Double, schemapb.DataType_None, false) + suite.run(schemapb.DataType_String, schemapb.DataType_None, false) + suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, false) + suite.run(schemapb.DataType_JSON, schemapb.DataType_None, false) - suite.run(schemapb.DataType_Array, schemapb.DataType_Bool) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int8) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int16) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int32) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int64) - suite.run(schemapb.DataType_Array, schemapb.DataType_Float) - suite.run(schemapb.DataType_Array, schemapb.DataType_Double) - suite.run(schemapb.DataType_Array, schemapb.DataType_String) + suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Float, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Double, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_String, false) + + suite.run(schemapb.DataType_Bool, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Int8, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Int16, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Int64, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Float, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Double, schemapb.DataType_None, true) + suite.run(schemapb.DataType_String, schemapb.DataType_None, true) + suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, true) + suite.run(schemapb.DataType_JSON, schemapb.DataType_None, true) + + suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Float, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Double, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_String, true) } func (suite *ReaderSuite) TestStringPK() { suite.pkDataType = schemapb.DataType_VarChar - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) } func (suite *ReaderSuite) TestVector() { suite.vecDataType = schemapb.DataType_BinaryVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) suite.vecDataType = schemapb.DataType_FloatVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) suite.vecDataType = schemapb.DataType_Float16Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) suite.vecDataType = schemapb.DataType_BFloat16Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) suite.vecDataType = schemapb.DataType_SparseFloatVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) } func TestUtil(t *testing.T) { diff --git a/internal/util/importutilv2/json/row_parser.go b/internal/util/importutilv2/json/row_parser.go index 634c464eb2..f94aec2fe0 100644 --- a/internal/util/importutilv2/json/row_parser.go +++ b/internal/util/importutilv2/json/row_parser.go @@ -199,6 +199,9 @@ func (r *rowParser) combineDynamicRow(dynamicValues map[string]any, row Row) err } func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) { + if r.id2Field[fieldID].GetNullable() { + return r.parseNullableEntity(fieldID, obj) + } switch r.id2Field[fieldID].GetDataType() { case schemapb.DataType_Bool: b, ok := obj.(bool) @@ -418,6 +421,147 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) { } } +func (r *rowParser) parseNullableEntity(fieldID int64, obj any) (any, error) { + switch r.id2Field[fieldID].GetDataType() { + case schemapb.DataType_Bool: + if obj == nil { + return nil, nil + } + value, ok := obj.(bool) + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } + return value, nil + case schemapb.DataType_Int8: + if obj == nil { + return nil, nil + } + value, ok := obj.(json.Number) + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } + num, err := strconv.ParseInt(value.String(), 0, 8) + if err != nil { + return nil, err + } + return int8(num), nil + case schemapb.DataType_Int16: + if obj == nil { + return nil, nil + } + value, ok := obj.(json.Number) + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } + num, err := strconv.ParseInt(value.String(), 0, 16) + if err != nil { + return nil, err + } + return int16(num), nil + case schemapb.DataType_Int32: + if obj == nil { + return nil, nil + } + value, ok := obj.(json.Number) + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } + num, err := strconv.ParseInt(value.String(), 0, 32) + if err != nil { + return nil, err + } + return int32(num), nil + case schemapb.DataType_Int64: + if obj == nil { + return nil, nil + } + value, ok := obj.(json.Number) + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } + num, err := strconv.ParseInt(value.String(), 0, 64) + if err != nil { + return nil, err + } + return num, nil + case schemapb.DataType_Float: + if obj == nil { + return nil, nil + } + value, ok := obj.(json.Number) + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } + num, err := strconv.ParseFloat(value.String(), 32) + if err != nil { + return nil, err + } + return float32(num), nil + case schemapb.DataType_Double: + if obj == nil { + return nil, nil + } + value, ok := obj.(json.Number) + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } + num, err := strconv.ParseFloat(value.String(), 64) + if err != nil { + return nil, err + } + return num, nil + case schemapb.DataType_BinaryVector, schemapb.DataType_FloatVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector: + return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + case schemapb.DataType_String, schemapb.DataType_VarChar: + if obj == nil { + return nil, nil + } + value, ok := obj.(string) + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } + return value, nil + case schemapb.DataType_JSON: + if obj == nil { + return nil, nil + } + // for JSON data, we accept two kinds input: string and map[string]interface + // user can write JSON content as {"FieldJSON": "{\"x\": 8}"} or {"FieldJSON": {"x": 8}} + if value, ok := obj.(string); ok { + var dummy interface{} + err := json.Unmarshal([]byte(value), &dummy) + if err != nil { + return nil, err + } + return []byte(value), nil + } else if mp, ok := obj.(map[string]interface{}); ok { + bs, err := json.Marshal(mp) + if err != nil { + return nil, err + } + return bs, nil + } else { + return nil, r.wrapTypeError(obj, fieldID) + } + case schemapb.DataType_Array: + if obj == nil { + return nil, nil + } + arr, ok := obj.([]interface{}) + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } + scalarFieldData, err := r.arrayToFieldData(arr, r.id2Field[fieldID].GetElementType()) + if err != nil { + return nil, err + } + return scalarFieldData, nil + default: + return nil, merr.WrapErrImportFailed(fmt.Sprintf("parse json failed, unsupport data type: %s", + r.id2Field[fieldID].GetDataType().String())) + } +} + func (r *rowParser) arrayToFieldData(arr []interface{}, eleType schemapb.DataType) (*schemapb.ScalarField, error) { switch eleType { case schemapb.DataType_Bool: diff --git a/internal/util/importutilv2/json/row_parser_test.go b/internal/util/importutilv2/json/row_parser_test.go index ff4b6ec4cb..77f79aee3e 100644 --- a/internal/util/importutilv2/json/row_parser_test.go +++ b/internal/util/importutilv2/json/row_parser_test.go @@ -98,6 +98,7 @@ func TestRowParser_Parse_Valid(t *testing.T) { var mp map[string]interface{} desc := json.NewDecoder(strings.NewReader(c.name)) + desc.UseNumber() err = desc.Decode(&mp) assert.NoError(t, err) diff --git a/internal/util/importutilv2/numpy/reader.go b/internal/util/importutilv2/numpy/reader.go index acd69e05c2..b1af6ef338 100644 --- a/internal/util/importutilv2/numpy/reader.go +++ b/internal/util/importutilv2/numpy/reader.go @@ -89,7 +89,7 @@ func (r *reader) Read() (*storage.InsertData, error) { if data == nil { return nil, io.EOF } - err = insertData.Data[fieldID].AppendRows(data) + err = insertData.Data[fieldID].AppendRows(data, nil) if err != nil { return nil, err } diff --git a/internal/util/importutilv2/numpy/reader_test.go b/internal/util/importutilv2/numpy/reader_test.go index d43baad0d3..b90c05bb67 100644 --- a/internal/util/importutilv2/numpy/reader_test.go +++ b/internal/util/importutilv2/numpy/reader_test.go @@ -142,7 +142,7 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { } data = jsonStrs case schemapb.DataType_BinaryVector: - rows := fieldData.GetRows().([]byte) + rows := fieldData.GetDataRows().([]byte) const rowBytes = dim / 8 chunked := lo.Chunk(rows, rowBytes) chunkedRows := make([][rowBytes]byte, len(chunked)) @@ -151,7 +151,7 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { } data = chunkedRows case schemapb.DataType_FloatVector: - rows := fieldData.GetRows().([]float32) + rows := fieldData.GetDataRows().([]float32) chunked := lo.Chunk(rows, dim) chunkedRows := make([][dim]float32, len(chunked)) for i, innerSlice := range chunked { @@ -159,7 +159,7 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { } data = chunkedRows case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: - rows := fieldData.GetRows().([]byte) + rows := fieldData.GetDataRows().([]byte) const rowBytes = dim * 2 chunked := lo.Chunk(rows, rowBytes) chunkedRows := make([][rowBytes]byte, len(chunked)) @@ -168,7 +168,7 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { } data = chunkedRows default: - data = fieldData.GetRows() + data = fieldData.GetDataRows() } reader, err := CreateReader(data) @@ -276,7 +276,7 @@ func (suite *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) { } data = jsonStrs case schemapb.DataType_BinaryVector: - rows := fieldData.GetRows().([]byte) + rows := fieldData.GetDataRows().([]byte) const rowBytes = dim / 8 chunked := lo.Chunk(rows, rowBytes) chunkedRows := make([][rowBytes]byte, len(chunked)) @@ -285,7 +285,7 @@ func (suite *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) { } data = chunkedRows case schemapb.DataType_FloatVector: - rows := fieldData.GetRows().([]float32) + rows := fieldData.GetDataRows().([]float32) chunked := lo.Chunk(rows, dim) chunkedRows := make([][dim]float32, len(chunked)) for i, innerSlice := range chunked { @@ -293,7 +293,7 @@ func (suite *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) { } data = chunkedRows case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: - rows := fieldData.GetRows().([]byte) + rows := fieldData.GetDataRows().([]byte) const rowBytes = dim * 2 chunked := lo.Chunk(rows, rowBytes) chunkedRows := make([][rowBytes]byte, len(chunked)) @@ -302,7 +302,7 @@ func (suite *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) { } data = chunkedRows default: - data = fieldData.GetRows() + data = fieldData.GetDataRows() } reader, err := CreateReader(data) diff --git a/internal/util/importutilv2/parquet/field_reader.go b/internal/util/importutilv2/parquet/field_reader.go index 106b20eb3c..5fb60ba458 100644 --- a/internal/util/importutilv2/parquet/field_reader.go +++ b/internal/util/importutilv2/parquet/field_reader.go @@ -66,58 +66,121 @@ func NewFieldReader(ctx context.Context, reader *pqarrow.FileReader, columnIndex return cr, nil } -func (c *FieldReader) Next(count int64) (any, error) { +func (c *FieldReader) Next(count int64) (any, any, error) { switch c.field.GetDataType() { case schemapb.DataType_Bool: - return ReadBoolData(c, count) + if c.field.GetNullable() { + return ReadNullableBoolData(c, count) + } + data, err := ReadBoolData(c, count) + return data, nil, err case schemapb.DataType_Int8: - return ReadIntegerOrFloatData[int8](c, count) + if c.field.GetNullable() { + return ReadNullableIntegerOrFloatData[int8](c, count) + } + data, err := ReadIntegerOrFloatData[int8](c, count) + return data, nil, err case schemapb.DataType_Int16: - return ReadIntegerOrFloatData[int16](c, count) + if c.field.GetNullable() { + return ReadNullableIntegerOrFloatData[int16](c, count) + } + data, err := ReadIntegerOrFloatData[int16](c, count) + return data, nil, err case schemapb.DataType_Int32: - return ReadIntegerOrFloatData[int32](c, count) + if c.field.GetNullable() { + return ReadNullableIntegerOrFloatData[int32](c, count) + } + data, err := ReadIntegerOrFloatData[int32](c, count) + return data, nil, err case schemapb.DataType_Int64: - return ReadIntegerOrFloatData[int64](c, count) + if c.field.GetNullable() { + return ReadNullableIntegerOrFloatData[int64](c, count) + } + data, err := ReadIntegerOrFloatData[int64](c, count) + return data, nil, err case schemapb.DataType_Float: + if c.field.GetNullable() { + data, validData, err := ReadNullableIntegerOrFloatData[float32](c, count) + if err != nil { + return nil, nil, err + } + if data == nil { + return nil, nil, nil + } + return data, validData, typeutil.VerifyFloats32(data.([]float32)) + } data, err := ReadIntegerOrFloatData[float32](c, count) if err != nil { - return nil, err + return nil, nil, err } if data == nil { - return nil, nil + return nil, nil, nil } - return data, typeutil.VerifyFloats32(data.([]float32)) + return data, nil, typeutil.VerifyFloats32(data.([]float32)) case schemapb.DataType_Double: + if c.field.GetNullable() { + data, validData, err := ReadNullableIntegerOrFloatData[float64](c, count) + if err != nil { + return nil, nil, err + } + if data == nil { + return nil, nil, nil + } + return data, validData, typeutil.VerifyFloats64(data.([]float64)) + } data, err := ReadIntegerOrFloatData[float64](c, count) if err != nil { - return nil, err + return nil, nil, err } if data == nil { - return nil, nil + return nil, nil, nil } - return data, typeutil.VerifyFloats64(data.([]float64)) + return data, nil, typeutil.VerifyFloats64(data.([]float64)) case schemapb.DataType_VarChar, schemapb.DataType_String: - return ReadVarcharData(c, count) + if c.field.GetNullable() { + return ReadNullableVarcharData(c, count) + } + data, err := ReadVarcharData(c, count) + return data, nil, err case schemapb.DataType_JSON: - return ReadJSONData(c, count) + if c.field.GetNullable() { + return ReadNullableJSONData(c, count) + } + data, err := ReadJSONData(c, count) + return data, nil, err case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: - return ReadBinaryData(c, count) + if c.field.GetNullable() { + return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + } + data, err := ReadBinaryData(c, count) + return data, nil, err case schemapb.DataType_FloatVector: + if c.field.GetNullable() { + return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + } arrayData, err := ReadIntegerOrFloatArrayData[float32](c, count) if err != nil { - return nil, err + return nil, nil, err } if arrayData == nil { - return nil, nil + return nil, nil, nil } vectors := lo.Flatten(arrayData.([][]float32)) - return vectors, nil + return vectors, nil, nil case schemapb.DataType_SparseFloatVector: - return ReadSparseFloatVectorData(c, count) + if c.field.GetNullable() { + return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + } + data, err := ReadSparseFloatVectorData(c, count) + return data, nil, err case schemapb.DataType_Array: - return ReadArrayData(c, count) + if c.field.GetNullable() { + return ReadNullableArrayData(c, count) + } + data, err := ReadArrayData(c, count) + return data, nil, err default: - return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type '%s' for field '%s'", + return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type '%s' for field '%s'", c.field.GetDataType().String(), c.field.GetName())) } } @@ -133,6 +196,9 @@ func ReadBoolData(pcr *FieldReader, count int64) (any, error) { for _, chunk := range chunked.Chunks() { dataNums := chunk.Data().Len() boolReader, ok := chunk.(*array.Boolean) + if boolReader.NullN() > 0 { + return nil, merr.WrapErrParameterInvalidMsg("not nullable, but has null value") + } if !ok { return nil, WrapTypeErr("bool", chunk.DataType().Name(), pcr.field) } @@ -146,6 +212,34 @@ func ReadBoolData(pcr *FieldReader, count int64) (any, error) { return data, nil } +func ReadNullableBoolData(pcr *FieldReader, count int64) (any, []bool, error) { + chunked, err := pcr.columnReader.NextBatch(count) + if err != nil { + return nil, nil, err + } + data := make([]bool, 0, count) + validData := make([]bool, 0, count) + for _, chunk := range chunked.Chunks() { + dataNums := chunk.Data().Len() + boolReader, ok := chunk.(*array.Boolean) + if !ok { + return nil, nil, WrapTypeErr("bool", chunk.DataType().Name(), pcr.field) + } + validData = append(validData, bytesToBoolArray(dataNums, boolReader.NullBitmapBytes())...) + + for i := 0; i < dataNums; i++ { + data = append(data, boolReader.Value(i)) + } + } + if len(data) == 0 { + return nil, nil, nil + } + if len(data) != len(validData) { + return nil, nil, merr.WrapErrParameterInvalid(len(data), len(validData), "length of data is not equal to length of valid_data") + } + return data, validData, nil +} + func ReadIntegerOrFloatData[T constraints.Integer | constraints.Float](pcr *FieldReader, count int64) (any, error) { chunked, err := pcr.columnReader.NextBatch(count) if err != nil { @@ -157,31 +251,49 @@ func ReadIntegerOrFloatData[T constraints.Integer | constraints.Float](pcr *Fiel switch chunk.DataType().ID() { case arrow.INT8: int8Reader := chunk.(*array.Int8) + if int8Reader.NullN() > 0 { + return nil, merr.WrapErrParameterInvalidMsg("not nullable, but has null value") + } for i := 0; i < dataNums; i++ { data = append(data, T(int8Reader.Value(i))) } case arrow.INT16: int16Reader := chunk.(*array.Int16) + if int16Reader.NullN() > 0 { + return nil, merr.WrapErrParameterInvalidMsg("not nullable, but has null value") + } for i := 0; i < dataNums; i++ { data = append(data, T(int16Reader.Value(i))) } case arrow.INT32: int32Reader := chunk.(*array.Int32) + if int32Reader.NullN() > 0 { + return nil, merr.WrapErrParameterInvalidMsg("not nullable, but has null value") + } for i := 0; i < dataNums; i++ { data = append(data, T(int32Reader.Value(i))) } case arrow.INT64: int64Reader := chunk.(*array.Int64) + if int64Reader.NullN() > 0 { + return nil, merr.WrapErrParameterInvalidMsg("not nullable, but has null value") + } for i := 0; i < dataNums; i++ { data = append(data, T(int64Reader.Value(i))) } case arrow.FLOAT32: float32Reader := chunk.(*array.Float32) + if float32Reader.NullN() > 0 { + return nil, merr.WrapErrParameterInvalidMsg("not nullable, but has null value") + } for i := 0; i < dataNums; i++ { data = append(data, T(float32Reader.Value(i))) } case arrow.FLOAT64: float64Reader := chunk.(*array.Float64) + if float64Reader.NullN() > 0 { + return nil, merr.WrapErrParameterInvalidMsg("not nullable, but has null value") + } for i := 0; i < dataNums; i++ { data = append(data, T(float64Reader.Value(i))) } @@ -195,6 +307,65 @@ func ReadIntegerOrFloatData[T constraints.Integer | constraints.Float](pcr *Fiel return data, nil } +func ReadNullableIntegerOrFloatData[T constraints.Integer | constraints.Float](pcr *FieldReader, count int64) (any, []bool, error) { + chunked, err := pcr.columnReader.NextBatch(count) + if err != nil { + return nil, nil, err + } + data := make([]T, 0, count) + validData := make([]bool, 0, count) + for _, chunk := range chunked.Chunks() { + dataNums := chunk.Data().Len() + switch chunk.DataType().ID() { + case arrow.INT8: + int8Reader := chunk.(*array.Int8) + validData = append(validData, bytesToBoolArray(dataNums, int8Reader.NullBitmapBytes())...) + for i := 0; i < dataNums; i++ { + data = append(data, T(int8Reader.Value(i))) + } + case arrow.INT16: + int16Reader := chunk.(*array.Int16) + validData = append(validData, bytesToBoolArray(dataNums, int16Reader.NullBitmapBytes())...) + for i := 0; i < dataNums; i++ { + data = append(data, T(int16Reader.Value(i))) + } + case arrow.INT32: + int32Reader := chunk.(*array.Int32) + validData = append(validData, bytesToBoolArray(dataNums, int32Reader.NullBitmapBytes())...) + for i := 0; i < dataNums; i++ { + data = append(data, T(int32Reader.Value(i))) + } + case arrow.INT64: + int64Reader := chunk.(*array.Int64) + validData = append(validData, bytesToBoolArray(dataNums, int64Reader.NullBitmapBytes())...) + for i := 0; i < dataNums; i++ { + data = append(data, T(int64Reader.Value(i))) + } + case arrow.FLOAT32: + float32Reader := chunk.(*array.Float32) + validData = append(validData, bytesToBoolArray(dataNums, float32Reader.NullBitmapBytes())...) + for i := 0; i < dataNums; i++ { + data = append(data, T(float32Reader.Value(i))) + } + case arrow.FLOAT64: + float64Reader := chunk.(*array.Float64) + validData = append(validData, bytesToBoolArray(dataNums, float64Reader.NullBitmapBytes())...) + for i := 0; i < dataNums; i++ { + data = append(data, T(float64Reader.Value(i))) + } + default: + return nil, nil, WrapTypeErr("integer|float", chunk.DataType().Name(), pcr.field) + } + } + if len(data) == 0 { + return nil, nil, nil + } + if len(data) != len(validData) { + return nil, nil, merr.WrapErrParameterInvalid(len(data), len(validData), "length of data is not equal to length of valid_data") + } + return data, validData, nil +} + func ReadStringData(pcr *FieldReader, count int64) (any, error) { chunked, err := pcr.columnReader.NextBatch(count) if err != nil { @@ -204,6 +375,9 @@ func ReadStringData(pcr *FieldReader, count int64) (any, error) { for _, chunk := range chunked.Chunks() { dataNums := chunk.Data().Len() stringReader, ok := chunk.(*array.String) + if stringReader.NullN() > 0 { + return nil, merr.WrapErrParameterInvalidMsg("not nullable, but has null value") + } if !ok { return nil, WrapTypeErr("string", chunk.DataType().Name(), pcr.field) } @@ -217,6 +391,37 @@ func ReadStringData(pcr *FieldReader, count int64) (any, error) { return data, nil } +func ReadNullableStringData(pcr *FieldReader, count int64) (any, []bool, error) { + chunked, err := pcr.columnReader.NextBatch(count) + if err != nil { + return nil, nil, err + } + data := make([]string, 0, count) + validData := make([]bool, 0, count) + for _, chunk := range chunked.Chunks() { + dataNums := chunk.Data().Len() + stringReader, ok := chunk.(*array.String) + if !ok { + return nil, nil, WrapTypeErr("string", chunk.DataType().Name(), pcr.field) + } + validData = append(validData, bytesToBoolArray(dataNums, stringReader.NullBitmapBytes())...) + for i := 0; i < dataNums; i++ { + if stringReader.IsNull(i) { + data = append(data, "") + continue + } + data = append(data, stringReader.ValueStr(i)) + } + } + if len(data) == 0 { + return nil, nil, nil + } + if len(data) != len(validData) { + return nil, nil, merr.WrapErrParameterInvalid(len(data), len(validData), "length of data is not equal to length of valid_data") + } + return data, validData, nil +} + func ReadVarcharData(pcr *FieldReader, count int64) (any, error) { chunked, err := pcr.columnReader.NextBatch(count) if err != nil { @@ -230,6 +435,9 @@ func ReadVarcharData(pcr *FieldReader, count int64) (any, error) { for _, chunk := range chunked.Chunks() { dataNums := chunk.Data().Len() stringReader, ok := chunk.(*array.String) + if stringReader.NullN() > 0 { + return nil, merr.WrapErrParameterInvalidMsg("not nullable, but has null value") + } if !ok { return nil, WrapTypeErr("string", chunk.DataType().Name(), pcr.field) } @@ -246,6 +454,44 @@ func ReadVarcharData(pcr *FieldReader, count int64) (any, error) { return data, nil } +func ReadNullableVarcharData(pcr *FieldReader, count int64) (any, []bool, error) { + chunked, err := pcr.columnReader.NextBatch(count) + if err != nil { + return nil, nil, err + } + data := make([]string, 0, count) + maxLength, err := parameterutil.GetMaxLength(pcr.field) + if err != nil { + return nil, nil, err + } + validData := make([]bool, 0, count) + for _, chunk := range chunked.Chunks() { + dataNums := chunk.Data().Len() + stringReader, ok := chunk.(*array.String) + if !ok { + return nil, nil, WrapTypeErr("string", chunk.DataType().Name(), pcr.field) + } + validData = append(validData, bytesToBoolArray(dataNums, stringReader.NullBitmapBytes())...) + for i := 0; i < dataNums; i++ { + if stringReader.IsNull(i) { + data = append(data, "") + continue + } + if err = common.CheckVarcharLength(stringReader.Value(i), maxLength); err != nil { + return nil, nil, err + } + data = append(data, stringReader.ValueStr(i)) + } + } + if len(data) == 0 { + return nil, nil, nil + } + if len(data) != len(validData) { + return nil, nil, merr.WrapErrParameterInvalid(len(data), len(validData), "length of data is not equal to length of valid_data") + } + return data, validData, nil +} + func ReadJSONData(pcr *FieldReader, count int64) (any, error) { // JSON field read data from string array Parquet data, err := ReadStringData(pcr, count) @@ -274,6 +520,38 @@ func ReadJSONData(pcr *FieldReader, count int64) (any, error) { return byteArr, nil } +func ReadNullableJSONData(pcr *FieldReader, count int64) (any, []bool, error) { + // JSON field read data from string array Parquet + data, validData, err := ReadNullableStringData(pcr, count) + if err != nil { + return nil, nil, err + } + if data == nil { + return nil, nil, nil + } + byteArr := make([][]byte, 0) + for i, str := range data.([]string) { + if !validData[i] { + byteArr = append(byteArr, []byte(nil)) + continue + } + var dummy interface{} + err = json.Unmarshal([]byte(str), &dummy) + if err != nil { + return nil, nil, err + } + if pcr.field.GetIsDynamic() { + var dummy2 map[string]interface{} + err = json.Unmarshal([]byte(str), &dummy2) + if err != nil { + return nil, nil, err + } + } + byteArr = append(byteArr, []byte(str)) + } + return byteArr, validData, nil +} + func ReadBinaryData(pcr *FieldReader, count int64) (any, error) { dataType := pcr.field.GetDataType() chunked, err := pcr.columnReader.NextBatch(count) @@ -398,6 +676,46 @@ func ReadBoolArrayData(pcr *FieldReader, count int64) (any, error) { return data, nil } +func ReadNullableBoolArrayData(pcr *FieldReader, count int64) (any, []bool, error) { + chunked, err := pcr.columnReader.NextBatch(count) + if err != nil { + return nil, nil, err + } + data := make([][]bool, 0, count) + validData := make([]bool, 0, count) + for _, chunk := range chunked.Chunks() { + listReader, ok := chunk.(*array.List) + if !ok { + return nil, nil, WrapTypeErr("list", chunk.DataType().Name(), pcr.field) + } + boolReader, ok := listReader.ListValues().(*array.Boolean) + if !ok { + return nil, nil, WrapTypeErr("boolArray", chunk.DataType().Name(), pcr.field) + } + offsets := listReader.Offsets() + for i := 1; i < len(offsets); i++ { + start, end := offsets[i-1], offsets[i] + elementData := make([]bool, 0, end-start) + for j := start; j < end; j++ { + elementData = append(elementData, boolReader.Value(int(j))) + } + data = append(data, elementData) + elementDataValid := true + if start == end { + elementDataValid = false + } + validData = append(validData, elementDataValid) + } + } + if len(data) == 0 { + return nil, nil, nil + } + if len(data) != len(validData) { + return nil, nil, merr.WrapErrParameterInvalid(len(data), len(validData), "length of data is not equal to length of valid_data") + } + return data, validData, nil +} + func ReadIntegerOrFloatArrayData[T constraints.Integer | constraints.Float](pcr *FieldReader, count int64) (any, error) { chunked, err := pcr.columnReader.NextBatch(count) if err != nil { @@ -469,6 +787,86 @@ func ReadIntegerOrFloatArrayData[T constraints.Integer | constraints.Float](pcr return data, nil } +func ReadNullableIntegerOrFloatArrayData[T constraints.Integer | constraints.Float](pcr *FieldReader, count int64) (any, []bool, error) { + chunked, err := pcr.columnReader.NextBatch(count) + if err != nil { + return nil, nil, err + } + data := make([][]T, 0, count) + validData := make([]bool, 0, count) + + getDataFunc := func(offsets []int32, getValue func(int) T) { + for i := 1; i < len(offsets); i++ { + start, end := offsets[i-1], offsets[i] + elementData := make([]T, 0, end-start) + for j := start; j < end; j++ { + elementData = append(elementData, getValue(int(j))) + } + data = append(data, elementData) + elementDataValid := true + if start == end { + elementDataValid = false + } + validData = append(validData, elementDataValid) + } + } + for _, chunk := range chunked.Chunks() { + listReader, ok := chunk.(*array.List) + if !ok { + return nil, nil, WrapTypeErr("list", chunk.DataType().Name(), pcr.field) + } + offsets := listReader.Offsets() + dataType := pcr.field.GetDataType() + if typeutil.IsVectorType(dataType) { + if err = checkVectorAligned(offsets, pcr.dim, dataType); err != nil { + return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("length of vector is not aligned: %s, data type: %s", err.Error(), dataType.String())) + } + } + valueReader := listReader.ListValues() + switch valueReader.DataType().ID() { + case arrow.INT8: + int8Reader := valueReader.(*array.Int8) + getDataFunc(offsets, func(i int) T { + return T(int8Reader.Value(i)) + }) + case arrow.INT16: + int16Reader := valueReader.(*array.Int16) + getDataFunc(offsets, func(i int) T { + return T(int16Reader.Value(i)) + }) + case arrow.INT32: + int32Reader := valueReader.(*array.Int32) + getDataFunc(offsets, func(i int) T { + return T(int32Reader.Value(i)) + }) + case arrow.INT64: + int64Reader := valueReader.(*array.Int64) + getDataFunc(offsets, func(i int) T { + return T(int64Reader.Value(i)) + }) + case arrow.FLOAT32: + float32Reader := valueReader.(*array.Float32) + getDataFunc(offsets, func(i int) T { + return T(float32Reader.Value(i)) + }) + case arrow.FLOAT64: + float64Reader := valueReader.(*array.Float64) + getDataFunc(offsets, func(i int) T { + return T(float64Reader.Value(i)) + }) + default: + return nil, nil, WrapTypeErr("integerArray|floatArray", chunk.DataType().Name(), pcr.field) + } + } + if len(data) == 0 { + return nil, nil, nil + } + if len(data) != len(validData) { + return nil, nil, merr.WrapErrParameterInvalid(len(data), len(validData), "length of data is not equal to length of valid_data") + } + return data, validData, nil +} + func ReadStringArrayData(pcr *FieldReader, count int64) (any, error) { chunked, err := pcr.columnReader.NextBatch(count) if err != nil { @@ -500,6 +898,46 @@ func ReadStringArrayData(pcr *FieldReader, count int64) (any, error) { return data, nil } +func ReadNullableStringArrayData(pcr *FieldReader, count int64) (any, []bool, error) { + chunked, err := pcr.columnReader.NextBatch(count) + if err != nil { + return nil, nil, err + } + data := make([][]string, 0, count) + validData := make([]bool, 0, count) + for _, chunk := range chunked.Chunks() { + listReader, ok := chunk.(*array.List) + if !ok { + return nil, nil, WrapTypeErr("list", chunk.DataType().Name(), pcr.field) + } + stringReader, ok := listReader.ListValues().(*array.String) + if !ok { + return nil, nil, WrapTypeErr("stringArray", chunk.DataType().Name(), pcr.field) + } + offsets := listReader.Offsets() + for i := 1; i < len(offsets); i++ { + start, end := offsets[i-1], offsets[i] + elementData := make([]string, 0, end-start) + for j := start; j < end; j++ { + elementData = append(elementData, stringReader.Value(int(j))) + } + data = append(data, elementData) + elementDataValid := true + if start == end { + elementDataValid = false + } + validData = append(validData, elementDataValid) + } + } + if len(data) == 0 { + return nil, nil, nil + } + if len(data) != len(validData) { + return nil, nil, merr.WrapErrParameterInvalid(len(data), len(validData), "length of data is not equal to length of valid_data") + } + return data, validData, nil +} + func ReadArrayData(pcr *FieldReader, count int64) (any, error) { data := make([]*schemapb.ScalarField, 0, count) maxCapacity, err := parameterutil.GetMaxCapacity(pcr.field) @@ -674,3 +1112,185 @@ func ReadArrayData(pcr *FieldReader, count int64) (any, error) { } return data, nil } + +func ReadNullableArrayData(pcr *FieldReader, count int64) (any, []bool, error) { + data := make([]*schemapb.ScalarField, 0, count) + maxCapacity, err := parameterutil.GetMaxCapacity(pcr.field) + if err != nil { + return nil, nil, err + } + elementType := pcr.field.GetElementType() + switch elementType { + case schemapb.DataType_Bool: + boolArray, validData, err := ReadNullableBoolArrayData(pcr, count) + if err != nil { + return nil, nil, err + } + if boolArray == nil { + return nil, nil, nil + } + for _, elementArray := range boolArray.([][]bool) { + if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil { + return nil, nil, err + } + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_BoolData{ + BoolData: &schemapb.BoolArray{ + Data: elementArray, + }, + }, + }) + } + return data, validData, nil + case schemapb.DataType_Int8: + int8Array, validData, err := ReadNullableIntegerOrFloatArrayData[int32](pcr, count) + if err != nil { + return nil, nil, err + } + if int8Array == nil { + return nil, nil, nil + } + for _, elementArray := range int8Array.([][]int32) { + if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil { + return nil, nil, err + } + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: elementArray, + }, + }, + }) + } + return data, validData, nil + case schemapb.DataType_Int16: + int16Array, validData, err := ReadNullableIntegerOrFloatArrayData[int32](pcr, count) + if err != nil { + return nil, nil, err + } + if int16Array == nil { + return nil, nil, nil + } + for _, elementArray := range int16Array.([][]int32) { + if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil { + return nil, nil, err + } + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: elementArray, + }, + }, + }) + } + return data, validData, nil + case schemapb.DataType_Int32: + int32Array, validData, err := ReadNullableIntegerOrFloatArrayData[int32](pcr, count) + if err != nil { + return nil, nil, err + } + if int32Array == nil { + return nil, nil, nil + } + for _, elementArray := range int32Array.([][]int32) { + if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil { + return nil, nil, err + } + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: elementArray, + }, + }, + }) + } + return data, validData, nil + case schemapb.DataType_Int64: + int64Array, validData, err := ReadNullableIntegerOrFloatArrayData[int64](pcr, count) + if err != nil { + return nil, nil, err + } + if int64Array == nil { + return nil, nil, nil + } + for _, elementArray := range int64Array.([][]int64) { + if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil { + return nil, nil, err + } + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: elementArray, + }, + }, + }) + } + return data, validData, nil + case schemapb.DataType_Float: + float32Array, validData, err := ReadNullableIntegerOrFloatArrayData[float32](pcr, count) + if err != nil { + return nil, nil, err + } + if float32Array == nil { + return nil, nil, nil + } + for _, elementArray := range float32Array.([][]float32) { + if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil { + return nil, nil, err + } + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_FloatData{ + FloatData: &schemapb.FloatArray{ + Data: elementArray, + }, + }, + }) + } + return data, validData, nil + case schemapb.DataType_Double: + float64Array, validData, err := ReadNullableIntegerOrFloatArrayData[float64](pcr, count) + if err != nil { + return nil, nil, err + } + if float64Array == nil { + return nil, nil, nil + } + for _, elementArray := range float64Array.([][]float64) { + if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil { + return nil, nil, err + } + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_DoubleData{ + DoubleData: &schemapb.DoubleArray{ + Data: elementArray, + }, + }, + }) + } + return data, validData, nil + case schemapb.DataType_VarChar, schemapb.DataType_String: + stringArray, validData, err := ReadNullableStringArrayData(pcr, count) + if err != nil { + return nil, nil, err + } + if stringArray == nil { + return nil, nil, nil + } + for _, elementArray := range stringArray.([][]string) { + if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil { + return nil, nil, err + } + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{ + Data: elementArray, + }, + }, + }) + } + return data, validData, nil + default: + return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type '%s' for array field '%s'", + elementType.String(), pcr.field.GetName())) + } +} diff --git a/internal/util/importutilv2/parquet/reader.go b/internal/util/importutilv2/parquet/reader.go index 4a29e344dd..8038c383ee 100644 --- a/internal/util/importutilv2/parquet/reader.go +++ b/internal/util/importutilv2/parquet/reader.go @@ -99,14 +99,14 @@ func (r *reader) Read() (*storage.InsertData, error) { OUTER: for { for fieldID, cr := range r.frs { - data, err := cr.Next(r.count) + data, validData, err := cr.Next(r.count) if err != nil { return nil, err } if data == nil { break OUTER } - err = insertData.Data[fieldID].AppendRows(data) + err = insertData.Data[fieldID].AppendRows(data, validData) if err != nil { return nil, err } diff --git a/internal/util/importutilv2/parquet/reader_test.go b/internal/util/importutilv2/parquet/reader_test.go index ea4737efec..6db2b2668e 100644 --- a/internal/util/importutilv2/parquet/reader_test.go +++ b/internal/util/importutilv2/parquet/reader_test.go @@ -98,7 +98,7 @@ func writeParquet(w io.Writer, schema *schemapb.CollectionSchema, numRows int) ( return insertData, nil } -func (s *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType) { +func (s *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool) { schema := &schemapb.CollectionSchema{ Fields: []*schemapb.FieldSchema{ { @@ -139,6 +139,7 @@ func (s *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType Value: "256", }, }, + Nullable: nullable, }, }, } @@ -166,7 +167,7 @@ func (s *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType for i := 0; i < expectRows; i++ { expect := expectInsertData.Data[fieldID].GetRow(i + offsetBegin) actual := data.GetRow(i) - if fieldDataType == schemapb.DataType_Array { + if fieldDataType == schemapb.DataType_Array && expect != nil { switch elementType { case schemapb.DataType_Bool: actualArray := actual.(*schemapb.ScalarField).GetBoolData().GetData() @@ -264,45 +265,66 @@ func (s *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) { } func (s *ReaderSuite) TestReadScalarFields() { - s.run(schemapb.DataType_Bool, schemapb.DataType_None) - s.run(schemapb.DataType_Int8, schemapb.DataType_None) - s.run(schemapb.DataType_Int16, schemapb.DataType_None) - s.run(schemapb.DataType_Int32, schemapb.DataType_None) - s.run(schemapb.DataType_Int64, schemapb.DataType_None) - s.run(schemapb.DataType_Float, schemapb.DataType_None) - s.run(schemapb.DataType_Double, schemapb.DataType_None) - s.run(schemapb.DataType_String, schemapb.DataType_None) - s.run(schemapb.DataType_VarChar, schemapb.DataType_None) - s.run(schemapb.DataType_JSON, schemapb.DataType_None) + s.run(schemapb.DataType_Bool, schemapb.DataType_None, false) + s.run(schemapb.DataType_Int8, schemapb.DataType_None, false) + s.run(schemapb.DataType_Int16, schemapb.DataType_None, false) + s.run(schemapb.DataType_Int32, schemapb.DataType_None, false) + s.run(schemapb.DataType_Int64, schemapb.DataType_None, false) + s.run(schemapb.DataType_Float, schemapb.DataType_None, false) + s.run(schemapb.DataType_Double, schemapb.DataType_None, false) + s.run(schemapb.DataType_String, schemapb.DataType_None, false) + s.run(schemapb.DataType_VarChar, schemapb.DataType_None, false) + s.run(schemapb.DataType_JSON, schemapb.DataType_None, false) - s.run(schemapb.DataType_Array, schemapb.DataType_Bool) - s.run(schemapb.DataType_Array, schemapb.DataType_Int8) - s.run(schemapb.DataType_Array, schemapb.DataType_Int16) - s.run(schemapb.DataType_Array, schemapb.DataType_Int32) - s.run(schemapb.DataType_Array, schemapb.DataType_Int64) - s.run(schemapb.DataType_Array, schemapb.DataType_Float) - s.run(schemapb.DataType_Array, schemapb.DataType_Double) - s.run(schemapb.DataType_Array, schemapb.DataType_String) + s.run(schemapb.DataType_Array, schemapb.DataType_Bool, false) + s.run(schemapb.DataType_Array, schemapb.DataType_Int8, false) + s.run(schemapb.DataType_Array, schemapb.DataType_Int16, false) + s.run(schemapb.DataType_Array, schemapb.DataType_Int32, false) + s.run(schemapb.DataType_Array, schemapb.DataType_Int64, false) + s.run(schemapb.DataType_Array, schemapb.DataType_Float, false) + s.run(schemapb.DataType_Array, schemapb.DataType_Double, false) + s.run(schemapb.DataType_Array, schemapb.DataType_String, false) + + s.run(schemapb.DataType_Bool, schemapb.DataType_None, true) + s.run(schemapb.DataType_Int8, schemapb.DataType_None, true) + s.run(schemapb.DataType_Int16, schemapb.DataType_None, true) + s.run(schemapb.DataType_Int32, schemapb.DataType_None, true) + s.run(schemapb.DataType_Int64, schemapb.DataType_None, true) + s.run(schemapb.DataType_Float, schemapb.DataType_None, true) + s.run(schemapb.DataType_Double, schemapb.DataType_None, true) + s.run(schemapb.DataType_String, schemapb.DataType_None, true) + s.run(schemapb.DataType_VarChar, schemapb.DataType_None, true) + s.run(schemapb.DataType_JSON, schemapb.DataType_None, true) + + s.run(schemapb.DataType_Array, schemapb.DataType_Bool, true) + s.run(schemapb.DataType_Array, schemapb.DataType_Int8, true) + s.run(schemapb.DataType_Array, schemapb.DataType_Int16, true) + s.run(schemapb.DataType_Array, schemapb.DataType_Int32, true) + s.run(schemapb.DataType_Array, schemapb.DataType_Int64, true) + s.run(schemapb.DataType_Array, schemapb.DataType_Float, true) + s.run(schemapb.DataType_Array, schemapb.DataType_Double, true) + s.run(schemapb.DataType_Array, schemapb.DataType_String, true) s.failRun(schemapb.DataType_JSON, true) } func (s *ReaderSuite) TestStringPK() { s.pkDataType = schemapb.DataType_VarChar - s.run(schemapb.DataType_Int32, schemapb.DataType_None) + s.run(schemapb.DataType_Int32, schemapb.DataType_None, false) + s.run(schemapb.DataType_Int32, schemapb.DataType_None, true) } func (s *ReaderSuite) TestVector() { s.vecDataType = schemapb.DataType_BinaryVector - s.run(schemapb.DataType_Int32, schemapb.DataType_None) + s.run(schemapb.DataType_Int32, schemapb.DataType_None, false) s.vecDataType = schemapb.DataType_FloatVector - s.run(schemapb.DataType_Int32, schemapb.DataType_None) + s.run(schemapb.DataType_Int32, schemapb.DataType_None, false) s.vecDataType = schemapb.DataType_Float16Vector - s.run(schemapb.DataType_Int32, schemapb.DataType_None) + s.run(schemapb.DataType_Int32, schemapb.DataType_None, false) s.vecDataType = schemapb.DataType_BFloat16Vector - s.run(schemapb.DataType_Int32, schemapb.DataType_None) + s.run(schemapb.DataType_Int32, schemapb.DataType_None, false) s.vecDataType = schemapb.DataType_SparseFloatVector - s.run(schemapb.DataType_Int32, schemapb.DataType_None) + s.run(schemapb.DataType_Int32, schemapb.DataType_None, false) } func TestUtil(t *testing.T) { diff --git a/internal/util/importutilv2/parquet/util.go b/internal/util/importutilv2/parquet/util.go index d74b293474..451436c34c 100644 --- a/internal/util/importutilv2/parquet/util.go +++ b/internal/util/importutilv2/parquet/util.go @@ -261,3 +261,20 @@ func estimateReadCountPerBatch(bufferSize int, schema *schemapb.CollectionSchema } return int64(bufferSize) / int64(sizePerRecord), nil } + +// todo(smellthemoon): use byte to store valid_data +func bytesToBoolArray(length int, bytes []byte) []bool { + bools := make([]bool, 0, length) + + for i := 0; i < length; i++ { + bit := (bytes[uint(i)/8] & BitMask[byte(i)%8]) != 0 + bools = append(bools, bit) + } + + return bools +} + +var ( + BitMask = [8]byte{1, 2, 4, 8, 16, 32, 64, 128} + FlippedBitMask = [8]byte{254, 253, 251, 247, 239, 223, 191, 127} +) diff --git a/internal/util/testutil/test_util.go b/internal/util/testutil/test_util.go index 698530b361..a1a19b03ca 100644 --- a/internal/util/testutil/test_util.go +++ b/internal/util/testutil/test_util.go @@ -113,33 +113,19 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int) (*storage.Ins } switch f.GetDataType() { case schemapb.DataType_Bool: - insertData.Data[f.FieldID] = &storage.BoolFieldData{ - Data: testutils.GenerateBoolArray(rows), - } + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateBoolArray(rows)) case schemapb.DataType_Int8: - insertData.Data[f.FieldID] = &storage.Int8FieldData{ - Data: testutils.GenerateInt8Array(rows), - } + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateInt8Array(rows)) case schemapb.DataType_Int16: - insertData.Data[f.FieldID] = &storage.Int16FieldData{ - Data: testutils.GenerateInt16Array(rows), - } + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateInt16Array(rows)) case schemapb.DataType_Int32: - insertData.Data[f.FieldID] = &storage.Int32FieldData{ - Data: testutils.GenerateInt32Array(rows), - } + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateInt32Array(rows)) case schemapb.DataType_Int64: - insertData.Data[f.FieldID] = &storage.Int64FieldData{ - Data: testutils.GenerateInt64Array(rows), - } + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateInt64Array(rows)) case schemapb.DataType_Float: - insertData.Data[f.FieldID] = &storage.FloatFieldData{ - Data: testutils.GenerateFloat32Array(rows), - } + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateFloat32Array(rows)) case schemapb.DataType_Double: - insertData.Data[f.FieldID] = &storage.DoubleFieldData{ - Data: testutils.GenerateFloat64Array(rows), - } + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateFloat64Array(rows)) case schemapb.DataType_BinaryVector: dim, err := typeutil.GetDim(f) if err != nil { @@ -185,43 +171,30 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int) (*storage.Ins }, } case schemapb.DataType_String, schemapb.DataType_VarChar: - insertData.Data[f.FieldID] = &storage.StringFieldData{ - Data: testutils.GenerateStringArray(rows), - } + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateStringArray(rows)) case schemapb.DataType_JSON: - insertData.Data[f.FieldID] = &storage.JSONFieldData{ - Data: testutils.GenerateJSONArray(rows), - } + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateJSONArray(rows)) case schemapb.DataType_Array: switch f.GetElementType() { case schemapb.DataType_Bool: - insertData.Data[f.FieldID] = &storage.ArrayFieldData{ - Data: testutils.GenerateArrayOfBoolArray(rows), - } + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateArrayOfBoolArray(rows)) case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32: - insertData.Data[f.FieldID] = &storage.ArrayFieldData{ - Data: testutils.GenerateArrayOfIntArray(rows), - } + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateArrayOfIntArray(rows)) case schemapb.DataType_Int64: - insertData.Data[f.FieldID] = &storage.ArrayFieldData{ - Data: testutils.GenerateArrayOfLongArray(rows), - } + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateArrayOfLongArray(rows)) case schemapb.DataType_Float: - insertData.Data[f.FieldID] = &storage.ArrayFieldData{ - Data: testutils.GenerateArrayOfFloatArray(rows), - } + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateArrayOfFloatArray(rows)) case schemapb.DataType_Double: - insertData.Data[f.FieldID] = &storage.ArrayFieldData{ - Data: testutils.GenerateArrayOfDoubleArray(rows), - } + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateArrayOfDoubleArray(rows)) case schemapb.DataType_String, schemapb.DataType_VarChar: - insertData.Data[f.FieldID] = &storage.ArrayFieldData{ - Data: testutils.GenerateArrayOfStringArray(rows), - } + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateArrayOfStringArray(rows)) } default: panic(fmt.Sprintf("unsupported data type: %s", f.GetDataType().String())) } + if f.GetNullable() { + insertData.Data[f.FieldID].AppendValidDataRows(testutils.GenerateBoolArray(rows)) + } } return insertData, nil } @@ -240,42 +213,51 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser case schemapb.DataType_Bool: builder := array.NewBooleanBuilder(mem) boolData := insertData.Data[fieldID].(*storage.BoolFieldData).Data - builder.AppendValues(boolData, nil) + validData := insertData.Data[fieldID].(*storage.BoolFieldData).ValidData + builder.AppendValues(boolData, validData) + columns = append(columns, builder.NewBooleanArray()) case schemapb.DataType_Int8: builder := array.NewInt8Builder(mem) int8Data := insertData.Data[fieldID].(*storage.Int8FieldData).Data - builder.AppendValues(int8Data, nil) + validData := insertData.Data[fieldID].(*storage.Int8FieldData).ValidData + builder.AppendValues(int8Data, validData) columns = append(columns, builder.NewInt8Array()) case schemapb.DataType_Int16: builder := array.NewInt16Builder(mem) int16Data := insertData.Data[fieldID].(*storage.Int16FieldData).Data - builder.AppendValues(int16Data, nil) + validData := insertData.Data[fieldID].(*storage.Int16FieldData).ValidData + builder.AppendValues(int16Data, validData) columns = append(columns, builder.NewInt16Array()) case schemapb.DataType_Int32: builder := array.NewInt32Builder(mem) int32Data := insertData.Data[fieldID].(*storage.Int32FieldData).Data - builder.AppendValues(int32Data, nil) + validData := insertData.Data[fieldID].(*storage.Int32FieldData).ValidData + builder.AppendValues(int32Data, validData) columns = append(columns, builder.NewInt32Array()) case schemapb.DataType_Int64: builder := array.NewInt64Builder(mem) int64Data := insertData.Data[fieldID].(*storage.Int64FieldData).Data - builder.AppendValues(int64Data, nil) + validData := insertData.Data[fieldID].(*storage.Int64FieldData).ValidData + builder.AppendValues(int64Data, validData) columns = append(columns, builder.NewInt64Array()) case schemapb.DataType_Float: builder := array.NewFloat32Builder(mem) floatData := insertData.Data[fieldID].(*storage.FloatFieldData).Data - builder.AppendValues(floatData, nil) + validData := insertData.Data[fieldID].(*storage.FloatFieldData).ValidData + builder.AppendValues(floatData, validData) columns = append(columns, builder.NewFloat32Array()) case schemapb.DataType_Double: builder := array.NewFloat64Builder(mem) doubleData := insertData.Data[fieldID].(*storage.DoubleFieldData).Data - builder.AppendValues(doubleData, nil) + validData := insertData.Data[fieldID].(*storage.DoubleFieldData).ValidData + builder.AppendValues(doubleData, validData) columns = append(columns, builder.NewFloat64Array()) case schemapb.DataType_String, schemapb.DataType_VarChar: builder := array.NewStringBuilder(mem) stringData := insertData.Data[fieldID].(*storage.StringFieldData).Data - builder.AppendValues(stringData, nil) + validData := insertData.Data[fieldID].(*storage.StringFieldData).ValidData + builder.AppendValues(stringData, validData) columns = append(columns, builder.NewStringArray()) case schemapb.DataType_BinaryVector: builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) @@ -358,12 +340,14 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser case schemapb.DataType_JSON: builder := array.NewStringBuilder(mem) jsonData := insertData.Data[fieldID].(*storage.JSONFieldData).Data + validData := insertData.Data[fieldID].(*storage.JSONFieldData).ValidData builder.AppendValues(lo.Map(jsonData, func(bs []byte, _ int) string { return string(bs) - }), nil) + }), validData) columns = append(columns, builder.NewStringArray()) case schemapb.DataType_Array: data := insertData.Data[fieldID].(*storage.ArrayFieldData).Data + validData := insertData.Data[fieldID].(*storage.ArrayFieldData).ValidData rows := len(data) offsets := make([]int32, 0, rows) valid := make([]bool, 0, rows) @@ -374,12 +358,16 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser builder := array.NewListBuilder(mem, &arrow.BooleanType{}) valueBuilder := builder.ValueBuilder().(*array.BooleanBuilder) for i := 0; i < rows; i++ { - boolData := data[i].Data.(*schemapb.ScalarField_BoolData).BoolData.GetData() - valueBuilder.AppendValues(boolData, nil) - - offsets = append(offsets, currOffset) - valid = append(valid, true) - currOffset = currOffset + int32(len(boolData)) + if field.GetNullable() && !validData[i] { + offsets = append(offsets, currOffset) + valid = append(valid, false) + } else { + boolData := data[i].Data.(*schemapb.ScalarField_BoolData).BoolData.GetData() + valueBuilder.AppendValues(boolData, nil) + offsets = append(offsets, currOffset) + currOffset = currOffset + int32(len(boolData)) + valid = append(valid, true) + } } builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) @@ -387,16 +375,20 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser builder := array.NewListBuilder(mem, &arrow.Int8Type{}) valueBuilder := builder.ValueBuilder().(*array.Int8Builder) for i := 0; i < rows; i++ { - intData := data[i].Data.(*schemapb.ScalarField_IntData).IntData.GetData() - int8Data := make([]int8, 0) - for j := 0; j < len(intData); j++ { - int8Data = append(int8Data, int8(intData[j])) + if field.GetNullable() && !validData[i] { + offsets = append(offsets, currOffset) + valid = append(valid, false) + } else { + intData := data[i].Data.(*schemapb.ScalarField_IntData).IntData.GetData() + int8Data := make([]int8, 0) + for j := 0; j < len(intData); j++ { + int8Data = append(int8Data, int8(intData[j])) + } + valueBuilder.AppendValues(int8Data, nil) + offsets = append(offsets, currOffset) + currOffset = currOffset + int32(len(int8Data)) + valid = append(valid, true) } - valueBuilder.AppendValues(int8Data, nil) - - offsets = append(offsets, currOffset) - valid = append(valid, true) - currOffset = currOffset + int32(len(int8Data)) } builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) @@ -404,16 +396,20 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser builder := array.NewListBuilder(mem, &arrow.Int16Type{}) valueBuilder := builder.ValueBuilder().(*array.Int16Builder) for i := 0; i < rows; i++ { - intData := data[i].Data.(*schemapb.ScalarField_IntData).IntData.GetData() - int16Data := make([]int16, 0) - for j := 0; j < len(intData); j++ { - int16Data = append(int16Data, int16(intData[j])) + if field.GetNullable() && !validData[i] { + offsets = append(offsets, currOffset) + valid = append(valid, false) + } else { + intData := data[i].Data.(*schemapb.ScalarField_IntData).IntData.GetData() + int16Data := make([]int16, 0) + for j := 0; j < len(intData); j++ { + int16Data = append(int16Data, int16(intData[j])) + } + valueBuilder.AppendValues(int16Data, nil) + offsets = append(offsets, currOffset) + currOffset = currOffset + int32(len(int16Data)) + valid = append(valid, true) } - valueBuilder.AppendValues(int16Data, nil) - - offsets = append(offsets, currOffset) - valid = append(valid, true) - currOffset = currOffset + int32(len(int16Data)) } builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) @@ -421,12 +417,16 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser builder := array.NewListBuilder(mem, &arrow.Int32Type{}) valueBuilder := builder.ValueBuilder().(*array.Int32Builder) for i := 0; i < rows; i++ { - intData := data[i].Data.(*schemapb.ScalarField_IntData).IntData.GetData() - valueBuilder.AppendValues(intData, nil) - - offsets = append(offsets, currOffset) - valid = append(valid, true) - currOffset = currOffset + int32(len(intData)) + if field.GetNullable() && !validData[i] { + offsets = append(offsets, currOffset) + valid = append(valid, false) + } else { + intData := data[i].Data.(*schemapb.ScalarField_IntData).IntData.GetData() + valueBuilder.AppendValues(intData, nil) + offsets = append(offsets, currOffset) + currOffset = currOffset + int32(len(intData)) + valid = append(valid, true) + } } builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) @@ -434,12 +434,16 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser builder := array.NewListBuilder(mem, &arrow.Int64Type{}) valueBuilder := builder.ValueBuilder().(*array.Int64Builder) for i := 0; i < rows; i++ { - longData := data[i].Data.(*schemapb.ScalarField_LongData).LongData.GetData() - valueBuilder.AppendValues(longData, nil) - - offsets = append(offsets, currOffset) - valid = append(valid, true) - currOffset = currOffset + int32(len(longData)) + if field.GetNullable() && !validData[i] { + offsets = append(offsets, currOffset) + valid = append(valid, false) + } else { + longData := data[i].Data.(*schemapb.ScalarField_LongData).LongData.GetData() + valueBuilder.AppendValues(longData, nil) + offsets = append(offsets, currOffset) + currOffset = currOffset + int32(len(longData)) + valid = append(valid, true) + } } builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) @@ -447,12 +451,16 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser builder := array.NewListBuilder(mem, &arrow.Float32Type{}) valueBuilder := builder.ValueBuilder().(*array.Float32Builder) for i := 0; i < rows; i++ { - floatData := data[i].Data.(*schemapb.ScalarField_FloatData).FloatData.GetData() - valueBuilder.AppendValues(floatData, nil) - - offsets = append(offsets, currOffset) - valid = append(valid, true) - currOffset = currOffset + int32(len(floatData)) + if field.GetNullable() && !validData[i] { + offsets = append(offsets, currOffset) + valid = append(valid, false) + } else { + floatData := data[i].Data.(*schemapb.ScalarField_FloatData).FloatData.GetData() + valueBuilder.AppendValues(floatData, nil) + offsets = append(offsets, currOffset) + currOffset = currOffset + int32(len(floatData)) + valid = append(valid, true) + } } builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) @@ -460,12 +468,16 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser builder := array.NewListBuilder(mem, &arrow.Float64Type{}) valueBuilder := builder.ValueBuilder().(*array.Float64Builder) for i := 0; i < rows; i++ { - doubleData := data[i].Data.(*schemapb.ScalarField_DoubleData).DoubleData.GetData() - valueBuilder.AppendValues(doubleData, nil) - - offsets = append(offsets, currOffset) - valid = append(valid, true) - currOffset = currOffset + int32(len(doubleData)) + if field.GetNullable() && !validData[i] { + offsets = append(offsets, currOffset) + valid = append(valid, false) + } else { + doubleData := data[i].Data.(*schemapb.ScalarField_DoubleData).DoubleData.GetData() + valueBuilder.AppendValues(doubleData, nil) + offsets = append(offsets, currOffset) + currOffset = currOffset + int32(len(doubleData)) + valid = append(valid, true) + } } builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) @@ -473,12 +485,16 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser builder := array.NewListBuilder(mem, &arrow.StringType{}) valueBuilder := builder.ValueBuilder().(*array.StringBuilder) for i := 0; i < rows; i++ { - stringData := data[i].Data.(*schemapb.ScalarField_StringData).StringData.GetData() - valueBuilder.AppendValues(stringData, nil) - - offsets = append(offsets, currOffset) - valid = append(valid, true) - currOffset = currOffset + int32(len(stringData)) + if field.GetNullable() && !validData[i] { + offsets = append(offsets, currOffset) + valid = append(valid, false) + } else { + stringData := data[i].Data.(*schemapb.ScalarField_StringData).StringData.GetData() + valueBuilder.AppendValues(stringData, nil) + offsets = append(offsets, currOffset) + currOffset = currOffset + int32(len(stringData)) + valid = append(valid, true) + } } builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) @@ -504,6 +520,10 @@ func CreateInsertDataRowsForJSON(schema *schemapb.CollectionSchema, insertData * if field.GetAutoID() { continue } + if v.GetRow(i) == nil { + data[fieldID] = nil + continue + } switch dataType { case schemapb.DataType_Array: switch elemType { diff --git a/tests/integration/import/partition_key_test.go b/tests/integration/import/partition_key_test.go index 68424f3c41..9a6d6db930 100644 --- a/tests/integration/import/partition_key_test.go +++ b/tests/integration/import/partition_key_test.go @@ -156,7 +156,7 @@ func (s *BulkInsertSuite) TestImportWithPartitionKey() { // query partition key, TermExpr queryNum := 10 - partitionKeyData := insertData.Data[int64(102)].GetRows().([]string) + partitionKeyData := insertData.Data[int64(102)].GetDataRows().([]string) queryData := partitionKeyData[:queryNum] strs := lo.Map(queryData, func(str string, _ int) string { return fmt.Sprintf("\"%s\"", str) diff --git a/tests/integration/import/util_test.go b/tests/integration/import/util_test.go index 74d4a89cb4..c9901ba2ad 100644 --- a/tests/integration/import/util_test.go +++ b/tests/integration/import/util_test.go @@ -125,7 +125,7 @@ func GenerateNumpyFiles(cm storage.ChunkManager, schema *schemapb.CollectionSche dType := field.GetDataType() switch dType { case schemapb.DataType_BinaryVector: - rows := fieldData.GetRows().([]byte) + rows := fieldData.GetDataRows().([]byte) if dim != fieldData.(*storage.BinaryVectorFieldData).Dim { panic(fmt.Sprintf("dim mis-match: %d, %d", dim, fieldData.(*storage.BinaryVectorFieldData).Dim)) } @@ -137,7 +137,7 @@ func GenerateNumpyFiles(cm storage.ChunkManager, schema *schemapb.CollectionSche } data = chunkedRows case schemapb.DataType_FloatVector: - rows := fieldData.GetRows().([]float32) + rows := fieldData.GetDataRows().([]float32) if dim != fieldData.(*storage.FloatVectorFieldData).Dim { panic(fmt.Sprintf("dim mis-match: %d, %d", dim, fieldData.(*storage.FloatVectorFieldData).Dim)) } @@ -148,7 +148,7 @@ func GenerateNumpyFiles(cm storage.ChunkManager, schema *schemapb.CollectionSche } data = chunkedRows case schemapb.DataType_Float16Vector: - rows := insertData.Data[fieldID].GetRows().([]byte) + rows := insertData.Data[fieldID].GetDataRows().([]byte) if dim != fieldData.(*storage.Float16VectorFieldData).Dim { panic(fmt.Sprintf("dim mis-match: %d, %d", dim, fieldData.(*storage.Float16VectorFieldData).Dim)) } @@ -160,7 +160,7 @@ func GenerateNumpyFiles(cm storage.ChunkManager, schema *schemapb.CollectionSche } data = chunkedRows case schemapb.DataType_BFloat16Vector: - rows := insertData.Data[fieldID].GetRows().([]byte) + rows := insertData.Data[fieldID].GetDataRows().([]byte) if dim != fieldData.(*storage.BFloat16VectorFieldData).Dim { panic(fmt.Sprintf("dim mis-match: %d, %d", dim, fieldData.(*storage.BFloat16VectorFieldData).Dim)) } @@ -174,7 +174,7 @@ func GenerateNumpyFiles(cm storage.ChunkManager, schema *schemapb.CollectionSche case schemapb.DataType_SparseFloatVector: data = insertData.Data[fieldID].(*storage.SparseFloatVectorFieldData).GetContents() default: - data = insertData.Data[fieldID].GetRows() + data = insertData.Data[fieldID].GetDataRows() } err := writeFn(path, data)