diff --git a/configs/milvus.yaml b/configs/milvus.yaml index eaa24181c4..de67cbaf8d 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -327,9 +327,10 @@ dataNode: syncPeriod: 600 # Seconds, 10min memory: - forceSyncEnable: false # `true` to force sync if memory usage is too high - forceSyncThreshold: 0.6 # forceSync only take effects when memory usage ratio > forceSyncThreshold - forceSyncSegmentRatio: 0.3 # ratio of segments to sync, top largest forceSyncSegmentRatio segments will be synced + forceSyncEnable: true # `true` to force sync if memory usage is too high + forceSyncSegmentNum: 1 # number of segments to sync, segments with top largest buffer will be synced. + watermarkStandalone: 0.2 # memory watermark for standalone, upon reaching this watermark, segments will be synced. + watermarkCluster: 0.5 # memory watermark for cluster, upon reaching this watermark, segments will be synced. # Configures the system log output. log: diff --git a/internal/datanode/channel_meta.go b/internal/datanode/channel_meta.go index 718c565b2e..aabdb2b2a6 100644 --- a/internal/datanode/channel_meta.go +++ b/internal/datanode/channel_meta.go @@ -24,6 +24,10 @@ import ( "sync" "time" + "github.com/samber/lo" + "go.uber.org/atomic" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" @@ -33,8 +37,6 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" - "github.com/samber/lo" - "go.uber.org/zap" ) type ( @@ -88,6 +90,10 @@ type Channel interface { setCurDeleteBuffer(segmentID UniqueID, buf *DelDataBuf) rollDeleteBuffer(segmentID UniqueID) evictHistoryDeleteBuffer(segmentID UniqueID, endPos *internalpb.MsgPosition) + + // getTotalMemorySize returns the sum of memory sizes of segments. + getTotalMemorySize() int64 + forceToSync() } // ChannelMeta contains channel meta and the latest segments infos of the channel. @@ -100,6 +106,7 @@ type ChannelMeta struct { segMu sync.RWMutex segments map[UniqueID]*Segment + needToSync *atomic.Bool syncPolicies []segmentSyncPolicy metaService *metaService @@ -118,6 +125,7 @@ func newChannel(channelName string, collID UniqueID, schema *schemapb.Collection segments: make(map[UniqueID]*Segment), + needToSync: atomic.NewBool(false), syncPolicies: []segmentSyncPolicy{ syncPeriodically(), syncMemoryTooHigh(), @@ -252,20 +260,14 @@ func (c *ChannelMeta) listSegmentIDsToSync(ts Timestamp) []UniqueID { validSegs = append(validSegs, seg) } - segIDsToSync := make([]UniqueID, 0) - toSyncSegIDDict := make(map[UniqueID]bool, 0) + segIDsToSync := typeutil.NewUniqueSet() for _, policy := range c.syncPolicies { - toSyncSegments := policy(validSegs, ts) - for _, segID := range toSyncSegments { - if _, ok := toSyncSegIDDict[segID]; ok { - continue - } else { - toSyncSegIDDict[segID] = true - segIDsToSync = append(segIDsToSync, segID) - } + segments := policy(validSegs, ts, c.needToSync) + for _, segID := range segments { + segIDsToSync.Insert(segID) } } - return segIDsToSync + return segIDsToSync.Collect() } func (c *ChannelMeta) setSegmentLastSyncTs(segID UniqueID, ts Timestamp) { @@ -802,3 +804,17 @@ func (c *ChannelMeta) evictHistoryDeleteBuffer(segmentID UniqueID, endPos *inter } log.Warn("cannot find segment when evictHistoryDeleteBuffer", zap.Int64("segmentID", segmentID)) } + +func (c *ChannelMeta) forceToSync() { + c.needToSync.Store(true) +} + +func (c *ChannelMeta) getTotalMemorySize() int64 { + c.segMu.RLock() + defer c.segMu.RUnlock() + var res int64 + for _, segment := range c.segments { + res += segment.memorySize + } + return res +} diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index ba86a2240f..e9b1caaede 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -519,6 +519,8 @@ func (node *DataNode) Start() error { // Start node watch node go node.StartWatchChannels(node.ctx) + go node.flowgraphManager.start() + Params.DataNodeCfg.CreatedTime = time.Now() Params.DataNodeCfg.UpdatedTime = time.Now() @@ -702,6 +704,7 @@ func (node *DataNode) Stop() error { node.cancel() node.flowgraphManager.dropAll() + node.flowgraphManager.stop() if node.rowIDAllocator != nil { log.Info("close id allocator", zap.String("role", typeutil.DataNodeRole)) diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 28ba0fab1d..9a40aef98d 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -62,7 +62,7 @@ func TestMain(t *testing.M) { defer os.RemoveAll(path) Params.DataNodeCfg.InitAlias("datanode-alias-1") - Params.Init() + Params.InitOnce() // change to specific channel for test Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int()) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index d608fe4cf1..930d5a14c3 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -62,7 +62,6 @@ type insertBufferNode struct { ttLogger *timeTickLogger ttMerger *mergedTimeTickerSender - syncPolicies []segmentSyncPolicy lastTimestamp Timestamp } @@ -300,9 +299,14 @@ func (ibNode *insertBufferNode) DisplayStatistics(seg2Upload []UniqueID) { // updateSegmentsMemorySize updates segments' memory size in channel meta func (ibNode *insertBufferNode) updateSegmentsMemorySize(seg2Upload []UniqueID) { for _, segID := range seg2Upload { - if bd, ok := ibNode.channel.getCurInsertBuffer(segID); ok { - ibNode.channel.updateSegmentMemorySize(segID, bd.memorySize()) + var memorySize int64 + if buffer, ok := ibNode.channel.getCurInsertBuffer(segID); ok { + memorySize += buffer.memorySize() } + if buffer, ok := ibNode.channel.getCurDeleteBuffer(segID); ok { + memorySize += buffer.GetLogSize() + } + ibNode.channel.updateSegmentMemorySize(segID, memorySize) } } @@ -458,6 +462,7 @@ func (ibNode *insertBufferNode) FillInSyncTasks(fgMsg *flowGraphMsg, seg2Upload func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID, endPosition *internalpb.MsgPosition) []UniqueID { syncTasks := ibNode.FillInSyncTasks(fgMsg, seg2Upload) segmentsToSync := make([]UniqueID, 0, len(syncTasks)) + ibNode.channel.(*ChannelMeta).needToSync.Store(false) for _, task := range syncTasks { log.Info("insertBufferNode syncing BufferData", diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 23b507db63..477c963fe8 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -42,6 +42,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.uber.org/atomic" ) var insertNodeTestDir = "/tmp/milvus_test/insert_node" @@ -346,6 +347,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { channel := &ChannelMeta{ collectionID: collMeta.ID, segments: make(map[UniqueID]*Segment), + needToSync: atomic.NewBool(false), } channel.metaService = newMetaService(mockRootCoord, collMeta.ID) @@ -589,6 +591,7 @@ func TestRollBF(t *testing.T) { channel := &ChannelMeta{ collectionID: collMeta.ID, segments: make(map[UniqueID]*Segment), + needToSync: atomic.NewBool(false), } channel.metaService = newMetaService(mockRootCoord, collMeta.ID) diff --git a/internal/datanode/flow_graph_manager.go b/internal/datanode/flow_graph_manager.go index 78607dea93..f730e6aedd 100644 --- a/internal/datanode/flow_graph_manager.go +++ b/internal/datanode/flow_graph_manager.go @@ -18,22 +18,86 @@ package datanode import ( "fmt" + "sort" "sync" + "time" + + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/datapb" - - "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/util/hardware" ) type flowgraphManager struct { flowgraphs sync.Map // vChannelName -> dataSyncService + + closeCh chan struct{} + closeOnce sync.Once } func newFlowgraphManager() *flowgraphManager { - return &flowgraphManager{} + return &flowgraphManager{ + closeCh: make(chan struct{}), + } +} + +func (fm *flowgraphManager) start() { + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + for { + select { + case <-fm.closeCh: + return + case <-ticker.C: + fm.execute(hardware.GetMemoryCount()) + } + } +} + +func (fm *flowgraphManager) stop() { + fm.closeOnce.Do(func() { + close(fm.closeCh) + }) +} + +func (fm *flowgraphManager) execute(totalMemory uint64) { + if !Params.DataNodeCfg.MemoryForceSyncEnable { + return + } + + var total int64 + channels := make([]struct { + channel string + bufferSize int64 + }, 0) + fm.flowgraphs.Range(func(key, value interface{}) bool { + size := value.(*dataSyncService).channel.getTotalMemorySize() + channels = append(channels, struct { + channel string + bufferSize int64 + }{key.(string), size}) + total += size + return true + }) + if len(channels) == 0 { + return + } + + if float64(total) < float64(totalMemory)*Params.DataNodeCfg.MemoryWatermark { + return + } + + sort.Slice(channels, func(i, j int) bool { + return channels[i].bufferSize > channels[j].bufferSize + }) + if fg, ok := fm.flowgraphs.Load(channels[0].channel); ok { // sync the first channel with the largest memory usage + fg.(*dataSyncService).channel.forceToSync() + log.Info("notify flowgraph to sync", + zap.String("channel", channels[0].channel), zap.Int64("bufferSize", channels[0].bufferSize)) + } } func (fm *flowgraphManager) addAndStart(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *tickler) error { diff --git a/internal/datanode/flow_graph_manager_test.go b/internal/datanode/flow_graph_manager_test.go index 4707058ade..5671c93e32 100644 --- a/internal/datanode/flow_graph_manager_test.go +++ b/internal/datanode/flow_graph_manager_test.go @@ -18,6 +18,7 @@ package datanode import ( "context" + "fmt" "testing" "github.com/milvus-io/milvus-proto/go-api/schemapb" @@ -190,4 +191,48 @@ func TestFlowGraphManager(t *testing.T) { assert.False(t, ok) assert.Nil(t, fg) }) + + t.Run("test execute", func(t *testing.T) { + tests := []struct { + testName string + totalMemory uint64 + watermark float64 + memorySizes []int64 + expectNeedToSync []bool + }{ + {"test over the watermark", 100, 0.5, + []int64{15, 16, 17, 18}, []bool{false, false, false, true}}, + {"test below the watermark", 100, 0.5, + []int64{1, 2, 3, 4}, []bool{false, false, false, false}}, + } + + fm.dropAll() + const channelPrefix = "by-dev-rootcoord-dml-test-fg-mgr-execute-" + for _, test := range tests { + t.Run(test.testName, func(t *testing.T) { + Params.DataNodeCfg.MemoryWatermark = test.watermark + for i, memorySize := range test.memorySizes { + vchannel := fmt.Sprintf("%s%d", channelPrefix, i) + vchan := &datapb.VchannelInfo{ + ChannelName: vchannel, + } + err = fm.addAndStart(node, vchan, nil, genTestTickler()) + assert.NoError(t, err) + fg, ok := fm.flowgraphs.Load(vchannel) + assert.True(t, ok) + err = fg.(*dataSyncService).channel.addSegment(addSegmentReq{segID: 0}) + assert.NoError(t, err) + fg.(*dataSyncService).channel.updateSegmentMemorySize(0, memorySize) + fg.(*dataSyncService).channel.(*ChannelMeta).needToSync.Store(false) + } + fm.execute(test.totalMemory) + for i, needToSync := range test.expectNeedToSync { + vchannel := fmt.Sprintf("%s%d", channelPrefix, i) + fg, ok := fm.flowgraphs.Load(vchannel) + assert.True(t, ok) + assert.Equal(t, needToSync, fg.(*dataSyncService).channel.(*ChannelMeta).needToSync.Load()) + } + }) + } + }) } diff --git a/internal/datanode/io_pool_test.go b/internal/datanode/io_pool_test.go index 7f51aa02fa..b6217976fb 100644 --- a/internal/datanode/io_pool_test.go +++ b/internal/datanode/io_pool_test.go @@ -10,7 +10,6 @@ import ( ) func Test_getOrCreateIOPool(t *testing.T) { - Params.InitOnce() ioConcurrency := Params.DataNodeCfg.IOConcurrency Params.DataNodeCfg.IOConcurrency = 64 defer func() { Params.DataNodeCfg.IOConcurrency = ioConcurrency }() diff --git a/internal/datanode/segment_sync_policy.go b/internal/datanode/segment_sync_policy.go index fa30be76a5..a70efba8b0 100644 --- a/internal/datanode/segment_sync_policy.go +++ b/internal/datanode/segment_sync_policy.go @@ -20,18 +20,21 @@ import ( "math" "sort" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/hardware" - "github.com/milvus-io/milvus/internal/util/tsoutil" + "go.uber.org/atomic" "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/tsoutil" ) +const minSyncSize = 0.5 * 1024 * 1024 + // segmentsSyncPolicy sync policy applies to segments -type segmentSyncPolicy func(segments []*Segment, ts Timestamp) []UniqueID +type segmentSyncPolicy func(segments []*Segment, ts Timestamp, needToSync *atomic.Bool) []UniqueID // syncPeriodically get segmentSyncPolicy with segments sync periodically. func syncPeriodically() segmentSyncPolicy { - return func(segments []*Segment, ts Timestamp) []UniqueID { + return func(segments []*Segment, ts Timestamp, _ *atomic.Bool) []UniqueID { segsToSync := make([]UniqueID, 0) for _, seg := range segments { endTime := tsoutil.PhysicalTime(ts) @@ -51,25 +54,24 @@ func syncPeriodically() segmentSyncPolicy { // syncMemoryTooHigh force sync the largest segment. func syncMemoryTooHigh() segmentSyncPolicy { - return func(segments []*Segment, ts Timestamp) []UniqueID { - if Params.DataNodeCfg.MemoryForceSyncEnable && - hardware.GetMemoryUseRatio() >= Params.DataNodeCfg.MemoryForceSyncThreshold && - len(segments) >= 1 { - toSyncSegmentNum := int(math.Max(float64(len(segments))*Params.DataNodeCfg.MemoryForceSyncSegmentRatio, 1.0)) - toSyncSegmentIDs := make([]UniqueID, 0) - sort.Slice(segments, func(i, j int) bool { - return segments[i].memorySize > segments[j].memorySize - }) - for i := 0; i < toSyncSegmentNum; i++ { - toSyncSegmentIDs = append(toSyncSegmentIDs, segments[i].segmentID) - } - log.Debug("sync segment due to memory usage is too high", - zap.Int64s("toSyncSegmentIDs", toSyncSegmentIDs), - zap.Int("inputSegmentNum", len(segments)), - zap.Int("toSyncSegmentNum", len(toSyncSegmentIDs)), - zap.Float64("memoryUsageRatio", hardware.GetMemoryUseRatio())) - return toSyncSegmentIDs + return func(segments []*Segment, ts Timestamp, needToSync *atomic.Bool) []UniqueID { + if len(segments) == 0 || !needToSync.Load() { + return nil } - return []UniqueID{} + sort.Slice(segments, func(i, j int) bool { + return segments[i].memorySize > segments[j].memorySize + }) + syncSegments := make([]UniqueID, 0) + syncSegmentsNum := math.Min(float64(Params.DataNodeCfg.MemoryForceSyncSegmentNum), float64(len(segments))) + for i := 0; i < int(syncSegmentsNum); i++ { + if segments[i].memorySize < minSyncSize { // prevent generating too many small binlogs + break + } + syncSegments = append(syncSegments, segments[i].segmentID) + log.Info("sync segment due to memory usage is too high", + zap.Int64("segmentID", segments[i].segmentID), + zap.Int64("memorySize", segments[i].memorySize)) + } + return syncSegments } } diff --git a/internal/datanode/segment_sync_policy_test.go b/internal/datanode/segment_sync_policy_test.go index 4bad755827..27fa3c85e3 100644 --- a/internal/datanode/segment_sync_policy_test.go +++ b/internal/datanode/segment_sync_policy_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "go.uber.org/atomic" "github.com/milvus-io/milvus/internal/util/tsoutil" ) @@ -49,26 +50,46 @@ func TestSyncPeriodically(t *testing.T) { if !test.isBufferEmpty { segment.curInsertBuf = &BufferData{} } - res := policy([]*Segment{segment}, tsoutil.ComposeTSByTime(test.ts, 0)) + res := policy([]*Segment{segment}, tsoutil.ComposeTSByTime(test.ts, 0), nil) assert.Equal(t, test.shouldSyncNum, len(res)) }) } } func TestSyncMemoryTooHigh(t *testing.T) { - s1 := &Segment{segmentID: 1, memorySize: 1} - s2 := &Segment{segmentID: 2, memorySize: 2} - s3 := &Segment{segmentID: 3, memorySize: 3} - s4 := &Segment{segmentID: 4, memorySize: 4} - s5 := &Segment{segmentID: 5, memorySize: 5} + tests := []struct { + testName string + syncSegmentNum int + needToSync bool + memorySizesInMB []float64 + shouldSyncSegs []UniqueID + }{ + {"test normal 1", 3, true, + []float64{1, 2, 3, 4, 5}, []UniqueID{5, 4, 3}}, + {"test normal 2", 2, true, + []float64{1, 2, 3, 4, 5}, []UniqueID{5, 4}}, + {"test normal 3", 5, true, + []float64{1, 2, 3, 4, 5}, []UniqueID{5, 4, 3, 2, 1}}, + {"test needToSync false", 3, false, + []float64{1, 2, 3, 4, 5}, []UniqueID{}}, + {"test syncSegmentNum 1", 1, true, + []float64{1, 2, 3, 4, 5}, []UniqueID{5}}, + {"test with small segment", 3, true, + []float64{0.1, 0.1, 0.1, 4, 5}, []UniqueID{5, 4}}, + } - Params.DataNodeCfg.MemoryForceSyncEnable = true - Params.DataNodeCfg.MemoryForceSyncThreshold = 0.0 - Params.DataNodeCfg.MemoryForceSyncSegmentRatio = 0.6 - policy := syncMemoryTooHigh() - segs := policy([]*Segment{s3, s4, s2, s1, s5}, 0) - assert.Equal(t, 3, len(segs)) - assert.Equal(t, int64(5), segs[0]) - assert.Equal(t, int64(4), segs[1]) - assert.Equal(t, int64(3), segs[2]) + for _, test := range tests { + t.Run(test.testName, func(t *testing.T) { + Params.DataNodeCfg.MemoryForceSyncSegmentNum = test.syncSegmentNum + policy := syncMemoryTooHigh() + segments := make([]*Segment, len(test.memorySizesInMB)) + for i := range segments { + segments[i] = &Segment{ + segmentID: UniqueID(i + 1), memorySize: int64(test.memorySizesInMB[i] * 1024 * 1024), + } + } + segs := policy(segments, 0, atomic.NewBool(test.needToSync)) + assert.ElementsMatch(t, segs, test.shouldSyncSegs) + }) + } } diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index c14a65a48c..6348611113 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -22,6 +22,8 @@ import ( "time" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/shirou/gopsutil/v3/disk" "go.uber.org/zap" ) @@ -1566,9 +1568,9 @@ type dataNodeConfig struct { UpdatedTime time.Time // memory management - MemoryForceSyncEnable bool - MemoryForceSyncThreshold float64 - MemoryForceSyncSegmentRatio float64 + MemoryForceSyncEnable bool + MemoryForceSyncSegmentNum int + MemoryWatermark float64 } func (p *dataNodeConfig) init(base *BaseTable) { @@ -1585,8 +1587,8 @@ func (p *dataNodeConfig) init(base *BaseTable) { p.initChannelWatchPath() p.initMemoryForceSyncEnable() - p.initMemoryForceSyncRatio() - p.initMemoryForceSyncSegmentRatio() + p.initMemoryWatermark() + p.initMemoryForceSyncSegmentNum() } // InitAlias init this DataNode alias @@ -1659,12 +1661,24 @@ func (p *dataNodeConfig) initMemoryForceSyncEnable() { p.MemoryForceSyncEnable = p.Base.ParseBool("datanode.memory.forceSyncEnable", true) } -func (p *dataNodeConfig) initMemoryForceSyncRatio() { - p.MemoryForceSyncThreshold = p.Base.ParseFloatWithDefault("datanode.memory.forceSyncThreshold", 0.7) +func (p *dataNodeConfig) initMemoryWatermark() { + if os.Getenv(metricsinfo.DeployModeEnvKey) == metricsinfo.StandaloneDeployMode { + p.MemoryWatermark = p.Base.ParseFloatWithDefault("datanode.memory.watermarkStandalone", 0.2) + return + } + if os.Getenv(metricsinfo.DeployModeEnvKey) == metricsinfo.ClusterDeployMode { + p.MemoryWatermark = p.Base.ParseFloatWithDefault("datanode.memory.watermarkCluster", 0.5) + return + } + log.Warn("DeployModeEnv is not set, use default", zap.Float64("default", 0.5)) + p.MemoryWatermark = 0.5 } -func (p *dataNodeConfig) initMemoryForceSyncSegmentRatio() { - p.MemoryForceSyncSegmentRatio = p.Base.ParseFloatWithDefault("datanode.memory.forceSyncSegmentRatio", 0.3) +func (p *dataNodeConfig) initMemoryForceSyncSegmentNum() { + p.MemoryForceSyncSegmentNum = p.Base.ParseIntWithDefault("datanode.memory.forceSyncSegmentNum", 1) + if p.MemoryForceSyncSegmentNum < 1 { + p.MemoryForceSyncSegmentNum = 1 + } } // /////////////////////////////////////////////////////////////////////////////