enhance: Enable binlog deserialize reader in datanode compaction (#31036)

See #30863

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
This commit is contained in:
Ted Xu 2024-03-08 18:25:02 +08:00 committed by GitHub
parent ddd918ba04
commit 987d9023a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 76 additions and 24 deletions

View File

@ -55,7 +55,7 @@ func downloadBlobs(ctx context.Context, b io.BinlogIO, paths []string) ([]*Blob,
return resp, nil
}
for i := range bytes {
resp[i] = &Blob{Value: bytes[i]}
resp[i] = &Blob{Key: paths[i], Value: bytes[i]}
}
return resp, nil
}

View File

@ -19,6 +19,7 @@ package datanode
import (
"context"
"fmt"
sio "io"
"sync"
"time"
@ -333,20 +334,23 @@ func (t *compactionTask) merge(
}
downloadTimeCost += time.Since(downloadStart)
iter, err := storage.NewInsertBinlogIterator(data, pkID, pkType)
iter, err := storage.NewBinlogDeserializeReader(data, pkID)
if err != nil {
log.Warn("new insert binlogs Itr wrong", zap.Strings("path", path), zap.Error(err))
log.Warn("new insert binlogs reader wrong", zap.Strings("path", path), zap.Error(err))
return nil, nil, 0, err
}
for iter.HasNext() {
vInter, _ := iter.Next()
v, ok := vInter.(*storage.Value)
if !ok {
log.Warn("transfer interface to Value wrong", zap.Strings("path", path))
return nil, nil, 0, errors.New("unexpected error")
for {
err := iter.Next()
if err != nil {
if err == sio.EOF {
break
} else {
log.Warn("transfer interface to Value wrong", zap.Strings("path", path))
return nil, nil, 0, errors.New("unexpected error")
}
}
v := iter.Value()
if isDeletedValue(v) {
continue
}

View File

@ -61,6 +61,8 @@ type InsertBinlogIterator struct {
}
// NewInsertBinlogIterator creates a new iterator
//
// Deprecated: use storage.NewBinlogDeserializeReader instead
func NewInsertBinlogIterator(blobs []*Blob, PKfieldID UniqueID, pkType schemapb.DataType) (*InsertBinlogIterator, error) {
// TODO: load part of file to read records other than loading all content
reader := NewInsertCodecWithSchema(nil)

View File

@ -27,7 +27,7 @@ import (
"github.com/milvus-io/milvus/pkg/common"
)
func generateTestData(t *testing.T, num int) []*Blob {
func generateTestData(num int) ([]*Blob, error) {
schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64},
{FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64},
@ -144,8 +144,7 @@ func generateTestData(t *testing.T, num int) []*Blob {
}}
blobs, err := insertCodec.Serialize(1, 1, data)
assert.NoError(t, err)
return blobs
return blobs, err
}
// Verify value of index i (1-based numbering) in data generated by generateTestData
@ -199,7 +198,8 @@ func TestInsertlogIterator(t *testing.T) {
})
t.Run("test dispose", func(t *testing.T) {
blobs := generateTestData(t, 1)
blobs, err := generateTestData(1)
assert.NoError(t, err)
itr, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64)
assert.NoError(t, err)
@ -210,7 +210,8 @@ func TestInsertlogIterator(t *testing.T) {
})
t.Run("not empty iterator", func(t *testing.T) {
blobs := generateTestData(t, 3)
blobs, err := generateTestData(3)
assert.NoError(t, err)
itr, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64)
assert.NoError(t, err)
@ -243,7 +244,8 @@ func TestMergeIterator(t *testing.T) {
})
t.Run("empty and non-empty iterators", func(t *testing.T) {
blobs := generateTestData(t, 3)
blobs, err := generateTestData(3)
assert.NoError(t, err)
insertItr, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64)
assert.NoError(t, err)
iterators := []Iterator{
@ -266,7 +268,8 @@ func TestMergeIterator(t *testing.T) {
})
t.Run("non-empty iterators", func(t *testing.T) {
blobs := generateTestData(t, 3)
blobs, err := generateTestData(3)
assert.NoError(t, err)
itr1, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64)
assert.NoError(t, err)
itr2, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64)
@ -290,7 +293,8 @@ func TestMergeIterator(t *testing.T) {
})
t.Run("test dispose", func(t *testing.T) {
blobs := generateTestData(t, 3)
blobs, err := generateTestData(3)
assert.NoError(t, err)
itr1, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64)
assert.NoError(t, err)
itr := NewMergeIterator([]Iterator{itr1})

View File

@ -237,9 +237,6 @@ func (deser *DeserializeReader[T]) Next() error {
return err
}
deser.pos = 0
if deser.rec != nil {
deser.rec.Release()
}
deser.rec = deser.rr.Record()
if deser.values == nil {
@ -382,19 +379,24 @@ func NewBinlogDeserializeReader(blobs []*Blob, PKfieldID UniqueID) (*Deserialize
value := v[i]
if value == nil {
value = &Value{}
m := make(map[FieldID]interface{}, len(r.Schema()))
value.Value = m
v[i] = value
}
m := make(map[FieldID]interface{})
m := value.Value.(map[FieldID]interface{})
for j, dt := range r.Schema() {
d, ok := deserializeCell(r.Column(j), dt, i)
if ok {
m[j] = d
m[j] = d // TODO: avoid memory copy here.
} else {
return errors.New(fmt.Sprintf("unexpected type %s", dt))
}
}
if _, ok := m[common.RowIDField]; !ok {
panic("no row id column found")
}
value.ID = m[common.RowIDField].(int64)
value.Timestamp = m[common.TimeStampField].(int64)

View File

@ -47,7 +47,8 @@ func TestBinlogDeserializeReader(t *testing.T) {
t.Run("test deserialize", func(t *testing.T) {
len := 3
blobs := generateTestData(t, len)
blobs, err := generateTestData(len)
assert.NoError(t, err)
reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField)
assert.NoError(t, err)
defer reader.Close()
@ -173,3 +174,42 @@ func Test_deserializeCell(t *testing.T) {
})
}
}
func BenchmarkDeserializeReader(b *testing.B) {
len := 1000000
blobs, err := generateTestData(len)
assert.NoError(b, err)
b.ResetTimer()
for i := 0; i < b.N; i++ {
reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField)
assert.NoError(b, err)
defer reader.Close()
for i := 0; i < len; i++ {
err = reader.Next()
_ = reader.Value()
assert.NoError(b, err)
}
err = reader.Next()
assert.Equal(b, io.EOF, err)
}
}
func BenchmarkBinlogIterator(b *testing.B) {
len := 1000000
blobs, err := generateTestData(len)
assert.NoError(b, err)
b.ResetTimer()
for i := 0; i < b.N; i++ {
itr, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64)
assert.NoError(b, err)
defer itr.Dispose()
for i := 0; i < len; i++ {
assert.True(b, itr.HasNext())
_, err = itr.Next()
assert.NoError(b, err)
}
assert.False(b, itr.HasNext())
}
}