diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 8ea48f70ab..fd601f1159 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -491,8 +491,6 @@ dataNode: memory: forceSyncEnable: true # `true` to force sync if memory usage is too high forceSyncSegmentNum: 1 # number of segments to sync, segments with top largest buffer will be synced. - watermarkStandalone: 0.2 # memory watermark for standalone, upon reaching this watermark, segments will be synced. - watermarkCluster: 0.5 # memory watermark for cluster, upon reaching this watermark, segments will be synced. timetick: byRPC: true channel: diff --git a/internal/datanode/writebuffer/manager.go b/internal/datanode/writebuffer/manager.go index 60dc7d9343..6392a4bec9 100644 --- a/internal/datanode/writebuffer/manager.go +++ b/internal/datanode/writebuffer/manager.go @@ -114,7 +114,7 @@ func (m *bufferManager) memoryCheck() { } totalMemory := hardware.GetMemoryCount() - memoryWatermark := float64(totalMemory) * paramtable.Get().DataNodeCfg.MemoryWatermark.GetAsFloat() + memoryWatermark := float64(totalMemory) * paramtable.Get().DataNodeCfg.MemoryForceSyncWatermark.GetAsFloat() if float64(total) < memoryWatermark { log.RatedDebug(20, "skip force sync because memory level is not high enough", zap.Float64("current_total_memory_usage", toMB(float64(total))), diff --git a/internal/datanode/writebuffer/manager_test.go b/internal/datanode/writebuffer/manager_test.go index 4338411aa3..910ab5f492 100644 --- a/internal/datanode/writebuffer/manager_test.go +++ b/internal/datanode/writebuffer/manager_test.go @@ -199,12 +199,12 @@ func (s *ManagerSuite) TestMemoryCheck() { param.Save(param.DataNodeCfg.MemoryCheckInterval.Key, "50") param.Save(param.DataNodeCfg.MemoryForceSyncEnable.Key, "false") - param.Save(param.DataNodeCfg.MemoryWatermark.Key, "0.7") + param.Save(param.DataNodeCfg.MemoryForceSyncWatermark.Key, "0.7") defer func() { param.Reset(param.DataNodeCfg.MemoryCheckInterval.Key) param.Reset(param.DataNodeCfg.MemoryForceSyncEnable.Key) - param.Reset(param.DataNodeCfg.MemoryWatermark.Key) + param.Reset(param.DataNodeCfg.MemoryForceSyncWatermark.Key) }() wb := NewMockWriteBuffer(s.T()) @@ -232,7 +232,7 @@ func (s *ManagerSuite) TestMemoryCheck() { <-time.After(time.Millisecond * 100) wb.AssertNotCalled(s.T(), "SetMemoryHighFlag") - param.Save(param.DataNodeCfg.MemoryWatermark.Key, "0.5") + param.Save(param.DataNodeCfg.MemoryForceSyncWatermark.Key, "0.5") <-signal wb.AssertExpectations(s.T()) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 16fb435a36..7b8ab9d477 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3188,7 +3188,7 @@ type dataNodeConfig struct { MemoryForceSyncEnable ParamItem `refreshable:"true"` MemoryForceSyncSegmentNum ParamItem `refreshable:"true"` MemoryCheckInterval ParamItem `refreshable:"true"` - MemoryWatermark ParamItem `refreshable:"true"` + MemoryForceSyncWatermark ParamItem `refreshable:"true"` DataNodeTimeTickByRPC ParamItem `refreshable:"false"` // DataNode send timetick interval per collection @@ -3319,26 +3319,20 @@ func (p *dataNodeConfig) init(base *BaseTable) { p.MemoryCheckInterval.Init(base.mgr) if os.Getenv(metricsinfo.DeployModeEnvKey) == metricsinfo.StandaloneDeployMode { - p.MemoryWatermark = ParamItem{ - Key: "datanode.memory.watermarkStandalone", - Version: "2.2.4", + p.MemoryForceSyncWatermark = ParamItem{ + Key: "datanode.memory.forceSyncWatermark", + Version: "2.4.0", DefaultValue: "0.2", } - } else if os.Getenv(metricsinfo.DeployModeEnvKey) == metricsinfo.ClusterDeployMode { - p.MemoryWatermark = ParamItem{ - Key: "datanode.memory.watermarkCluster", - Version: "2.2.4", - DefaultValue: "0.5", - } } else { log.Info("DeployModeEnv is not set, use default", zap.Float64("default", 0.5)) - p.MemoryWatermark = ParamItem{ - Key: "datanode.memory.watermarkCluster", - Version: "2.2.4", + p.MemoryForceSyncWatermark = ParamItem{ + Key: "datanode.memory.forceSyncWatermark", + Version: "2.4.0", DefaultValue: "0.5", } } - p.MemoryWatermark.Init(base.mgr) + p.MemoryForceSyncWatermark.Init(base.mgr) p.FlushDeleteBufferBytes = ParamItem{ Key: "dataNode.segment.deleteBufBytes",