fix: modify param to use less memory when flush and sync (#42102)

issue: #42097

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-05-27 10:12:27 +08:00 committed by GitHub
parent 40bea4fc21
commit 212e17c4c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 14 additions and 14 deletions

View File

@ -698,7 +698,7 @@ dataNode:
flowGraph: flowGraph:
maxQueueLength: 16 # Maximum length of task queue in flowgraph maxQueueLength: 16 # Maximum length of task queue in flowgraph
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
maxParallelSyncMgrTasks: 256 # The max concurrent sync task number of datanode sync mgr globally maxParallelSyncMgrTasks: 64 # The max concurrent sync task number of datanode sync mgr globally
skipMode: skipMode:
enable: true # Support skip some timetick message to reduce CPU usage enable: true # Support skip some timetick message to reduce CPU usage
skipNum: 4 # Consume one for every n records skipped skipNum: 4 # Consume one for every n records skipped
@ -1231,12 +1231,12 @@ streaming:
memoryThreshold: 0.6 memoryThreshold: 0.6
# The high watermark of total growing segment bytes for one streaming node, # The high watermark of total growing segment bytes for one streaming node,
# If the total bytes of growing segment is greater than this threshold, # If the total bytes of growing segment is greater than this threshold,
# a flush process will be triggered to decrease total bytes of growing segment until growingSegmentBytesLwmThreshold, 0.4 by default # a flush process will be triggered to decrease total bytes of growing segment until growingSegmentBytesLwmThreshold, 0.2 by default
growingSegmentBytesHwmThreshold: 0.4 growingSegmentBytesHwmThreshold: 0.2
# The lower watermark of total growing segment bytes for one streaming node, # The lower watermark of total growing segment bytes for one streaming node,
# growing segment flush process will try to flush some growing segment into sealed # growing segment flush process will try to flush some growing segment into sealed
# until the total bytes of growing segment is less than this threshold, 0.2 by default. # until the total bytes of growing segment is less than this threshold, 0.1 by default.
growingSegmentBytesLwmThreshold: 0.2 growingSegmentBytesLwmThreshold: 0.1
walRecovery: walRecovery:
# The interval of persist recovery info, 10s by default. # The interval of persist recovery info, 10s by default.
# Every the interval, the recovery info of wal will try to persist, and the checkpoint of wal can be advanced. # Every the interval, the recovery info of wal will try to persist, and the checkpoint of wal can be advanced.

View File

@ -5133,13 +5133,13 @@ func (p *dataNodeConfig) init(base *BaseTable) {
p.MaxParallelSyncMgrTasks = ParamItem{ p.MaxParallelSyncMgrTasks = ParamItem{
Key: "dataNode.dataSync.maxParallelSyncMgrTasks", Key: "dataNode.dataSync.maxParallelSyncMgrTasks",
Version: "2.3.4", Version: "2.3.4",
DefaultValue: "256", DefaultValue: "64",
Doc: "The max concurrent sync task number of datanode sync mgr globally", Doc: "The max concurrent sync task number of datanode sync mgr globally",
Formatter: func(v string) string { Formatter: func(v string) string {
concurrency := getAsInt(v) concurrency := getAsInt(v)
if concurrency < 1 { if concurrency < 1 {
log.Warn("positive parallel task number, reset to default 256", zap.String("value", v)) log.Warn("positive parallel task number, reset to default 64", zap.String("value", v))
return "256" // MaxParallelSyncMgrTasks must >= 1 return "64" // MaxParallelSyncMgrTasks must >= 1
} }
return strconv.FormatInt(int64(concurrency), 10) return strconv.FormatInt(int64(concurrency), 10)
}, },
@ -5671,8 +5671,8 @@ the value should be in the range of (0, 1), 0.6 by default.`,
Version: "2.6.0", Version: "2.6.0",
Doc: `The high watermark of total growing segment bytes for one streaming node, Doc: `The high watermark of total growing segment bytes for one streaming node,
If the total bytes of growing segment is greater than this threshold, If the total bytes of growing segment is greater than this threshold,
a flush process will be triggered to decrease total bytes of growing segment until growingSegmentBytesLwmThreshold, 0.4 by default`, a flush process will be triggered to decrease total bytes of growing segment until growingSegmentBytesLwmThreshold, 0.2 by default`,
DefaultValue: "0.4", DefaultValue: "0.2",
Export: true, Export: true,
} }
p.FlushGrowingSegmentBytesHwmThreshold.Init(base.mgr) p.FlushGrowingSegmentBytesHwmThreshold.Init(base.mgr)
@ -5682,8 +5682,8 @@ a flush process will be triggered to decrease total bytes of growing segment unt
Version: "2.6.0", Version: "2.6.0",
Doc: `The lower watermark of total growing segment bytes for one streaming node, Doc: `The lower watermark of total growing segment bytes for one streaming node,
growing segment flush process will try to flush some growing segment into sealed growing segment flush process will try to flush some growing segment into sealed
until the total bytes of growing segment is less than this threshold, 0.2 by default.`, until the total bytes of growing segment is less than this threshold, 0.1 by default.`,
DefaultValue: "0.2", DefaultValue: "0.1",
Export: true, Export: true,
} }
p.FlushGrowingSegmentBytesLwmThreshold.Init(base.mgr) p.FlushGrowingSegmentBytesLwmThreshold.Init(base.mgr)

View File

@ -633,8 +633,8 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 100, params.StreamingCfg.WALRecoveryMaxDirtyMessage.GetAsInt()) assert.Equal(t, 100, params.StreamingCfg.WALRecoveryMaxDirtyMessage.GetAsInt())
assert.Equal(t, 10*time.Second, params.StreamingCfg.WALRecoveryPersistInterval.GetAsDurationByParse()) assert.Equal(t, 10*time.Second, params.StreamingCfg.WALRecoveryPersistInterval.GetAsDurationByParse())
assert.Equal(t, float64(0.6), params.StreamingCfg.FlushMemoryThreshold.GetAsFloat()) assert.Equal(t, float64(0.6), params.StreamingCfg.FlushMemoryThreshold.GetAsFloat())
assert.Equal(t, float64(0.4), params.StreamingCfg.FlushGrowingSegmentBytesHwmThreshold.GetAsFloat()) assert.Equal(t, float64(0.2), params.StreamingCfg.FlushGrowingSegmentBytesHwmThreshold.GetAsFloat())
assert.Equal(t, float64(0.2), params.StreamingCfg.FlushGrowingSegmentBytesLwmThreshold.GetAsFloat()) assert.Equal(t, float64(0.1), params.StreamingCfg.FlushGrowingSegmentBytesLwmThreshold.GetAsFloat())
assert.Equal(t, 1*time.Minute, params.StreamingCfg.WALTruncateSampleInterval.GetAsDurationByParse()) assert.Equal(t, 1*time.Minute, params.StreamingCfg.WALTruncateSampleInterval.GetAsDurationByParse())
assert.Equal(t, 5*time.Minute, params.StreamingCfg.WALTruncateRetentionInterval.GetAsDurationByParse()) assert.Equal(t, 5*time.Minute, params.StreamingCfg.WALTruncateRetentionInterval.GetAsDurationByParse())