From c50fe71163332028c55281edf4014904e43f509c Mon Sep 17 00:00:00 2001 From: SimFG Date: Mon, 23 Sep 2024 16:33:18 +0800 Subject: [PATCH] fix: long buffering causes mq to be unable to receive messages. (#36420) - issue: #36397 Signed-off-by: SimFG --- configs/milvus.yaml | 1 + internal/datacoord/import_util.go | 3 +- pkg/mq/msgstream/mq_msgstream.go | 9 +++- pkg/mq/msgstream/mq_msgstream_test.go | 47 +++++++++++++-------- pkg/util/paramtable/component_param_test.go | 2 + pkg/util/paramtable/service_param.go | 10 +++++ 6 files changed, 50 insertions(+), 22 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 987c30ecd6..cd9db3944b 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -161,6 +161,7 @@ mq: enablePursuitMode: true # Default value: "true" pursuitLag: 10 # time tick lag threshold to enter pursuit mode, in seconds pursuitBufferSize: 8388608 # pursuit mode buffer size in bytes + pursuitBufferTime: 60 # pursuit mode buffer time in seconds mqBufSize: 16 # MQ client consumer buffer length dispatcher: mergeCheckInterval: 1 # the interval time(in seconds) for dispatcher to check whether to merge diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 84af224e51..3408744fe4 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -23,14 +23,13 @@ import ( "sort" "time" - "github.com/milvus-io/milvus/internal/proto/indexpb" - "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/importutilv2" diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index 89fe6f7fd3..846f9d447f 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -629,7 +629,7 @@ func isDMLMsg(msg TsMsg) bool { return msg.Type() == commonpb.MsgType_Insert || msg.Type() == commonpb.MsgType_Delete } -func (ms *MqTtMsgStream) continueBuffering(endTs uint64, size uint64) bool { +func (ms *MqTtMsgStream) continueBuffering(endTs, size uint64, startTime time.Time) bool { if ms.ctx.Err() != nil { return false } @@ -649,6 +649,10 @@ func (ms *MqTtMsgStream) continueBuffering(endTs uint64, size uint64) bool { return false } + if time.Since(startTime) > paramtable.Get().ServiceParam.MQCfg.PursuitBufferTime.GetAsDuration(time.Second) { + return false + } + endTime, _ := tsoutil.ParseTS(endTs) return time.Since(endTime) > paramtable.Get().ServiceParam.MQCfg.PursuitLag.GetAsDuration(time.Second) } @@ -677,10 +681,11 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() { // endMsgPositions := make([]*msgpb.MsgPosition, 0) startPositions := make(map[string]*msgpb.MsgPosition) endPositions := make(map[string]*msgpb.MsgPosition) + startBufTime := time.Now() var endTs uint64 var size uint64 - for ms.continueBuffering(endTs, size) { + for ms.continueBuffering(endTs, size, startBufTime) { ms.consumerLock.Lock() // wait all channels get ttMsg for _, consumer := range ms.consumers { diff --git a/pkg/mq/msgstream/mq_msgstream_test.go b/pkg/mq/msgstream/mq_msgstream_test.go index 1fbefef9e1..968d455b77 100644 --- a/pkg/mq/msgstream/mq_msgstream_test.go +++ b/pkg/mq/msgstream/mq_msgstream_test.go @@ -1543,36 +1543,47 @@ func TestMqttStream_continueBuffering(t *testing.T) { Params.Save(Params.ServiceParam.MQCfg.PursuitBufferSize.Key, "1024") type testCase struct { - tag string - endTs uint64 - size uint64 - expect bool + tag string + endTs uint64 + size uint64 + expect bool + startTime time.Time } currTs := tsoutil.ComposeTSByTime(time.Now(), 0) cases := []testCase{ { - tag: "first_run", - endTs: 0, - size: 0, - expect: true, + tag: "first_run", + endTs: 0, + size: 0, + expect: true, + startTime: time.Now(), }, { - tag: "lag_large", - endTs: 1, - size: 10, - expect: false, + tag: "lag_large", + endTs: 1, + size: 10, + expect: false, + startTime: time.Now(), }, { - tag: "currTs", - endTs: currTs, - size: 10, - expect: false, + tag: "currTs", + endTs: currTs, + size: 10, + expect: false, + startTime: time.Now(), + }, + { + tag: "bufferTs", + endTs: 10, + size: 0, + expect: false, + startTime: time.Now().Add(-time.Hour), }, } for _, tc := range cases { t.Run(tc.tag, func(t *testing.T) { - assert.Equal(t, tc.expect, ms.continueBuffering(tc.endTs, tc.size)) + assert.Equal(t, tc.expect, ms.continueBuffering(tc.endTs, tc.size, tc.startTime)) }) } }) @@ -1618,7 +1629,7 @@ func TestMqttStream_continueBuffering(t *testing.T) { } for _, tc := range cases { t.Run(tc.tag, func(t *testing.T) { - assert.Equal(t, tc.expect, ms.continueBuffering(tc.endTs, tc.size)) + assert.Equal(t, tc.expect, ms.continueBuffering(tc.endTs, tc.size, time.Now())) }) } }) diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 4ce1e6fccf..33ab0c8b43 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -645,6 +645,8 @@ func TestCachedParam(t *testing.T) { assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64()) assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64()) + assert.Equal(t, 60, params.ServiceParam.MQCfg.PursuitBufferTime.GetAsInt()) + assert.Equal(t, int64(1024), params.DataCoordCfg.SegmentMaxSize.GetAsInt64()) assert.Equal(t, int64(1024), params.DataCoordCfg.SegmentMaxSize.GetAsInt64()) diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index 70516b0bf2..152e7e7046 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -484,6 +484,7 @@ type MQConfig struct { EnablePursuitMode ParamItem `refreshable:"true"` PursuitLag ParamItem `refreshable:"true"` PursuitBufferSize ParamItem `refreshable:"true"` + PursuitBufferTime ParamItem `refreshable:"true"` MQBufSize ParamItem `refreshable:"false"` ReceiveBufSize ParamItem `refreshable:"false"` @@ -561,6 +562,15 @@ Valid values: [default, pulsar, kafka, rocksmq, natsmq]`, } p.PursuitBufferSize.Init(base.mgr) + p.PursuitBufferTime = ParamItem{ + Key: "mq.pursuitBufferTime", + Version: "2.4.12", + DefaultValue: "60", // 60 s + Doc: `pursuit mode buffer time in seconds`, + Export: true, + } + p.PursuitBufferTime.Init(base.mgr) + p.MQBufSize = ParamItem{ Key: "mq.mqBufSize", Version: "2.3.0",