enhance: [GoSDK] support compact&sparse mode nullable column (#41576)

Related to #41568

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-04-28 19:36:48 +08:00 committed by GitHub
parent 2c55aae384
commit 91bb89925c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 290 additions and 47 deletions

View File

@ -45,6 +45,7 @@ type Column interface {
Nullable() bool
SetNullable(bool)
ValidateNullable() error
CompactNullableValues()
}
var errFieldDataTypeNotMatch = errors.New("FieldData type not matched")
@ -90,14 +91,14 @@ func parseScalarData[T any, COL Column, NCOL Column](
start, end int,
validData []bool,
creator func(string, []T) COL,
nullableCreator func(string, []T, []bool) (NCOL, error),
nullableCreator func(string, []T, []bool, ...ColumnOption[T]) (NCOL, error),
) (Column, error) {
if end < 0 {
end = len(data)
}
data = data[start:end]
if len(validData) > 0 {
ncol, err := nullableCreator(name, data, validData)
ncol, err := nullableCreator(name, data, validData, WithSparseNullableMode[T](true))
return ncol, err
}

View File

@ -24,6 +24,11 @@ import (
"github.com/milvus-io/milvus/client/v2/entity"
)
type GColumn[T any] interface {
Value(idx int) T
AppendValue(v T)
}
var _ Column = (*genericColumnBase[any])(nil)
// genericColumnBase implements `Column` interface
@ -37,6 +42,17 @@ type genericColumnBase[T any] struct {
// note that nullable must be set to true explicitly
nullable bool
validData []bool
// nullable column could be presented in two modes
// - compactMode, in which all valid data are compacted into one slice
// - sparseMode, in which valid data are located in its index position
// while invalid one are filled with zero value.
// for Milvus 2.5.x and before, insert request shall be in compactMode while
// search & query results are formed in sparseMode
// this flag indicates which form current column are in and peform validation
// or conversion logical based on it
sparseMode bool
// indexMapping stores the compact-sparse mapping
indexMapping []int
}
// Name returns column name.
@ -69,6 +85,7 @@ func (c *genericColumnBase[T]) AppendValue(a any) error {
c.values = append(c.values, v)
if c.nullable {
c.validData = append(c.validData, true)
c.indexMapping = append(c.indexMapping, len(c.values)-1)
}
return nil
}
@ -77,6 +94,7 @@ func (c *genericColumnBase[T]) Slice(start, end int) Column {
return c.slice(start, end)
}
// WARNING: this methods works only for sparse mode column
func (c *genericColumnBase[T]) slice(start, end int) *genericColumnBase[T] {
l := c.Len()
if start > l {
@ -86,10 +104,11 @@ func (c *genericColumnBase[T]) slice(start, end int) *genericColumnBase[T] {
end = l
}
result := &genericColumnBase[T]{
name: c.name,
fieldType: c.fieldType,
values: c.values[start:end],
nullable: c.nullable,
name: c.name,
fieldType: c.fieldType,
values: c.values[start:end],
nullable: c.nullable,
sparseMode: c.sparseMode,
}
if c.nullable {
result.validData = c.validData[start:end]
@ -115,6 +134,7 @@ func (c *genericColumnBase[T]) rangeCheck(idx int) error {
}
func (c *genericColumnBase[T]) Get(idx int) (any, error) {
idx = c.valueIndex(idx)
if err := c.rangeCheck(idx); err != nil {
return nil, err
}
@ -122,6 +142,7 @@ func (c *genericColumnBase[T]) Get(idx int) (any, error) {
}
func (c *genericColumnBase[T]) GetAsInt64(idx int) (int64, error) {
idx = c.valueIndex(idx)
if err := c.rangeCheck(idx); err != nil {
return 0, err
}
@ -129,6 +150,7 @@ func (c *genericColumnBase[T]) GetAsInt64(idx int) (int64, error) {
}
func (c *genericColumnBase[T]) GetAsString(idx int) (string, error) {
idx = c.valueIndex(idx)
if err := c.rangeCheck(idx); err != nil {
return "", err
}
@ -136,6 +158,7 @@ func (c *genericColumnBase[T]) GetAsString(idx int) (string, error) {
}
func (c *genericColumnBase[T]) GetAsDouble(idx int) (float64, error) {
idx = c.valueIndex(idx)
if err := c.rangeCheck(idx); err != nil {
return 0, err
}
@ -143,6 +166,7 @@ func (c *genericColumnBase[T]) GetAsDouble(idx int) (float64, error) {
}
func (c *genericColumnBase[T]) GetAsBool(idx int) (bool, error) {
idx = c.valueIndex(idx)
if err := c.rangeCheck(idx); err != nil {
return false, err
}
@ -150,6 +174,7 @@ func (c *genericColumnBase[T]) GetAsBool(idx int) (bool, error) {
}
func (c *genericColumnBase[T]) Value(idx int) (T, error) {
idx = c.valueIndex(idx)
var z T
if err := c.rangeCheck(idx); err != nil {
return z, err
@ -157,11 +182,19 @@ func (c *genericColumnBase[T]) Value(idx int) (T, error) {
return c.values[idx], nil
}
func (c *genericColumnBase[T]) valueIndex(idx int) int {
if !c.nullable || c.sparseMode {
return idx
}
return c.indexMapping[idx]
}
func (c *genericColumnBase[T]) Data() []T {
return c.values
}
func (c *genericColumnBase[T]) MustValue(idx int) T {
idx = c.valueIndex(idx)
if idx < 0 || idx > c.Len() {
panic("index out of range")
}
@ -174,6 +207,9 @@ func (c *genericColumnBase[T]) AppendNull() error {
}
c.validData = append(c.validData, false)
if !c.sparseMode {
c.indexMapping = append(c.indexMapping, -1)
}
return nil
}
@ -215,19 +251,72 @@ func (c *genericColumnBase[T]) ValidateNullable() error {
return nil
}
if c.sparseMode {
return c.validateNullableSparse()
}
return c.validateNullableCompact()
}
func (c *genericColumnBase[T]) validateNullableCompact() error {
// count valid entries
validCnt := lo.CountBy(c.validData, func(v bool) bool {
return v
})
var validCnt int
c.indexMapping = make([]int, len(c.validData))
for idx, v := range c.validData {
if v {
c.indexMapping[idx] = validCnt
validCnt++
} else {
c.indexMapping[idx] = -1
}
}
if validCnt != len(c.values) {
return errors.Newf("values number(%d) does not match valid count(%d)", len(c.values), validCnt)
}
return nil
}
func (c *genericColumnBase[T]) validateNullableSparse() error {
if len(c.validData) != len(c.values) {
return errors.Newf("values number (%d) does not match valid data len(%d)", len(c.values), len(c.validData))
}
return nil
}
func (c *genericColumnBase[T]) CompactNullableValues() {
if !c.nullable || !c.sparseMode {
return
}
c.indexMapping = make([]int, len(c.validData))
var cnt int
for idx, valid := range c.validData {
if !valid {
c.indexMapping[idx] = -1
continue
}
c.values[cnt] = c.values[idx]
c.indexMapping[idx] = cnt
cnt++
}
c.values = c.values[0:cnt]
}
func (c *genericColumnBase[T]) withValidData(validData []bool) {
if len(validData) > 0 {
c.nullable = true
c.validData = validData
}
}
func (c *genericColumnBase[T]) base() *genericColumnBase[T] {
return c
}
type ColumnOption[T any] func(*genericColumnBase[T])
// WithSparseNullableMode returns a ColumnOption that sets the sparse mode for the column.
func WithSparseNullableMode[T any](flag bool) ColumnOption[T] {
return func(c *genericColumnBase[T]) {
c.sparseMode = flag
}
}

View File

@ -89,6 +89,82 @@ func (s *GenericBaseSuite) TestIndexAccess() {
})
}
func (s *GenericBaseSuite) TestIndexAccess_Nullable() {
name := fmt.Sprintf("test_%d", rand.Intn(10))
s.Run("compact_mode", func() {
values := []int64{1, 2, 3}
validData := []bool{true, false, true, false, true}
gb := &genericColumnBase[int64]{
name: name,
fieldType: entity.FieldTypeInt64,
values: values,
nullable: true,
validData: validData,
sparseMode: false,
}
err := gb.ValidateNullable()
s.NoError(err)
for idx, valid := range validData {
if valid {
v, err := gb.Value(idx)
s.NoError(err)
s.Equal(values[gb.indexMapping[idx]], v)
s.NotPanics(func() {
v = gb.MustValue(idx)
})
s.Equal(values[gb.indexMapping[idx]], v)
} else {
result, err := gb.IsNull(idx)
s.NoError(err)
s.True(result)
_, err = gb.Value(idx)
s.Error(err)
}
}
})
s.Run("sparse_mode", func() {
values := []int64{1, 0, 2, 0, 3}
validData := []bool{true, false, true, false, true}
gb := &genericColumnBase[int64]{
name: name,
fieldType: entity.FieldTypeInt64,
values: values,
nullable: true,
validData: validData,
sparseMode: true,
}
err := gb.ValidateNullable()
s.NoError(err)
for idx, valid := range validData {
if valid {
v, err := gb.Value(idx)
s.NoError(err)
s.Equal(values[idx], v)
s.NotPanics(func() {
v = gb.MustValue(idx)
})
s.Equal(values[idx], v)
} else {
result, err := gb.IsNull(idx)
s.NoError(err)
s.True(result)
_, err = gb.Value(idx)
s.NoError(err)
}
}
})
}
func (s *GenericBaseSuite) TestSlice() {
name := fmt.Sprintf("test_%d", rand.Intn(10))
values := []int64{1, 2, 3}
@ -144,8 +220,21 @@ func (s *GenericBaseSuite) TestConversion() {
_, err = gb.GetAsBool(0)
s.Error(err)
_, err = gb.GetAsBool(0)
_, err = gb.GetAsString(0)
s.Error(err)
_, err = gb.GetAsDouble(0)
s.Error(err)
strValues := []string{"1", "2", "3"}
strGb := &genericColumnBase[string]{
name: name,
fieldType: entity.FieldTypeVarChar,
values: strValues,
}
sv, err := strGb.GetAsString(0)
s.NoError(err)
s.Equal("1", sv)
}
func (s *GenericBaseSuite) TestNullable() {
@ -170,6 +259,16 @@ func (s *GenericBaseSuite) TestNullable() {
gb.SetNullable(false)
s.NoError(gb.ValidateNullable())
s.EqualValues(0, gb.Len())
gb = &genericColumnBase[int64]{
name: name,
fieldType: entity.FieldTypeInt64,
values: []int64{0},
validData: []bool{true, false},
sparseMode: true,
nullable: true,
}
s.Error(gb.ValidateNullable())
}
func TestGenericBase(t *testing.T) {

View File

@ -42,18 +42,24 @@ var (
type NullableColumnCreateFunc[T any, Col interface {
Column
Data() []T
}] func(name string, values []T, validData []bool) (Col, error)
}] func(name string, values []T, validData []bool, opts ...ColumnOption[T]) (Col, error)
type NullableColumnCreator[col interface {
Column
withValidData([]bool)
base() *genericColumnBase[T]
}, T any] struct {
base func(name string, values []T) col
}
func (c NullableColumnCreator[col, T]) New(name string, values []T, validData []bool) (col, error) {
func (c NullableColumnCreator[col, T]) New(name string, values []T, validData []bool, opts ...ColumnOption[T]) (col, error) {
result := c.base(name, values)
result.withValidData(validData)
base := result.base()
for _, opt := range opts {
opt(base)
}
return result, result.ValidateNullable()
}
@ -61,6 +67,7 @@ func (c NullableColumnCreator[col, T]) New(name string, values []T, validData []
func NewNullableColumnCreator[col interface {
Column
withValidData([]bool)
base() *genericColumnBase[T]
}, T any](base func(name string, values []T) col) NullableColumnCreator[col, T] {
return NullableColumnCreator[col, T]{
base: base,

View File

@ -33,18 +33,27 @@ type NullableScalarSuite struct {
func (s *NullableScalarSuite) TestBasic() {
s.Run("nullable_bool", func() {
name := fmt.Sprintf("field_%d", rand.Intn(1000))
data := []bool{false}
compactData := []bool{false}
sparseData := []bool{false, false}
validData := []bool{true, false}
column, err := NewNullableColumnBool(name, data, validData)
// compact mode
column, err := NewNullableColumnBool(name, compactData, validData)
s.NoError(err)
s.Equal(entity.FieldTypeBool, column.Type())
s.Equal(name, column.Name())
s.Equal(data, column.Data())
s.Equal(compactData, column.Data())
for i := 0; i < len(validData); i++ {
r, err := column.IsNull(i)
s.NoError(err)
s.Equal(validData[i], !r)
}
s.NoError(column.AppendValue(true))
s.NoError(column.AppendNull())
// sparse mode
column, err = NewNullableColumnBool(name, sparseData, validData, WithSparseNullableMode[bool](true))
s.NoError(err)
s.Equal(sparseData, column.Data())
fd := column.FieldData()
s.Equal(validData, fd.GetValidData())
@ -53,29 +62,35 @@ func (s *NullableScalarSuite) TestBasic() {
parsed, ok := result.(*ColumnBool)
if s.True(ok) {
s.Equal(name, parsed.Name())
s.Equal(data, parsed.Data())
s.Equal(sparseData, parsed.Data())
s.Equal(entity.FieldTypeBool, column.Type())
}
_, err = NewNullableColumnBool(name, data, []bool{false, false})
_, err = NewNullableColumnBool(name, compactData, []bool{false, false})
s.Error(err)
})
s.Run("nullable_int8", func() {
name := fmt.Sprintf("field_%d", rand.Intn(1000))
data := []int8{1, 3}
compactData := []int8{1, 3}
sparseData := []int8{1, 0, 3}
validData := []bool{true, false, true}
column, err := NewNullableColumnInt8(name, data, validData)
// compact mode
column, err := NewNullableColumnInt8(name, compactData, validData)
s.NoError(err)
s.Equal(entity.FieldTypeInt8, column.Type())
s.Equal(name, column.Name())
s.Equal(data, column.Data())
s.Equal(compactData, column.Data())
for i := 0; i < len(validData); i++ {
r, err := column.IsNull(i)
s.NoError(err)
s.Equal(validData[i], !r)
}
// sparse mode
column, err = NewNullableColumnInt8(name, sparseData, validData, WithSparseNullableMode[int8](true))
s.NoError(err)
fd := column.FieldData()
s.Equal(validData, fd.GetValidData())
result, err := FieldDataColumn(fd, 0, -1)
@ -83,29 +98,35 @@ func (s *NullableScalarSuite) TestBasic() {
parsed, ok := result.(*ColumnInt8)
if s.True(ok) {
s.Equal(name, parsed.Name())
s.Equal(data, parsed.Data())
s.Equal(sparseData, parsed.Data())
s.Equal(entity.FieldTypeInt8, column.Type())
}
_, err = NewNullableColumnInt8(name, data, []bool{false, false})
_, err = NewNullableColumnInt8(name, compactData, []bool{false, false})
s.Error(err)
})
s.Run("nullable_int16", func() {
name := fmt.Sprintf("field_%d", rand.Intn(1000))
data := []int16{1, 3}
compactData := []int16{1, 3}
sparseData := []int16{1, 0, 3}
validData := []bool{true, false, true}
column, err := NewNullableColumnInt16(name, data, validData)
// compact mode
column, err := NewNullableColumnInt16(name, compactData, validData)
s.NoError(err)
s.Equal(entity.FieldTypeInt16, column.Type())
s.Equal(name, column.Name())
s.Equal(data, column.Data())
s.Equal(compactData, column.Data())
for i := 0; i < len(validData); i++ {
r, err := column.IsNull(i)
s.NoError(err)
s.Equal(validData[i], !r)
}
// compact mode
column, err = NewNullableColumnInt16(name, sparseData, validData, WithSparseNullableMode[int16](true))
s.NoError(err)
fd := column.FieldData()
s.Equal(validData, fd.GetValidData())
result, err := FieldDataColumn(fd, 0, -1)
@ -113,29 +134,35 @@ func (s *NullableScalarSuite) TestBasic() {
parsed, ok := result.(*ColumnInt16)
if s.True(ok) {
s.Equal(name, parsed.Name())
s.Equal(data, parsed.Data())
s.Equal(sparseData, parsed.Data())
s.Equal(entity.FieldTypeInt16, column.Type())
}
_, err = NewNullableColumnInt16(name, data, []bool{false, false})
_, err = NewNullableColumnInt16(name, compactData, []bool{false, false})
s.Error(err)
})
s.Run("nullable_int32", func() {
name := fmt.Sprintf("field_%d", rand.Intn(1000))
data := []int32{1, 3}
compactData := []int32{1, 3}
sparseData := []int32{1, 0, 3}
validData := []bool{true, false, true}
column, err := NewNullableColumnInt32(name, data, validData)
// compact mode
column, err := NewNullableColumnInt32(name, compactData, validData)
s.NoError(err)
s.Equal(entity.FieldTypeInt32, column.Type())
s.Equal(name, column.Name())
s.Equal(data, column.Data())
s.Equal(compactData, column.Data())
for i := 0; i < len(validData); i++ {
r, err := column.IsNull(i)
s.NoError(err)
s.Equal(validData[i], !r)
}
// compact mode
column, err = NewNullableColumnInt32(name, sparseData, validData, WithSparseNullableMode[int32](true))
s.NoError(err)
fd := column.FieldData()
s.Equal(validData, fd.GetValidData())
result, err := FieldDataColumn(fd, 0, -1)
@ -143,29 +170,35 @@ func (s *NullableScalarSuite) TestBasic() {
parsed, ok := result.(*ColumnInt32)
if s.True(ok) {
s.Equal(name, parsed.Name())
s.Equal(data, parsed.Data())
s.Equal(sparseData, parsed.Data())
s.Equal(entity.FieldTypeInt32, column.Type())
}
_, err = NewNullableColumnInt32(name, data, []bool{false, false})
_, err = NewNullableColumnInt32(name, compactData, []bool{false, false})
s.Error(err)
})
s.Run("nullable_int64", func() {
name := fmt.Sprintf("field_%d", rand.Intn(1000))
data := []int64{1, 3}
compactData := []int64{1, 3}
sparseData := []int64{1, 0, 3}
validData := []bool{true, false, true}
column, err := NewNullableColumnInt64(name, data, validData)
// compact mode
column, err := NewNullableColumnInt64(name, compactData, validData)
s.NoError(err)
s.Equal(entity.FieldTypeInt64, column.Type())
s.Equal(name, column.Name())
s.Equal(data, column.Data())
s.Equal(compactData, column.Data())
for i := 0; i < len(validData); i++ {
r, err := column.IsNull(i)
s.NoError(err)
s.Equal(validData[i], !r)
}
// compact mode
column, err = NewNullableColumnInt64(name, sparseData, validData, WithSparseNullableMode[int64](true))
s.NoError(err)
fd := column.FieldData()
s.Equal(validData, fd.GetValidData())
result, err := FieldDataColumn(fd, 0, -1)
@ -173,29 +206,35 @@ func (s *NullableScalarSuite) TestBasic() {
parsed, ok := result.(*ColumnInt64)
if s.True(ok) {
s.Equal(name, parsed.Name())
s.Equal(data, parsed.Data())
s.Equal(sparseData, parsed.Data())
s.Equal(entity.FieldTypeInt64, column.Type())
}
_, err = NewNullableColumnInt64(name, data, []bool{false, false})
_, err = NewNullableColumnInt64(name, compactData, []bool{false, false})
s.Error(err)
})
s.Run("nullable_float", func() {
name := fmt.Sprintf("field_%d", rand.Intn(1000))
data := []float32{0.1, 0.3}
compactData := []float32{0.1, 0.3}
sparseData := []float32{0.1, 0, 0.3}
validData := []bool{true, false, true}
column, err := NewNullableColumnFloat(name, data, validData)
// compact mode
column, err := NewNullableColumnFloat(name, compactData, validData)
s.NoError(err)
s.Equal(entity.FieldTypeFloat, column.Type())
s.Equal(name, column.Name())
s.Equal(data, column.Data())
s.Equal(compactData, column.Data())
for i := 0; i < len(validData); i++ {
r, err := column.IsNull(i)
s.NoError(err)
s.Equal(validData[i], !r)
}
// sparse mode
column, err = NewNullableColumnFloat(name, sparseData, validData, WithSparseNullableMode[float32](true))
s.NoError(err)
fd := column.FieldData()
s.Equal(validData, fd.GetValidData())
result, err := FieldDataColumn(fd, 0, -1)
@ -203,29 +242,35 @@ func (s *NullableScalarSuite) TestBasic() {
parsed, ok := result.(*ColumnFloat)
if s.True(ok) {
s.Equal(name, parsed.Name())
s.Equal(data, parsed.Data())
s.Equal(sparseData, parsed.Data())
s.Equal(entity.FieldTypeFloat, column.Type())
}
_, err = NewNullableColumnFloat(name, data, []bool{false, false})
_, err = NewNullableColumnFloat(name, compactData, []bool{false, false})
s.Error(err)
})
s.Run("nullable_double", func() {
name := fmt.Sprintf("field_%d", rand.Intn(1000))
data := []float64{0.1, 0.3}
compactData := []float64{0.1, 0.3}
sparseData := []float64{0.1, 0, 0.3}
validData := []bool{true, false, true}
column, err := NewNullableColumnDouble(name, data, validData)
// compact data
column, err := NewNullableColumnDouble(name, compactData, validData)
s.NoError(err)
s.Equal(entity.FieldTypeDouble, column.Type())
s.Equal(name, column.Name())
s.Equal(data, column.Data())
s.Equal(compactData, column.Data())
for i := 0; i < len(validData); i++ {
r, err := column.IsNull(i)
s.NoError(err)
s.Equal(validData[i], !r)
}
// sparse data
column, err = NewNullableColumnDouble(name, sparseData, validData, WithSparseNullableMode[float64](true))
s.NoError(err)
fd := column.FieldData()
s.Equal(validData, fd.GetValidData())
result, err := FieldDataColumn(fd, 0, -1)
@ -233,11 +278,11 @@ func (s *NullableScalarSuite) TestBasic() {
parsed, ok := result.(*ColumnDouble)
if s.True(ok) {
s.Equal(name, parsed.Name())
s.Equal(data, parsed.Data())
s.Equal(sparseData, parsed.Data())
s.Equal(entity.FieldTypeDouble, column.Type())
}
_, err = NewNullableColumnDouble(name, data, []bool{false, false})
_, err = NewNullableColumnDouble(name, compactData, []bool{false, false})
s.Error(err)
})
}

View File

@ -130,6 +130,8 @@ func (opt *columnBasedDataOption) processInsertColumns(colSchema *entity.Schema,
fieldsData := make([]*schemapb.FieldData, 0, len(mNameColumn)+1)
for _, fixedColumn := range mNameColumn {
// make sure the field data in compact mode
fixedColumn.CompactNullableValues()
fieldsData = append(fieldsData, fixedColumn.FieldData())
}
if len(dynamicColumns) > 0 {