From 9824615efbef181ca9b850c45a80cd8abca1a977 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 21 Mar 2025 10:22:11 +0800 Subject: [PATCH] enhance: Close component in topological order when unsub channel (#40796) Related to #40795 --------- Signed-off-by: Congqi Xia --- internal/querynodev2/services.go | 3 ++- internal/util/pipeline/stream_pipeline.go | 11 +++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index f406368b54..7b5fa3761d 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -357,10 +357,11 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC defer node.unsubscribingChannels.Remove(req.GetChannelName()) delegator, ok := node.delegators.GetAndRemove(req.GetChannelName()) if ok { + node.pipelineManager.Remove(req.GetChannelName()) + // close the delegator first to block all coming query/search requests delegator.Close() - node.pipelineManager.Remove(req.GetChannelName()) node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing)) node.manager.Collection.Unref(req.GetCollectionID(), 1) } diff --git a/internal/util/pipeline/stream_pipeline.go b/internal/util/pipeline/stream_pipeline.go index c5e89d4c2b..88b9c98854 100644 --- a/internal/util/pipeline/stream_pipeline.go +++ b/internal/util/pipeline/stream_pipeline.go @@ -67,7 +67,11 @@ func (p *streamPipeline) work() { case <-p.closeCh: log.Ctx(context.TODO()).Debug("stream pipeline input closed") return - case msg := <-p.input: + case msg, ok := <-p.input: + if !ok { + log.Ctx(context.TODO()).Debug("stream pipeline input closed") + return + } p.lastAccessTime.Store(time.Now()) log.Ctx(context.TODO()).RatedDebug(10, "stream pipeline fetch msg", zap.Int("sum", len(msg.Msgs))) p.pipeline.inputChannel <- msg @@ -158,12 +162,15 @@ func (p *streamPipeline) Start() error { func (p *streamPipeline) Close() { p.closeOnce.Do(func() { + // close datasource first + p.dispatcher.Deregister(p.vChannel) + // close stream input close(p.closeCh) p.closeWg.Wait() if p.scanner != nil { p.scanner.Close() } - p.dispatcher.Deregister(p.vChannel) + // close the underline pipeline p.pipeline.Close() }) }