diff --git a/pkg/mq/msgdispatcher/dispatcher_test.go b/pkg/mq/msgdispatcher/dispatcher_test.go index 2ef4bbff66..456dcda18d 100644 --- a/pkg/mq/msgdispatcher/dispatcher_test.go +++ b/pkg/mq/msgdispatcher/dispatcher_test.go @@ -28,7 +28,6 @@ import ( "github.com/milvus-io/milvus/pkg/mq/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" - "github.com/milvus-io/milvus/pkg/util/lifetime" ) func TestDispatcher(t *testing.T) { @@ -71,18 +70,15 @@ func TestDispatcher(t *testing.T) { d, err := NewDispatcher(ctx, newMockFactory(), true, "mock_pchannel_0", nil, "mock_subName_0", common.SubscriptionPositionEarliest, nil, nil, false) assert.NoError(t, err) output := make(chan *msgstream.MsgPack, 1024) - d.AddTarget(&target{ - vchannel: "mock_vchannel_0", - pos: nil, - ch: output, - cancelCh: lifetime.NewSafeChan(), - }) - d.AddTarget(&target{ - vchannel: "mock_vchannel_1", - pos: nil, - ch: nil, - cancelCh: lifetime.NewSafeChan(), - }) + + getTarget := func(vchannel string, pos *Pos, ch chan *msgstream.MsgPack) *target { + target := newTarget(vchannel, pos) + target.ch = ch + return target + } + + d.AddTarget(getTarget("mock_vchannel_0", nil, output)) + d.AddTarget(getTarget("mock_vchannel_1", nil, nil)) num := d.TargetNum() assert.Equal(t, 2, num) @@ -106,12 +102,8 @@ func TestDispatcher(t *testing.T) { t.Run("test concurrent send and close", func(t *testing.T) { for i := 0; i < 100; i++ { output := make(chan *msgstream.MsgPack, 1024) - target := &target{ - vchannel: "mock_vchannel_0", - pos: nil, - ch: output, - cancelCh: lifetime.NewSafeChan(), - } + target := newTarget("mock_vchannel_0", nil) + target.ch = output assert.Equal(t, cap(output), cap(target.ch)) wg := &sync.WaitGroup{} for j := 0; j < 100; j++ { diff --git a/pkg/mq/msgdispatcher/target.go b/pkg/mq/msgdispatcher/target.go index 3b2a5a48e7..cce4aa5fa2 100644 --- a/pkg/mq/msgdispatcher/target.go +++ b/pkg/mq/msgdispatcher/target.go @@ -36,16 +36,21 @@ type target struct { closeMu sync.Mutex closeOnce sync.Once closed bool + maxLag time.Duration + timer *time.Timer cancelCh lifetime.SafeChan } func newTarget(vchannel string, pos *Pos) *target { + maxTolerantLag := paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second) t := &target{ vchannel: vchannel, ch: make(chan *MsgPack, paramtable.Get().MQCfg.TargetBufSize.GetAsInt()), pos: pos, cancelCh: lifetime.NewSafeChan(), + maxLag: maxTolerantLag, + timer: time.NewTimer(maxTolerantLag), } t.closed = false return t @@ -57,6 +62,7 @@ func (t *target) close() { defer t.closeMu.Unlock() t.closeOnce.Do(func() { t.closed = true + t.timer.Stop() close(t.ch) }) } @@ -67,13 +73,13 @@ func (t *target) send(pack *MsgPack) error { if t.closed { return nil } - maxTolerantLag := paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second) + t.timer.Reset(t.maxLag) select { case <-t.cancelCh.CloseCh(): log.Info("target closed", zap.String("vchannel", t.vchannel)) return nil - case <-time.After(maxTolerantLag): - return fmt.Errorf("send target timeout, vchannel=%s, timeout=%s", t.vchannel, maxTolerantLag) + case <-t.timer.C: + return fmt.Errorf("send target timeout, vchannel=%s, timeout=%s", t.vchannel, t.maxLag) case t.ch <- pack: return nil } diff --git a/pkg/util/paramtable/base_table.go b/pkg/util/paramtable/base_table.go index 176ab9e210..79f3e2c82a 100644 --- a/pkg/util/paramtable/base_table.go +++ b/pkg/util/paramtable/base_table.go @@ -103,7 +103,7 @@ func SkipRemote(skip bool) Option { } } -func skipEnv(skip bool) Option { +func SkipEnv(skip bool) Option { return func(bt *baseTableConfig) { bt.skipEnv = skip } @@ -112,7 +112,7 @@ func skipEnv(skip bool) Option { // NewBaseTableFromYamlOnly only used in migration tool. // Maybe we shouldn't limit the configDir internally. func NewBaseTableFromYamlOnly(yaml string) *BaseTable { - return NewBaseTable(Files([]string{yaml}), SkipRemote(true), skipEnv(true)) + return NewBaseTable(Files([]string{yaml}), SkipRemote(true), SkipEnv(true)) } func NewBaseTable(opts ...Option) *BaseTable { diff --git a/pkg/util/typeutil/map.go b/pkg/util/typeutil/map.go index 973d6e127f..be4f5af5b7 100644 --- a/pkg/util/typeutil/map.go +++ b/pkg/util/typeutil/map.go @@ -121,3 +121,12 @@ func (m *ConcurrentMap[K, V]) Values() []V { }) return ret } + +func (m *ConcurrentMap[K, V]) Keys() []K { + ret := make([]K, m.Len()) + m.inner.Range(func(key, value any) bool { + ret = append(ret, key.(K)) + return true + }) + return ret +}