From 643b9d521ca01e5a742fedf7eae7f2603d523b76 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Tue, 23 Jul 2024 10:11:49 +0800 Subject: [PATCH] fix: Fix the issue of concurrent packing of the same segment (#34840) issue: #34703 Signed-off-by: Cai Zhang --- .../compaction/clustering_compactor.go | 48 ++++++++++++++++--- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index e5352d500c..aa8d82755c 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -307,7 +307,7 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0), clusteringKeyFieldStats: fieldStats, } - if _, err = t.refreshBufferWriter(buffer); err != nil { + if _, err = t.refreshBufferWriterWithPack(buffer); err != nil { return err } t.clusterBuffers = append(t.clusterBuffers, buffer) @@ -361,7 +361,7 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0), clusteringKeyFieldStats: fieldStats, } - if _, err = t.refreshBufferWriter(clusterBuffer); err != nil { + if _, err = t.refreshBufferWriterWithPack(clusterBuffer); err != nil { return err } t.clusterBuffers = append(t.clusterBuffers, clusterBuffer) @@ -414,6 +414,10 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, return nil, nil, err } + if err := t.checkBuffersAfterCompaction(); err != nil { + return nil, nil, err + } + resultSegments := make([]*datapb.CompactionSegment, 0) resultPartitionStats := &storage.PartitionStatsSnapshot{ SegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats), @@ -603,7 +607,7 @@ func (t *clusteringCompactionTask) mappingSegment( // reach segment/binlog max size t.clusterBufferLocks.Lock(clusterBuffer.id) writer := clusterBuffer.writer - pack, _ := t.refreshBufferWriter(clusterBuffer) + pack, _ := t.refreshBufferWriterWithPack(clusterBuffer) log.Debug("buffer need to flush", zap.Int("bufferID", clusterBuffer.id), zap.Bool("pack", pack), zap.Int64("current segment", writer.GetSegmentID()), @@ -763,7 +767,10 @@ func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) erro buffer := t.clusterBuffers[bufferId] writer := buffer.writer currentMemorySize -= int64(writer.WrittenMemorySize()) - pack, _ := t.refreshBufferWriter(buffer) + if err := t.refreshBufferWriter(buffer); err != nil { + t.clusterBufferLocks.Unlock(bufferId) + return err + } t.clusterBufferLocks.Unlock(bufferId) log.Info("currentMemorySize after flush buffer binlog", @@ -772,7 +779,7 @@ func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) erro zap.Uint64("WrittenMemorySize()", writer.WrittenMemorySize()), zap.Int64("RowNum", writer.GetRowNum())) future := t.flushPool.Submit(func() (any, error) { - err := t.flushBinlog(ctx, buffer, writer, pack) + err := t.flushBinlog(ctx, buffer, writer, false) if err != nil { return nil, err } @@ -856,7 +863,7 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff zap.Int64("row num", seg.GetNumOfRows())) // clear segment binlogs cache - buffer.flushedBinlogs[writer.GetSegmentID()] = nil + delete(buffer.flushedBinlogs, writer.GetSegmentID()) //set old writer nil writer = nil return nil @@ -1153,7 +1160,7 @@ func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]in return buckets } -func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (bool, error) { +func (t *clusteringCompactionTask) refreshBufferWriterWithPack(buffer *ClusterBuffer) (bool, error) { var segmentID int64 var err error var pack bool @@ -1179,6 +1186,33 @@ func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (b return pack, nil } +func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) error { + var segmentID int64 + var err error + segmentID = buffer.writer.GetSegmentID() + buffer.bufferMemorySize.Add(int64(buffer.writer.WrittenMemorySize())) + + writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, segmentID, t.partitionID, t.collectionID) + if err != nil { + return err + } + + buffer.writer = writer + return nil +} + func (t *clusteringCompactionTask) GetSlotUsage() int64 { return t.plan.GetSlotUsage() } + +func (t *clusteringCompactionTask) checkBuffersAfterCompaction() error { + for _, buffer := range t.clusterBuffers { + if len(buffer.flushedBinlogs) != 0 { + log.Warn("there are some binlogs have leaked, please check", zap.Int("buffer id", buffer.id), + zap.Int64s("leak segments", lo.Keys(buffer.flushedBinlogs))) + log.Debug("leak binlogs", zap.Any("buffer flushedBinlogs", buffer.flushedBinlogs)) + return fmt.Errorf("there are some binlogs have leaked") + } + } + return nil +}