Use buffer size at memory sync policy (#22797)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2023-03-21 12:11:57 +08:00 committed by GitHub
parent e6296a92df
commit fce815e414
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 245 additions and 72 deletions

View File

@ -327,9 +327,10 @@ dataNode:
syncPeriod: 600 # Seconds, 10min
memory:
forceSyncEnable: false # `true` to force sync if memory usage is too high
forceSyncThreshold: 0.6 # forceSync only take effects when memory usage ratio > forceSyncThreshold
forceSyncSegmentRatio: 0.3 # ratio of segments to sync, top largest forceSyncSegmentRatio segments will be synced
forceSyncEnable: true # `true` to force sync if memory usage is too high
forceSyncSegmentNum: 1 # number of segments to sync, segments with top largest buffer will be synced.
watermarkStandalone: 0.2 # memory watermark for standalone, upon reaching this watermark, segments will be synced.
watermarkCluster: 0.5 # memory watermark for cluster, upon reaching this watermark, segments will be synced.
# Configures the system log output.
log:

View File

@ -24,6 +24,10 @@ import (
"sync"
"time"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
@ -33,8 +37,6 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/samber/lo"
"go.uber.org/zap"
)
type (
@ -88,6 +90,10 @@ type Channel interface {
setCurDeleteBuffer(segmentID UniqueID, buf *DelDataBuf)
rollDeleteBuffer(segmentID UniqueID)
evictHistoryDeleteBuffer(segmentID UniqueID, endPos *internalpb.MsgPosition)
// getTotalMemorySize returns the sum of memory sizes of segments.
getTotalMemorySize() int64
forceToSync()
}
// ChannelMeta contains channel meta and the latest segments infos of the channel.
@ -100,6 +106,7 @@ type ChannelMeta struct {
segMu sync.RWMutex
segments map[UniqueID]*Segment
needToSync *atomic.Bool
syncPolicies []segmentSyncPolicy
metaService *metaService
@ -118,6 +125,7 @@ func newChannel(channelName string, collID UniqueID, schema *schemapb.Collection
segments: make(map[UniqueID]*Segment),
needToSync: atomic.NewBool(false),
syncPolicies: []segmentSyncPolicy{
syncPeriodically(),
syncMemoryTooHigh(),
@ -252,20 +260,14 @@ func (c *ChannelMeta) listSegmentIDsToSync(ts Timestamp) []UniqueID {
validSegs = append(validSegs, seg)
}
segIDsToSync := make([]UniqueID, 0)
toSyncSegIDDict := make(map[UniqueID]bool, 0)
segIDsToSync := typeutil.NewUniqueSet()
for _, policy := range c.syncPolicies {
toSyncSegments := policy(validSegs, ts)
for _, segID := range toSyncSegments {
if _, ok := toSyncSegIDDict[segID]; ok {
continue
} else {
toSyncSegIDDict[segID] = true
segIDsToSync = append(segIDsToSync, segID)
}
segments := policy(validSegs, ts, c.needToSync)
for _, segID := range segments {
segIDsToSync.Insert(segID)
}
}
return segIDsToSync
return segIDsToSync.Collect()
}
func (c *ChannelMeta) setSegmentLastSyncTs(segID UniqueID, ts Timestamp) {
@ -802,3 +804,17 @@ func (c *ChannelMeta) evictHistoryDeleteBuffer(segmentID UniqueID, endPos *inter
}
log.Warn("cannot find segment when evictHistoryDeleteBuffer", zap.Int64("segmentID", segmentID))
}
func (c *ChannelMeta) forceToSync() {
c.needToSync.Store(true)
}
func (c *ChannelMeta) getTotalMemorySize() int64 {
c.segMu.RLock()
defer c.segMu.RUnlock()
var res int64
for _, segment := range c.segments {
res += segment.memorySize
}
return res
}

View File

@ -519,6 +519,8 @@ func (node *DataNode) Start() error {
// Start node watch node
go node.StartWatchChannels(node.ctx)
go node.flowgraphManager.start()
Params.DataNodeCfg.CreatedTime = time.Now()
Params.DataNodeCfg.UpdatedTime = time.Now()
@ -702,6 +704,7 @@ func (node *DataNode) Stop() error {
node.cancel()
node.flowgraphManager.dropAll()
node.flowgraphManager.stop()
if node.rowIDAllocator != nil {
log.Info("close id allocator", zap.String("role", typeutil.DataNodeRole))

View File

@ -62,7 +62,7 @@ func TestMain(t *testing.M) {
defer os.RemoveAll(path)
Params.DataNodeCfg.InitAlias("datanode-alias-1")
Params.Init()
Params.InitOnce()
// change to specific channel for test
Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())

View File

@ -62,7 +62,6 @@ type insertBufferNode struct {
ttLogger *timeTickLogger
ttMerger *mergedTimeTickerSender
syncPolicies []segmentSyncPolicy
lastTimestamp Timestamp
}
@ -300,9 +299,14 @@ func (ibNode *insertBufferNode) DisplayStatistics(seg2Upload []UniqueID) {
// updateSegmentsMemorySize updates segments' memory size in channel meta
func (ibNode *insertBufferNode) updateSegmentsMemorySize(seg2Upload []UniqueID) {
for _, segID := range seg2Upload {
if bd, ok := ibNode.channel.getCurInsertBuffer(segID); ok {
ibNode.channel.updateSegmentMemorySize(segID, bd.memorySize())
var memorySize int64
if buffer, ok := ibNode.channel.getCurInsertBuffer(segID); ok {
memorySize += buffer.memorySize()
}
if buffer, ok := ibNode.channel.getCurDeleteBuffer(segID); ok {
memorySize += buffer.GetLogSize()
}
ibNode.channel.updateSegmentMemorySize(segID, memorySize)
}
}
@ -458,6 +462,7 @@ func (ibNode *insertBufferNode) FillInSyncTasks(fgMsg *flowGraphMsg, seg2Upload
func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID, endPosition *internalpb.MsgPosition) []UniqueID {
syncTasks := ibNode.FillInSyncTasks(fgMsg, seg2Upload)
segmentsToSync := make([]UniqueID, 0, len(syncTasks))
ibNode.channel.(*ChannelMeta).needToSync.Store(false)
for _, task := range syncTasks {
log.Info("insertBufferNode syncing BufferData",

View File

@ -42,6 +42,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
)
var insertNodeTestDir = "/tmp/milvus_test/insert_node"
@ -346,6 +347,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
channel := &ChannelMeta{
collectionID: collMeta.ID,
segments: make(map[UniqueID]*Segment),
needToSync: atomic.NewBool(false),
}
channel.metaService = newMetaService(mockRootCoord, collMeta.ID)
@ -589,6 +591,7 @@ func TestRollBF(t *testing.T) {
channel := &ChannelMeta{
collectionID: collMeta.ID,
segments: make(map[UniqueID]*Segment),
needToSync: atomic.NewBool(false),
}
channel.metaService = newMetaService(mockRootCoord, collMeta.ID)

View File

@ -18,22 +18,86 @@ package datanode
import (
"fmt"
"sort"
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/hardware"
)
type flowgraphManager struct {
flowgraphs sync.Map // vChannelName -> dataSyncService
closeCh chan struct{}
closeOnce sync.Once
}
func newFlowgraphManager() *flowgraphManager {
return &flowgraphManager{}
return &flowgraphManager{
closeCh: make(chan struct{}),
}
}
func (fm *flowgraphManager) start() {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-fm.closeCh:
return
case <-ticker.C:
fm.execute(hardware.GetMemoryCount())
}
}
}
func (fm *flowgraphManager) stop() {
fm.closeOnce.Do(func() {
close(fm.closeCh)
})
}
func (fm *flowgraphManager) execute(totalMemory uint64) {
if !Params.DataNodeCfg.MemoryForceSyncEnable {
return
}
var total int64
channels := make([]struct {
channel string
bufferSize int64
}, 0)
fm.flowgraphs.Range(func(key, value interface{}) bool {
size := value.(*dataSyncService).channel.getTotalMemorySize()
channels = append(channels, struct {
channel string
bufferSize int64
}{key.(string), size})
total += size
return true
})
if len(channels) == 0 {
return
}
if float64(total) < float64(totalMemory)*Params.DataNodeCfg.MemoryWatermark {
return
}
sort.Slice(channels, func(i, j int) bool {
return channels[i].bufferSize > channels[j].bufferSize
})
if fg, ok := fm.flowgraphs.Load(channels[0].channel); ok { // sync the first channel with the largest memory usage
fg.(*dataSyncService).channel.forceToSync()
log.Info("notify flowgraph to sync",
zap.String("channel", channels[0].channel), zap.Int64("bufferSize", channels[0].bufferSize))
}
}
func (fm *flowgraphManager) addAndStart(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *tickler) error {

View File

@ -18,6 +18,7 @@ package datanode
import (
"context"
"fmt"
"testing"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
@ -190,4 +191,48 @@ func TestFlowGraphManager(t *testing.T) {
assert.False(t, ok)
assert.Nil(t, fg)
})
t.Run("test execute", func(t *testing.T) {
tests := []struct {
testName string
totalMemory uint64
watermark float64
memorySizes []int64
expectNeedToSync []bool
}{
{"test over the watermark", 100, 0.5,
[]int64{15, 16, 17, 18}, []bool{false, false, false, true}},
{"test below the watermark", 100, 0.5,
[]int64{1, 2, 3, 4}, []bool{false, false, false, false}},
}
fm.dropAll()
const channelPrefix = "by-dev-rootcoord-dml-test-fg-mgr-execute-"
for _, test := range tests {
t.Run(test.testName, func(t *testing.T) {
Params.DataNodeCfg.MemoryWatermark = test.watermark
for i, memorySize := range test.memorySizes {
vchannel := fmt.Sprintf("%s%d", channelPrefix, i)
vchan := &datapb.VchannelInfo{
ChannelName: vchannel,
}
err = fm.addAndStart(node, vchan, nil, genTestTickler())
assert.NoError(t, err)
fg, ok := fm.flowgraphs.Load(vchannel)
assert.True(t, ok)
err = fg.(*dataSyncService).channel.addSegment(addSegmentReq{segID: 0})
assert.NoError(t, err)
fg.(*dataSyncService).channel.updateSegmentMemorySize(0, memorySize)
fg.(*dataSyncService).channel.(*ChannelMeta).needToSync.Store(false)
}
fm.execute(test.totalMemory)
for i, needToSync := range test.expectNeedToSync {
vchannel := fmt.Sprintf("%s%d", channelPrefix, i)
fg, ok := fm.flowgraphs.Load(vchannel)
assert.True(t, ok)
assert.Equal(t, needToSync, fg.(*dataSyncService).channel.(*ChannelMeta).needToSync.Load())
}
})
}
})
}

View File

@ -10,7 +10,6 @@ import (
)
func Test_getOrCreateIOPool(t *testing.T) {
Params.InitOnce()
ioConcurrency := Params.DataNodeCfg.IOConcurrency
Params.DataNodeCfg.IOConcurrency = 64
defer func() { Params.DataNodeCfg.IOConcurrency = ioConcurrency }()

View File

@ -20,18 +20,21 @@ import (
"math"
"sort"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/tsoutil"
)
const minSyncSize = 0.5 * 1024 * 1024
// segmentsSyncPolicy sync policy applies to segments
type segmentSyncPolicy func(segments []*Segment, ts Timestamp) []UniqueID
type segmentSyncPolicy func(segments []*Segment, ts Timestamp, needToSync *atomic.Bool) []UniqueID
// syncPeriodically get segmentSyncPolicy with segments sync periodically.
func syncPeriodically() segmentSyncPolicy {
return func(segments []*Segment, ts Timestamp) []UniqueID {
return func(segments []*Segment, ts Timestamp, _ *atomic.Bool) []UniqueID {
segsToSync := make([]UniqueID, 0)
for _, seg := range segments {
endTime := tsoutil.PhysicalTime(ts)
@ -51,25 +54,24 @@ func syncPeriodically() segmentSyncPolicy {
// syncMemoryTooHigh force sync the largest segment.
func syncMemoryTooHigh() segmentSyncPolicy {
return func(segments []*Segment, ts Timestamp) []UniqueID {
if Params.DataNodeCfg.MemoryForceSyncEnable &&
hardware.GetMemoryUseRatio() >= Params.DataNodeCfg.MemoryForceSyncThreshold &&
len(segments) >= 1 {
toSyncSegmentNum := int(math.Max(float64(len(segments))*Params.DataNodeCfg.MemoryForceSyncSegmentRatio, 1.0))
toSyncSegmentIDs := make([]UniqueID, 0)
sort.Slice(segments, func(i, j int) bool {
return segments[i].memorySize > segments[j].memorySize
})
for i := 0; i < toSyncSegmentNum; i++ {
toSyncSegmentIDs = append(toSyncSegmentIDs, segments[i].segmentID)
}
log.Debug("sync segment due to memory usage is too high",
zap.Int64s("toSyncSegmentIDs", toSyncSegmentIDs),
zap.Int("inputSegmentNum", len(segments)),
zap.Int("toSyncSegmentNum", len(toSyncSegmentIDs)),
zap.Float64("memoryUsageRatio", hardware.GetMemoryUseRatio()))
return toSyncSegmentIDs
return func(segments []*Segment, ts Timestamp, needToSync *atomic.Bool) []UniqueID {
if len(segments) == 0 || !needToSync.Load() {
return nil
}
return []UniqueID{}
sort.Slice(segments, func(i, j int) bool {
return segments[i].memorySize > segments[j].memorySize
})
syncSegments := make([]UniqueID, 0)
syncSegmentsNum := math.Min(float64(Params.DataNodeCfg.MemoryForceSyncSegmentNum), float64(len(segments)))
for i := 0; i < int(syncSegmentsNum); i++ {
if segments[i].memorySize < minSyncSize { // prevent generating too many small binlogs
break
}
syncSegments = append(syncSegments, segments[i].segmentID)
log.Info("sync segment due to memory usage is too high",
zap.Int64("segmentID", segments[i].segmentID),
zap.Int64("memorySize", segments[i].memorySize))
}
return syncSegments
}
}

View File

@ -21,6 +21,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
"github.com/milvus-io/milvus/internal/util/tsoutil"
)
@ -49,26 +50,46 @@ func TestSyncPeriodically(t *testing.T) {
if !test.isBufferEmpty {
segment.curInsertBuf = &BufferData{}
}
res := policy([]*Segment{segment}, tsoutil.ComposeTSByTime(test.ts, 0))
res := policy([]*Segment{segment}, tsoutil.ComposeTSByTime(test.ts, 0), nil)
assert.Equal(t, test.shouldSyncNum, len(res))
})
}
}
func TestSyncMemoryTooHigh(t *testing.T) {
s1 := &Segment{segmentID: 1, memorySize: 1}
s2 := &Segment{segmentID: 2, memorySize: 2}
s3 := &Segment{segmentID: 3, memorySize: 3}
s4 := &Segment{segmentID: 4, memorySize: 4}
s5 := &Segment{segmentID: 5, memorySize: 5}
tests := []struct {
testName string
syncSegmentNum int
needToSync bool
memorySizesInMB []float64
shouldSyncSegs []UniqueID
}{
{"test normal 1", 3, true,
[]float64{1, 2, 3, 4, 5}, []UniqueID{5, 4, 3}},
{"test normal 2", 2, true,
[]float64{1, 2, 3, 4, 5}, []UniqueID{5, 4}},
{"test normal 3", 5, true,
[]float64{1, 2, 3, 4, 5}, []UniqueID{5, 4, 3, 2, 1}},
{"test needToSync false", 3, false,
[]float64{1, 2, 3, 4, 5}, []UniqueID{}},
{"test syncSegmentNum 1", 1, true,
[]float64{1, 2, 3, 4, 5}, []UniqueID{5}},
{"test with small segment", 3, true,
[]float64{0.1, 0.1, 0.1, 4, 5}, []UniqueID{5, 4}},
}
Params.DataNodeCfg.MemoryForceSyncEnable = true
Params.DataNodeCfg.MemoryForceSyncThreshold = 0.0
Params.DataNodeCfg.MemoryForceSyncSegmentRatio = 0.6
policy := syncMemoryTooHigh()
segs := policy([]*Segment{s3, s4, s2, s1, s5}, 0)
assert.Equal(t, 3, len(segs))
assert.Equal(t, int64(5), segs[0])
assert.Equal(t, int64(4), segs[1])
assert.Equal(t, int64(3), segs[2])
for _, test := range tests {
t.Run(test.testName, func(t *testing.T) {
Params.DataNodeCfg.MemoryForceSyncSegmentNum = test.syncSegmentNum
policy := syncMemoryTooHigh()
segments := make([]*Segment, len(test.memorySizesInMB))
for i := range segments {
segments[i] = &Segment{
segmentID: UniqueID(i + 1), memorySize: int64(test.memorySizesInMB[i] * 1024 * 1024),
}
}
segs := policy(segments, 0, atomic.NewBool(test.needToSync))
assert.ElementsMatch(t, segs, test.shouldSyncSegs)
})
}
}

View File

@ -22,6 +22,8 @@ import (
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/shirou/gopsutil/v3/disk"
"go.uber.org/zap"
)
@ -1566,9 +1568,9 @@ type dataNodeConfig struct {
UpdatedTime time.Time
// memory management
MemoryForceSyncEnable bool
MemoryForceSyncThreshold float64
MemoryForceSyncSegmentRatio float64
MemoryForceSyncEnable bool
MemoryForceSyncSegmentNum int
MemoryWatermark float64
}
func (p *dataNodeConfig) init(base *BaseTable) {
@ -1585,8 +1587,8 @@ func (p *dataNodeConfig) init(base *BaseTable) {
p.initChannelWatchPath()
p.initMemoryForceSyncEnable()
p.initMemoryForceSyncRatio()
p.initMemoryForceSyncSegmentRatio()
p.initMemoryWatermark()
p.initMemoryForceSyncSegmentNum()
}
// InitAlias init this DataNode alias
@ -1659,12 +1661,24 @@ func (p *dataNodeConfig) initMemoryForceSyncEnable() {
p.MemoryForceSyncEnable = p.Base.ParseBool("datanode.memory.forceSyncEnable", true)
}
func (p *dataNodeConfig) initMemoryForceSyncRatio() {
p.MemoryForceSyncThreshold = p.Base.ParseFloatWithDefault("datanode.memory.forceSyncThreshold", 0.7)
func (p *dataNodeConfig) initMemoryWatermark() {
if os.Getenv(metricsinfo.DeployModeEnvKey) == metricsinfo.StandaloneDeployMode {
p.MemoryWatermark = p.Base.ParseFloatWithDefault("datanode.memory.watermarkStandalone", 0.2)
return
}
if os.Getenv(metricsinfo.DeployModeEnvKey) == metricsinfo.ClusterDeployMode {
p.MemoryWatermark = p.Base.ParseFloatWithDefault("datanode.memory.watermarkCluster", 0.5)
return
}
log.Warn("DeployModeEnv is not set, use default", zap.Float64("default", 0.5))
p.MemoryWatermark = 0.5
}
func (p *dataNodeConfig) initMemoryForceSyncSegmentRatio() {
p.MemoryForceSyncSegmentRatio = p.Base.ParseFloatWithDefault("datanode.memory.forceSyncSegmentRatio", 0.3)
func (p *dataNodeConfig) initMemoryForceSyncSegmentNum() {
p.MemoryForceSyncSegmentNum = p.Base.ParseIntWithDefault("datanode.memory.forceSyncSegmentNum", 1)
if p.MemoryForceSyncSegmentNum < 1 {
p.MemoryForceSyncSegmentNum = 1
}
}
// /////////////////////////////////////////////////////////////////////////////