From 987d9023a5f2e4d6be1eaee52a84adeb22eb3c48 Mon Sep 17 00:00:00 2001 From: Ted Xu Date: Fri, 8 Mar 2024 18:25:02 +0800 Subject: [PATCH] enhance: Enable binlog deserialize reader in datanode compaction (#31036) See #30863 Signed-off-by: Ted Xu --- internal/datanode/binlog_io.go | 2 +- internal/datanode/compactor.go | 22 ++++++++----- internal/storage/binlog_iterator.go | 2 ++ internal/storage/binlog_iterator_test.go | 20 ++++++----- internal/storage/serde.go | 12 ++++--- internal/storage/serde_test.go | 42 +++++++++++++++++++++++- 6 files changed, 76 insertions(+), 24 deletions(-) diff --git a/internal/datanode/binlog_io.go b/internal/datanode/binlog_io.go index 146d3d63f0..a351f0f260 100644 --- a/internal/datanode/binlog_io.go +++ b/internal/datanode/binlog_io.go @@ -55,7 +55,7 @@ func downloadBlobs(ctx context.Context, b io.BinlogIO, paths []string) ([]*Blob, return resp, nil } for i := range bytes { - resp[i] = &Blob{Value: bytes[i]} + resp[i] = &Blob{Key: paths[i], Value: bytes[i]} } return resp, nil } diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 60bc3fe554..ab8dae57af 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -19,6 +19,7 @@ package datanode import ( "context" "fmt" + sio "io" "sync" "time" @@ -333,20 +334,23 @@ func (t *compactionTask) merge( } downloadTimeCost += time.Since(downloadStart) - iter, err := storage.NewInsertBinlogIterator(data, pkID, pkType) + iter, err := storage.NewBinlogDeserializeReader(data, pkID) if err != nil { - log.Warn("new insert binlogs Itr wrong", zap.Strings("path", path), zap.Error(err)) + log.Warn("new insert binlogs reader wrong", zap.Strings("path", path), zap.Error(err)) return nil, nil, 0, err } - for iter.HasNext() { - vInter, _ := iter.Next() - v, ok := vInter.(*storage.Value) - if !ok { - log.Warn("transfer interface to Value wrong", zap.Strings("path", path)) - return nil, nil, 0, errors.New("unexpected error") + for { + err := iter.Next() + if err != nil { + if err == sio.EOF { + break + } else { + log.Warn("transfer interface to Value wrong", zap.Strings("path", path)) + return nil, nil, 0, errors.New("unexpected error") + } } - + v := iter.Value() if isDeletedValue(v) { continue } diff --git a/internal/storage/binlog_iterator.go b/internal/storage/binlog_iterator.go index fad450b8ad..f620483982 100644 --- a/internal/storage/binlog_iterator.go +++ b/internal/storage/binlog_iterator.go @@ -61,6 +61,8 @@ type InsertBinlogIterator struct { } // NewInsertBinlogIterator creates a new iterator +// +// Deprecated: use storage.NewBinlogDeserializeReader instead func NewInsertBinlogIterator(blobs []*Blob, PKfieldID UniqueID, pkType schemapb.DataType) (*InsertBinlogIterator, error) { // TODO: load part of file to read records other than loading all content reader := NewInsertCodecWithSchema(nil) diff --git a/internal/storage/binlog_iterator_test.go b/internal/storage/binlog_iterator_test.go index b7932bdc09..b2e545bebe 100644 --- a/internal/storage/binlog_iterator_test.go +++ b/internal/storage/binlog_iterator_test.go @@ -27,7 +27,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" ) -func generateTestData(t *testing.T, num int) []*Blob { +func generateTestData(num int) ([]*Blob, error) { schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ {FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64}, {FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64}, @@ -144,8 +144,7 @@ func generateTestData(t *testing.T, num int) []*Blob { }} blobs, err := insertCodec.Serialize(1, 1, data) - assert.NoError(t, err) - return blobs + return blobs, err } // Verify value of index i (1-based numbering) in data generated by generateTestData @@ -199,7 +198,8 @@ func TestInsertlogIterator(t *testing.T) { }) t.Run("test dispose", func(t *testing.T) { - blobs := generateTestData(t, 1) + blobs, err := generateTestData(1) + assert.NoError(t, err) itr, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64) assert.NoError(t, err) @@ -210,7 +210,8 @@ func TestInsertlogIterator(t *testing.T) { }) t.Run("not empty iterator", func(t *testing.T) { - blobs := generateTestData(t, 3) + blobs, err := generateTestData(3) + assert.NoError(t, err) itr, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64) assert.NoError(t, err) @@ -243,7 +244,8 @@ func TestMergeIterator(t *testing.T) { }) t.Run("empty and non-empty iterators", func(t *testing.T) { - blobs := generateTestData(t, 3) + blobs, err := generateTestData(3) + assert.NoError(t, err) insertItr, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64) assert.NoError(t, err) iterators := []Iterator{ @@ -266,7 +268,8 @@ func TestMergeIterator(t *testing.T) { }) t.Run("non-empty iterators", func(t *testing.T) { - blobs := generateTestData(t, 3) + blobs, err := generateTestData(3) + assert.NoError(t, err) itr1, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64) assert.NoError(t, err) itr2, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64) @@ -290,7 +293,8 @@ func TestMergeIterator(t *testing.T) { }) t.Run("test dispose", func(t *testing.T) { - blobs := generateTestData(t, 3) + blobs, err := generateTestData(3) + assert.NoError(t, err) itr1, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64) assert.NoError(t, err) itr := NewMergeIterator([]Iterator{itr1}) diff --git a/internal/storage/serde.go b/internal/storage/serde.go index 6fdc26761d..330dd81882 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -237,9 +237,6 @@ func (deser *DeserializeReader[T]) Next() error { return err } deser.pos = 0 - if deser.rec != nil { - deser.rec.Release() - } deser.rec = deser.rr.Record() if deser.values == nil { @@ -382,19 +379,24 @@ func NewBinlogDeserializeReader(blobs []*Blob, PKfieldID UniqueID) (*Deserialize value := v[i] if value == nil { value = &Value{} + m := make(map[FieldID]interface{}, len(r.Schema())) + value.Value = m v[i] = value } - m := make(map[FieldID]interface{}) + m := value.Value.(map[FieldID]interface{}) for j, dt := range r.Schema() { d, ok := deserializeCell(r.Column(j), dt, i) if ok { - m[j] = d + m[j] = d // TODO: avoid memory copy here. } else { return errors.New(fmt.Sprintf("unexpected type %s", dt)) } } + if _, ok := m[common.RowIDField]; !ok { + panic("no row id column found") + } value.ID = m[common.RowIDField].(int64) value.Timestamp = m[common.TimeStampField].(int64) diff --git a/internal/storage/serde_test.go b/internal/storage/serde_test.go index b04d00bd7f..f309ae41ec 100644 --- a/internal/storage/serde_test.go +++ b/internal/storage/serde_test.go @@ -47,7 +47,8 @@ func TestBinlogDeserializeReader(t *testing.T) { t.Run("test deserialize", func(t *testing.T) { len := 3 - blobs := generateTestData(t, len) + blobs, err := generateTestData(len) + assert.NoError(t, err) reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField) assert.NoError(t, err) defer reader.Close() @@ -173,3 +174,42 @@ func Test_deserializeCell(t *testing.T) { }) } } + +func BenchmarkDeserializeReader(b *testing.B) { + len := 1000000 + blobs, err := generateTestData(len) + assert.NoError(b, err) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField) + assert.NoError(b, err) + defer reader.Close() + for i := 0; i < len; i++ { + err = reader.Next() + _ = reader.Value() + assert.NoError(b, err) + } + err = reader.Next() + assert.Equal(b, io.EOF, err) + } +} + +func BenchmarkBinlogIterator(b *testing.B) { + len := 1000000 + blobs, err := generateTestData(len) + assert.NoError(b, err) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + itr, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64) + assert.NoError(b, err) + defer itr.Dispose() + for i := 0; i < len; i++ { + assert.True(b, itr.HasNext()) + _, err = itr.Next() + assert.NoError(b, err) + } + assert.False(b, itr.HasNext()) + } +}