enhance: [2.5] Add mutex and range check preventing concurrent del (#44128) (#44202)

Cherry-pick from master
pr: #44128
This PR adds a mutex prevent concurrent applying delete on same segment
and check latestDeltaTimestamp to skip overlapping delete range

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-09-04 10:35:54 +08:00 committed by GitHub
parent c17ce3cf90
commit 7514eece4c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -293,6 +293,7 @@ type LocalSegment struct {
rowNum *atomic.Int64
insertCount *atomic.Int64
deltaMut sync.Mutex
lastDeltaTimestamp *atomic.Uint64
fields *typeutil.ConcurrentMap[int64, *FieldInfo]
fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo] // indexID -> IndexedFieldInfo
@ -716,6 +717,16 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys storage.PrimaryKe
}
defer s.ptrLock.Unpin()
s.deltaMut.Lock()
defer s.deltaMut.Unlock()
if s.lastDeltaTimestamp.Load() >= timestamps[len(timestamps)-1] {
log.Info("skip delete due to delete record before lastDeltaTimestamp",
zap.Int64("segmentID", s.ID()),
zap.Uint64("lastDeltaTimestamp", s.lastDeltaTimestamp.Load()))
return nil
}
var err error
GetDynamicPool().Submit(func() (any, error) {
start := time.Now()
@ -899,6 +910,15 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del
zap.Int64("segmentID", s.ID()),
)
s.deltaMut.Lock()
defer s.deltaMut.Unlock()
if s.lastDeltaTimestamp.Load() >= tss[len(tss)-1] {
log.Info("skip load delta data due to delete record before lastDeltaTimestamp",
zap.Uint64("lastDeltaTimestamp", s.lastDeltaTimestamp.Load()))
return nil
}
ids, err := storage.ParsePrimaryKeysBatch2IDs(pks)
if err != nil {
return err