diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index bb655a0208..f0623c4e31 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -293,6 +293,7 @@ type LocalSegment struct { rowNum *atomic.Int64 insertCount *atomic.Int64 + deltaMut sync.Mutex lastDeltaTimestamp *atomic.Uint64 fields *typeutil.ConcurrentMap[int64, *FieldInfo] fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo] // indexID -> IndexedFieldInfo @@ -716,6 +717,16 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys storage.PrimaryKe } defer s.ptrLock.Unpin() + s.deltaMut.Lock() + defer s.deltaMut.Unlock() + + if s.lastDeltaTimestamp.Load() >= timestamps[len(timestamps)-1] { + log.Info("skip delete due to delete record before lastDeltaTimestamp", + zap.Int64("segmentID", s.ID()), + zap.Uint64("lastDeltaTimestamp", s.lastDeltaTimestamp.Load())) + return nil + } + var err error GetDynamicPool().Submit(func() (any, error) { start := time.Now() @@ -899,6 +910,15 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del zap.Int64("segmentID", s.ID()), ) + s.deltaMut.Lock() + defer s.deltaMut.Unlock() + + if s.lastDeltaTimestamp.Load() >= tss[len(tss)-1] { + log.Info("skip load delta data due to delete record before lastDeltaTimestamp", + zap.Uint64("lastDeltaTimestamp", s.lastDeltaTimestamp.Load())) + return nil + } + ids, err := storage.ParsePrimaryKeysBatch2IDs(pks) if err != nil { return err