From ea3817fbbc9d17b853cf6f2ecc272a94b571e526 Mon Sep 17 00:00:00 2001 From: MrPresent-Han <116052805+MrPresent-Han@users.noreply.github.com> Date: Wed, 12 Jul 2023 20:22:29 +0800 Subject: [PATCH] refinement towards sync_cp_lag_too_behind_policy to avoid submit syncTasks too frequently(#25441) (#25442) (#25500) Signed-off-by: MrPresent-Han --- internal/datanode/segment_sync_policy.go | 35 ++++++++++---- internal/datanode/segment_sync_policy_test.go | 46 ++++++++++++++++++- pkg/util/paramtable/component_param.go | 12 ++++- pkg/util/paramtable/param_item.go | 4 ++ 4 files changed, 85 insertions(+), 12 deletions(-) diff --git a/internal/datanode/segment_sync_policy.go b/internal/datanode/segment_sync_policy.go index ed6b90b276..abbf46228d 100644 --- a/internal/datanode/segment_sync_policy.go +++ b/internal/datanode/segment_sync_policy.go @@ -21,11 +21,12 @@ import ( "sort" "time" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/tsoutil" + "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" - - "github.com/milvus-io/milvus/pkg/log" ) const minSyncSize = 0.5 * 1024 * 1024 @@ -100,20 +101,34 @@ func syncCPLagTooBehind() segmentSyncPolicy { } return func(segments []*Segment, ts Timestamp, _ *atomic.Bool) []UniqueID { - segmentsToSync := make([]UniqueID, 0) + segmentsSyncPairs := make([][2]int64, 0) for _, segment := range segments { - segmentMinTs := segmentMinTs(segment) - segmentStartTime := tsoutil.PhysicalTime(segmentMinTs) + if segment == nil || segment.sType.Load() == nil || segment.getType() != datapb.SegmentType_Flushed { + continue //cp behind check policy only towards flushed segments generated by compaction + } + segmentStartTime := tsoutil.PhysicalTime(segmentMinTs(segment)) cpLagDuration := tsoutil.PhysicalTime(ts).Sub(segmentStartTime) shouldSync := cpLagDuration > Params.DataNodeCfg.CpLagPeriod.GetAsDuration(time.Second) && !segment.isBufferEmpty() + lagInfo := [2]int64{segment.segmentID, cpLagDuration.Nanoseconds()} if shouldSync { - segmentsToSync = append(segmentsToSync, segment.segmentID) + segmentsSyncPairs = append(segmentsSyncPairs, lagInfo) } } - if len(segmentsToSync) > 0 { - log.Info("sync segment for cp lag behind too much", - zap.Int64s("segmentID", segmentsToSync)) + segmentsIDsToSync := make([]UniqueID, 0) + if len(segmentsSyncPairs) > 0 { + if uint16(len(segmentsSyncPairs)) > Params.DataNodeCfg.CpLagSyncLimit.GetAsUint16() { + //sort all segments according to the length of lag duration + sort.Slice(segmentsSyncPairs, func(i, j int) bool { + return segmentsSyncPairs[i][1] > segmentsSyncPairs[j][1] + }) + segmentsSyncPairs = segmentsSyncPairs[:Params.DataNodeCfg.CpLagSyncLimit.GetAsUint16()] + } + segmentsIDsToSync = lo.Map(segmentsSyncPairs, func(t [2]int64, _ int) int64 { + return t[0] + }) + log.Info("sync segment for cp lag behind too much", zap.Int("segmentCount", len(segmentsIDsToSync)), + zap.Int64s("segmentIDs", segmentsIDsToSync)) } - return segmentsToSync + return segmentsIDsToSync } } diff --git a/internal/datanode/segment_sync_policy_test.go b/internal/datanode/segment_sync_policy_test.go index 4af3057041..5feeb119ad 100644 --- a/internal/datanode/segment_sync_policy_test.go +++ b/internal/datanode/segment_sync_policy_test.go @@ -22,7 +22,11 @@ import ( "time" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" + + "github.com/samber/lo" "github.com/stretchr/testify/assert" "go.uber.org/atomic" ) @@ -98,6 +102,8 @@ func TestSyncMemoryTooHigh(t *testing.T) { func TestSyncCpLagBehindTooMuch(t *testing.T) { nowTs := tsoutil.ComposeTSByTime(time.Now(), 0) + paramtable.Get().Save(Params.DataNodeCfg.CpLagPeriod.Key, "60") + paramtable.Get().Save(Params.DataNodeCfg.CpLagSyncLimit.Key, "2") laggedTs := tsoutil.AddPhysicalDurationOnTs(nowTs, -2*Params.DataNodeCfg.CpLagPeriod.GetAsDuration(time.Second)) tests := []struct { testName string @@ -153,12 +159,50 @@ func TestSyncCpLagBehindTooMuch(t *testing.T) { }, []int64{1, 2}, }, + {"test_cp_sync_limit", + []*Segment{ + { + segmentID: 1, + historyInsertBuf: []*BufferData{ + { + startPos: &msgpb.MsgPosition{ + Timestamp: tsoutil.AddPhysicalDurationOnTs(laggedTs, -3*time.Second), + }, + }, + }, + }, + { + segmentID: 2, + historyDeleteBuf: []*DelDataBuf{ + { + startPos: &msgpb.MsgPosition{ + Timestamp: tsoutil.AddPhysicalDurationOnTs(laggedTs, -2*time.Second), + }, + }, + }, + }, + { + segmentID: 3, + historyDeleteBuf: []*DelDataBuf{ + { + startPos: &msgpb.MsgPosition{ + Timestamp: tsoutil.AddPhysicalDurationOnTs(laggedTs, -1*time.Second), + }, + }, + }, + }, + }, + []int64{1, 2}, + }, } for _, test := range tests { t.Run(test.testName, func(t *testing.T) { + lo.ForEach(test.segments, func(segment *Segment, _ int) { + segment.setType(datapb.SegmentType_Flushed) + }) policy := syncCPLagTooBehind() ids := policy(test.segments, tsoutil.ComposeTSByTime(time.Now(), 0), nil) - assert.ElementsMatch(t, test.idsToSync, ids) + assert.Exactly(t, test.idsToSync, ids) }) } } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 717bd92ad6..fb5b3fc8b6 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2221,6 +2221,7 @@ type dataNodeConfig struct { BinLogMaxSize ParamItem `refreshable:"true"` SyncPeriod ParamItem `refreshable:"true"` CpLagPeriod ParamItem `refreshable:"true"` + CpLagSyncLimit ParamItem `refreshable:"true"` // watchEvent WatchEventTicklerInterval ParamItem `refreshable:"false"` @@ -2345,11 +2346,20 @@ func (p *dataNodeConfig) init(base *BaseTable) { Key: "datanode.segment.cpLagPeriod", Version: "2.2.0", DefaultValue: "600", - Doc: "The period to sync segments if buffer is not empty.", + Doc: "The period to sync segments for cp lag period policy", Export: true, } p.CpLagPeriod.Init(base.mgr) + p.CpLagSyncLimit = ParamItem{ + Key: "datanode.segment.cpLagSyncLimit", + Version: "2.2.0", + DefaultValue: "10", + Doc: "The limit to sync segments for cp lag period policy", + Export: true, + } + p.CpLagSyncLimit.Init(base.mgr) + p.WatchEventTicklerInterval = ParamItem{ Key: "datanode.segment.watchEventTicklerInterval", Version: "2.2.3", diff --git a/pkg/util/paramtable/param_item.go b/pkg/util/paramtable/param_item.go index 640caf89a9..eb3be4e9d1 100644 --- a/pkg/util/paramtable/param_item.go +++ b/pkg/util/paramtable/param_item.go @@ -113,6 +113,10 @@ func (pi *ParamItem) GetAsUint32() uint32 { return uint32(getAsInt64(pi.GetValue())) } +func (pi *ParamItem) GetAsUint16() uint16 { + return uint16(getAsInt64(pi.GetValue())) +} + func (pi *ParamItem) GetAsInt64() int64 { return getAsInt64(pi.GetValue()) }