From 7721edf32a26c6abf5fe3e06e950b4ff9dff4be4 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 1 Sep 2025 14:29:52 +0800 Subject: [PATCH] enhance: Add mutex and range check preventing concurrent del (#44128) This PR adds a mutex prevent concurrent applying delete on same segment and check latestDeltaTimestamp to skip overlapping delete range Signed-off-by: Congqi Xia --- internal/querynodev2/segments/segment.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 73ad771d24..729e3cb394 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -30,6 +30,7 @@ import "C" import ( "context" "fmt" + "sync" "time" "unsafe" @@ -292,6 +293,7 @@ type LocalSegment struct { rowNum *atomic.Int64 insertCount *atomic.Int64 + deltaMut sync.Mutex lastDeltaTimestamp *atomic.Uint64 fields *typeutil.ConcurrentMap[int64, *FieldInfo] fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo] // indexID -> IndexedFieldInfo @@ -702,6 +704,16 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys storage.PrimaryKe } defer s.ptrLock.Unpin() + s.deltaMut.Lock() + defer s.deltaMut.Unlock() + + if s.lastDeltaTimestamp.Load() >= timestamps[len(timestamps)-1] { + log.Info("skip delete due to delete record before lastDeltaTimestamp", + zap.Int64("segmentID", s.ID()), + zap.Uint64("lastDeltaTimestamp", s.lastDeltaTimestamp.Load())) + return nil + } + var err error GetDynamicPool().Submit(func() (any, error) { start := time.Now() @@ -887,6 +899,15 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del zap.Int64("segmentID", s.ID()), ) + s.deltaMut.Lock() + defer s.deltaMut.Unlock() + + if s.lastDeltaTimestamp.Load() >= tss[len(tss)-1] { + log.Info("skip load delta data due to delete record before lastDeltaTimestamp", + zap.Uint64("lastDeltaTimestamp", s.lastDeltaTimestamp.Load())) + return nil + } + ids, err := storage.ParsePrimaryKeysBatch2IDs(pks) if err != nil { return err