diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index ef2209ade8..1a20260dec 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -107,8 +107,9 @@ type ClusterBuffer struct { bufferMemorySize atomic.Int64 - flushedRowNum atomic.Int64 - flushedBinlogs map[typeutil.UniqueID]*datapb.FieldBinlog + flushedRowNum atomic.Int64 + currentSegmentRowNum atomic.Int64 + flushedBinlogs map[typeutil.UniqueID]*datapb.FieldBinlog uploadedSegments []*datapb.CompactionSegment uploadedSegmentStats map[typeutil.UniqueID]storage.SegmentStats @@ -580,14 +581,15 @@ func (t *clusteringCompactionTask) mappingSegment( // trigger flushBinlog currentBufferNum := clusterBuffer.writer.GetRowNum() - if clusterBuffer.flushedRowNum.Load()+currentBufferNum > t.plan.GetMaxSegmentRows() || + if clusterBuffer.currentSegmentRowNum.Load()+currentBufferNum > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.IsFull() { // reach segment/binlog max size t.clusterBufferLocks.Lock(clusterBuffer.id) writer := clusterBuffer.writer pack, _ := t.refreshBufferWriter(clusterBuffer) log.Debug("buffer need to flush", zap.Int("bufferID", clusterBuffer.id), - zap.Bool("pack", pack), zap.Int64("buffer num", currentBufferNum)) + zap.Bool("pack", pack), zap.Int64("buffer num", currentBufferNum), + zap.Int64("clusterBuffer.flushedRowNum.Load()", clusterBuffer.flushedRowNum.Load())) t.clusterBufferLocks.Unlock(clusterBuffer.id) t.flushChan <- FlushSignal{ @@ -653,6 +655,7 @@ func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuf return err } t.writtenRowNum.Inc() + clusterBuffer.currentSegmentRowNum.Inc() return nil } @@ -1131,12 +1134,13 @@ func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (b segmentID = buffer.writer.GetSegmentID() buffer.bufferMemorySize.Add(int64(buffer.writer.WrittenMemorySize())) } - if buffer.writer == nil || buffer.flushedRowNum.Load()+buffer.writer.GetRowNum() > t.plan.GetMaxSegmentRows() { + if buffer.writer == nil || buffer.currentSegmentRowNum.Load()+buffer.writer.GetRowNum() > t.plan.GetMaxSegmentRows() { pack = true segmentID, err = t.allocator.AllocOne() if err != nil { return pack, err } + buffer.currentSegmentRowNum.Store(0) } writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, segmentID, t.partitionID, t.collectionID)