From 7c1d35dcd676c1b601507473105e35d064e665bd Mon Sep 17 00:00:00 2001 From: yah01 Date: Mon, 3 Jul 2023 15:04:25 +0800 Subject: [PATCH] Fix delete record may be re-applied (#24858) Signed-off-by: yah01 --- internal/querynode/load_segment_task.go | 23 ++++++++-- internal/querynode/segment.go | 58 +++++++++++++++++-------- internal/querynode/segment_loader.go | 15 +------ internal/querynode/segment_test.go | 12 +++++ internal/storage/stats.go | 1 + 5 files changed, 74 insertions(+), 35 deletions(-) diff --git a/internal/querynode/load_segment_task.go b/internal/querynode/load_segment_task.go index 1731770274..2e781bb1c3 100644 --- a/internal/querynode/load_segment_task.go +++ b/internal/querynode/load_segment_task.go @@ -102,10 +102,25 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error { pos := deltaPosition runningGroup.Go(func() error { // reload data from dml channel - return l.node.loader.FromDmlCPLoadDelete(groupCtx, l.req.CollectionID, pos, - lo.FilterMap(l.req.Infos, func(info *queryPb.SegmentLoadInfo, _ int) (int64, bool) { - return info.GetSegmentID(), funcutil.SliceContain(loadDoneSegmentIDs, info.SegmentID) && info.GetInsertChannel() == pos.GetChannelName() - })) + segments := lo.FilterMap(l.req.Infos, func(info *queryPb.SegmentLoadInfo, _ int) (int64, bool) { + return info.GetSegmentID(), funcutil.SliceContain(loadDoneSegmentIDs, info.SegmentID) && info.GetInsertChannel() == pos.GetChannelName() + }) + err := l.node.loader.FromDmlCPLoadDelete(groupCtx, l.req.CollectionID, pos, segments) + if err != nil { + return err + } + + for _, id := range segments { + segment, err := l.node.metaReplica.getSegmentByID(id, segmentTypeSealed) + if err != nil { + return err + } + err = segment.FlushDelete() + if err != nil { + return err + } + } + return nil }) } err = runningGroup.Wait() diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 8c7b546940..ee4ffdbfdc 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -108,12 +108,10 @@ func (r *DeleteRecords) TryAppend(pks []primaryKey, timestamps []Timestamp) bool return true } -func (r *DeleteRecords) Flush() ([]primaryKey, []Timestamp) { +func (r *DeleteRecords) Flush(handler func([]primaryKey, []Timestamp) error) error { r.mut.Lock() defer r.mut.Unlock() - r.flushed = true - sort.Sort(ByTimestamp(r.records)) pks := make([]primaryKey, len(r.records)) timestamps := make([]Timestamp, len(r.records)) @@ -121,8 +119,16 @@ func (r *DeleteRecords) Flush() ([]primaryKey, []Timestamp) { pks[i] = r.records[i].pk timestamps[i] = r.records[i].timestamp } + + err := handler(pks, timestamps) + if err != nil { + return err + } + r.records = nil - return pks, timestamps + r.flushed = true + + return nil } // Segment is a wrapper of the underlying C-structure segment. @@ -136,10 +142,11 @@ type Segment struct { version UniqueID startPosition *internalpb.MsgPosition // for growing segment release - vChannelID Channel - lastMemSize int64 - lastRowCount int64 - deleteBuffer DeleteRecords + vChannelID Channel + lastMemSize int64 + lastRowCount int64 + deleteBuffer DeleteRecords + flushedDeletedTimestamp Timestamp recentlyModified *atomic.Bool segmentType *atomic.Int32 @@ -336,7 +343,7 @@ func (s *Segment) getDeletedCount() int64 { return -1 } - deletedCount := C.GetRowCount(s.segmentPtr) + deletedCount := C.GetDeletedCount(s.segmentPtr) return int64(deletedCount) } @@ -784,16 +791,26 @@ func (s *Segment) segmentDelete(entityIDs []primaryKey, timestamps []Timestamp) return nil } + return s.deleteImpl(entityIDs, timestamps) +} + +func (s *Segment) deleteImpl(pks []primaryKey, timestamps []Timestamp) error { s.mut.RLock() defer s.mut.RUnlock() if !s.healthy() { return fmt.Errorf("%w(segmentID=%d)", ErrSegmentUnhealthy, s.segmentID) } - return s.deleteImpl(entityIDs, timestamps) -} + start := sort.Search(len(timestamps), func(i int) bool { + return timestamps[i] >= s.flushedDeletedTimestamp+1 + }) + // all delete records have been applied, skip them + if start == len(timestamps) { + return nil + } + pks = pks[start:] + timestamps = timestamps[start:] -func (s *Segment) deleteImpl(pks []primaryKey, timestamps []Timestamp) error { var cSize = C.int64_t(len(pks)) var cTimestampsPtr = (*C.uint64_t)(&(timestamps)[0]) offset := C.PreDelete(s.segmentPtr, cSize) @@ -840,12 +857,19 @@ func (s *Segment) deleteImpl(pks []primaryKey, timestamps []Timestamp) error { } func (s *Segment) FlushDelete() error { - pks, tss := s.deleteBuffer.Flush() - if len(pks) == 0 { - return nil - } + return s.deleteBuffer.Flush(func(pks []primaryKey, tss []Timestamp) error { + if len(pks) == 0 { + return nil + } - return s.deleteImpl(pks, tss) + err := s.deleteImpl(pks, tss) + if err != nil { + return err + } + + s.flushedDeletedTimestamp = tss[len(pks)-1] + return nil + }) } // -------------------------------------------------------------------------------------- interfaces for sealed segment diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 0a96a443db..c8041c0352 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -227,19 +227,6 @@ func (loader *segmentLoader) LoadSegment(ctx context.Context, req *querypb.LoadS failedSetMetaSegmentIDs := make([]UniqueID, 0) for _, id := range loadDoneSegmentIDSet.Collect() { segment := newSegments[id] - err = segment.FlushDelete() - if err != nil { - log.Error("load segment failed, set segment to meta failed", - zap.Int64("collectionID", segment.collectionID), - zap.Int64("partitionID", segment.partitionID), - zap.Int64("segmentID", segment.segmentID), - zap.Int64("loadSegmentRequest msgID", req.Base.MsgID), - zap.Error(err)) - failedSetMetaSegmentIDs = append(failedSetMetaSegmentIDs, id) - loadDoneSegmentIDSet.Remove(id) - continue - } - err = loader.metaReplica.setSegment(segment) if err != nil { log.Error("load segment failed, set segment to meta failed", @@ -568,7 +555,7 @@ func (loader *segmentLoader) loadStatslog(ctx context.Context, segment *Segment, stats, err := storage.DeserializeStats(blobs) if err != nil { log.Warn("failed to deserialize stats", zap.Error(err)) - return err + return multierr.Combine(errBinlogCorrupted, err) } var size uint for _, stat := range stats { diff --git a/internal/querynode/segment_test.go b/internal/querynode/segment_test.go index d3ef60ee9c..b23d2853b5 100644 --- a/internal/querynode/segment_test.go +++ b/internal/querynode/segment_test.go @@ -225,6 +225,9 @@ func TestSegment_getDeletedCount(t *testing.T) { assert.Equal(t, segmentID, segment.segmentID) assert.Nil(t, err) + err = segment.FlushDelete() + assert.NoError(t, err) + insertMsg, err := genSimpleInsertMsg(schema, defaultMsgLength) assert.NoError(t, err) insertRecord := &segcorepb.InsertRecord{ @@ -1036,5 +1039,14 @@ func TestDeleteBuff(t *testing.T) { assert.NoError(t, err) err = seg.FlushDelete() assert.NoError(t, err) + deletedCount := seg.getDeletedCount() + assert.EqualValues(t, 2, deletedCount) + + // should delete nothing + err = seg.segmentDelete([]primaryKey{newInt64PrimaryKey(1), newInt64PrimaryKey(2)}, + []Timestamp{11, 12}) + assert.NoError(t, err) + deletedCount = seg.getDeletedCount() + assert.EqualValues(t, 2, deletedCount) }) } diff --git a/internal/storage/stats.go b/internal/storage/stats.go index 053d83ffa3..908109608f 100644 --- a/internal/storage/stats.go +++ b/internal/storage/stats.go @@ -221,6 +221,7 @@ func DeserializeStats(blobs []*Blob) ([]*PrimaryKeyStats, error) { if len(blob.Value) == 0 { continue } + sr := &StatsReader{} sr.SetBuffer(blob.Value) stats, err := sr.GetPrimaryKeyStats()