From 4a3d98d88cd7ca8e74e5a78d523ceb6ed8b1eba3 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Mon, 29 Jul 2024 00:25:49 +0800 Subject: [PATCH] enhance: Fast close msgdispatcher target (#34803) /kind improvement issue: https://github.com/milvus-io/milvus/issues/34075 Signed-off-by: bigsheeper --- pkg/mq/msgdispatcher/dispatcher_test.go | 4 ++++ pkg/mq/msgdispatcher/target.go | 11 +++++++++++ 2 files changed, 15 insertions(+) diff --git a/pkg/mq/msgdispatcher/dispatcher_test.go b/pkg/mq/msgdispatcher/dispatcher_test.go index 2ee5469b4b..e7fb703921 100644 --- a/pkg/mq/msgdispatcher/dispatcher_test.go +++ b/pkg/mq/msgdispatcher/dispatcher_test.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus/pkg/mq/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/lifetime" ) func TestDispatcher(t *testing.T) { @@ -77,11 +78,13 @@ func TestDispatcher(t *testing.T) { vchannel: "mock_vchannel_0", pos: nil, ch: output, + cancelCh: lifetime.NewSafeChan(), }) d.AddTarget(&target{ vchannel: "mock_vchannel_1", pos: nil, ch: nil, + cancelCh: lifetime.NewSafeChan(), }) num := d.TargetNum() assert.Equal(t, 2, num) @@ -110,6 +113,7 @@ func TestDispatcher(t *testing.T) { vchannel: "mock_vchannel_0", pos: nil, ch: output, + cancelCh: lifetime.NewSafeChan(), } assert.Equal(t, cap(output), cap(target.ch)) wg := &sync.WaitGroup{} diff --git a/pkg/mq/msgdispatcher/target.go b/pkg/mq/msgdispatcher/target.go index 8fd231e296..3b2a5a48e7 100644 --- a/pkg/mq/msgdispatcher/target.go +++ b/pkg/mq/msgdispatcher/target.go @@ -21,6 +21,10 @@ import ( "sync" "time" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -32,6 +36,8 @@ type target struct { closeMu sync.Mutex closeOnce sync.Once closed bool + + cancelCh lifetime.SafeChan } func newTarget(vchannel string, pos *Pos) *target { @@ -39,12 +45,14 @@ func newTarget(vchannel string, pos *Pos) *target { vchannel: vchannel, ch: make(chan *MsgPack, paramtable.Get().MQCfg.TargetBufSize.GetAsInt()), pos: pos, + cancelCh: lifetime.NewSafeChan(), } t.closed = false return t } func (t *target) close() { + t.cancelCh.Close() t.closeMu.Lock() defer t.closeMu.Unlock() t.closeOnce.Do(func() { @@ -61,6 +69,9 @@ func (t *target) send(pack *MsgPack) error { } maxTolerantLag := paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second) select { + case <-t.cancelCh.CloseCh(): + log.Info("target closed", zap.String("vchannel", t.vchannel)) + return nil case <-time.After(maxTolerantLag): return fmt.Errorf("send target timeout, vchannel=%s, timeout=%s", t.vchannel, maxTolerantLag) case t.ch <- pack: