From 902f6506caaf4d799de45ce5bbcb1ccb5e5b1ff7 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Wed, 2 Apr 2025 16:32:22 +0800 Subject: [PATCH] fix: Get all children deltalogs for segment to load (#40956) issue: #40207 Signed-off-by: Cai Zhang --- internal/datacoord/services.go | 45 +++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index ded7771cd0..417a11e26c 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -439,16 +439,38 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR } infos := make([]*datapb.SegmentInfo, 0, len(req.GetSegmentIDs())) channelCPs := make(map[string]*msgpb.MsgPosition) + + var getChildrenDelta func(id UniqueID) ([]*datapb.FieldBinlog, error) + getChildrenDelta = func(id UniqueID) ([]*datapb.FieldBinlog, error) { + children, ok := s.meta.GetCompactionTo(id) + // double-check the segment, maybe the segment is being dropped concurrently. + if !ok { + log.Warn("failed to get segment, this may have been cleaned", zap.Int64("segmentID", id)) + err := merr.WrapErrSegmentNotFound(id) + return nil, err + } + allDeltaLogs := make([]*datapb.FieldBinlog, 0) + for _, child := range children { + clonedChild := child.Clone() + // child segment should decompress binlog path + binlog.DecompressBinLog(storage.DeleteBinlog, clonedChild.GetCollectionID(), clonedChild.GetPartitionID(), clonedChild.GetID(), clonedChild.GetDeltalogs()) + allDeltaLogs = append(allDeltaLogs, clonedChild.GetDeltalogs()...) + allChildrenDeltas, err := getChildrenDelta(child.GetID()) + if err != nil { + return nil, err + } + allDeltaLogs = append(allDeltaLogs, allChildrenDeltas...) + } + + return allDeltaLogs, nil + } + for _, id := range req.SegmentIDs { var info *SegmentInfo if req.IncludeUnHealthy { info = s.meta.GetSegment(ctx, id) - // TODO: GetCompactionTo should be removed and add into GetSegment method and protected by lock. - // Too much modification need to be applied to SegmentInfo, a refactor is needed. - children, ok := s.meta.GetCompactionTo(id) - // info may be not-nil, but ok is false when the segment is being dropped concurrently. - if info == nil || !ok { + if info == nil { log.Warn("failed to get segment, this may have been cleaned", zap.Int64("segmentID", id)) err := merr.WrapErrSegmentNotFound(id) resp.Status = merr.Status(err) @@ -456,13 +478,14 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR } clonedInfo := info.Clone() - for _, child := range children { - clonedChild := child.Clone() - // child segment should decompress binlog path - binlog.DecompressBinLog(storage.DeleteBinlog, clonedChild.GetCollectionID(), clonedChild.GetPartitionID(), clonedChild.GetID(), clonedChild.GetDeltalogs()) - clonedInfo.Deltalogs = append(clonedInfo.Deltalogs, clonedChild.GetDeltalogs()...) - clonedInfo.DmlPosition = clonedChild.GetDmlPosition() + // We should retrieve the deltalog of all child segments, + // but due to the compaction constraint based on indexed segment, there will be at most two generations. + allChildrenDeltalogs, err := getChildrenDelta(id) + if err != nil { + resp.Status = merr.Status(err) + return resp, nil } + clonedInfo.Deltalogs = append(clonedInfo.Deltalogs, allChildrenDeltalogs...) segmentutil.ReCalcRowCount(info.SegmentInfo, clonedInfo.SegmentInfo) infos = append(infos, clonedInfo.SegmentInfo) } else {