From 212e17c4c5e76ba0c6df8040d6e383efdf61ae97 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Tue, 27 May 2025 10:12:27 +0800 Subject: [PATCH] fix: modify param to use less memory when flush and sync (#42102) issue: #42097 Signed-off-by: chyezh --- configs/milvus.yaml | 10 +++++----- pkg/util/paramtable/component_param.go | 14 +++++++------- pkg/util/paramtable/component_param_test.go | 4 ++-- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 224384f8b2..40f413fe5c 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -698,7 +698,7 @@ dataNode: flowGraph: maxQueueLength: 16 # Maximum length of task queue in flowgraph maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph - maxParallelSyncMgrTasks: 256 # The max concurrent sync task number of datanode sync mgr globally + maxParallelSyncMgrTasks: 64 # The max concurrent sync task number of datanode sync mgr globally skipMode: enable: true # Support skip some timetick message to reduce CPU usage skipNum: 4 # Consume one for every n records skipped @@ -1231,12 +1231,12 @@ streaming: memoryThreshold: 0.6 # The high watermark of total growing segment bytes for one streaming node, # If the total bytes of growing segment is greater than this threshold, - # a flush process will be triggered to decrease total bytes of growing segment until growingSegmentBytesLwmThreshold, 0.4 by default - growingSegmentBytesHwmThreshold: 0.4 + # a flush process will be triggered to decrease total bytes of growing segment until growingSegmentBytesLwmThreshold, 0.2 by default + growingSegmentBytesHwmThreshold: 0.2 # The lower watermark of total growing segment bytes for one streaming node, # growing segment flush process will try to flush some growing segment into sealed - # until the total bytes of growing segment is less than this threshold, 0.2 by default. - growingSegmentBytesLwmThreshold: 0.2 + # until the total bytes of growing segment is less than this threshold, 0.1 by default. + growingSegmentBytesLwmThreshold: 0.1 walRecovery: # The interval of persist recovery info, 10s by default. # Every the interval, the recovery info of wal will try to persist, and the checkpoint of wal can be advanced. diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index a9c2d8e9a4..681f326536 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -5133,13 +5133,13 @@ func (p *dataNodeConfig) init(base *BaseTable) { p.MaxParallelSyncMgrTasks = ParamItem{ Key: "dataNode.dataSync.maxParallelSyncMgrTasks", Version: "2.3.4", - DefaultValue: "256", + DefaultValue: "64", Doc: "The max concurrent sync task number of datanode sync mgr globally", Formatter: func(v string) string { concurrency := getAsInt(v) if concurrency < 1 { - log.Warn("positive parallel task number, reset to default 256", zap.String("value", v)) - return "256" // MaxParallelSyncMgrTasks must >= 1 + log.Warn("positive parallel task number, reset to default 64", zap.String("value", v)) + return "64" // MaxParallelSyncMgrTasks must >= 1 } return strconv.FormatInt(int64(concurrency), 10) }, @@ -5671,8 +5671,8 @@ the value should be in the range of (0, 1), 0.6 by default.`, Version: "2.6.0", Doc: `The high watermark of total growing segment bytes for one streaming node, If the total bytes of growing segment is greater than this threshold, -a flush process will be triggered to decrease total bytes of growing segment until growingSegmentBytesLwmThreshold, 0.4 by default`, - DefaultValue: "0.4", +a flush process will be triggered to decrease total bytes of growing segment until growingSegmentBytesLwmThreshold, 0.2 by default`, + DefaultValue: "0.2", Export: true, } p.FlushGrowingSegmentBytesHwmThreshold.Init(base.mgr) @@ -5682,8 +5682,8 @@ a flush process will be triggered to decrease total bytes of growing segment unt Version: "2.6.0", Doc: `The lower watermark of total growing segment bytes for one streaming node, growing segment flush process will try to flush some growing segment into sealed -until the total bytes of growing segment is less than this threshold, 0.2 by default.`, - DefaultValue: "0.2", +until the total bytes of growing segment is less than this threshold, 0.1 by default.`, + DefaultValue: "0.1", Export: true, } p.FlushGrowingSegmentBytesLwmThreshold.Init(base.mgr) diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 4b2ead31d6..a69fe9a969 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -633,8 +633,8 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 100, params.StreamingCfg.WALRecoveryMaxDirtyMessage.GetAsInt()) assert.Equal(t, 10*time.Second, params.StreamingCfg.WALRecoveryPersistInterval.GetAsDurationByParse()) assert.Equal(t, float64(0.6), params.StreamingCfg.FlushMemoryThreshold.GetAsFloat()) - assert.Equal(t, float64(0.4), params.StreamingCfg.FlushGrowingSegmentBytesHwmThreshold.GetAsFloat()) - assert.Equal(t, float64(0.2), params.StreamingCfg.FlushGrowingSegmentBytesLwmThreshold.GetAsFloat()) + assert.Equal(t, float64(0.2), params.StreamingCfg.FlushGrowingSegmentBytesHwmThreshold.GetAsFloat()) + assert.Equal(t, float64(0.1), params.StreamingCfg.FlushGrowingSegmentBytesLwmThreshold.GetAsFloat()) assert.Equal(t, 1*time.Minute, params.StreamingCfg.WALTruncateSampleInterval.GetAsDurationByParse()) assert.Equal(t, 5*time.Minute, params.StreamingCfg.WALTruncateRetentionInterval.GetAsDurationByParse())