mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: Accelerate dispatcher building (#42500)
Reduce check interval to accelerate dispatcher building. issue: https://github.com/milvus-io/milvus/issues/42067 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
e299c533be
commit
ee9d08375d
@ -173,7 +173,7 @@ mq:
|
|||||||
pursuitBufferTime: 60 # pursuit mode buffer time in seconds
|
pursuitBufferTime: 60 # pursuit mode buffer time in seconds
|
||||||
mqBufSize: 16 # MQ client consumer buffer length
|
mqBufSize: 16 # MQ client consumer buffer length
|
||||||
dispatcher:
|
dispatcher:
|
||||||
mergeCheckInterval: 1 # the interval time(in seconds) for dispatcher to check whether to merge
|
mergeCheckInterval: 0.1 # the interval time(in seconds) for dispatcher to check whether to merge
|
||||||
targetBufSize: 16 # the lenth of channel buffer for targe
|
targetBufSize: 16 # the lenth of channel buffer for targe
|
||||||
maxTolerantLag: 3 # Default value: "3", the timeout(in seconds) that target sends msgPack
|
maxTolerantLag: 3 # Default value: "3", the timeout(in seconds) that target sends msgPack
|
||||||
|
|
||||||
|
|||||||
@ -240,7 +240,7 @@ func (d *Dispatcher) work() {
|
|||||||
return
|
return
|
||||||
case pack := <-d.stream.Chan():
|
case pack := <-d.stream.Chan():
|
||||||
if pack == nil || len(pack.EndPositions) != 1 {
|
if pack == nil || len(pack.EndPositions) != 1 {
|
||||||
log.Error("consumed invalid msgPack")
|
log.Error("consumed invalid msgPack", zap.Any("pack", pack))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
d.curTs.Store(pack.EndPositions[0].GetTimestamp())
|
d.curTs.Store(pack.EndPositions[0].GetTimestamp())
|
||||||
|
|||||||
@ -125,8 +125,8 @@ func (c *dispatcherManager) Close() {
|
|||||||
func (c *dispatcherManager) Run() {
|
func (c *dispatcherManager) Run() {
|
||||||
log := log.With(zap.String("role", c.role), zap.Int64("nodeID", c.nodeID), zap.String("pchannel", c.pchannel))
|
log := log.With(zap.String("role", c.role), zap.Int64("nodeID", c.nodeID), zap.String("pchannel", c.pchannel))
|
||||||
log.Info("dispatcherManager is running...")
|
log.Info("dispatcherManager is running...")
|
||||||
ticker1 := time.NewTicker(10 * time.Second)
|
ticker1 := time.NewTicker(30 * time.Second)
|
||||||
ticker2 := time.NewTicker(paramtable.Get().MQCfg.MergeCheckInterval.GetAsDuration(time.Second))
|
ticker2 := time.NewTicker(paramtable.Get().MQCfg.CheckInterval.GetAsDuration(time.Second))
|
||||||
defer ticker1.Stop()
|
defer ticker1.Stop()
|
||||||
defer ticker2.Stop()
|
defer ticker2.Stop()
|
||||||
for {
|
for {
|
||||||
|
|||||||
@ -181,7 +181,7 @@ func TestManager(t *testing.T) {
|
|||||||
d.curTs.Store(1000)
|
d.curTs.Store(1000)
|
||||||
}
|
}
|
||||||
|
|
||||||
checkIntervalK := paramtable.Get().MQCfg.MergeCheckInterval.Key
|
checkIntervalK := paramtable.Get().MQCfg.CheckInterval.Key
|
||||||
paramtable.Get().Save(checkIntervalK, "0.01")
|
paramtable.Get().Save(checkIntervalK, "0.01")
|
||||||
defer paramtable.Get().Reset(checkIntervalK)
|
defer paramtable.Get().Reset(checkIntervalK)
|
||||||
|
|
||||||
|
|||||||
@ -544,10 +544,10 @@ type MQConfig struct {
|
|||||||
IgnoreBadPosition ParamItem `refreshable:"true"`
|
IgnoreBadPosition ParamItem `refreshable:"true"`
|
||||||
|
|
||||||
// msgdispatcher
|
// msgdispatcher
|
||||||
MergeCheckInterval ParamItem `refreshable:"false"`
|
CheckInterval ParamItem `refreshable:"false"`
|
||||||
TargetBufSize ParamItem `refreshable:"false"`
|
TargetBufSize ParamItem `refreshable:"false"`
|
||||||
MaxTolerantLag ParamItem `refreshable:"true"`
|
MaxTolerantLag ParamItem `refreshable:"true"`
|
||||||
MaxPositionTsGap ParamItem `refreshable:"true"`
|
MaxPositionTsGap ParamItem `refreshable:"true"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init initializes the MQConfig object with a BaseTable.
|
// Init initializes the MQConfig object with a BaseTable.
|
||||||
@ -580,14 +580,14 @@ Valid values: [default, pulsar, kafka, rocksmq, woodpecker]`,
|
|||||||
}
|
}
|
||||||
p.TargetBufSize.Init(base.mgr)
|
p.TargetBufSize.Init(base.mgr)
|
||||||
|
|
||||||
p.MergeCheckInterval = ParamItem{
|
p.CheckInterval = ParamItem{
|
||||||
Key: "mq.dispatcher.mergeCheckInterval",
|
Key: "mq.dispatcher.mergeCheckInterval",
|
||||||
Version: "2.4.4",
|
Version: "2.4.4",
|
||||||
DefaultValue: "1",
|
DefaultValue: "0.1",
|
||||||
Doc: `the interval time(in seconds) for dispatcher to check whether to merge`,
|
Doc: `the interval time(in seconds) for dispatcher to check whether to merge`,
|
||||||
Export: true,
|
Export: true,
|
||||||
}
|
}
|
||||||
p.MergeCheckInterval.Init(base.mgr)
|
p.CheckInterval.Init(base.mgr)
|
||||||
|
|
||||||
p.MaxPositionTsGap = ParamItem{
|
p.MaxPositionTsGap = ParamItem{
|
||||||
Key: "mq.dispatcher.maxPositionGapInMinutes",
|
Key: "mq.dispatcher.maxPositionGapInMinutes",
|
||||||
|
|||||||
@ -35,7 +35,7 @@ func TestServiceParam(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("test MQConfig", func(t *testing.T) {
|
t.Run("test MQConfig", func(t *testing.T) {
|
||||||
Params := &SParams.MQCfg
|
Params := &SParams.MQCfg
|
||||||
assert.Equal(t, 1*time.Second, Params.MergeCheckInterval.GetAsDuration(time.Second))
|
assert.Equal(t, 100*time.Millisecond, Params.CheckInterval.GetAsDuration(time.Second))
|
||||||
assert.Equal(t, 16, Params.TargetBufSize.GetAsInt())
|
assert.Equal(t, 16, Params.TargetBufSize.GetAsInt())
|
||||||
assert.Equal(t, 3*time.Second, Params.MaxTolerantLag.GetAsDuration(time.Second))
|
assert.Equal(t, 3*time.Second, Params.MaxTolerantLag.GetAsDuration(time.Second))
|
||||||
assert.Equal(t, 60*time.Minute, Params.MaxPositionTsGap.GetAsDuration(time.Minute))
|
assert.Equal(t, 60*time.Minute, Params.MaxPositionTsGap.GetAsDuration(time.Minute))
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user