From f7184101e1cb770f0ea1b7fffad287d14ed907da Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 1 Aug 2024 16:08:13 +0800 Subject: [PATCH] fix: [cherry-pick] Fix data race for clustering buffer writer (#35146) issue: #34495 master pr: #35145 Signed-off-by: Cai Zhang --- .../compaction/clustering_compactor.go | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index c6ea167ea5..f4202a1cb8 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -456,14 +456,6 @@ func (t *clusteringCompactionTask) getBufferTotalUsedMemorySize() int64 { return totalBufferSize } -func (t *clusteringCompactionTask) getCurrentBufferWrittenMemorySize() int64 { - var totalBufferSize int64 = 0 - for _, buffer := range t.clusterBuffers { - totalBufferSize = totalBufferSize + int64(buffer.writer.WrittenMemorySize()) - } - return totalBufferSize -} - // read insert log of one segment, mappingSegment into buckets according to clusteringKey. flush data to file when necessary func (t *clusteringCompactionTask) mappingSegment( ctx context.Context, @@ -594,9 +586,12 @@ func (t *clusteringCompactionTask) mappingSegment( if (remained+1)%100 == 0 { currentBufferTotalMemorySize := t.getBufferTotalUsedMemorySize() // trigger flushBinlog + + t.clusterBufferLocks.RLock(clusterBuffer.id) + currentBufferWriterFull := clusterBuffer.writer.IsFull() + t.clusterBufferLocks.RUnlock(clusterBuffer.id) currentSegmentNumRows := clusterBuffer.currentSegmentRowNum.Load() - if currentSegmentNumRows > t.plan.GetMaxSegmentRows() || - clusterBuffer.writer.IsFull() { + if currentSegmentNumRows > t.plan.GetMaxSegmentRows() || currentBufferWriterFull { // reach segment/binlog max size t.clusterBufferLocks.Lock(clusterBuffer.id) writer := clusterBuffer.writer @@ -746,12 +741,15 @@ func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) erro _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "flushLargestBuffers") defer span.End() bufferIDs := make([]int, 0) + bufferRowNums := make([]int64, 0) for _, buffer := range t.clusterBuffers { bufferIDs = append(bufferIDs, buffer.id) + t.clusterBufferLocks.RLock(buffer.id) + bufferRowNums = append(bufferRowNums, buffer.writer.GetRowNum()) + t.clusterBufferLocks.RUnlock(buffer.id) } sort.Slice(bufferIDs, func(i, j int) bool { - return t.clusterBuffers[i].writer.GetRowNum() > - t.clusterBuffers[j].writer.GetRowNum() + return bufferRowNums[i] > bufferRowNums[j] }) log.Info("start flushLargestBuffers", zap.Ints("bufferIDs", bufferIDs), zap.Int64("currentMemorySize", currentMemorySize))