From a84ba1967d8840301c0ff6093ba41dd8a1cce24c Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Mon, 31 Mar 2025 14:36:22 +0800 Subject: [PATCH] fix: [2.4] Get all children deltalogs for segment to load (#40964) issue: https://github.com/milvus-io/milvus/issues/40207 master pr: https://github.com/milvus-io/milvus/pull/40956 2.5 pr: #40957 Signed-off-by: Cai Zhang --- internal/datacoord/services.go | 45 +++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index b87bc414d1..54fd3334ce 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -395,16 +395,38 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR } infos := make([]*datapb.SegmentInfo, 0, len(req.GetSegmentIDs())) channelCPs := make(map[string]*msgpb.MsgPosition) + + var getChildrenDelta func(id UniqueID) ([]*datapb.FieldBinlog, error) + getChildrenDelta = func(id UniqueID) ([]*datapb.FieldBinlog, error) { + children, ok := s.meta.GetCompactionTo(id) + // double-check the segment, maybe the segment is being dropped concurrently. + if !ok { + log.Warn("failed to get segment, this may have been cleaned", zap.Int64("segmentID", id)) + err := merr.WrapErrSegmentNotFound(id) + return nil, err + } + allDeltaLogs := make([]*datapb.FieldBinlog, 0) + for _, child := range children { + clonedChild := child.Clone() + // child segment should decompress binlog path + binlog.DecompressBinLog(storage.DeleteBinlog, clonedChild.GetCollectionID(), clonedChild.GetPartitionID(), clonedChild.GetID(), clonedChild.GetDeltalogs()) + allDeltaLogs = append(allDeltaLogs, clonedChild.GetDeltalogs()...) + allChildrenDeltas, err := getChildrenDelta(child.GetID()) + if err != nil { + return nil, err + } + allDeltaLogs = append(allDeltaLogs, allChildrenDeltas...) + } + + return allDeltaLogs, nil + } + for _, id := range req.SegmentIDs { var info *SegmentInfo if req.IncludeUnHealthy { info = s.meta.GetSegment(id) - // TODO: GetCompactionTo should be removed and add into GetSegment method and protected by lock. - // Too much modification need to be applied to SegmentInfo, a refactor is needed. - children, ok := s.meta.GetCompactionTo(id) - // info may be not-nil, but ok is false when the segment is being dropped concurrently. - if info == nil || !ok { + if info == nil { log.Warn("failed to get segment, this may have been cleaned", zap.Int64("segmentID", id)) err := merr.WrapErrSegmentNotFound(id) resp.Status = merr.Status(err) @@ -412,13 +434,14 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR } clonedInfo := info.Clone() - for _, child := range children { - clonedChild := child.Clone() - // child segment should decompress binlog path - binlog.DecompressBinLog(storage.DeleteBinlog, clonedChild.GetCollectionID(), clonedChild.GetPartitionID(), clonedChild.GetID(), clonedChild.GetDeltalogs()) - clonedInfo.Deltalogs = append(clonedInfo.Deltalogs, clonedChild.GetDeltalogs()...) - clonedInfo.DmlPosition = clonedChild.GetDmlPosition() + // We should retrieve the deltalog of all child segments, + // but due to the compaction constraint based on indexed segment, there will be at most two generations. + allChildrenDeltalogs, err := getChildrenDelta(id) + if err != nil { + resp.Status = merr.Status(err) + return resp, nil } + clonedInfo.Deltalogs = append(clonedInfo.Deltalogs, allChildrenDeltalogs...) segmentutil.ReCalcRowCount(info.SegmentInfo, clonedInfo.SegmentInfo) infos = append(infos, clonedInfo.SegmentInfo) } else {