mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
refinement towards sync_cp_lag_too_behind_policy to avoid submit syncTasks too frequently(#25441) (#25442) (#25500)
Signed-off-by: MrPresent-Han <chun.han@zilliz.com>
This commit is contained in:
parent
205a7c430a
commit
ea3817fbbc
@ -21,11 +21,12 @@ import (
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
)
|
||||
|
||||
const minSyncSize = 0.5 * 1024 * 1024
|
||||
@ -100,20 +101,34 @@ func syncCPLagTooBehind() segmentSyncPolicy {
|
||||
}
|
||||
|
||||
return func(segments []*Segment, ts Timestamp, _ *atomic.Bool) []UniqueID {
|
||||
segmentsToSync := make([]UniqueID, 0)
|
||||
segmentsSyncPairs := make([][2]int64, 0)
|
||||
for _, segment := range segments {
|
||||
segmentMinTs := segmentMinTs(segment)
|
||||
segmentStartTime := tsoutil.PhysicalTime(segmentMinTs)
|
||||
if segment == nil || segment.sType.Load() == nil || segment.getType() != datapb.SegmentType_Flushed {
|
||||
continue //cp behind check policy only towards flushed segments generated by compaction
|
||||
}
|
||||
segmentStartTime := tsoutil.PhysicalTime(segmentMinTs(segment))
|
||||
cpLagDuration := tsoutil.PhysicalTime(ts).Sub(segmentStartTime)
|
||||
shouldSync := cpLagDuration > Params.DataNodeCfg.CpLagPeriod.GetAsDuration(time.Second) && !segment.isBufferEmpty()
|
||||
lagInfo := [2]int64{segment.segmentID, cpLagDuration.Nanoseconds()}
|
||||
if shouldSync {
|
||||
segmentsToSync = append(segmentsToSync, segment.segmentID)
|
||||
segmentsSyncPairs = append(segmentsSyncPairs, lagInfo)
|
||||
}
|
||||
}
|
||||
if len(segmentsToSync) > 0 {
|
||||
log.Info("sync segment for cp lag behind too much",
|
||||
zap.Int64s("segmentID", segmentsToSync))
|
||||
segmentsIDsToSync := make([]UniqueID, 0)
|
||||
if len(segmentsSyncPairs) > 0 {
|
||||
if uint16(len(segmentsSyncPairs)) > Params.DataNodeCfg.CpLagSyncLimit.GetAsUint16() {
|
||||
//sort all segments according to the length of lag duration
|
||||
sort.Slice(segmentsSyncPairs, func(i, j int) bool {
|
||||
return segmentsSyncPairs[i][1] > segmentsSyncPairs[j][1]
|
||||
})
|
||||
segmentsSyncPairs = segmentsSyncPairs[:Params.DataNodeCfg.CpLagSyncLimit.GetAsUint16()]
|
||||
}
|
||||
segmentsIDsToSync = lo.Map(segmentsSyncPairs, func(t [2]int64, _ int) int64 {
|
||||
return t[0]
|
||||
})
|
||||
log.Info("sync segment for cp lag behind too much", zap.Int("segmentCount", len(segmentsIDsToSync)),
|
||||
zap.Int64s("segmentIDs", segmentsIDsToSync))
|
||||
}
|
||||
return segmentsToSync
|
||||
return segmentsIDsToSync
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,7 +22,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
@ -98,6 +102,8 @@ func TestSyncMemoryTooHigh(t *testing.T) {
|
||||
|
||||
func TestSyncCpLagBehindTooMuch(t *testing.T) {
|
||||
nowTs := tsoutil.ComposeTSByTime(time.Now(), 0)
|
||||
paramtable.Get().Save(Params.DataNodeCfg.CpLagPeriod.Key, "60")
|
||||
paramtable.Get().Save(Params.DataNodeCfg.CpLagSyncLimit.Key, "2")
|
||||
laggedTs := tsoutil.AddPhysicalDurationOnTs(nowTs, -2*Params.DataNodeCfg.CpLagPeriod.GetAsDuration(time.Second))
|
||||
tests := []struct {
|
||||
testName string
|
||||
@ -153,12 +159,50 @@ func TestSyncCpLagBehindTooMuch(t *testing.T) {
|
||||
},
|
||||
[]int64{1, 2},
|
||||
},
|
||||
{"test_cp_sync_limit",
|
||||
[]*Segment{
|
||||
{
|
||||
segmentID: 1,
|
||||
historyInsertBuf: []*BufferData{
|
||||
{
|
||||
startPos: &msgpb.MsgPosition{
|
||||
Timestamp: tsoutil.AddPhysicalDurationOnTs(laggedTs, -3*time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
segmentID: 2,
|
||||
historyDeleteBuf: []*DelDataBuf{
|
||||
{
|
||||
startPos: &msgpb.MsgPosition{
|
||||
Timestamp: tsoutil.AddPhysicalDurationOnTs(laggedTs, -2*time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
segmentID: 3,
|
||||
historyDeleteBuf: []*DelDataBuf{
|
||||
{
|
||||
startPos: &msgpb.MsgPosition{
|
||||
Timestamp: tsoutil.AddPhysicalDurationOnTs(laggedTs, -1*time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]int64{1, 2},
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.testName, func(t *testing.T) {
|
||||
lo.ForEach(test.segments, func(segment *Segment, _ int) {
|
||||
segment.setType(datapb.SegmentType_Flushed)
|
||||
})
|
||||
policy := syncCPLagTooBehind()
|
||||
ids := policy(test.segments, tsoutil.ComposeTSByTime(time.Now(), 0), nil)
|
||||
assert.ElementsMatch(t, test.idsToSync, ids)
|
||||
assert.Exactly(t, test.idsToSync, ids)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -2221,6 +2221,7 @@ type dataNodeConfig struct {
|
||||
BinLogMaxSize ParamItem `refreshable:"true"`
|
||||
SyncPeriod ParamItem `refreshable:"true"`
|
||||
CpLagPeriod ParamItem `refreshable:"true"`
|
||||
CpLagSyncLimit ParamItem `refreshable:"true"`
|
||||
|
||||
// watchEvent
|
||||
WatchEventTicklerInterval ParamItem `refreshable:"false"`
|
||||
@ -2345,11 +2346,20 @@ func (p *dataNodeConfig) init(base *BaseTable) {
|
||||
Key: "datanode.segment.cpLagPeriod",
|
||||
Version: "2.2.0",
|
||||
DefaultValue: "600",
|
||||
Doc: "The period to sync segments if buffer is not empty.",
|
||||
Doc: "The period to sync segments for cp lag period policy",
|
||||
Export: true,
|
||||
}
|
||||
p.CpLagPeriod.Init(base.mgr)
|
||||
|
||||
p.CpLagSyncLimit = ParamItem{
|
||||
Key: "datanode.segment.cpLagSyncLimit",
|
||||
Version: "2.2.0",
|
||||
DefaultValue: "10",
|
||||
Doc: "The limit to sync segments for cp lag period policy",
|
||||
Export: true,
|
||||
}
|
||||
p.CpLagSyncLimit.Init(base.mgr)
|
||||
|
||||
p.WatchEventTicklerInterval = ParamItem{
|
||||
Key: "datanode.segment.watchEventTicklerInterval",
|
||||
Version: "2.2.3",
|
||||
|
||||
@ -113,6 +113,10 @@ func (pi *ParamItem) GetAsUint32() uint32 {
|
||||
return uint32(getAsInt64(pi.GetValue()))
|
||||
}
|
||||
|
||||
func (pi *ParamItem) GetAsUint16() uint16 {
|
||||
return uint16(getAsInt64(pi.GetValue()))
|
||||
}
|
||||
|
||||
func (pi *ParamItem) GetAsInt64() int64 {
|
||||
return getAsInt64(pi.GetValue())
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user