From 91bb89925c027dc0b97d848117cdd10c0280ea06 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 28 Apr 2025 19:36:48 +0800 Subject: [PATCH] enhance: [GoSDK] support compact&sparse mode nullable column (#41576) Related to #41568 --------- Signed-off-by: Congqi Xia --- client/column/columns.go | 5 +- client/column/generic_base.go | 103 ++++++++++++++++++++++-- client/column/generic_base_test.go | 101 ++++++++++++++++++++++- client/column/nullable.go | 11 ++- client/column/nullable_test.go | 115 +++++++++++++++++++-------- client/milvusclient/write_options.go | 2 + 6 files changed, 290 insertions(+), 47 deletions(-) diff --git a/client/column/columns.go b/client/column/columns.go index e5135ef78f..32374b7701 100644 --- a/client/column/columns.go +++ b/client/column/columns.go @@ -45,6 +45,7 @@ type Column interface { Nullable() bool SetNullable(bool) ValidateNullable() error + CompactNullableValues() } var errFieldDataTypeNotMatch = errors.New("FieldData type not matched") @@ -90,14 +91,14 @@ func parseScalarData[T any, COL Column, NCOL Column]( start, end int, validData []bool, creator func(string, []T) COL, - nullableCreator func(string, []T, []bool) (NCOL, error), + nullableCreator func(string, []T, []bool, ...ColumnOption[T]) (NCOL, error), ) (Column, error) { if end < 0 { end = len(data) } data = data[start:end] if len(validData) > 0 { - ncol, err := nullableCreator(name, data, validData) + ncol, err := nullableCreator(name, data, validData, WithSparseNullableMode[T](true)) return ncol, err } diff --git a/client/column/generic_base.go b/client/column/generic_base.go index eeb8accccd..b85f10b24f 100644 --- a/client/column/generic_base.go +++ b/client/column/generic_base.go @@ -24,6 +24,11 @@ import ( "github.com/milvus-io/milvus/client/v2/entity" ) +type GColumn[T any] interface { + Value(idx int) T + AppendValue(v T) +} + var _ Column = (*genericColumnBase[any])(nil) // genericColumnBase implements `Column` interface @@ -37,6 +42,17 @@ type genericColumnBase[T any] struct { // note that nullable must be set to true explicitly nullable bool validData []bool + // nullable column could be presented in two modes + // - compactMode, in which all valid data are compacted into one slice + // - sparseMode, in which valid data are located in its index position + // while invalid one are filled with zero value. + // for Milvus 2.5.x and before, insert request shall be in compactMode while + // search & query results are formed in sparseMode + // this flag indicates which form current column are in and peform validation + // or conversion logical based on it + sparseMode bool + // indexMapping stores the compact-sparse mapping + indexMapping []int } // Name returns column name. @@ -69,6 +85,7 @@ func (c *genericColumnBase[T]) AppendValue(a any) error { c.values = append(c.values, v) if c.nullable { c.validData = append(c.validData, true) + c.indexMapping = append(c.indexMapping, len(c.values)-1) } return nil } @@ -77,6 +94,7 @@ func (c *genericColumnBase[T]) Slice(start, end int) Column { return c.slice(start, end) } +// WARNING: this methods works only for sparse mode column func (c *genericColumnBase[T]) slice(start, end int) *genericColumnBase[T] { l := c.Len() if start > l { @@ -86,10 +104,11 @@ func (c *genericColumnBase[T]) slice(start, end int) *genericColumnBase[T] { end = l } result := &genericColumnBase[T]{ - name: c.name, - fieldType: c.fieldType, - values: c.values[start:end], - nullable: c.nullable, + name: c.name, + fieldType: c.fieldType, + values: c.values[start:end], + nullable: c.nullable, + sparseMode: c.sparseMode, } if c.nullable { result.validData = c.validData[start:end] @@ -115,6 +134,7 @@ func (c *genericColumnBase[T]) rangeCheck(idx int) error { } func (c *genericColumnBase[T]) Get(idx int) (any, error) { + idx = c.valueIndex(idx) if err := c.rangeCheck(idx); err != nil { return nil, err } @@ -122,6 +142,7 @@ func (c *genericColumnBase[T]) Get(idx int) (any, error) { } func (c *genericColumnBase[T]) GetAsInt64(idx int) (int64, error) { + idx = c.valueIndex(idx) if err := c.rangeCheck(idx); err != nil { return 0, err } @@ -129,6 +150,7 @@ func (c *genericColumnBase[T]) GetAsInt64(idx int) (int64, error) { } func (c *genericColumnBase[T]) GetAsString(idx int) (string, error) { + idx = c.valueIndex(idx) if err := c.rangeCheck(idx); err != nil { return "", err } @@ -136,6 +158,7 @@ func (c *genericColumnBase[T]) GetAsString(idx int) (string, error) { } func (c *genericColumnBase[T]) GetAsDouble(idx int) (float64, error) { + idx = c.valueIndex(idx) if err := c.rangeCheck(idx); err != nil { return 0, err } @@ -143,6 +166,7 @@ func (c *genericColumnBase[T]) GetAsDouble(idx int) (float64, error) { } func (c *genericColumnBase[T]) GetAsBool(idx int) (bool, error) { + idx = c.valueIndex(idx) if err := c.rangeCheck(idx); err != nil { return false, err } @@ -150,6 +174,7 @@ func (c *genericColumnBase[T]) GetAsBool(idx int) (bool, error) { } func (c *genericColumnBase[T]) Value(idx int) (T, error) { + idx = c.valueIndex(idx) var z T if err := c.rangeCheck(idx); err != nil { return z, err @@ -157,11 +182,19 @@ func (c *genericColumnBase[T]) Value(idx int) (T, error) { return c.values[idx], nil } +func (c *genericColumnBase[T]) valueIndex(idx int) int { + if !c.nullable || c.sparseMode { + return idx + } + return c.indexMapping[idx] +} + func (c *genericColumnBase[T]) Data() []T { return c.values } func (c *genericColumnBase[T]) MustValue(idx int) T { + idx = c.valueIndex(idx) if idx < 0 || idx > c.Len() { panic("index out of range") } @@ -174,6 +207,9 @@ func (c *genericColumnBase[T]) AppendNull() error { } c.validData = append(c.validData, false) + if !c.sparseMode { + c.indexMapping = append(c.indexMapping, -1) + } return nil } @@ -215,19 +251,72 @@ func (c *genericColumnBase[T]) ValidateNullable() error { return nil } + if c.sparseMode { + return c.validateNullableSparse() + } + return c.validateNullableCompact() +} + +func (c *genericColumnBase[T]) validateNullableCompact() error { // count valid entries - validCnt := lo.CountBy(c.validData, func(v bool) bool { - return v - }) + var validCnt int + c.indexMapping = make([]int, len(c.validData)) + for idx, v := range c.validData { + if v { + c.indexMapping[idx] = validCnt + validCnt++ + } else { + c.indexMapping[idx] = -1 + } + } if validCnt != len(c.values) { return errors.Newf("values number(%d) does not match valid count(%d)", len(c.values), validCnt) } return nil } +func (c *genericColumnBase[T]) validateNullableSparse() error { + if len(c.validData) != len(c.values) { + return errors.Newf("values number (%d) does not match valid data len(%d)", len(c.values), len(c.validData)) + } + return nil +} + +func (c *genericColumnBase[T]) CompactNullableValues() { + if !c.nullable || !c.sparseMode { + return + } + + c.indexMapping = make([]int, len(c.validData)) + var cnt int + for idx, valid := range c.validData { + if !valid { + c.indexMapping[idx] = -1 + continue + } + c.values[cnt] = c.values[idx] + c.indexMapping[idx] = cnt + cnt++ + } + c.values = c.values[0:cnt] +} + func (c *genericColumnBase[T]) withValidData(validData []bool) { if len(validData) > 0 { c.nullable = true c.validData = validData } } + +func (c *genericColumnBase[T]) base() *genericColumnBase[T] { + return c +} + +type ColumnOption[T any] func(*genericColumnBase[T]) + +// WithSparseNullableMode returns a ColumnOption that sets the sparse mode for the column. +func WithSparseNullableMode[T any](flag bool) ColumnOption[T] { + return func(c *genericColumnBase[T]) { + c.sparseMode = flag + } +} diff --git a/client/column/generic_base_test.go b/client/column/generic_base_test.go index 64cfa16032..7d07cd2538 100644 --- a/client/column/generic_base_test.go +++ b/client/column/generic_base_test.go @@ -89,6 +89,82 @@ func (s *GenericBaseSuite) TestIndexAccess() { }) } +func (s *GenericBaseSuite) TestIndexAccess_Nullable() { + name := fmt.Sprintf("test_%d", rand.Intn(10)) + + s.Run("compact_mode", func() { + values := []int64{1, 2, 3} + validData := []bool{true, false, true, false, true} + gb := &genericColumnBase[int64]{ + name: name, + fieldType: entity.FieldTypeInt64, + values: values, + nullable: true, + validData: validData, + sparseMode: false, + } + + err := gb.ValidateNullable() + s.NoError(err) + + for idx, valid := range validData { + if valid { + v, err := gb.Value(idx) + s.NoError(err) + s.Equal(values[gb.indexMapping[idx]], v) + + s.NotPanics(func() { + v = gb.MustValue(idx) + }) + s.Equal(values[gb.indexMapping[idx]], v) + } else { + result, err := gb.IsNull(idx) + s.NoError(err) + s.True(result) + + _, err = gb.Value(idx) + s.Error(err) + } + } + }) + + s.Run("sparse_mode", func() { + values := []int64{1, 0, 2, 0, 3} + validData := []bool{true, false, true, false, true} + gb := &genericColumnBase[int64]{ + name: name, + fieldType: entity.FieldTypeInt64, + values: values, + nullable: true, + validData: validData, + sparseMode: true, + } + + err := gb.ValidateNullable() + s.NoError(err) + + for idx, valid := range validData { + if valid { + v, err := gb.Value(idx) + s.NoError(err) + s.Equal(values[idx], v) + + s.NotPanics(func() { + v = gb.MustValue(idx) + }) + s.Equal(values[idx], v) + } else { + result, err := gb.IsNull(idx) + s.NoError(err) + s.True(result) + + _, err = gb.Value(idx) + s.NoError(err) + } + } + }) +} + func (s *GenericBaseSuite) TestSlice() { name := fmt.Sprintf("test_%d", rand.Intn(10)) values := []int64{1, 2, 3} @@ -144,8 +220,21 @@ func (s *GenericBaseSuite) TestConversion() { _, err = gb.GetAsBool(0) s.Error(err) - _, err = gb.GetAsBool(0) + _, err = gb.GetAsString(0) s.Error(err) + + _, err = gb.GetAsDouble(0) + s.Error(err) + + strValues := []string{"1", "2", "3"} + strGb := &genericColumnBase[string]{ + name: name, + fieldType: entity.FieldTypeVarChar, + values: strValues, + } + sv, err := strGb.GetAsString(0) + s.NoError(err) + s.Equal("1", sv) } func (s *GenericBaseSuite) TestNullable() { @@ -170,6 +259,16 @@ func (s *GenericBaseSuite) TestNullable() { gb.SetNullable(false) s.NoError(gb.ValidateNullable()) s.EqualValues(0, gb.Len()) + + gb = &genericColumnBase[int64]{ + name: name, + fieldType: entity.FieldTypeInt64, + values: []int64{0}, + validData: []bool{true, false}, + sparseMode: true, + nullable: true, + } + s.Error(gb.ValidateNullable()) } func TestGenericBase(t *testing.T) { diff --git a/client/column/nullable.go b/client/column/nullable.go index 63346d369b..30803c80c6 100644 --- a/client/column/nullable.go +++ b/client/column/nullable.go @@ -42,18 +42,24 @@ var ( type NullableColumnCreateFunc[T any, Col interface { Column Data() []T -}] func(name string, values []T, validData []bool) (Col, error) +}] func(name string, values []T, validData []bool, opts ...ColumnOption[T]) (Col, error) type NullableColumnCreator[col interface { Column withValidData([]bool) + base() *genericColumnBase[T] }, T any] struct { base func(name string, values []T) col } -func (c NullableColumnCreator[col, T]) New(name string, values []T, validData []bool) (col, error) { +func (c NullableColumnCreator[col, T]) New(name string, values []T, validData []bool, opts ...ColumnOption[T]) (col, error) { result := c.base(name, values) result.withValidData(validData) + base := result.base() + + for _, opt := range opts { + opt(base) + } return result, result.ValidateNullable() } @@ -61,6 +67,7 @@ func (c NullableColumnCreator[col, T]) New(name string, values []T, validData [] func NewNullableColumnCreator[col interface { Column withValidData([]bool) + base() *genericColumnBase[T] }, T any](base func(name string, values []T) col) NullableColumnCreator[col, T] { return NullableColumnCreator[col, T]{ base: base, diff --git a/client/column/nullable_test.go b/client/column/nullable_test.go index 084e092946..255864c1ea 100644 --- a/client/column/nullable_test.go +++ b/client/column/nullable_test.go @@ -33,18 +33,27 @@ type NullableScalarSuite struct { func (s *NullableScalarSuite) TestBasic() { s.Run("nullable_bool", func() { name := fmt.Sprintf("field_%d", rand.Intn(1000)) - data := []bool{false} + compactData := []bool{false} + sparseData := []bool{false, false} validData := []bool{true, false} - column, err := NewNullableColumnBool(name, data, validData) + // compact mode + column, err := NewNullableColumnBool(name, compactData, validData) s.NoError(err) s.Equal(entity.FieldTypeBool, column.Type()) s.Equal(name, column.Name()) - s.Equal(data, column.Data()) + s.Equal(compactData, column.Data()) for i := 0; i < len(validData); i++ { r, err := column.IsNull(i) s.NoError(err) s.Equal(validData[i], !r) } + s.NoError(column.AppendValue(true)) + s.NoError(column.AppendNull()) + + // sparse mode + column, err = NewNullableColumnBool(name, sparseData, validData, WithSparseNullableMode[bool](true)) + s.NoError(err) + s.Equal(sparseData, column.Data()) fd := column.FieldData() s.Equal(validData, fd.GetValidData()) @@ -53,29 +62,35 @@ func (s *NullableScalarSuite) TestBasic() { parsed, ok := result.(*ColumnBool) if s.True(ok) { s.Equal(name, parsed.Name()) - s.Equal(data, parsed.Data()) + s.Equal(sparseData, parsed.Data()) s.Equal(entity.FieldTypeBool, column.Type()) } - _, err = NewNullableColumnBool(name, data, []bool{false, false}) + _, err = NewNullableColumnBool(name, compactData, []bool{false, false}) s.Error(err) }) s.Run("nullable_int8", func() { name := fmt.Sprintf("field_%d", rand.Intn(1000)) - data := []int8{1, 3} + compactData := []int8{1, 3} + sparseData := []int8{1, 0, 3} validData := []bool{true, false, true} - column, err := NewNullableColumnInt8(name, data, validData) + // compact mode + column, err := NewNullableColumnInt8(name, compactData, validData) s.NoError(err) s.Equal(entity.FieldTypeInt8, column.Type()) s.Equal(name, column.Name()) - s.Equal(data, column.Data()) + s.Equal(compactData, column.Data()) for i := 0; i < len(validData); i++ { r, err := column.IsNull(i) s.NoError(err) s.Equal(validData[i], !r) } + // sparse mode + column, err = NewNullableColumnInt8(name, sparseData, validData, WithSparseNullableMode[int8](true)) + s.NoError(err) + fd := column.FieldData() s.Equal(validData, fd.GetValidData()) result, err := FieldDataColumn(fd, 0, -1) @@ -83,29 +98,35 @@ func (s *NullableScalarSuite) TestBasic() { parsed, ok := result.(*ColumnInt8) if s.True(ok) { s.Equal(name, parsed.Name()) - s.Equal(data, parsed.Data()) + s.Equal(sparseData, parsed.Data()) s.Equal(entity.FieldTypeInt8, column.Type()) } - _, err = NewNullableColumnInt8(name, data, []bool{false, false}) + _, err = NewNullableColumnInt8(name, compactData, []bool{false, false}) s.Error(err) }) s.Run("nullable_int16", func() { name := fmt.Sprintf("field_%d", rand.Intn(1000)) - data := []int16{1, 3} + compactData := []int16{1, 3} + sparseData := []int16{1, 0, 3} validData := []bool{true, false, true} - column, err := NewNullableColumnInt16(name, data, validData) + // compact mode + column, err := NewNullableColumnInt16(name, compactData, validData) s.NoError(err) s.Equal(entity.FieldTypeInt16, column.Type()) s.Equal(name, column.Name()) - s.Equal(data, column.Data()) + s.Equal(compactData, column.Data()) for i := 0; i < len(validData); i++ { r, err := column.IsNull(i) s.NoError(err) s.Equal(validData[i], !r) } + // compact mode + column, err = NewNullableColumnInt16(name, sparseData, validData, WithSparseNullableMode[int16](true)) + s.NoError(err) + fd := column.FieldData() s.Equal(validData, fd.GetValidData()) result, err := FieldDataColumn(fd, 0, -1) @@ -113,29 +134,35 @@ func (s *NullableScalarSuite) TestBasic() { parsed, ok := result.(*ColumnInt16) if s.True(ok) { s.Equal(name, parsed.Name()) - s.Equal(data, parsed.Data()) + s.Equal(sparseData, parsed.Data()) s.Equal(entity.FieldTypeInt16, column.Type()) } - _, err = NewNullableColumnInt16(name, data, []bool{false, false}) + _, err = NewNullableColumnInt16(name, compactData, []bool{false, false}) s.Error(err) }) s.Run("nullable_int32", func() { name := fmt.Sprintf("field_%d", rand.Intn(1000)) - data := []int32{1, 3} + compactData := []int32{1, 3} + sparseData := []int32{1, 0, 3} validData := []bool{true, false, true} - column, err := NewNullableColumnInt32(name, data, validData) + // compact mode + column, err := NewNullableColumnInt32(name, compactData, validData) s.NoError(err) s.Equal(entity.FieldTypeInt32, column.Type()) s.Equal(name, column.Name()) - s.Equal(data, column.Data()) + s.Equal(compactData, column.Data()) for i := 0; i < len(validData); i++ { r, err := column.IsNull(i) s.NoError(err) s.Equal(validData[i], !r) } + // compact mode + column, err = NewNullableColumnInt32(name, sparseData, validData, WithSparseNullableMode[int32](true)) + s.NoError(err) + fd := column.FieldData() s.Equal(validData, fd.GetValidData()) result, err := FieldDataColumn(fd, 0, -1) @@ -143,29 +170,35 @@ func (s *NullableScalarSuite) TestBasic() { parsed, ok := result.(*ColumnInt32) if s.True(ok) { s.Equal(name, parsed.Name()) - s.Equal(data, parsed.Data()) + s.Equal(sparseData, parsed.Data()) s.Equal(entity.FieldTypeInt32, column.Type()) } - _, err = NewNullableColumnInt32(name, data, []bool{false, false}) + _, err = NewNullableColumnInt32(name, compactData, []bool{false, false}) s.Error(err) }) s.Run("nullable_int64", func() { name := fmt.Sprintf("field_%d", rand.Intn(1000)) - data := []int64{1, 3} + compactData := []int64{1, 3} + sparseData := []int64{1, 0, 3} validData := []bool{true, false, true} - column, err := NewNullableColumnInt64(name, data, validData) + // compact mode + column, err := NewNullableColumnInt64(name, compactData, validData) s.NoError(err) s.Equal(entity.FieldTypeInt64, column.Type()) s.Equal(name, column.Name()) - s.Equal(data, column.Data()) + s.Equal(compactData, column.Data()) for i := 0; i < len(validData); i++ { r, err := column.IsNull(i) s.NoError(err) s.Equal(validData[i], !r) } + // compact mode + column, err = NewNullableColumnInt64(name, sparseData, validData, WithSparseNullableMode[int64](true)) + s.NoError(err) + fd := column.FieldData() s.Equal(validData, fd.GetValidData()) result, err := FieldDataColumn(fd, 0, -1) @@ -173,29 +206,35 @@ func (s *NullableScalarSuite) TestBasic() { parsed, ok := result.(*ColumnInt64) if s.True(ok) { s.Equal(name, parsed.Name()) - s.Equal(data, parsed.Data()) + s.Equal(sparseData, parsed.Data()) s.Equal(entity.FieldTypeInt64, column.Type()) } - _, err = NewNullableColumnInt64(name, data, []bool{false, false}) + _, err = NewNullableColumnInt64(name, compactData, []bool{false, false}) s.Error(err) }) s.Run("nullable_float", func() { name := fmt.Sprintf("field_%d", rand.Intn(1000)) - data := []float32{0.1, 0.3} + compactData := []float32{0.1, 0.3} + sparseData := []float32{0.1, 0, 0.3} validData := []bool{true, false, true} - column, err := NewNullableColumnFloat(name, data, validData) + // compact mode + column, err := NewNullableColumnFloat(name, compactData, validData) s.NoError(err) s.Equal(entity.FieldTypeFloat, column.Type()) s.Equal(name, column.Name()) - s.Equal(data, column.Data()) + s.Equal(compactData, column.Data()) for i := 0; i < len(validData); i++ { r, err := column.IsNull(i) s.NoError(err) s.Equal(validData[i], !r) } + // sparse mode + column, err = NewNullableColumnFloat(name, sparseData, validData, WithSparseNullableMode[float32](true)) + s.NoError(err) + fd := column.FieldData() s.Equal(validData, fd.GetValidData()) result, err := FieldDataColumn(fd, 0, -1) @@ -203,29 +242,35 @@ func (s *NullableScalarSuite) TestBasic() { parsed, ok := result.(*ColumnFloat) if s.True(ok) { s.Equal(name, parsed.Name()) - s.Equal(data, parsed.Data()) + s.Equal(sparseData, parsed.Data()) s.Equal(entity.FieldTypeFloat, column.Type()) } - _, err = NewNullableColumnFloat(name, data, []bool{false, false}) + _, err = NewNullableColumnFloat(name, compactData, []bool{false, false}) s.Error(err) }) s.Run("nullable_double", func() { name := fmt.Sprintf("field_%d", rand.Intn(1000)) - data := []float64{0.1, 0.3} + compactData := []float64{0.1, 0.3} + sparseData := []float64{0.1, 0, 0.3} validData := []bool{true, false, true} - column, err := NewNullableColumnDouble(name, data, validData) + // compact data + column, err := NewNullableColumnDouble(name, compactData, validData) s.NoError(err) s.Equal(entity.FieldTypeDouble, column.Type()) s.Equal(name, column.Name()) - s.Equal(data, column.Data()) + s.Equal(compactData, column.Data()) for i := 0; i < len(validData); i++ { r, err := column.IsNull(i) s.NoError(err) s.Equal(validData[i], !r) } + // sparse data + column, err = NewNullableColumnDouble(name, sparseData, validData, WithSparseNullableMode[float64](true)) + s.NoError(err) + fd := column.FieldData() s.Equal(validData, fd.GetValidData()) result, err := FieldDataColumn(fd, 0, -1) @@ -233,11 +278,11 @@ func (s *NullableScalarSuite) TestBasic() { parsed, ok := result.(*ColumnDouble) if s.True(ok) { s.Equal(name, parsed.Name()) - s.Equal(data, parsed.Data()) + s.Equal(sparseData, parsed.Data()) s.Equal(entity.FieldTypeDouble, column.Type()) } - _, err = NewNullableColumnDouble(name, data, []bool{false, false}) + _, err = NewNullableColumnDouble(name, compactData, []bool{false, false}) s.Error(err) }) } diff --git a/client/milvusclient/write_options.go b/client/milvusclient/write_options.go index eadcc6c284..c57a073da5 100644 --- a/client/milvusclient/write_options.go +++ b/client/milvusclient/write_options.go @@ -130,6 +130,8 @@ func (opt *columnBasedDataOption) processInsertColumns(colSchema *entity.Schema, fieldsData := make([]*schemapb.FieldData, 0, len(mNameColumn)+1) for _, fixedColumn := range mNameColumn { + // make sure the field data in compact mode + fixedColumn.CompactNullableValues() fieldsData = append(fieldsData, fixedColumn.FieldData()) } if len(dynamicColumns) > 0 {