diff --git a/internal/datanode/binlog_io_test.go b/internal/datanode/binlog_io_test.go index b71da427b7..ebb9f9acaf 100644 --- a/internal/datanode/binlog_io_test.go +++ b/internal/datanode/binlog_io_test.go @@ -116,7 +116,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { alloc := allocator.NewMockAllocator(t) alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(nil, fmt.Errorf("mock err")) b := binlogIO{cm, alloc} - _, _, err := b.uploadStatsLog(context.Background(), 1, 10, genInsertData(), genTestStat(meta), 10, meta) + _, _, err := b.uploadStatsLog(context.Background(), 1, 10, genInsertData(2), genTestStat(meta), 10, meta) assert.Error(t, err) }) }) @@ -140,7 +140,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(nil, fmt.Errorf("mock err")) - _, err := b.uploadInsertLog(context.Background(), 1, 10, genInsertData(), meta) + _, err := b.uploadInsertLog(context.Background(), 1, 10, genInsertData(2), meta) assert.Error(t, err) }) @@ -154,7 +154,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - _, err := b.uploadInsertLog(ctx, 1, 10, genInsertData(), meta) + _, err := b.uploadInsertLog(ctx, 1, 10, genInsertData(2), meta) assert.Error(t, err) }) }) @@ -260,7 +260,7 @@ func TestBinlogIOInnerMethods(t *testing.T) { iCodec := storage.NewInsertCodecWithSchema(meta) kvs := make(map[string][]byte) - pin, err := b.genInsertBlobs(genInsertData(), 10, 1, iCodec, kvs) + pin, err := b.genInsertBlobs(genInsertData(2), 10, 1, iCodec, kvs) assert.NoError(t, err) assert.Equal(t, 12, len(pin)) @@ -301,7 +301,7 @@ func TestBinlogIOInnerMethods(t *testing.T) { bin := &binlogIO{cm, alloc} kvs := make(map[string][]byte) - pin, err := bin.genInsertBlobs(genInsertData(), 10, 1, iCodec, kvs) + pin, err := bin.genInsertBlobs(genInsertData(2), 10, 1, iCodec, kvs) assert.Error(t, err) assert.Empty(t, kvs) diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 3e25a432d6..93483c6f8c 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -176,31 +176,10 @@ func (t *compactionTask) uploadRemainLog( meta *etcdpb.CollectionMeta, stats *storage.PrimaryKeyStats, totRows int64, - fID2Content map[UniqueID][]interface{}, + writeBuffer *storage.InsertData, fID2Type map[UniqueID]schemapb.DataType, ) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) { - var iData *InsertData - - // remain insert data - if len(fID2Content) != 0 { - iData = &InsertData{Data: make(map[storage.FieldID]storage.FieldData)} - for fID, content := range fID2Content { - tp, ok := fID2Type[fID] - if !ok { - log.Warn("no field ID in this schema", zap.Int64("fieldID", fID)) - return nil, nil, errors.New("Unexpected error") - } - - fData, err := interface2FieldData(tp, content, int64(len(content))) - if err != nil { - log.Warn("transfer interface to FieldData wrong", zap.Error(err)) - return nil, nil, err - } - iData.Data[fID] = fData - } - } - - inPaths, statPaths, err := t.uploadStatsLog(ctxTimeout, targetSegID, partID, iData, stats, totRows, meta) + inPaths, statPaths, err := t.uploadStatsLog(ctxTimeout, targetSegID, partID, writeBuffer, stats, totRows, meta) if err != nil { return nil, nil, err } @@ -213,29 +192,10 @@ func (t *compactionTask) uploadSingleInsertLog( targetSegID UniqueID, partID UniqueID, meta *etcdpb.CollectionMeta, - fID2Content map[UniqueID][]interface{}, + writeBuffer *storage.InsertData, fID2Type map[UniqueID]schemapb.DataType, ) (map[UniqueID]*datapb.FieldBinlog, error) { - iData := &InsertData{ - Data: make(map[storage.FieldID]storage.FieldData), - } - - for fID, content := range fID2Content { - tp, ok := fID2Type[fID] - if !ok { - log.Warn("no field ID in this schema", zap.Int64("fieldID", fID)) - return nil, errors.New("Unexpected error") - } - - fData, err := interface2FieldData(tp, content, int64(len(content))) - if err != nil { - log.Warn("transfer interface to FieldData wrong", zap.Error(err)) - return nil, err - } - iData.Data[fID] = fData - } - - inPaths, err := t.uploadInsertLog(ctxTimeout, targetSegID, partID, iData, meta) + inPaths, err := t.uploadInsertLog(ctxTimeout, targetSegID, partID, writeBuffer, meta) if err != nil { return nil, err } @@ -255,13 +215,11 @@ func (t *compactionTask) merge( mergeStart := time.Now() var ( - maxRowsPerBinlog int // maximum rows populating one binlog - numBinlogs int // binlog number - numRows int64 // the number of rows uploaded - expired int64 // the number of expired entity + numBinlogs int // binlog number + numRows int64 // the number of rows uploaded + expired int64 // the number of expired entity - fID2Type = make(map[UniqueID]schemapb.DataType) - fID2Content = make(map[UniqueID][]interface{}) + fID2Type = make(map[UniqueID]schemapb.DataType) insertField2Path = make(map[UniqueID]*datapb.FieldBinlog) insertPaths = make([]*datapb.FieldBinlog, 0) @@ -269,6 +227,10 @@ func (t *compactionTask) merge( statField2Path = make(map[UniqueID]*datapb.FieldBinlog) statPaths = make([]*datapb.FieldBinlog, 0) ) + writeBuffer, err := storage.NewInsertData(meta.GetSchema()) + if err != nil { + return nil, nil, -1, err + } isDeletedValue := func(v *storage.Value) bool { ts, ok := delta[v.PK.GetValue()] @@ -326,19 +288,6 @@ func (t *compactionTask) merge( pkID := pkField.GetFieldID() pkType := pkField.GetDataType() - // estimate Rows per binlog - // TODO should not convert size to row because we already know the size, this is especially important on varchar types. - size, err := typeutil.EstimateSizePerRecord(meta.GetSchema()) - if err != nil { - log.Warn("failed to estimate size per record", zap.Error(err)) - return nil, nil, 0, err - } - - maxRowsPerBinlog = int(Params.DataNodeCfg.BinLogMaxSize.GetAsInt64() / int64(size)) - if Params.DataNodeCfg.BinLogMaxSize.GetAsInt64()%int64(size) != 0 { - maxRowsPerBinlog++ - } - expired = 0 numRows = 0 numBinlogs = 0 @@ -410,19 +359,19 @@ func (t *compactionTask) merge( return nil, nil, 0, errors.New("unexpected error") } - for fID, vInter := range row { - if _, ok := fID2Content[fID]; !ok { - fID2Content[fID] = make([]interface{}, 0) - } - fID2Content[fID] = append(fID2Content[fID], vInter) + err = writeBuffer.Append(row) + if err != nil { + return nil, nil, 0, err } - // update pk to new stats log - stats.Update(v.PK) currentRows++ - if currentRows >= maxRowsPerBinlog { + stats.Update(v.PK) + + // check size every 100 rows in case of too many `GetMemorySize` call + if (currentRows+1)%100 == 0 && writeBuffer.GetMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsInt() { + numRows += int64(writeBuffer.GetRowNum()) uploadInsertStart := time.Now() - inPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, fID2Content, fID2Type) + inPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, writeBuffer, fID2Type) if err != nil { log.Warn("failed to upload single insert log", zap.Error(err)) return nil, nil, 0, err @@ -432,19 +381,19 @@ func (t *compactionTask) merge( timestampFrom = -1 timestampTo = -1 - fID2Content = make(map[int64][]interface{}) + writeBuffer, _ = storage.NewInsertData(meta.GetSchema()) currentRows = 0 - numRows += int64(maxRowsPerBinlog) numBinlogs++ } } } // upload stats log and remain insert rows - if numRows != 0 || currentRows != 0 { + if writeBuffer.GetRowNum() > 0 || numRows > 0 { + numRows += int64(writeBuffer.GetRowNum()) uploadStart := time.Now() inPaths, statsPaths, err := t.uploadRemainLog(ctxTimeout, targetSegID, partID, meta, - stats, numRows+int64(currentRows), fID2Content, fID2Type) + stats, numRows+int64(currentRows), writeBuffer, fID2Type) if err != nil { return nil, nil, 0, err } @@ -452,7 +401,6 @@ func (t *compactionTask) merge( uploadInsertTimeCost += time.Since(uploadStart) addInsertFieldPath(inPaths, timestampFrom, timestampTo) addStatFieldPath(statsPaths) - numRows += int64(currentRows) numBinlogs += len(inPaths) } diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index a77cf11f85..aa965093c4 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -367,7 +367,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { defer func() { Params.Save(Params.DataNodeCfg.BinLogMaxSize.Key, BinLogMaxSize) }() - paramtable.Get().Save(Params.DataNodeCfg.BinLogMaxSize.Key, "128") + paramtable.Get().Save(Params.DataNodeCfg.BinLogMaxSize.Key, "64") iData := genInsertDataWithExpiredTS() meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) @@ -402,14 +402,14 @@ func TestCompactionTaskInnerMethods(t *testing.T) { inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm) assert.NoError(t, err) assert.Equal(t, int64(2), numOfRow) - assert.Equal(t, 2, len(inPaths[0].GetBinlogs())) + assert.Equal(t, 1, len(inPaths[0].GetBinlogs())) assert.Equal(t, 1, len(statsPaths)) assert.Equal(t, 1, len(statsPaths[0].GetBinlogs())) assert.NotEqual(t, -1, inPaths[0].GetBinlogs()[0].GetTimestampFrom()) assert.NotEqual(t, -1, inPaths[0].GetBinlogs()[0].GetTimestampTo()) }) // set Params.DataNodeCfg.BinLogMaxSize.Key = 1 to generate multi binlogs, each has only one row - t.Run("Merge without expiration3", func(t *testing.T) { + t.Run("merge_with_more_than_100rows", func(t *testing.T) { mockbIO := &binlogIO{cm, alloc} paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") BinLogMaxSize := Params.DataNodeCfg.BinLogMaxSize.GetAsInt() @@ -417,7 +417,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { paramtable.Get().Save(Params.DataNodeCfg.BinLogMaxSize.Key, fmt.Sprintf("%d", BinLogMaxSize)) }() paramtable.Get().Save(Params.DataNodeCfg.BinLogMaxSize.Key, "1") - iData := genInsertDataWithExpiredTS() + iData := genInsertData(101) var allPaths [][]string inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) @@ -451,14 +451,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) { } inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm) assert.NoError(t, err) - assert.Equal(t, int64(2), numOfRow) + assert.Equal(t, int64(101), numOfRow) assert.Equal(t, 2, len(inPaths[0].GetBinlogs())) assert.Equal(t, 1, len(statsPaths)) for _, inpath := range inPaths { assert.NotEqual(t, -1, inpath.GetBinlogs()[0].GetTimestampFrom()) assert.NotEqual(t, -1, inpath.GetBinlogs()[0].GetTimestampTo()) - // as only one row for each binlog, timestampTo == timestampFrom - assert.Equal(t, inpath.GetBinlogs()[0].GetTimestampTo(), inpath.GetBinlogs()[0].GetTimestampFrom()) } }) @@ -742,36 +740,6 @@ func TestCompactionTaskInnerMethods(t *testing.T) { t.Run("Test uploadRemainLog error", func(t *testing.T) { f := &MetaFactory{} - t.Run("field not in field to type", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - - ct := &compactionTask{ - done: make(chan struct{}, 1), - } - meta := f.GetCollectionMeta(UniqueID(10001), "test_upload_remain_log", schemapb.DataType_Int64) - fid2C := make(map[int64][]interface{}) - fid2T := make(map[int64]schemapb.DataType) - fid2C[1] = nil - _, _, err := ct.uploadRemainLog(ctx, 1, 2, meta, nil, 0, fid2C, fid2T) - assert.Error(t, err) - }) - - t.Run("transfer interface wrong", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - - ct := &compactionTask{ - done: make(chan struct{}, 1), - } - meta := f.GetCollectionMeta(UniqueID(10001), "test_upload_remain_log", schemapb.DataType_Int64) - fid2C := make(map[int64][]interface{}) - fid2T := make(map[int64]schemapb.DataType) - fid2C[1] = nil - _, _, err := ct.uploadRemainLog(ctx, 1, 2, meta, nil, 0, fid2C, fid2T) - assert.Error(t, err) - }) - t.Run("upload failed", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 211310f599..854c593149 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -22,9 +22,11 @@ import ( "encoding/binary" "fmt" "math" + "math/rand" "time" "github.com/cockroachdb/errors" + "github.com/samber/lo" "github.com/stretchr/testify/mock" "go.uber.org/zap" "google.golang.org/grpc" @@ -1089,7 +1091,7 @@ func (f *FailMessageStreamFactory) NewTtMsgStream(ctx context.Context) (msgstrea } func genInsertDataWithPKs(PKs [2]storage.PrimaryKey, dataType schemapb.DataType) *InsertData { - iD := genInsertData() + iD := genInsertData(2) switch dataType { case schemapb.DataType_Int64: values := make([]int64, len(PKs)) @@ -1121,46 +1123,46 @@ func genTestStat(meta *etcdpb.CollectionMeta) *storage.PrimaryKeyStats { return stats } -func genInsertData() *InsertData { +func genInsertData(rowNum int) *InsertData { return &InsertData{ Data: map[int64]storage.FieldData{ 0: &storage.Int64FieldData{ - Data: []int64{1, 2}, + Data: lo.RepeatBy(rowNum, func(i int) int64 { return int64(i + 1) }), }, 1: &storage.Int64FieldData{ - Data: []int64{3, 4}, + Data: lo.RepeatBy(rowNum, func(i int) int64 { return int64(i + 3) }), }, 100: &storage.FloatVectorFieldData{ - Data: []float32{1.0, 6.0, 7.0, 8.0}, + Data: lo.RepeatBy(rowNum*2, func(i int) float32 { return rand.Float32() }), Dim: 2, }, 101: &storage.BinaryVectorFieldData{ - Data: []byte{0, 255, 255, 255, 128, 128, 128, 0}, + Data: lo.RepeatBy(rowNum*4, func(i int) byte { return byte(rand.Intn(256)) }), Dim: 32, }, 102: &storage.BoolFieldData{ - Data: []bool{true, false}, + Data: lo.RepeatBy(rowNum, func(i int) bool { return i%2 == 0 }), }, 103: &storage.Int8FieldData{ - Data: []int8{5, 6}, + Data: lo.RepeatBy(rowNum, func(i int) int8 { return int8(i) }), }, 104: &storage.Int16FieldData{ - Data: []int16{7, 8}, + Data: lo.RepeatBy(rowNum, func(i int) int16 { return int16(i) }), }, 105: &storage.Int32FieldData{ - Data: []int32{9, 10}, + Data: lo.RepeatBy(rowNum, func(i int) int32 { return int32(i) }), }, 106: &storage.Int64FieldData{ - Data: []int64{1, 2}, + Data: lo.RepeatBy(rowNum, func(i int) int64 { return int64(i) }), }, 107: &storage.FloatFieldData{ - Data: []float32{2.333, 2.334}, + Data: lo.RepeatBy(rowNum, func(i int) float32 { return rand.Float32() }), }, 108: &storage.DoubleFieldData{ - Data: []float64{3.333, 3.334}, + Data: lo.RepeatBy(rowNum, func(i int) float64 { return rand.Float64() }), }, 109: &storage.StringFieldData{ - Data: []string{"test1", "test2"}, + Data: lo.RepeatBy(rowNum, func(i int) string { return fmt.Sprintf("test%d", i) }), }, }, }