fix: [cherry-pick] Fix the issue of concurrent packing of the same segment (#34838)

issue: #34703 

master pr: #34840

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2024-07-19 18:27:40 +08:00 committed by GitHub
parent 07bc1b6717
commit 323dee2fbc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -300,7 +300,7 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0),
clusteringKeyFieldStats: fieldStats,
}
t.refreshBufferWriter(buffer)
t.refreshBufferWriterWithPack(buffer)
t.clusterBuffers = append(t.clusterBuffers, buffer)
for _, key := range bucket {
scalarToClusterBufferMap[key] = buffer
@ -352,7 +352,7 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e
uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0),
clusteringKeyFieldStats: fieldStats,
}
t.refreshBufferWriter(clusterBuffer)
t.refreshBufferWriterWithPack(clusterBuffer)
t.clusterBuffers = append(t.clusterBuffers, clusterBuffer)
}
t.offsetToBufferFunc = func(offset int64, idMapping []uint32) *ClusterBuffer {
@ -403,6 +403,10 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
return nil, nil, err
}
if err := t.checkBuffersAfterCompaction(); err != nil {
return nil, nil, err
}
resultSegments := make([]*datapb.CompactionSegment, 0)
resultPartitionStats := &storage.PartitionStatsSnapshot{
SegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats),
@ -592,7 +596,7 @@ func (t *clusteringCompactionTask) mappingSegment(
// reach segment/binlog max size
t.clusterBufferLocks.Lock(clusterBuffer.id)
writer := clusterBuffer.writer
pack, _ := t.refreshBufferWriter(clusterBuffer)
pack, _ := t.refreshBufferWriterWithPack(clusterBuffer)
log.Debug("buffer need to flush", zap.Int("bufferID", clusterBuffer.id),
zap.Bool("pack", pack),
zap.Int64("current segment", writer.GetSegmentID()),
@ -752,7 +756,10 @@ func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) erro
buffer := t.clusterBuffers[bufferId]
writer := buffer.writer
currentMemorySize -= int64(writer.WrittenMemorySize())
pack, _ := t.refreshBufferWriter(buffer)
if err := t.refreshBufferWriter(buffer); err != nil {
t.clusterBufferLocks.Unlock(bufferId)
return err
}
t.clusterBufferLocks.Unlock(bufferId)
log.Info("currentMemorySize after flush buffer binlog",
@ -761,7 +768,7 @@ func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) erro
zap.Uint64("WrittenMemorySize()", writer.WrittenMemorySize()),
zap.Int64("RowNum", writer.GetRowNum()))
future := t.flushPool.Submit(func() (any, error) {
err := t.flushBinlog(ctx, buffer, writer, pack)
err := t.flushBinlog(ctx, buffer, writer, false)
if err != nil {
return nil, err
}
@ -844,7 +851,7 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff
zap.Int64("row num", seg.GetNumOfRows()))
// clear segment binlogs cache
buffer.flushedBinlogs[writer.GetSegmentID()] = nil
delete(buffer.flushedBinlogs, writer.GetSegmentID())
//set old writer nil
writer = nil
return nil
@ -1141,7 +1148,7 @@ func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]in
return buckets
}
func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (bool, error) {
func (t *clusteringCompactionTask) refreshBufferWriterWithPack(buffer *ClusterBuffer) (bool, error) {
var segmentID int64
var err error
var pack bool
@ -1167,6 +1174,33 @@ func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (b
return pack, nil
}
func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) error {
var segmentID int64
var err error
segmentID = buffer.writer.GetSegmentID()
buffer.bufferMemorySize.Add(int64(buffer.writer.WrittenMemorySize()))
writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, segmentID, t.partitionID, t.collectionID)
if err != nil {
return err
}
buffer.writer = writer
return nil
}
func (t *clusteringCompactionTask) GetSlotUsage() int64 {
return t.plan.GetSlotUsage()
}
func (t *clusteringCompactionTask) checkBuffersAfterCompaction() error {
for _, buffer := range t.clusterBuffers {
if len(buffer.flushedBinlogs) != 0 {
log.Warn("there are some binlogs have leaked, please check", zap.Int("buffer id", buffer.id),
zap.Int64s("leak segments", lo.Keys(buffer.flushedBinlogs)))
log.Debug("leak binlogs", zap.Any("buffer flushedBinlogs", buffer.flushedBinlogs))
return fmt.Errorf("there are some binlogs have leaked")
}
}
return nil
}