mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
fix: [2.4] Get all children deltalogs for segment to load (#40964)
issue: https://github.com/milvus-io/milvus/issues/40207 master pr: https://github.com/milvus-io/milvus/pull/40956 2.5 pr: #40957 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
4898b9d3c8
commit
a84ba1967d
@ -395,16 +395,38 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
|
|||||||
}
|
}
|
||||||
infos := make([]*datapb.SegmentInfo, 0, len(req.GetSegmentIDs()))
|
infos := make([]*datapb.SegmentInfo, 0, len(req.GetSegmentIDs()))
|
||||||
channelCPs := make(map[string]*msgpb.MsgPosition)
|
channelCPs := make(map[string]*msgpb.MsgPosition)
|
||||||
|
|
||||||
|
var getChildrenDelta func(id UniqueID) ([]*datapb.FieldBinlog, error)
|
||||||
|
getChildrenDelta = func(id UniqueID) ([]*datapb.FieldBinlog, error) {
|
||||||
|
children, ok := s.meta.GetCompactionTo(id)
|
||||||
|
// double-check the segment, maybe the segment is being dropped concurrently.
|
||||||
|
if !ok {
|
||||||
|
log.Warn("failed to get segment, this may have been cleaned", zap.Int64("segmentID", id))
|
||||||
|
err := merr.WrapErrSegmentNotFound(id)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
allDeltaLogs := make([]*datapb.FieldBinlog, 0)
|
||||||
|
for _, child := range children {
|
||||||
|
clonedChild := child.Clone()
|
||||||
|
// child segment should decompress binlog path
|
||||||
|
binlog.DecompressBinLog(storage.DeleteBinlog, clonedChild.GetCollectionID(), clonedChild.GetPartitionID(), clonedChild.GetID(), clonedChild.GetDeltalogs())
|
||||||
|
allDeltaLogs = append(allDeltaLogs, clonedChild.GetDeltalogs()...)
|
||||||
|
allChildrenDeltas, err := getChildrenDelta(child.GetID())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
allDeltaLogs = append(allDeltaLogs, allChildrenDeltas...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return allDeltaLogs, nil
|
||||||
|
}
|
||||||
|
|
||||||
for _, id := range req.SegmentIDs {
|
for _, id := range req.SegmentIDs {
|
||||||
var info *SegmentInfo
|
var info *SegmentInfo
|
||||||
if req.IncludeUnHealthy {
|
if req.IncludeUnHealthy {
|
||||||
info = s.meta.GetSegment(id)
|
info = s.meta.GetSegment(id)
|
||||||
// TODO: GetCompactionTo should be removed and add into GetSegment method and protected by lock.
|
|
||||||
// Too much modification need to be applied to SegmentInfo, a refactor is needed.
|
|
||||||
children, ok := s.meta.GetCompactionTo(id)
|
|
||||||
|
|
||||||
// info may be not-nil, but ok is false when the segment is being dropped concurrently.
|
// info may be not-nil, but ok is false when the segment is being dropped concurrently.
|
||||||
if info == nil || !ok {
|
if info == nil {
|
||||||
log.Warn("failed to get segment, this may have been cleaned", zap.Int64("segmentID", id))
|
log.Warn("failed to get segment, this may have been cleaned", zap.Int64("segmentID", id))
|
||||||
err := merr.WrapErrSegmentNotFound(id)
|
err := merr.WrapErrSegmentNotFound(id)
|
||||||
resp.Status = merr.Status(err)
|
resp.Status = merr.Status(err)
|
||||||
@ -412,13 +434,14 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
|
|||||||
}
|
}
|
||||||
|
|
||||||
clonedInfo := info.Clone()
|
clonedInfo := info.Clone()
|
||||||
for _, child := range children {
|
// We should retrieve the deltalog of all child segments,
|
||||||
clonedChild := child.Clone()
|
// but due to the compaction constraint based on indexed segment, there will be at most two generations.
|
||||||
// child segment should decompress binlog path
|
allChildrenDeltalogs, err := getChildrenDelta(id)
|
||||||
binlog.DecompressBinLog(storage.DeleteBinlog, clonedChild.GetCollectionID(), clonedChild.GetPartitionID(), clonedChild.GetID(), clonedChild.GetDeltalogs())
|
if err != nil {
|
||||||
clonedInfo.Deltalogs = append(clonedInfo.Deltalogs, clonedChild.GetDeltalogs()...)
|
resp.Status = merr.Status(err)
|
||||||
clonedInfo.DmlPosition = clonedChild.GetDmlPosition()
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
clonedInfo.Deltalogs = append(clonedInfo.Deltalogs, allChildrenDeltalogs...)
|
||||||
segmentutil.ReCalcRowCount(info.SegmentInfo, clonedInfo.SegmentInfo)
|
segmentutil.ReCalcRowCount(info.SegmentInfo, clonedInfo.SegmentInfo)
|
||||||
infos = append(infos, clonedInfo.SegmentInfo)
|
infos = append(infos, clonedInfo.SegmentInfo)
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user