From 540456041fa3637530ddb99766655903ff1ace85 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 24 Apr 2025 12:04:38 +0800 Subject: [PATCH] enhance: Remove not inuse binlog iterator (#41359) See also: #41466 Signed-off-by: yangxuan --- internal/storage/binlog_iterator.go | 128 ------ internal/storage/binlog_iterator_test.go | 478 ----------------------- internal/storage/data_codec.go | 9 + internal/storage/data_codec_test.go | 406 +++++++++++++++++++ internal/storage/serde_test.go | 20 - 5 files changed, 415 insertions(+), 626 deletions(-) delete mode 100644 internal/storage/binlog_iterator.go delete mode 100644 internal/storage/binlog_iterator_test.go diff --git a/internal/storage/binlog_iterator.go b/internal/storage/binlog_iterator.go deleted file mode 100644 index 40ba73d4c7..0000000000 --- a/internal/storage/binlog_iterator.go +++ /dev/null @@ -1,128 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "sync/atomic" - - "github.com/cockroachdb/errors" - - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/pkg/v2/common" -) - -var ( - // ErrNoMoreRecord is the error that the iterator does not have next record. - ErrNoMoreRecord = errors.New("no more record") - // ErrDisposed is the error that the iterator is disposed. - ErrDisposed = errors.New("iterator is disposed") -) - -// Iterator is the iterator interface. -type Iterator interface { - // HasNext returns true if the iterator have unread record - HasNext() bool - // Next returns the next record - Next() (interface{}, error) - // Dispose disposes the iterator - Dispose() -} - -// Value is the return value of Next -type Value struct { - ID int64 - PK PrimaryKey - Timestamp int64 - IsDeleted bool - Value interface{} -} - -// InsertBinlogIterator is the iterator of binlog -type InsertBinlogIterator struct { - dispose int32 // 0: false, 1: true - data *InsertData - PKfieldID int64 - PkType schemapb.DataType - pos int -} - -// NewInsertBinlogIterator creates a new iterator -// -// Deprecated: use storage.NewBinlogDeserializeReader instead -func NewInsertBinlogIterator(blobs []*Blob, PKfieldID UniqueID, pkType schemapb.DataType) (*InsertBinlogIterator, error) { - // TODO: load part of file to read records other than loading all content - reader := NewInsertCodecWithSchema(nil) - - _, _, serData, err := reader.Deserialize(blobs) - if err != nil { - return nil, err - } - - return &InsertBinlogIterator{data: serData, PKfieldID: PKfieldID, PkType: pkType}, nil -} - -// HasNext returns true if the iterator have unread record -func (itr *InsertBinlogIterator) HasNext() bool { - return !itr.isDisposed() && itr.hasNext() -} - -// Next returns the next record -func (itr *InsertBinlogIterator) Next() (interface{}, error) { - if itr.isDisposed() { - return nil, ErrDisposed - } - - if !itr.hasNext() { - return nil, ErrNoMoreRecord - } - - m := make(map[FieldID]interface{}) - for fieldID, fieldData := range itr.data.Data { - m[fieldID] = fieldData.GetRow(itr.pos) - } - pk, err := GenPrimaryKeyByRawData(itr.data.Data[itr.PKfieldID].GetRow(itr.pos), itr.PkType) - if err != nil { - return nil, err - } - - v := &Value{ - ID: itr.data.Data[common.RowIDField].GetRow(itr.pos).(int64), - Timestamp: itr.data.Data[common.TimeStampField].GetRow(itr.pos).(int64), - PK: pk, - IsDeleted: false, - Value: m, - } - itr.pos++ - return v, nil -} - -// Dispose disposes the iterator -func (itr *InsertBinlogIterator) Dispose() { - atomic.CompareAndSwapInt32(&itr.dispose, 0, 1) -} - -func (itr *InsertBinlogIterator) hasNext() bool { - _, ok := itr.data.Data[common.RowIDField] - if !ok { - return false - } - return itr.pos < itr.data.Data[common.RowIDField].RowNum() -} - -func (itr *InsertBinlogIterator) isDisposed() bool { - return atomic.LoadInt32(&itr.dispose) == 1 -} diff --git a/internal/storage/binlog_iterator_test.go b/internal/storage/binlog_iterator_test.go deleted file mode 100644 index a9f464d94d..0000000000 --- a/internal/storage/binlog_iterator_test.go +++ /dev/null @@ -1,478 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - "google.golang.org/protobuf/proto" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/pkg/v2/common" - "github.com/milvus-io/milvus/pkg/v2/proto/etcdpb" - "github.com/milvus-io/milvus/pkg/v2/util/typeutil" -) - -func generateTestSchema() *schemapb.CollectionSchema { - schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ - {FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64}, - {FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, - {FieldID: 10, Name: "bool", DataType: schemapb.DataType_Bool}, - {FieldID: 11, Name: "int8", DataType: schemapb.DataType_Int8}, - {FieldID: 12, Name: "int16", DataType: schemapb.DataType_Int16}, - {FieldID: 13, Name: "int64", DataType: schemapb.DataType_Int64}, - {FieldID: 14, Name: "float", DataType: schemapb.DataType_Float}, - {FieldID: 15, Name: "double", DataType: schemapb.DataType_Double}, - {FieldID: 16, Name: "varchar", DataType: schemapb.DataType_VarChar}, - {FieldID: 17, Name: "string", DataType: schemapb.DataType_String}, - {FieldID: 18, Name: "array", DataType: schemapb.DataType_Array}, - {FieldID: 19, Name: "json", DataType: schemapb.DataType_JSON}, - {FieldID: 101, Name: "int32", DataType: schemapb.DataType_Int32}, - {FieldID: 102, Name: "floatVector", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "8"}, - }}, - {FieldID: 103, Name: "binaryVector", DataType: schemapb.DataType_BinaryVector, TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "8"}, - }}, - {FieldID: 104, Name: "float16Vector", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "8"}, - }}, - {FieldID: 105, Name: "bf16Vector", DataType: schemapb.DataType_BFloat16Vector, TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "8"}, - }}, - {FieldID: 106, Name: "sparseFloatVector", DataType: schemapb.DataType_SparseFloatVector, TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "28433"}, - }}, - }} - - return schema -} - -func generateTestAddedFieldSchema() *schemapb.CollectionSchema { - schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ - {FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64}, - {FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, - {FieldID: 10, Name: "bool", DataType: schemapb.DataType_Bool}, - {FieldID: 11, Name: "int8", DataType: schemapb.DataType_Int8}, - {FieldID: 12, Name: "int16", DataType: schemapb.DataType_Int16}, - {FieldID: 13, Name: "int64", DataType: schemapb.DataType_Int64}, - {FieldID: 14, Name: "float", DataType: schemapb.DataType_Float}, - {FieldID: 15, Name: "double", DataType: schemapb.DataType_Double}, - {FieldID: 16, Name: "varchar", DataType: schemapb.DataType_VarChar}, - {FieldID: 17, Name: "string", DataType: schemapb.DataType_String}, - {FieldID: 18, Name: "array", DataType: schemapb.DataType_Array}, - {FieldID: 19, Name: "json", DataType: schemapb.DataType_JSON}, - {FieldID: 101, Name: "int32", DataType: schemapb.DataType_Int32}, - {FieldID: 102, Name: "floatVector", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "8"}, - }}, - {FieldID: 103, Name: "binaryVector", DataType: schemapb.DataType_BinaryVector, TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "8"}, - }}, - {FieldID: 104, Name: "float16Vector", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "8"}, - }}, - {FieldID: 105, Name: "bf16Vector", DataType: schemapb.DataType_BFloat16Vector, TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "8"}, - }}, - {FieldID: 106, Name: "sparseFloatVector", DataType: schemapb.DataType_SparseFloatVector, TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "28433"}, - }}, - {FieldID: 107, Name: "bool_null", Nullable: true, DataType: schemapb.DataType_Bool}, - {FieldID: 108, Name: "int8_null", Nullable: true, DataType: schemapb.DataType_Int8}, - {FieldID: 109, Name: "int16_null", Nullable: true, DataType: schemapb.DataType_Int16}, - {FieldID: 110, Name: "int64_null", Nullable: true, DataType: schemapb.DataType_Int64}, - {FieldID: 111, Name: "float_null", Nullable: true, DataType: schemapb.DataType_Float}, - {FieldID: 112, Name: "double_null", Nullable: true, DataType: schemapb.DataType_Double}, - {FieldID: 113, Name: "varchar_null", Nullable: true, DataType: schemapb.DataType_VarChar}, - {FieldID: 114, Name: "string_null", Nullable: true, DataType: schemapb.DataType_String}, - {FieldID: 115, Name: "array_null", Nullable: true, DataType: schemapb.DataType_Array}, - {FieldID: 116, Name: "json_null", Nullable: true, DataType: schemapb.DataType_JSON}, - { - FieldID: 117, Name: "bool_with_default_value", Nullable: true, DataType: schemapb.DataType_Bool, - DefaultValue: &schemapb.ValueField{ - Data: &schemapb.ValueField_BoolData{ - BoolData: true, - }, - }, - }, - { - FieldID: 118, Name: "int8_with_default_value", Nullable: true, DataType: schemapb.DataType_Int8, - DefaultValue: &schemapb.ValueField{ - Data: &schemapb.ValueField_IntData{ - IntData: 10, - }, - }, - }, - { - FieldID: 119, Name: "int16_with_default_value", Nullable: true, DataType: schemapb.DataType_Int16, - DefaultValue: &schemapb.ValueField{ - Data: &schemapb.ValueField_IntData{ - IntData: 10, - }, - }, - }, - { - FieldID: 120, Name: "int64_with_default_value", Nullable: true, DataType: schemapb.DataType_Int64, - DefaultValue: &schemapb.ValueField{ - Data: &schemapb.ValueField_LongData{ - LongData: 10, - }, - }, - }, - { - FieldID: 121, Name: "float_with_default_value", Nullable: true, DataType: schemapb.DataType_Float, - DefaultValue: &schemapb.ValueField{ - Data: &schemapb.ValueField_FloatData{ - FloatData: 10, - }, - }, - }, - { - FieldID: 122, Name: "double_with_default_value", Nullable: true, DataType: schemapb.DataType_Double, - DefaultValue: &schemapb.ValueField{ - Data: &schemapb.ValueField_DoubleData{ - DoubleData: 10, - }, - }, - }, - { - FieldID: 123, Name: "varchar_with_default_value", Nullable: true, DataType: schemapb.DataType_VarChar, - DefaultValue: &schemapb.ValueField{ - Data: &schemapb.ValueField_StringData{ - StringData: "a", - }, - }, - }, - { - FieldID: 124, Name: "string_with_default_value", Nullable: true, DataType: schemapb.DataType_String, - DefaultValue: &schemapb.ValueField{ - Data: &schemapb.ValueField_StringData{ - StringData: "a", - }, - }, - }, - {FieldID: 125, Name: "int32_null", Nullable: true, DataType: schemapb.DataType_Int32}, - { - FieldID: 126, Name: "int32_with_default_value", Nullable: true, DataType: schemapb.DataType_Int32, - DefaultValue: &schemapb.ValueField{ - Data: &schemapb.ValueField_IntData{ - IntData: 10, - }, - }, - }, - }} - - return schema -} - -func generateTestData(num int) ([]*Blob, error) { - return generateTestDataWithSeed(1, num) -} - -func generateTestDataWithSeed(seed, num int) ([]*Blob, error) { - insertCodec := NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: 1, Schema: generateTestSchema()}) - - var ( - field0 []int64 - field1 []int64 - - field10 []bool - field11 []int8 - field12 []int16 - field13 []int64 - field14 []float32 - field15 []float64 - field16 []string - field17 []string - field18 []*schemapb.ScalarField - field19 [][]byte - - field101 []int32 - field102 []float32 - field103 []byte - - field104 []byte - field105 []byte - field106 [][]byte - ) - - for i := seed; i < seed+num; i++ { - field0 = append(field0, int64(i)) - field1 = append(field1, int64(i)) - field10 = append(field10, true) - field11 = append(field11, int8(i)) - field12 = append(field12, int16(i)) - field13 = append(field13, int64(i)) - field14 = append(field14, float32(i)) - field15 = append(field15, float64(i)) - field16 = append(field16, fmt.Sprint(i)) - field17 = append(field17, fmt.Sprint(i)) - - arr := &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{Data: []int32{int32(i), int32(i), int32(i)}}, - }, - } - field18 = append(field18, arr) - - field19 = append(field19, []byte{byte(i)}) - field101 = append(field101, int32(i)) - - f102 := make([]float32, 8) - for j := range f102 { - f102[j] = float32(i) - } - - field102 = append(field102, f102...) - field103 = append(field103, 0xff) - - f104 := make([]byte, 16) - for j := range f104 { - f104[j] = byte(i) - } - field104 = append(field104, f104...) - field105 = append(field105, f104...) - - field106 = append(field106, typeutil.CreateSparseFloatRow([]uint32{0, uint32(18 * i), uint32(284 * i)}, []float32{1.1, 0.3, 2.4})) - } - - data := &InsertData{Data: map[FieldID]FieldData{ - common.RowIDField: &Int64FieldData{Data: field0}, - common.TimeStampField: &Int64FieldData{Data: field1}, - - 10: &BoolFieldData{Data: field10}, - 11: &Int8FieldData{Data: field11}, - 12: &Int16FieldData{Data: field12}, - 13: &Int64FieldData{Data: field13}, - 14: &FloatFieldData{Data: field14}, - 15: &DoubleFieldData{Data: field15}, - 16: &StringFieldData{Data: field16}, - 17: &StringFieldData{Data: field17}, - 18: &ArrayFieldData{Data: field18}, - 19: &JSONFieldData{Data: field19}, - 101: &Int32FieldData{Data: field101}, - 102: &FloatVectorFieldData{ - Data: field102, - Dim: 8, - }, - 103: &BinaryVectorFieldData{ - Data: field103, - Dim: 8, - }, - 104: &Float16VectorFieldData{ - Data: field104, - Dim: 8, - }, - 105: &BFloat16VectorFieldData{ - Data: field105, - Dim: 8, - }, - 106: &SparseFloatVectorFieldData{ - SparseFloatArray: schemapb.SparseFloatArray{ - Dim: 28433, - Contents: field106, - }, - }, - }} - - blobs, err := insertCodec.Serialize(1, 1, data) - return blobs, err -} - -func assertTestData(t *testing.T, i int, value *Value) { - assertTestDataInternal(t, i, value, true) -} - -// Verify value of index i (1-based numbering) in data generated by generateTestData -func assertTestDataInternal(t *testing.T, i int, value *Value, lazy bool) { - getf18 := func() any { - f18 := &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{ - Data: []int32{int32(i), int32(i), int32(i)}, - }, - }, - } - if lazy { - f18b, err := proto.Marshal(f18) - assert.Nil(t, err) - return f18b - } - return f18 - } - - f102 := make([]float32, 8) - for j := range f102 { - f102[j] = float32(i) - } - - f104 := make([]byte, 16) - for j := range f104 { - f104[j] = byte(i) - } - - f106 := typeutil.CreateSparseFloatRow([]uint32{0, uint32(18 * i), uint32(284 * i)}, []float32{1.1, 0.3, 2.4}) - - assert.EqualExportedValues(t, &Value{ - int64(i), - &Int64PrimaryKey{Value: int64(i)}, - int64(i), - false, - map[FieldID]interface{}{ - common.TimeStampField: int64(i), - common.RowIDField: int64(i), - - 10: true, - 11: int8(i), - 12: int16(i), - 13: int64(i), - 14: float32(i), - 15: float64(i), - 16: fmt.Sprint(i), - 17: fmt.Sprint(i), - 18: getf18(), - 19: []byte{byte(i)}, - 101: int32(i), - 102: f102, - 103: []byte{0xff}, - 104: f104, - 105: f104, - 106: f106, - }, - }, value) -} - -func assertTestAddedFieldData(t *testing.T, i int, value *Value) { - getf18 := func() any { - f18 := &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{ - Data: []int32{int32(i), int32(i), int32(i)}, - }, - }, - } - f18b, err := proto.Marshal(f18) - assert.Nil(t, err) - return f18b - } - - f102 := make([]float32, 8) - for j := range f102 { - f102[j] = float32(i) - } - - f104 := make([]byte, 16) - for j := range f104 { - f104[j] = byte(i) - } - - f106 := typeutil.CreateSparseFloatRow([]uint32{0, uint32(18 * i), uint32(284 * i)}, []float32{1.1, 0.3, 2.4}) - - assert.EqualExportedValues(t, &Value{ - int64(i), - &Int64PrimaryKey{Value: int64(i)}, - int64(i), - false, - map[FieldID]interface{}{ - common.TimeStampField: int64(i), - common.RowIDField: int64(i), - - 10: true, - 11: int8(i), - 12: int16(i), - 13: int64(i), - 14: float32(i), - 15: float64(i), - 16: fmt.Sprint(i), - 17: fmt.Sprint(i), - 18: getf18(), - 19: []byte{byte(i)}, - 101: int32(i), - 102: f102, - 103: []byte{0xff}, - 104: f104, - 105: f104, - 106: f106, - 107: nil, - 108: nil, - 109: nil, - 110: nil, - 111: nil, - 112: nil, - 113: nil, - 114: nil, - 115: nil, - 116: nil, - 117: true, - 118: int8(10), - 119: int16(10), - 120: int64(10), - 121: float32(10), - 122: float64(10), - 123: "a", - 124: "a", - 125: nil, - 126: int32(10), - }, - }, value) -} - -func TestInsertlogIterator(t *testing.T) { - t.Run("empty iterator", func(t *testing.T) { - itr := &InsertBinlogIterator{ - data: &InsertData{}, - } - assert.False(t, itr.HasNext()) - _, err := itr.Next() - assert.Equal(t, ErrNoMoreRecord, err) - }) - - t.Run("test dispose", func(t *testing.T) { - blobs, err := generateTestData(1) - assert.NoError(t, err) - itr, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64) - assert.NoError(t, err) - - itr.Dispose() - assert.False(t, itr.HasNext()) - _, err = itr.Next() - assert.Equal(t, ErrDisposed, err) - }) - - t.Run("not empty iterator", func(t *testing.T) { - blobs, err := generateTestData(3) - assert.NoError(t, err) - itr, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64) - assert.NoError(t, err) - - for i := 1; i <= 3; i++ { - assert.True(t, itr.HasNext()) - v, err := itr.Next() - assert.NoError(t, err) - value := v.(*Value) - assertTestDataInternal(t, i, value, false) - } - - assert.False(t, itr.HasNext()) - _, err = itr.Next() - assert.Equal(t, ErrNoMoreRecord, err) - }) -} diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 109ef161be..b77f45cd37 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -65,6 +65,15 @@ type ( // InvalidUniqueID is used when the UniqueID is not set (like in return with err) const InvalidUniqueID = UniqueID(-1) +// Value is the return value of Next +type Value struct { + ID int64 + PK PrimaryKey + Timestamp int64 + IsDeleted bool + Value interface{} +} + // Blob is a pack of key&value type Blob struct { Key string diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 7d594b1147..9819153184 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -57,6 +58,411 @@ const ( Int8VectorField = 115 ) +func assertTestData(t *testing.T, i int, value *Value) { + assertTestDataInternal(t, i, value, true) +} + +// Verify value of index i (1-based numbering) in data generated by generateTestData +func assertTestDataInternal(t *testing.T, i int, value *Value, lazy bool) { + getf18 := func() any { + f18 := &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: []int32{int32(i), int32(i), int32(i)}, + }, + }, + } + if lazy { + f18b, err := proto.Marshal(f18) + assert.Nil(t, err) + return f18b + } + return f18 + } + + f102 := make([]float32, 8) + for j := range f102 { + f102[j] = float32(i) + } + + f104 := make([]byte, 16) + for j := range f104 { + f104[j] = byte(i) + } + + f106 := typeutil.CreateSparseFloatRow([]uint32{0, uint32(18 * i), uint32(284 * i)}, []float32{1.1, 0.3, 2.4}) + + assert.EqualExportedValues(t, &Value{ + int64(i), + &Int64PrimaryKey{Value: int64(i)}, + int64(i), + false, + map[FieldID]interface{}{ + common.TimeStampField: int64(i), + common.RowIDField: int64(i), + + 10: true, + 11: int8(i), + 12: int16(i), + 13: int64(i), + 14: float32(i), + 15: float64(i), + 16: fmt.Sprint(i), + 17: fmt.Sprint(i), + 18: getf18(), + 19: []byte{byte(i)}, + 101: int32(i), + 102: f102, + 103: []byte{0xff}, + 104: f104, + 105: f104, + 106: f106, + }, + }, value) +} + +func assertTestAddedFieldData(t *testing.T, i int, value *Value) { + getf18 := func() any { + f18 := &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: []int32{int32(i), int32(i), int32(i)}, + }, + }, + } + f18b, err := proto.Marshal(f18) + assert.Nil(t, err) + return f18b + } + + f102 := make([]float32, 8) + for j := range f102 { + f102[j] = float32(i) + } + + f104 := make([]byte, 16) + for j := range f104 { + f104[j] = byte(i) + } + + f106 := typeutil.CreateSparseFloatRow([]uint32{0, uint32(18 * i), uint32(284 * i)}, []float32{1.1, 0.3, 2.4}) + + assert.EqualExportedValues(t, &Value{ + int64(i), + &Int64PrimaryKey{Value: int64(i)}, + int64(i), + false, + map[FieldID]interface{}{ + common.TimeStampField: int64(i), + common.RowIDField: int64(i), + + 10: true, + 11: int8(i), + 12: int16(i), + 13: int64(i), + 14: float32(i), + 15: float64(i), + 16: fmt.Sprint(i), + 17: fmt.Sprint(i), + 18: getf18(), + 19: []byte{byte(i)}, + 101: int32(i), + 102: f102, + 103: []byte{0xff}, + 104: f104, + 105: f104, + 106: f106, + 107: nil, + 108: nil, + 109: nil, + 110: nil, + 111: nil, + 112: nil, + 113: nil, + 114: nil, + 115: nil, + 116: nil, + 117: true, + 118: int8(10), + 119: int16(10), + 120: int64(10), + 121: float32(10), + 122: float64(10), + 123: "a", + 124: "a", + 125: nil, + 126: int32(10), + }, + }, value) +} + +func generateTestData(num int) ([]*Blob, error) { + return generateTestDataWithSeed(1, num) +} + +func generateTestDataWithSeed(seed, num int) ([]*Blob, error) { + insertCodec := NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: 1, Schema: generateTestSchema()}) + + var ( + field0 []int64 + field1 []int64 + + field10 []bool + field11 []int8 + field12 []int16 + field13 []int64 + field14 []float32 + field15 []float64 + field16 []string + field17 []string + field18 []*schemapb.ScalarField + field19 [][]byte + + field101 []int32 + field102 []float32 + field103 []byte + + field104 []byte + field105 []byte + field106 [][]byte + ) + + for i := seed; i < seed+num; i++ { + field0 = append(field0, int64(i)) + field1 = append(field1, int64(i)) + field10 = append(field10, true) + field11 = append(field11, int8(i)) + field12 = append(field12, int16(i)) + field13 = append(field13, int64(i)) + field14 = append(field14, float32(i)) + field15 = append(field15, float64(i)) + field16 = append(field16, fmt.Sprint(i)) + field17 = append(field17, fmt.Sprint(i)) + + arr := &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{Data: []int32{int32(i), int32(i), int32(i)}}, + }, + } + field18 = append(field18, arr) + + field19 = append(field19, []byte{byte(i)}) + field101 = append(field101, int32(i)) + + f102 := make([]float32, 8) + for j := range f102 { + f102[j] = float32(i) + } + + field102 = append(field102, f102...) + field103 = append(field103, 0xff) + + f104 := make([]byte, 16) + for j := range f104 { + f104[j] = byte(i) + } + field104 = append(field104, f104...) + field105 = append(field105, f104...) + + field106 = append(field106, typeutil.CreateSparseFloatRow([]uint32{0, uint32(18 * i), uint32(284 * i)}, []float32{1.1, 0.3, 2.4})) + } + + data := &InsertData{Data: map[FieldID]FieldData{ + common.RowIDField: &Int64FieldData{Data: field0}, + common.TimeStampField: &Int64FieldData{Data: field1}, + + 10: &BoolFieldData{Data: field10}, + 11: &Int8FieldData{Data: field11}, + 12: &Int16FieldData{Data: field12}, + 13: &Int64FieldData{Data: field13}, + 14: &FloatFieldData{Data: field14}, + 15: &DoubleFieldData{Data: field15}, + 16: &StringFieldData{Data: field16}, + 17: &StringFieldData{Data: field17}, + 18: &ArrayFieldData{Data: field18}, + 19: &JSONFieldData{Data: field19}, + 101: &Int32FieldData{Data: field101}, + 102: &FloatVectorFieldData{ + Data: field102, + Dim: 8, + }, + 103: &BinaryVectorFieldData{ + Data: field103, + Dim: 8, + }, + 104: &Float16VectorFieldData{ + Data: field104, + Dim: 8, + }, + 105: &BFloat16VectorFieldData{ + Data: field105, + Dim: 8, + }, + 106: &SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 28433, + Contents: field106, + }, + }, + }} + + blobs, err := insertCodec.Serialize(1, 1, data) + return blobs, err +} + +func generateTestAddedFieldSchema() *schemapb.CollectionSchema { + schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ + {FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64}, + {FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, + {FieldID: 10, Name: "bool", DataType: schemapb.DataType_Bool}, + {FieldID: 11, Name: "int8", DataType: schemapb.DataType_Int8}, + {FieldID: 12, Name: "int16", DataType: schemapb.DataType_Int16}, + {FieldID: 13, Name: "int64", DataType: schemapb.DataType_Int64}, + {FieldID: 14, Name: "float", DataType: schemapb.DataType_Float}, + {FieldID: 15, Name: "double", DataType: schemapb.DataType_Double}, + {FieldID: 16, Name: "varchar", DataType: schemapb.DataType_VarChar}, + {FieldID: 17, Name: "string", DataType: schemapb.DataType_String}, + {FieldID: 18, Name: "array", DataType: schemapb.DataType_Array}, + {FieldID: 19, Name: "json", DataType: schemapb.DataType_JSON}, + {FieldID: 101, Name: "int32", DataType: schemapb.DataType_Int32}, + {FieldID: 102, Name: "floatVector", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }}, + {FieldID: 103, Name: "binaryVector", DataType: schemapb.DataType_BinaryVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }}, + {FieldID: 104, Name: "float16Vector", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }}, + {FieldID: 105, Name: "bf16Vector", DataType: schemapb.DataType_BFloat16Vector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }}, + {FieldID: 106, Name: "sparseFloatVector", DataType: schemapb.DataType_SparseFloatVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "28433"}, + }}, + {FieldID: 107, Name: "bool_null", Nullable: true, DataType: schemapb.DataType_Bool}, + {FieldID: 108, Name: "int8_null", Nullable: true, DataType: schemapb.DataType_Int8}, + {FieldID: 109, Name: "int16_null", Nullable: true, DataType: schemapb.DataType_Int16}, + {FieldID: 110, Name: "int64_null", Nullable: true, DataType: schemapb.DataType_Int64}, + {FieldID: 111, Name: "float_null", Nullable: true, DataType: schemapb.DataType_Float}, + {FieldID: 112, Name: "double_null", Nullable: true, DataType: schemapb.DataType_Double}, + {FieldID: 113, Name: "varchar_null", Nullable: true, DataType: schemapb.DataType_VarChar}, + {FieldID: 114, Name: "string_null", Nullable: true, DataType: schemapb.DataType_String}, + {FieldID: 115, Name: "array_null", Nullable: true, DataType: schemapb.DataType_Array}, + {FieldID: 116, Name: "json_null", Nullable: true, DataType: schemapb.DataType_JSON}, + { + FieldID: 117, Name: "bool_with_default_value", Nullable: true, DataType: schemapb.DataType_Bool, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_BoolData{ + BoolData: true, + }, + }, + }, + { + FieldID: 118, Name: "int8_with_default_value", Nullable: true, DataType: schemapb.DataType_Int8, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_IntData{ + IntData: 10, + }, + }, + }, + { + FieldID: 119, Name: "int16_with_default_value", Nullable: true, DataType: schemapb.DataType_Int16, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_IntData{ + IntData: 10, + }, + }, + }, + { + FieldID: 120, Name: "int64_with_default_value", Nullable: true, DataType: schemapb.DataType_Int64, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_LongData{ + LongData: 10, + }, + }, + }, + { + FieldID: 121, Name: "float_with_default_value", Nullable: true, DataType: schemapb.DataType_Float, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_FloatData{ + FloatData: 10, + }, + }, + }, + { + FieldID: 122, Name: "double_with_default_value", Nullable: true, DataType: schemapb.DataType_Double, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_DoubleData{ + DoubleData: 10, + }, + }, + }, + { + FieldID: 123, Name: "varchar_with_default_value", Nullable: true, DataType: schemapb.DataType_VarChar, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_StringData{ + StringData: "a", + }, + }, + }, + { + FieldID: 124, Name: "string_with_default_value", Nullable: true, DataType: schemapb.DataType_String, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_StringData{ + StringData: "a", + }, + }, + }, + {FieldID: 125, Name: "int32_null", Nullable: true, DataType: schemapb.DataType_Int32}, + { + FieldID: 126, Name: "int32_with_default_value", Nullable: true, DataType: schemapb.DataType_Int32, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_IntData{ + IntData: 10, + }, + }, + }, + }} + + return schema +} + +func generateTestSchema() *schemapb.CollectionSchema { + schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ + {FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64}, + {FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, + {FieldID: 10, Name: "bool", DataType: schemapb.DataType_Bool}, + {FieldID: 11, Name: "int8", DataType: schemapb.DataType_Int8}, + {FieldID: 12, Name: "int16", DataType: schemapb.DataType_Int16}, + {FieldID: 13, Name: "int64", DataType: schemapb.DataType_Int64}, + {FieldID: 14, Name: "float", DataType: schemapb.DataType_Float}, + {FieldID: 15, Name: "double", DataType: schemapb.DataType_Double}, + {FieldID: 16, Name: "varchar", DataType: schemapb.DataType_VarChar}, + {FieldID: 17, Name: "string", DataType: schemapb.DataType_String}, + {FieldID: 18, Name: "array", DataType: schemapb.DataType_Array}, + {FieldID: 19, Name: "json", DataType: schemapb.DataType_JSON}, + {FieldID: 101, Name: "int32", DataType: schemapb.DataType_Int32}, + {FieldID: 102, Name: "floatVector", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }}, + {FieldID: 103, Name: "binaryVector", DataType: schemapb.DataType_BinaryVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }}, + {FieldID: 104, Name: "float16Vector", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }}, + {FieldID: 105, Name: "bf16Vector", DataType: schemapb.DataType_BFloat16Vector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }}, + {FieldID: 106, Name: "sparseFloatVector", DataType: schemapb.DataType_SparseFloatVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "28433"}, + }}, + }} + + return schema +} + func genTestCollectionMeta() *etcdpb.CollectionMeta { return &etcdpb.CollectionMeta{ ID: CollectionID, diff --git a/internal/storage/serde_test.go b/internal/storage/serde_test.go index 24b5e82c94..5fc50759b8 100644 --- a/internal/storage/serde_test.go +++ b/internal/storage/serde_test.go @@ -27,7 +27,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/pkg/v2/common" ) type MockRecordWriter struct { @@ -139,25 +138,6 @@ func BenchmarkDeserializeReader(b *testing.B) { } } -func BenchmarkBinlogIterator(b *testing.B) { - len := 1000000 - blobs, err := generateTestData(len) - assert.NoError(b, err) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - itr, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64) - assert.NoError(b, err) - defer itr.Dispose() - for i := 0; i < len; i++ { - assert.True(b, itr.HasNext()) - _, err = itr.Next() - assert.NoError(b, err) - } - assert.False(b, itr.HasNext()) - } -} - func TestCalculateArraySize(t *testing.T) { mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0)