From 531b4da24930fd88a9cace67800a215d4b4e2818 Mon Sep 17 00:00:00 2001 From: Letian Jiang Date: Wed, 9 Mar 2022 10:07:59 +0800 Subject: [PATCH] Fix passing empty binlog content in compaction (#15909) (#15932) Signed-off-by: Letian Jiang --- internal/datanode/compactor.go | 27 ++++++++++++++----------- internal/datanode/compactor_test.go | 31 +++++++++++++++++++++++++++-- 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 1612cbe8b5..58a09fccd9 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -195,11 +195,11 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, mergeStart := time.Now() var ( - dim int // dimension of vector field - num int // numOfRows in each binlog - n int // binlog number - expired int64 // the number of expired entity - err error + dim int // dimension of float/binary vector field + maxRowsPerBinlog int // maximum rows populating one binlog + numBinlogs int // binlog number + expired int64 // the number of expired entity + err error iDatas = make([]*InsertData, 0) fID2Type = make(map[UniqueID]schemapb.DataType) @@ -261,10 +261,13 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, // calculate numRows from rowID field, fieldID 0 numRows := int64(len(fID2Content[0])) - num = int(Params.DataNodeCfg.FlushInsertBufferSize / (int64(dim) * 4)) - n = int(numRows)/num + 1 + maxRowsPerBinlog = int(Params.DataNodeCfg.FlushInsertBufferSize / (int64(dim) * 4)) + numBinlogs = int(numRows) / maxRowsPerBinlog + if int(numRows)%maxRowsPerBinlog != 0 { + numBinlogs++ + } - for i := 0; i < n; i++ { + for i := 0; i < numBinlogs; i++ { iDatas = append(iDatas, &InsertData{Data: make(map[storage.FieldID]storage.FieldData)}) } @@ -275,13 +278,13 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, return nil, 0, errors.New("Unexpected error") } - for i := 0; i < n; i++ { + for i := 0; i < numBinlogs; i++ { var c []interface{} - if i == n-1 { - c = content[i*num:] + if i == numBinlogs-1 { + c = content[i*maxRowsPerBinlog:] } else { - c = content[i*num : i*num+num] + c = content[i*maxRowsPerBinlog : i*maxRowsPerBinlog+maxRowsPerBinlog] } fData, err := interface2FieldData(tp, c, int64(len(c))) diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index eba9b6faea..23f766b247 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -260,6 +260,34 @@ func TestCompactionTaskInnerMethods(t *testing.T) { assert.Equal(t, 1, len(idata)) assert.NotEmpty(t, idata[0].Data) }) + t.Run("Merge without expiration2", func(t *testing.T) { + Params.DataCoordCfg.CompactionEntityExpiration = math.MaxInt64 + flushInsertBufferSize := Params.DataNodeCfg.FlushInsertBufferSize + defer func() { + Params.DataNodeCfg.FlushInsertBufferSize = flushInsertBufferSize + }() + Params.DataNodeCfg.FlushInsertBufferSize = 128 + iData := genInsertDataWithExpiredTS() + meta := NewMetaFactory().GetCollectionMeta(1, "test") + + iblobs, err := getInsertBlobs(100, iData, meta) + require.NoError(t, err) + + iitr, err := storage.NewInsertBinlogIterator(iblobs, 106) + require.NoError(t, err) + + mitr := storage.NewMergeIterator([]iterator{iitr}) + + dm := map[UniqueID]Timestamp{} + + ct := &compactionTask{} + idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime()) + assert.NoError(t, err) + assert.Equal(t, int64(2), numOfRow) + assert.Equal(t, 2, len(idata)) + assert.NotEmpty(t, idata[0].Data) + }) + t.Run("Merge with expiration", func(t *testing.T) { Params.DataCoordCfg.CompactionEntityExpiration = 864000 // 10 days in seconds iData := genInsertDataWithExpiredTS() @@ -281,8 +309,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), genTimestamp()) assert.NoError(t, err) assert.Equal(t, int64(0), numOfRow) - assert.Equal(t, 1, len(idata)) - assert.Empty(t, idata[0].Data) + assert.Equal(t, 0, len(idata)) }) })