From cc730c1ee56f4b89f76e23bc4b10d57724274189 Mon Sep 17 00:00:00 2001 From: wei liu Date: Mon, 10 Jul 2023 17:36:28 +0800 Subject: [PATCH] fix consume growing from dml after release (#25421) Signed-off-by: Wei Liu --- internal/querynodev2/services.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 027b7d8f04..ee12431950 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -610,6 +610,21 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil } + // when we try to release a segment, add it to pipeline's exclude list first + // in case of consumed it's growing segment again + pipeline := node.pipelineManager.Get(req.GetShard()) + if pipeline != nil { + droppedInfos := lo.Map(req.GetSegmentIDs(), func(id int64, _ int) *datapb.SegmentInfo { + return &datapb.SegmentInfo{ + ID: id, + DmlPosition: &msgpb.MsgPosition{ + Timestamp: typeutil.MaxTimestamp, + }, + } + }) + pipeline.ExcludedSegments(droppedInfos...) + } + req.NeedTransfer = false err := delegator.ReleaseSegments(ctx, req, false) if err != nil {