From ee9d08375db2d6ccd04053970d4e06fbb8461df7 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Thu, 5 Jun 2025 16:36:32 +0800 Subject: [PATCH] enhance: Accelerate dispatcher building (#42500) Reduce check interval to accelerate dispatcher building. issue: https://github.com/milvus-io/milvus/issues/42067 Signed-off-by: bigsheeper --- configs/milvus.yaml | 2 +- pkg/mq/msgdispatcher/dispatcher.go | 2 +- pkg/mq/msgdispatcher/manager.go | 4 ++-- pkg/mq/msgdispatcher/manager_test.go | 2 +- pkg/util/paramtable/service_param.go | 14 +++++++------- pkg/util/paramtable/service_param_test.go | 2 +- 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index b76b68b6e4..4dba2ee030 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -173,7 +173,7 @@ mq: pursuitBufferTime: 60 # pursuit mode buffer time in seconds mqBufSize: 16 # MQ client consumer buffer length dispatcher: - mergeCheckInterval: 1 # the interval time(in seconds) for dispatcher to check whether to merge + mergeCheckInterval: 0.1 # the interval time(in seconds) for dispatcher to check whether to merge targetBufSize: 16 # the lenth of channel buffer for targe maxTolerantLag: 3 # Default value: "3", the timeout(in seconds) that target sends msgPack diff --git a/pkg/mq/msgdispatcher/dispatcher.go b/pkg/mq/msgdispatcher/dispatcher.go index e3e2bfb32c..dd831e92db 100644 --- a/pkg/mq/msgdispatcher/dispatcher.go +++ b/pkg/mq/msgdispatcher/dispatcher.go @@ -240,7 +240,7 @@ func (d *Dispatcher) work() { return case pack := <-d.stream.Chan(): if pack == nil || len(pack.EndPositions) != 1 { - log.Error("consumed invalid msgPack") + log.Error("consumed invalid msgPack", zap.Any("pack", pack)) continue } d.curTs.Store(pack.EndPositions[0].GetTimestamp()) diff --git a/pkg/mq/msgdispatcher/manager.go b/pkg/mq/msgdispatcher/manager.go index cfcf09b60d..05a88766a5 100644 --- a/pkg/mq/msgdispatcher/manager.go +++ b/pkg/mq/msgdispatcher/manager.go @@ -125,8 +125,8 @@ func (c *dispatcherManager) Close() { func (c *dispatcherManager) Run() { log := log.With(zap.String("role", c.role), zap.Int64("nodeID", c.nodeID), zap.String("pchannel", c.pchannel)) log.Info("dispatcherManager is running...") - ticker1 := time.NewTicker(10 * time.Second) - ticker2 := time.NewTicker(paramtable.Get().MQCfg.MergeCheckInterval.GetAsDuration(time.Second)) + ticker1 := time.NewTicker(30 * time.Second) + ticker2 := time.NewTicker(paramtable.Get().MQCfg.CheckInterval.GetAsDuration(time.Second)) defer ticker1.Stop() defer ticker2.Stop() for { diff --git a/pkg/mq/msgdispatcher/manager_test.go b/pkg/mq/msgdispatcher/manager_test.go index 1bded78c79..19777a1f6a 100644 --- a/pkg/mq/msgdispatcher/manager_test.go +++ b/pkg/mq/msgdispatcher/manager_test.go @@ -181,7 +181,7 @@ func TestManager(t *testing.T) { d.curTs.Store(1000) } - checkIntervalK := paramtable.Get().MQCfg.MergeCheckInterval.Key + checkIntervalK := paramtable.Get().MQCfg.CheckInterval.Key paramtable.Get().Save(checkIntervalK, "0.01") defer paramtable.Get().Reset(checkIntervalK) diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index a0300dd276..2be4bd6d83 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -544,10 +544,10 @@ type MQConfig struct { IgnoreBadPosition ParamItem `refreshable:"true"` // msgdispatcher - MergeCheckInterval ParamItem `refreshable:"false"` - TargetBufSize ParamItem `refreshable:"false"` - MaxTolerantLag ParamItem `refreshable:"true"` - MaxPositionTsGap ParamItem `refreshable:"true"` + CheckInterval ParamItem `refreshable:"false"` + TargetBufSize ParamItem `refreshable:"false"` + MaxTolerantLag ParamItem `refreshable:"true"` + MaxPositionTsGap ParamItem `refreshable:"true"` } // Init initializes the MQConfig object with a BaseTable. @@ -580,14 +580,14 @@ Valid values: [default, pulsar, kafka, rocksmq, woodpecker]`, } p.TargetBufSize.Init(base.mgr) - p.MergeCheckInterval = ParamItem{ + p.CheckInterval = ParamItem{ Key: "mq.dispatcher.mergeCheckInterval", Version: "2.4.4", - DefaultValue: "1", + DefaultValue: "0.1", Doc: `the interval time(in seconds) for dispatcher to check whether to merge`, Export: true, } - p.MergeCheckInterval.Init(base.mgr) + p.CheckInterval.Init(base.mgr) p.MaxPositionTsGap = ParamItem{ Key: "mq.dispatcher.maxPositionGapInMinutes", diff --git a/pkg/util/paramtable/service_param_test.go b/pkg/util/paramtable/service_param_test.go index 7dfcbffac9..b28696e1d6 100644 --- a/pkg/util/paramtable/service_param_test.go +++ b/pkg/util/paramtable/service_param_test.go @@ -35,7 +35,7 @@ func TestServiceParam(t *testing.T) { t.Run("test MQConfig", func(t *testing.T) { Params := &SParams.MQCfg - assert.Equal(t, 1*time.Second, Params.MergeCheckInterval.GetAsDuration(time.Second)) + assert.Equal(t, 100*time.Millisecond, Params.CheckInterval.GetAsDuration(time.Second)) assert.Equal(t, 16, Params.TargetBufSize.GetAsInt()) assert.Equal(t, 3*time.Second, Params.MaxTolerantLag.GetAsDuration(time.Second)) assert.Equal(t, 60*time.Minute, Params.MaxPositionTsGap.GetAsDuration(time.Minute))