From 323dee2fbc4053dd646c415f20a50410ca51b260 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Fri, 19 Jul 2024 18:27:40 +0800 Subject: [PATCH] fix: [cherry-pick] Fix the issue of concurrent packing of the same segment (#34838) issue: #34703 master pr: #34840 Signed-off-by: Cai Zhang --- .../compaction/clustering_compactor.go | 48 ++++++++++++++++--- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index f583162f17..81f0920b3f 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -300,7 +300,7 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0), clusteringKeyFieldStats: fieldStats, } - t.refreshBufferWriter(buffer) + t.refreshBufferWriterWithPack(buffer) t.clusterBuffers = append(t.clusterBuffers, buffer) for _, key := range bucket { scalarToClusterBufferMap[key] = buffer @@ -352,7 +352,7 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0), clusteringKeyFieldStats: fieldStats, } - t.refreshBufferWriter(clusterBuffer) + t.refreshBufferWriterWithPack(clusterBuffer) t.clusterBuffers = append(t.clusterBuffers, clusterBuffer) } t.offsetToBufferFunc = func(offset int64, idMapping []uint32) *ClusterBuffer { @@ -403,6 +403,10 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, return nil, nil, err } + if err := t.checkBuffersAfterCompaction(); err != nil { + return nil, nil, err + } + resultSegments := make([]*datapb.CompactionSegment, 0) resultPartitionStats := &storage.PartitionStatsSnapshot{ SegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats), @@ -592,7 +596,7 @@ func (t *clusteringCompactionTask) mappingSegment( // reach segment/binlog max size t.clusterBufferLocks.Lock(clusterBuffer.id) writer := clusterBuffer.writer - pack, _ := t.refreshBufferWriter(clusterBuffer) + pack, _ := t.refreshBufferWriterWithPack(clusterBuffer) log.Debug("buffer need to flush", zap.Int("bufferID", clusterBuffer.id), zap.Bool("pack", pack), zap.Int64("current segment", writer.GetSegmentID()), @@ -752,7 +756,10 @@ func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) erro buffer := t.clusterBuffers[bufferId] writer := buffer.writer currentMemorySize -= int64(writer.WrittenMemorySize()) - pack, _ := t.refreshBufferWriter(buffer) + if err := t.refreshBufferWriter(buffer); err != nil { + t.clusterBufferLocks.Unlock(bufferId) + return err + } t.clusterBufferLocks.Unlock(bufferId) log.Info("currentMemorySize after flush buffer binlog", @@ -761,7 +768,7 @@ func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) erro zap.Uint64("WrittenMemorySize()", writer.WrittenMemorySize()), zap.Int64("RowNum", writer.GetRowNum())) future := t.flushPool.Submit(func() (any, error) { - err := t.flushBinlog(ctx, buffer, writer, pack) + err := t.flushBinlog(ctx, buffer, writer, false) if err != nil { return nil, err } @@ -844,7 +851,7 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff zap.Int64("row num", seg.GetNumOfRows())) // clear segment binlogs cache - buffer.flushedBinlogs[writer.GetSegmentID()] = nil + delete(buffer.flushedBinlogs, writer.GetSegmentID()) //set old writer nil writer = nil return nil @@ -1141,7 +1148,7 @@ func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]in return buckets } -func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (bool, error) { +func (t *clusteringCompactionTask) refreshBufferWriterWithPack(buffer *ClusterBuffer) (bool, error) { var segmentID int64 var err error var pack bool @@ -1167,6 +1174,33 @@ func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (b return pack, nil } +func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) error { + var segmentID int64 + var err error + segmentID = buffer.writer.GetSegmentID() + buffer.bufferMemorySize.Add(int64(buffer.writer.WrittenMemorySize())) + + writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, segmentID, t.partitionID, t.collectionID) + if err != nil { + return err + } + + buffer.writer = writer + return nil +} + func (t *clusteringCompactionTask) GetSlotUsage() int64 { return t.plan.GetSlotUsage() } + +func (t *clusteringCompactionTask) checkBuffersAfterCompaction() error { + for _, buffer := range t.clusterBuffers { + if len(buffer.flushedBinlogs) != 0 { + log.Warn("there are some binlogs have leaked, please check", zap.Int("buffer id", buffer.id), + zap.Int64s("leak segments", lo.Keys(buffer.flushedBinlogs))) + log.Debug("leak binlogs", zap.Any("buffer flushedBinlogs", buffer.flushedBinlogs)) + return fmt.Errorf("there are some binlogs have leaked") + } + } + return nil +}