mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 09:38:39 +08:00
enhance: Close component in topological order when unsub channel (#40796)
Related to #40795 --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
d7df78a6c9
commit
9824615efb
@ -357,10 +357,11 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
|
|||||||
defer node.unsubscribingChannels.Remove(req.GetChannelName())
|
defer node.unsubscribingChannels.Remove(req.GetChannelName())
|
||||||
delegator, ok := node.delegators.GetAndRemove(req.GetChannelName())
|
delegator, ok := node.delegators.GetAndRemove(req.GetChannelName())
|
||||||
if ok {
|
if ok {
|
||||||
|
node.pipelineManager.Remove(req.GetChannelName())
|
||||||
|
|
||||||
// close the delegator first to block all coming query/search requests
|
// close the delegator first to block all coming query/search requests
|
||||||
delegator.Close()
|
delegator.Close()
|
||||||
|
|
||||||
node.pipelineManager.Remove(req.GetChannelName())
|
|
||||||
node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing))
|
node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing))
|
||||||
node.manager.Collection.Unref(req.GetCollectionID(), 1)
|
node.manager.Collection.Unref(req.GetCollectionID(), 1)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -67,7 +67,11 @@ func (p *streamPipeline) work() {
|
|||||||
case <-p.closeCh:
|
case <-p.closeCh:
|
||||||
log.Ctx(context.TODO()).Debug("stream pipeline input closed")
|
log.Ctx(context.TODO()).Debug("stream pipeline input closed")
|
||||||
return
|
return
|
||||||
case msg := <-p.input:
|
case msg, ok := <-p.input:
|
||||||
|
if !ok {
|
||||||
|
log.Ctx(context.TODO()).Debug("stream pipeline input closed")
|
||||||
|
return
|
||||||
|
}
|
||||||
p.lastAccessTime.Store(time.Now())
|
p.lastAccessTime.Store(time.Now())
|
||||||
log.Ctx(context.TODO()).RatedDebug(10, "stream pipeline fetch msg", zap.Int("sum", len(msg.Msgs)))
|
log.Ctx(context.TODO()).RatedDebug(10, "stream pipeline fetch msg", zap.Int("sum", len(msg.Msgs)))
|
||||||
p.pipeline.inputChannel <- msg
|
p.pipeline.inputChannel <- msg
|
||||||
@ -158,12 +162,15 @@ func (p *streamPipeline) Start() error {
|
|||||||
|
|
||||||
func (p *streamPipeline) Close() {
|
func (p *streamPipeline) Close() {
|
||||||
p.closeOnce.Do(func() {
|
p.closeOnce.Do(func() {
|
||||||
|
// close datasource first
|
||||||
|
p.dispatcher.Deregister(p.vChannel)
|
||||||
|
// close stream input
|
||||||
close(p.closeCh)
|
close(p.closeCh)
|
||||||
p.closeWg.Wait()
|
p.closeWg.Wait()
|
||||||
if p.scanner != nil {
|
if p.scanner != nil {
|
||||||
p.scanner.Close()
|
p.scanner.Close()
|
||||||
}
|
}
|
||||||
p.dispatcher.Deregister(p.vChannel)
|
// close the underline pipeline
|
||||||
p.pipeline.Close()
|
p.pipeline.Close()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user