enhance: Add mutex and range check preventing concurrent del (#44128)

This PR adds a mutex prevent concurrent applying delete on same segment
and check latestDeltaTimestamp to skip overlapping delete range

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-09-01 14:29:52 +08:00 committed by GitHub
parent fc876639cf
commit 7721edf32a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -30,6 +30,7 @@ import "C"
import (
"context"
"fmt"
"sync"
"time"
"unsafe"
@ -292,6 +293,7 @@ type LocalSegment struct {
rowNum *atomic.Int64
insertCount *atomic.Int64
deltaMut sync.Mutex
lastDeltaTimestamp *atomic.Uint64
fields *typeutil.ConcurrentMap[int64, *FieldInfo]
fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo] // indexID -> IndexedFieldInfo
@ -702,6 +704,16 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys storage.PrimaryKe
}
defer s.ptrLock.Unpin()
s.deltaMut.Lock()
defer s.deltaMut.Unlock()
if s.lastDeltaTimestamp.Load() >= timestamps[len(timestamps)-1] {
log.Info("skip delete due to delete record before lastDeltaTimestamp",
zap.Int64("segmentID", s.ID()),
zap.Uint64("lastDeltaTimestamp", s.lastDeltaTimestamp.Load()))
return nil
}
var err error
GetDynamicPool().Submit(func() (any, error) {
start := time.Now()
@ -887,6 +899,15 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del
zap.Int64("segmentID", s.ID()),
)
s.deltaMut.Lock()
defer s.deltaMut.Unlock()
if s.lastDeltaTimestamp.Load() >= tss[len(tss)-1] {
log.Info("skip load delta data due to delete record before lastDeltaTimestamp",
zap.Uint64("lastDeltaTimestamp", s.lastDeltaTimestamp.Load()))
return nil
}
ids, err := storage.ParsePrimaryKeysBatch2IDs(pks)
if err != nil {
return err