From c7b5c23ff6192eb0f120f688b728982d94a5c764 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Tue, 6 Jan 2026 20:53:24 +0800 Subject: [PATCH] enhance: filter the empty timetick from consuming side (#46541) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit issue: #46540 Empty timetick is just used to sync up the time clock between different component in milvus. So empty timetick can be ignored if we achieve the lsn/mvcc semantic for timetick. Currently, some components need the empty timetick to trigger some operation, such as flush/tsafe. So we only slow down the empty time tick for 5 seconds. - Core invariant: with LSN/MVCC semantics consumers only need (a) the first timetick that advances the latest-required-MVCC to unblock MVCC-dependent waits and (b) occasional periodic timeticks (~≤5s) for clock synchronization—therefore frequent non-persisted empty timeticks can be suppressed without breaking MVCC correctness. - Logic removed/simplified: per-message dispatch/consumption of frequent non-persisted empty timeticks is suppressed — an MVCC-aware filter emptyTimeTickSlowdowner (internal/util/pipeline/consuming_slowdown.go) short-circuits frequent empty timeticks in the stream pipeline (internal/util/pipeline/stream_pipeline.go), and the WAL flusher rate-limits non-persisted timetick dispatch to one emission per ~5s (internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go); the delegator exposes GetLatestRequiredMVCCTimeTick to drive the filter (internal/querynodev2/delegator/delegator.go). - Why this does NOT introduce data loss or regressions: the slowdowner always refreshes latestRequiredMVCCTimeTick via GetLatestRequiredMVCCTimeTick and (1) never filters timeticks < latestRequiredMVCCTimeTick (so existing tsafe/flush waits stay unblocked) and (2) always lets the first timetick ≥ latestRequiredMVCCTimeTick pass to notify pending MVCC waits; separately, WAL flusher suppression applies only to non-persisted timeticks and still emits when the 5s threshold elapses, preserving periodic clock-sync messages used by flush/tsafe. - Enhancement summary (where it takes effect): adds GetLatestRequiredMVCCTimeTick on ShardDelegator and LastestMVCCTimeTickGetter, wires emptyTimeTickSlowdowner into NewPipelineWithStream (internal/util/pipeline), and adds WAL flusher rate-limiting + metrics (internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go, pkg/metrics) to reduce CPU/dispatch overhead while keeping MVCC correctness and periodic synchronization. --------- Signed-off-by: chyezh --- internal/.mockery.yaml | 3 + .../mock_LastestMVCCTimeTickGetter.go | 77 ++++++++++ internal/querynodev2/delegator/delegator.go | 63 ++++++-- .../querynodev2/delegator/delegator_test.go | 33 ++-- .../querynodev2/delegator/mock_delegator.go | 45 ++++++ internal/querynodev2/pipeline/pipeline.go | 2 +- .../querynodev2/pipeline/pipeline_test.go | 1 + .../flusher/flusherimpl/msg_handler_impl.go | 2 +- .../flusherimpl/msg_handler_impl_test.go | 2 +- .../server/flusher/flusherimpl/wal_flusher.go | 38 ++++- .../timetick/mvcc/mvcc_manager.go | 6 + .../timetick/mvcc/mvcc_manager_test.go | 30 ++-- internal/util/pipeline/consuming_slowdown.go | 142 ++++++++++++++++++ .../util/pipeline/consuming_slowdown_test.go | 61 ++++++++ internal/util/pipeline/stream_pipeline.go | 24 ++- .../util/pipeline/stream_pipeline_test.go | 5 +- pkg/metrics/streaming_service_metrics.go | 18 +++ pkg/util/funcutil/random.go | 9 +- pkg/util/paramtable/component_param.go | 27 ++++ pkg/util/paramtable/component_param_test.go | 2 + tests/go_client/common/utils.go | 12 +- tests/go_client/testcases/helper/helper.go | 2 +- .../go_client/testcases/helper/test_setup.go | 8 +- 23 files changed, 538 insertions(+), 74 deletions(-) create mode 100644 internal/mocks/util/mock_pipeline/mock_LastestMVCCTimeTickGetter.go create mode 100644 internal/util/pipeline/consuming_slowdown.go create mode 100644 internal/util/pipeline/consuming_slowdown_test.go diff --git a/internal/.mockery.yaml b/internal/.mockery.yaml index c050a2e0f6..51c9f2a8ad 100644 --- a/internal/.mockery.yaml +++ b/internal/.mockery.yaml @@ -111,6 +111,9 @@ packages: github.com/milvus-io/milvus/internal/util/searchutil/optimizers: interfaces: QueryHook: + github.com/milvus-io/milvus/internal/util/pipeline: + interfaces: + LastestMVCCTimeTickGetter: github.com/milvus-io/milvus/internal/flushcommon/util: interfaces: MsgHandler: diff --git a/internal/mocks/util/mock_pipeline/mock_LastestMVCCTimeTickGetter.go b/internal/mocks/util/mock_pipeline/mock_LastestMVCCTimeTickGetter.go new file mode 100644 index 0000000000..84e851407d --- /dev/null +++ b/internal/mocks/util/mock_pipeline/mock_LastestMVCCTimeTickGetter.go @@ -0,0 +1,77 @@ +// Code generated by mockery v2.53.3. DO NOT EDIT. + +package mock_pipeline + +import mock "github.com/stretchr/testify/mock" + +// MockLastestMVCCTimeTickGetter is an autogenerated mock type for the LastestMVCCTimeTickGetter type +type MockLastestMVCCTimeTickGetter struct { + mock.Mock +} + +type MockLastestMVCCTimeTickGetter_Expecter struct { + mock *mock.Mock +} + +func (_m *MockLastestMVCCTimeTickGetter) EXPECT() *MockLastestMVCCTimeTickGetter_Expecter { + return &MockLastestMVCCTimeTickGetter_Expecter{mock: &_m.Mock} +} + +// GetLatestRequiredMVCCTimeTick provides a mock function with no fields +func (_m *MockLastestMVCCTimeTickGetter) GetLatestRequiredMVCCTimeTick() uint64 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetLatestRequiredMVCCTimeTick") + } + + var r0 uint64 + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + return r0 +} + +// MockLastestMVCCTimeTickGetter_GetLatestRequiredMVCCTimeTick_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestRequiredMVCCTimeTick' +type MockLastestMVCCTimeTickGetter_GetLatestRequiredMVCCTimeTick_Call struct { + *mock.Call +} + +// GetLatestRequiredMVCCTimeTick is a helper method to define mock.On call +func (_e *MockLastestMVCCTimeTickGetter_Expecter) GetLatestRequiredMVCCTimeTick() *MockLastestMVCCTimeTickGetter_GetLatestRequiredMVCCTimeTick_Call { + return &MockLastestMVCCTimeTickGetter_GetLatestRequiredMVCCTimeTick_Call{Call: _e.mock.On("GetLatestRequiredMVCCTimeTick")} +} + +func (_c *MockLastestMVCCTimeTickGetter_GetLatestRequiredMVCCTimeTick_Call) Run(run func()) *MockLastestMVCCTimeTickGetter_GetLatestRequiredMVCCTimeTick_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockLastestMVCCTimeTickGetter_GetLatestRequiredMVCCTimeTick_Call) Return(_a0 uint64) *MockLastestMVCCTimeTickGetter_GetLatestRequiredMVCCTimeTick_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockLastestMVCCTimeTickGetter_GetLatestRequiredMVCCTimeTick_Call) RunAndReturn(run func() uint64) *MockLastestMVCCTimeTickGetter_GetLatestRequiredMVCCTimeTick_Call { + _c.Call.Return(run) + return _c +} + +// NewMockLastestMVCCTimeTickGetter creates a new instance of MockLastestMVCCTimeTickGetter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockLastestMVCCTimeTickGetter(t interface { + mock.TestingT + Cleanup(func()) +}) *MockLastestMVCCTimeTickGetter { + mock := &MockLastestMVCCTimeTickGetter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 84da1820d2..74bf2fea53 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -99,6 +99,7 @@ type ShardDelegator interface { TryCleanExcludedSegments(ts uint64) // tsafe + GetLatestRequiredMVCCTimeTick() uint64 UpdateTSafe(ts uint64) GetTSafe() uint64 @@ -169,6 +170,10 @@ type shardDelegator struct { // streaming data catch-up state catchingUpStreamingData *atomic.Bool + + // latest required mvcc timestamp for the delegator + // for slow down the delegator consumption and reduce the timetick dispatch frequency. + latestRequiredMVCCTimeTick *atomic.Uint64 } // getLogger returns the zap logger with pre-defined shard attributes. @@ -705,6 +710,7 @@ func (sd *shardDelegator) GetStatistics(ctx context.Context, req *querypb.GetSta } // wait tsafe + sd.updateLatestRequiredMVCCTimestamp(req.Req.GuaranteeTimestamp) _, err := sd.waitTSafe(ctx, req.Req.GuaranteeTimestamp) if err != nil { log.Warn("delegator GetStatistics failed to wait tsafe", zap.Error(err)) @@ -926,6 +932,12 @@ func (sd *shardDelegator) speedupGuranteeTS( mvccTS uint64, isIterator bool, ) uint64 { + // because the mvcc speed up will make the guarantee timestamp smaller. + // and the update latest required mvcc timestamp and mvcc speed up are executed concurrently. + // so we update the latest required mvcc timestamp first, then the mvcc speed up will not affect the latest required mvcc timestamp. + // to make the new incoming mvcc can be seen by the timetick_slowdowner. + sd.updateLatestRequiredMVCCTimestamp(guaranteeTS) + // when 1. streaming service is disable, // 2. consistency level is not strong, // 3. cannot speed iterator, because current client of milvus doesn't support shard level mvcc. @@ -944,6 +956,7 @@ func (sd *shardDelegator) waitTSafe(ctx context.Context, ts uint64) (uint64, err ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "Delegator-waitTSafe") defer sp.End() log := sd.getLogger(ctx) + // already safe to search latestTSafe := sd.latestTsafe.Load() if latestTSafe >= ts { @@ -998,6 +1011,31 @@ func (sd *shardDelegator) waitTSafe(ctx context.Context, ts uint64) (uint64, err } } +// GetLatestRequiredMVCCTimeTick returns the latest required mvcc timestamp for the delegator. +func (sd *shardDelegator) GetLatestRequiredMVCCTimeTick() uint64 { + if sd.catchingUpStreamingData.Load() { + // delegator need to catch up the streaming data when startup, + // If the empty timetick is filtered, the load operation will be blocked. + // We want the delegator to catch up the streaming data, and load done as soon as possible, + // so we always return the current time as the latest required mvcc timestamp. + return tsoutil.GetCurrentTime() + } + return sd.latestRequiredMVCCTimeTick.Load() +} + +// updateLatestRequiredMVCCTimestamp updates the latest required mvcc timestamp for the delegator. +func (sd *shardDelegator) updateLatestRequiredMVCCTimestamp(ts uint64) { + for { + previousTs := sd.latestRequiredMVCCTimeTick.Load() + if ts <= previousTs { + return + } + if sd.latestRequiredMVCCTimeTick.CompareAndSwap(previousTs, ts) { + return + } + } +} + // updateTSafe read current tsafe value from tsafeManager. func (sd *shardDelegator) UpdateTSafe(tsafe uint64) { log := sd.getLogger(context.Background()).WithRateGroup(fmt.Sprintf("UpdateTSafe-%s", sd.vchannelName), 1, 60) @@ -1212,18 +1250,19 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni distribution: NewDistribution(channel, queryView), deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock, []string{fmt.Sprint(paramtable.GetNodeID()), channel}), - pkOracle: pkoracle.NewPkOracle(), - latestTsafe: atomic.NewUint64(startTs), - loader: loader, - queryHook: queryHook, - chunkManager: chunkManager, - partitionStats: make(map[UniqueID]*storage.PartitionStatsSnapshot), - excludedSegments: excludedSegments, - functionRunners: make(map[int64]function.FunctionRunner), - analyzerRunners: make(map[UniqueID]function.Analyzer), - isBM25Field: make(map[int64]bool), - l0ForwardPolicy: policy, - catchingUpStreamingData: atomic.NewBool(true), + pkOracle: pkoracle.NewPkOracle(), + latestTsafe: atomic.NewUint64(startTs), + loader: loader, + queryHook: queryHook, + chunkManager: chunkManager, + partitionStats: make(map[UniqueID]*storage.PartitionStatsSnapshot), + excludedSegments: excludedSegments, + functionRunners: make(map[int64]function.FunctionRunner), + analyzerRunners: make(map[UniqueID]function.Analyzer), + isBM25Field: make(map[int64]bool), + l0ForwardPolicy: policy, + catchingUpStreamingData: atomic.NewBool(true), + latestRequiredMVCCTimeTick: atomic.NewUint64(0), } for _, tf := range collection.Schema().GetFunctions() { diff --git a/internal/querynodev2/delegator/delegator_test.go b/internal/querynodev2/delegator/delegator_test.go index 96a954dc99..1fa1e408c6 100644 --- a/internal/querynodev2/delegator/delegator_test.go +++ b/internal/querynodev2/delegator/delegator_test.go @@ -1934,7 +1934,8 @@ func TestDelegatorSearchBM25InvalidMetricType(t *testing.T) { searchReq.Req.MetricType = metric.IP sd := &shardDelegator{ - isBM25Field: map[int64]bool{101: true}, + isBM25Field: map[int64]bool{101: true}, + latestRequiredMVCCTimeTick: atomic.NewUint64(0), } _, err := sd.search(context.Background(), searchReq, []SnapshotItem{}, []SegmentEntry{}, map[int64]int64{}) @@ -2049,7 +2050,8 @@ func TestDelegatorCatchingUpStreamingData(t *testing.T) { t.Run("initial state is catching up", func(t *testing.T) { // Create a minimal delegator to test CatchingUpStreamingData sd := &shardDelegator{ - catchingUpStreamingData: atomic.NewBool(true), + catchingUpStreamingData: atomic.NewBool(true), + latestRequiredMVCCTimeTick: atomic.NewUint64(0), } assert.True(t, sd.CatchingUpStreamingData()) }) @@ -2060,10 +2062,11 @@ func TestDelegatorCatchingUpStreamingData(t *testing.T) { defer mockParam.UnPatch() sd := &shardDelegator{ - vchannelName: "test-channel", - latestTsafe: atomic.NewUint64(0), - catchingUpStreamingData: atomic.NewBool(true), - tsCond: sync.NewCond(&sync.Mutex{}), + vchannelName: "test-channel", + latestTsafe: atomic.NewUint64(0), + catchingUpStreamingData: atomic.NewBool(true), + tsCond: sync.NewCond(&sync.Mutex{}), + latestRequiredMVCCTimeTick: atomic.NewUint64(0), } // Initially catching up @@ -2083,10 +2086,11 @@ func TestDelegatorCatchingUpStreamingData(t *testing.T) { defer mockParam.UnPatch() sd := &shardDelegator{ - vchannelName: "test-channel", - latestTsafe: atomic.NewUint64(0), - catchingUpStreamingData: atomic.NewBool(true), - tsCond: sync.NewCond(&sync.Mutex{}), + vchannelName: "test-channel", + latestTsafe: atomic.NewUint64(0), + catchingUpStreamingData: atomic.NewBool(true), + tsCond: sync.NewCond(&sync.Mutex{}), + latestRequiredMVCCTimeTick: atomic.NewUint64(0), } // Initially catching up @@ -2106,10 +2110,11 @@ func TestDelegatorCatchingUpStreamingData(t *testing.T) { defer mockParam.UnPatch() sd := &shardDelegator{ - vchannelName: "test-channel", - latestTsafe: atomic.NewUint64(0), - catchingUpStreamingData: atomic.NewBool(true), - tsCond: sync.NewCond(&sync.Mutex{}), + vchannelName: "test-channel", + latestTsafe: atomic.NewUint64(0), + catchingUpStreamingData: atomic.NewBool(true), + tsCond: sync.NewCond(&sync.Mutex{}), + latestRequiredMVCCTimeTick: atomic.NewUint64(0), } // Initially catching up diff --git a/internal/querynodev2/delegator/mock_delegator.go b/internal/querynodev2/delegator/mock_delegator.go index 335e5ced51..041dd2bb20 100644 --- a/internal/querynodev2/delegator/mock_delegator.go +++ b/internal/querynodev2/delegator/mock_delegator.go @@ -348,6 +348,51 @@ func (_c *MockShardDelegator_GetHighlight_Call) RunAndReturn(run func(context.Co return _c } +// GetLatestRequiredMVCCTimeTick provides a mock function with no fields +func (_m *MockShardDelegator) GetLatestRequiredMVCCTimeTick() uint64 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetLatestRequiredMVCCTimeTick") + } + + var r0 uint64 + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + return r0 +} + +// MockShardDelegator_GetLatestRequiredMVCCTimeTick_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestRequiredMVCCTimeTick' +type MockShardDelegator_GetLatestRequiredMVCCTimeTick_Call struct { + *mock.Call +} + +// GetLatestRequiredMVCCTimeTick is a helper method to define mock.On call +func (_e *MockShardDelegator_Expecter) GetLatestRequiredMVCCTimeTick() *MockShardDelegator_GetLatestRequiredMVCCTimeTick_Call { + return &MockShardDelegator_GetLatestRequiredMVCCTimeTick_Call{Call: _e.mock.On("GetLatestRequiredMVCCTimeTick")} +} + +func (_c *MockShardDelegator_GetLatestRequiredMVCCTimeTick_Call) Run(run func()) *MockShardDelegator_GetLatestRequiredMVCCTimeTick_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockShardDelegator_GetLatestRequiredMVCCTimeTick_Call) Return(_a0 uint64) *MockShardDelegator_GetLatestRequiredMVCCTimeTick_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockShardDelegator_GetLatestRequiredMVCCTimeTick_Call) RunAndReturn(run func() uint64) *MockShardDelegator_GetLatestRequiredMVCCTimeTick_Call { + _c.Call.Return(run) + return _c +} + // GetPartitionStatsVersions provides a mock function with given fields: ctx func (_m *MockShardDelegator) GetPartitionStatsVersions(ctx context.Context) map[int64]int64 { ret := _m.Called(ctx) diff --git a/internal/querynodev2/pipeline/pipeline.go b/internal/querynodev2/pipeline/pipeline.go index 3536df2dff..36acc5c9ac 100644 --- a/internal/querynodev2/pipeline/pipeline.go +++ b/internal/querynodev2/pipeline/pipeline.go @@ -55,7 +55,7 @@ func NewPipeLine( p := &pipeline{ collectionID: collectionID, - StreamPipeline: base.NewPipelineWithStream(dispatcher, nodeCtxTtInterval, enableTtChecker, channel), + StreamPipeline: base.NewPipelineWithStream(dispatcher, nodeCtxTtInterval, enableTtChecker, channel, delegator), } filterNode := newFilterNode(collectionID, channel, manager, delegator, pipelineQueueLength) diff --git a/internal/querynodev2/pipeline/pipeline_test.go b/internal/querynodev2/pipeline/pipeline_test.go index 130a1a410e..5d17cb8b03 100644 --- a/internal/querynodev2/pipeline/pipeline_test.go +++ b/internal/querynodev2/pipeline/pipeline_test.go @@ -139,6 +139,7 @@ func (suite *PipelineTestSuite) TestBasic() { suite.delegator.EXPECT().UpdateTSafe(in.EndTs).Run(func(ts uint64) { close(done) }).Return() + suite.delegator.EXPECT().GetLatestRequiredMVCCTimeTick().Return(0).Maybe() // build pipleine manager := &segments.Manager{ diff --git a/internal/streamingnode/server/flusher/flusherimpl/msg_handler_impl.go b/internal/streamingnode/server/flusher/flusherimpl/msg_handler_impl.go index 28da812f1b..b26e9e1c18 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/msg_handler_impl.go +++ b/internal/streamingnode/server/flusher/flusherimpl/msg_handler_impl.go @@ -102,7 +102,7 @@ func (impl *msgHandlerImpl) HandleManualFlush(flushMsg message.ImmutableManualFl if err := impl.wbMgr.SealSegments(context.Background(), vchannel, flushMsg.Header().SegmentIds); err != nil { return errors.Wrap(err, "failed to seal segments") } - if err := impl.wbMgr.FlushChannel(context.Background(), vchannel, flushMsg.Header().FlushTs); err != nil { + if err := impl.wbMgr.FlushChannel(context.Background(), vchannel, flushMsg.TimeTick()); err != nil { return errors.Wrap(err, "failed to flush channel") } // may be redundant. return nil diff --git a/internal/streamingnode/server/flusher/flusherimpl/msg_handler_impl_test.go b/internal/streamingnode/server/flusher/flusherimpl/msg_handler_impl_test.go index bd4dace2be..2e775da75c 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/msg_handler_impl_test.go +++ b/internal/streamingnode/server/flusher/flusherimpl/msg_handler_impl_test.go @@ -84,7 +84,7 @@ func TestFlushMsgHandler_HandleManualFlush(t *testing.T) { handler := newMsgHandler(wbMgr) msgID := mock_message.NewMockMessageID(t) - im, err := message.AsImmutableManualFlushMessageV2(msg.IntoImmutableMessage(msgID)) + im, err := message.AsImmutableManualFlushMessageV2(msg.WithTimeTick(1000).IntoImmutableMessage(msgID)) assert.NoError(t, err) err = handler.HandleManualFlush(im) assert.Error(t, err) diff --git a/internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go b/internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go index 03908eed4d..45745cc519 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go +++ b/internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go @@ -4,6 +4,7 @@ import ( "context" "github.com/cockroachdb/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" "go.uber.org/zap" @@ -15,6 +16,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/wal/recovery" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility" "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message/adaptor" "github.com/milvus-io/milvus/pkg/v2/streaming/util/options" @@ -22,6 +24,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/syncutil" + "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" ) var errChannelLifetimeUnrecoverable = errors.New("channel lifetime unrecoverable") @@ -42,19 +45,22 @@ func RecoverWALFlusher(param *RecoverWALFlusherParam) *WALFlusherImpl { logger: resource.Resource().Logger().With( log.FieldComponent("flusher"), zap.String("pchannel", param.ChannelInfo.String())), - metrics: newFlusherMetrics(param.ChannelInfo), - RecoveryStorage: param.RecoveryStorage, + metrics: newFlusherMetrics(param.ChannelInfo), + emptyTimeTickCounter: metrics.WALFlusherEmptyTimeTickFilteredTotal.WithLabelValues(paramtable.GetStringNodeID(), param.ChannelInfo.Name), + RecoveryStorage: param.RecoveryStorage, } go flusher.Execute(param.RecoverySnapshot) return flusher } type WALFlusherImpl struct { - notifier *syncutil.AsyncTaskNotifier[struct{}] - wal *syncutil.Future[wal.WAL] - flusherComponents *flusherComponents - logger *log.MLogger - metrics *flusherMetrics + notifier *syncutil.AsyncTaskNotifier[struct{}] + wal *syncutil.Future[wal.WAL] + flusherComponents *flusherComponents + logger *log.MLogger + metrics *flusherMetrics + lastDispatchTimeTick uint64 // The last time tick that the message is dispatched. + emptyTimeTickCounter prometheus.Counter recovery.RecoveryStorage } @@ -206,6 +212,24 @@ func (impl *WALFlusherImpl) generateScanner(ctx context.Context, l wal.WAL, chec // dispatch dispatches the message to the related handler for flusher components. func (impl *WALFlusherImpl) dispatch(msg message.ImmutableMessage) (err error) { + if msg.MessageType() == message.MessageTypeTimeTick && !msg.IsPersisted() { + // Currently, milvus use the timetick to synchronize the system periodically, + // so the wal will still produce empty timetick message after the last write operation is done. + // When there're huge amount of vchannel in one pchannel, every time tick will be dispatched, + // which will waste a lot of cpu resources. + // So we only dispatch the timetick message when the timetick-lastDispatchTimeTick is greater than a threshold. + timetick := msg.TimeTick() + threshold := paramtable.Get().StreamingCfg.FlushEmptyTimeTickMaxFilterInterval.GetAsDurationByParse() + if tsoutil.CalculateDuration(timetick, impl.lastDispatchTimeTick) < threshold.Milliseconds() { + impl.emptyTimeTickCounter.Inc() + return + } + } + timetick := msg.TimeTick() + defer func() { + impl.lastDispatchTimeTick = timetick + }() + // TODO: should be removed at 3.0, after merge the flusher logic into recovery storage. // only for truncate api now. if bh := msg.BroadcastHeader(); bh != nil && bh.AckSyncUp { diff --git a/internal/streamingnode/server/wal/interceptors/timetick/mvcc/mvcc_manager.go b/internal/streamingnode/server/wal/interceptors/timetick/mvcc/mvcc_manager.go index 2faaeb2180..d158529291 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/mvcc/mvcc_manager.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/mvcc/mvcc_manager.go @@ -44,6 +44,12 @@ func (cm *MVCCManager) GetMVCCOfVChannel(vchannel string) VChannelMVCC { // UpdateMVCC updates the mvcc state by incoming message. func (cm *MVCCManager) UpdateMVCC(msg message.MutableMessage) { + if !msg.IsPersisted() { + // A unpersisted message is always a time tick message that is used to sync up the system time. + // No data change should be made by this message so it should be ignored in the mvcc manager. + return + } + tt := msg.TimeTick() msgType := msg.MessageType() vchannel := msg.VChannel() diff --git a/internal/streamingnode/server/wal/interceptors/timetick/mvcc/mvcc_manager_test.go b/internal/streamingnode/server/wal/interceptors/timetick/mvcc/mvcc_manager_test.go index b4364fcd55..16beb3a957 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/mvcc/mvcc_manager_test.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/mvcc/mvcc_manager_test.go @@ -14,37 +14,43 @@ func TestNewMVCCManager(t *testing.T) { v := cm.GetMVCCOfVChannel("vc1") assert.Equal(t, v, VChannelMVCC{Timetick: 100, Confirmed: true}) - cm.UpdateMVCC(createTestMessage(t, 101, "vc1", message.MessageTypeInsert, false)) + cm.UpdateMVCC(createTestMessage(t, 101, "vc1", message.MessageTypeInsert, false, true)) v = cm.GetMVCCOfVChannel("vc1") assert.Equal(t, v, VChannelMVCC{Timetick: 101, Confirmed: false}) v = cm.GetMVCCOfVChannel("vc2") assert.Equal(t, v, VChannelMVCC{Timetick: 100, Confirmed: true}) - cm.UpdateMVCC(createTestMessage(t, 102, "", message.MessageTypeTimeTick, false)) + cm.UpdateMVCC(createTestMessage(t, 102, "", message.MessageTypeTimeTick, false, true)) v = cm.GetMVCCOfVChannel("vc1") assert.Equal(t, v, VChannelMVCC{Timetick: 102, Confirmed: true}) v = cm.GetMVCCOfVChannel("vc2") assert.Equal(t, v, VChannelMVCC{Timetick: 102, Confirmed: true}) - cm.UpdateMVCC(createTestMessage(t, 103, "vc1", message.MessageTypeInsert, true)) + cm.UpdateMVCC(createTestMessage(t, 103, "vc1", message.MessageTypeInsert, true, true)) v = cm.GetMVCCOfVChannel("vc1") assert.Equal(t, v, VChannelMVCC{Timetick: 102, Confirmed: true}) v = cm.GetMVCCOfVChannel("vc2") assert.Equal(t, v, VChannelMVCC{Timetick: 102, Confirmed: true}) - cm.UpdateMVCC(createTestMessage(t, 104, "vc1", message.MessageTypeCommitTxn, true)) + cm.UpdateMVCC(createTestMessage(t, 104, "vc1", message.MessageTypeCommitTxn, true, true)) v = cm.GetMVCCOfVChannel("vc1") assert.Equal(t, v, VChannelMVCC{Timetick: 104, Confirmed: false}) v = cm.GetMVCCOfVChannel("vc2") assert.Equal(t, v, VChannelMVCC{Timetick: 102, Confirmed: true}) - cm.UpdateMVCC(createTestMessage(t, 104, "", message.MessageTypeTimeTick, false)) + cm.UpdateMVCC(createTestMessage(t, 104, "", message.MessageTypeTimeTick, false, true)) v = cm.GetMVCCOfVChannel("vc1") assert.Equal(t, v, VChannelMVCC{Timetick: 104, Confirmed: true}) v = cm.GetMVCCOfVChannel("vc2") assert.Equal(t, v, VChannelMVCC{Timetick: 104, Confirmed: true}) - cm.UpdateMVCC(createTestMessage(t, 101, "", message.MessageTypeTimeTick, false)) + cm.UpdateMVCC(createTestMessage(t, 101, "", message.MessageTypeTimeTick, false, true)) + v = cm.GetMVCCOfVChannel("vc1") + assert.Equal(t, v, VChannelMVCC{Timetick: 104, Confirmed: true}) + v = cm.GetMVCCOfVChannel("vc2") + assert.Equal(t, v, VChannelMVCC{Timetick: 104, Confirmed: true}) + + cm.UpdateMVCC(createTestMessage(t, 1000, "", message.MessageTypeTimeTick, false, false)) v = cm.GetMVCCOfVChannel("vc1") assert.Equal(t, v, VChannelMVCC{Timetick: 104, Confirmed: true}) v = cm.GetMVCCOfVChannel("vc2") @@ -57,15 +63,17 @@ func createTestMessage( vchannel string, msgType message.MessageType, txTxn bool, + persist bool, ) message.MutableMessage { msg := mock_message.NewMockMutableMessage(t) - msg.EXPECT().TimeTick().Return(tt) - msg.EXPECT().VChannel().Return(vchannel) - msg.EXPECT().MessageType().Return(msgType) + msg.EXPECT().IsPersisted().Return(persist) + msg.EXPECT().TimeTick().Return(tt).Maybe() + msg.EXPECT().VChannel().Return(vchannel).Maybe() + msg.EXPECT().MessageType().Return(msgType).Maybe() if txTxn { - msg.EXPECT().TxnContext().Return(&message.TxnContext{}) + msg.EXPECT().TxnContext().Return(&message.TxnContext{}).Maybe() return msg } - msg.EXPECT().TxnContext().Return(nil) + msg.EXPECT().TxnContext().Return(nil).Maybe() return msg } diff --git a/internal/util/pipeline/consuming_slowdown.go b/internal/util/pipeline/consuming_slowdown.go new file mode 100644 index 0000000000..803ac5dce6 --- /dev/null +++ b/internal/util/pipeline/consuming_slowdown.go @@ -0,0 +1,142 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/v2/config" + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/metrics" + "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" + "github.com/milvus-io/milvus/pkg/v2/util/funcutil" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" + "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" +) + +var ( + thresholdUpdateIntervalMs = atomic.NewInt64(60 * 1000) + thresholdUpdateIntervalWatchOnce = &sync.Once{} +) + +type LastestMVCCTimeTickGetter interface { + GetLatestRequiredMVCCTimeTick() uint64 +} + +// newEmptyTimeTickSlowdowner creates a new consumingSlowdowner instance. +func newEmptyTimeTickSlowdowner(lastestMVCCTimeTickGetter LastestMVCCTimeTickGetter, vChannel string) *emptyTimeTickSlowdowner { + thresholdUpdateIntervalWatchOnce.Do(updateThresholdWithConfiguration) + + nodeID := paramtable.GetStringNodeID() + pchannel := funcutil.ToPhysicalChannel(vChannel) + emptyTimeTickFilteredCounter := metrics.WALDelegatorEmptyTimeTickFilteredTotal.WithLabelValues(nodeID, pchannel) + tsafeTimeTickUnfilteredCounter := metrics.WALDelegatorTsafeTimeTickUnfilteredTotal.WithLabelValues(nodeID, pchannel) + return &emptyTimeTickSlowdowner{ + lastestMVCCTimeTickGetter: lastestMVCCTimeTickGetter, + + lastestMVCCTimeTick: 0, + lastestMVCCTimeTickNotified: false, + lastConsumedTimeTick: 0, + + emptyTimeTickFilteredCounter: emptyTimeTickFilteredCounter, + tsafeTimeTickUnfilteredCounter: tsafeTimeTickUnfilteredCounter, + } +} + +func updateThresholdWithConfiguration() { + params := paramtable.Get() + interval := params.StreamingCfg.DelegatorEmptyTimeTickMaxFilterInterval.GetAsDurationByParse() + log.Info("delegator empty time tick max filter interval initialized", zap.Duration("interval", interval)) + thresholdUpdateIntervalMs.Store(interval.Milliseconds()) + params.Watch(params.StreamingCfg.DelegatorEmptyTimeTickMaxFilterInterval.Key, config.NewHandler( + params.StreamingCfg.DelegatorEmptyTimeTickMaxFilterInterval.Key, + func(_ *config.Event) { + previousInterval := thresholdUpdateIntervalMs.Load() + newInterval := params.StreamingCfg.DelegatorEmptyTimeTickMaxFilterInterval.GetAsDurationByParse() + log.Info("delegator empty time tick max filter interval updated", + zap.Duration("previousInterval", time.Duration(previousInterval)), + zap.Duration("interval", newInterval)) + thresholdUpdateIntervalMs.Store(newInterval.Milliseconds()) + }, + )) +} + +type emptyTimeTickSlowdowner struct { + lastestMVCCTimeTickGetter LastestMVCCTimeTickGetter + + lastestMVCCTimeTick uint64 + lastestMVCCTimeTickNotified bool + lastConsumedTimeTick uint64 + thresholdMs int64 + lastTimeThresholdUpdated time.Duration + + emptyTimeTickFilteredCounter prometheus.Counter + tsafeTimeTickUnfilteredCounter prometheus.Counter +} + +// Filter filters the message by the consuming slowdowner. +// if true, the message should be filtered out. +// if false, the message should be processed. +func (sd *emptyTimeTickSlowdowner) Filter(msg *msgstream.MsgPack) (filtered bool) { + defer func() { + if !filtered { + sd.lastConsumedTimeTick = msg.EndTs + return + } + sd.emptyTimeTickFilteredCounter.Inc() + }() + + if len(msg.Msgs) != 0 { + return false + } + + timetick := msg.EndTs + + // handle the case that if there's a pending + sd.updateLastestMVCCTimeTick() + if timetick < sd.lastestMVCCTimeTick { + // catch up the latest time tick to make the tsafe check pass. + // every time tick should be handled, + // otherwise the search/query operation which tsafe is less than the latest mvcc time tick will be blocked. + sd.tsafeTimeTickUnfilteredCounter.Inc() + return false + } + + // if the timetick is greater than the lastestMVCCTimeTick, it means all tsafe checks can be passed by it. + // so we mark the notified flag to true, stop the mvcc check, then the threshold check will be activated. + if !sd.lastestMVCCTimeTickNotified && timetick >= sd.lastestMVCCTimeTick { + sd.lastestMVCCTimeTickNotified = true + // This is the first time tick satisfying the tsafe check, so we should NOT filter it. + sd.tsafeTimeTickUnfilteredCounter.Inc() + return false + } + + // For monitoring, we should sync the time tick at least once every threshold. + return tsoutil.CalculateDuration(timetick, sd.lastConsumedTimeTick) < thresholdUpdateIntervalMs.Load() +} + +func (sd *emptyTimeTickSlowdowner) updateLastestMVCCTimeTick() { + if newIncoming := sd.lastestMVCCTimeTickGetter.GetLatestRequiredMVCCTimeTick(); newIncoming > sd.lastestMVCCTimeTick { + sd.lastestMVCCTimeTick = newIncoming + sd.lastestMVCCTimeTickNotified = false + } +} diff --git a/internal/util/pipeline/consuming_slowdown_test.go b/internal/util/pipeline/consuming_slowdown_test.go new file mode 100644 index 0000000000..d5f58def30 --- /dev/null +++ b/internal/util/pipeline/consuming_slowdown_test.go @@ -0,0 +1,61 @@ +package pipeline + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/milvus-io/milvus/internal/mocks/util/mock_pipeline" + "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" + "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" +) + +func TestConsumingSlowdown(t *testing.T) { + g := mock_pipeline.NewMockLastestMVCCTimeTickGetter(t) + + var latestRequiredMVCCTimeTick uint64 + + g.EXPECT().GetLatestRequiredMVCCTimeTick().RunAndReturn(func() uint64 { + return latestRequiredMVCCTimeTick + }) + + sd := newEmptyTimeTickSlowdowner(g, "vchannel") + + now := time.Now().UnixMilli() + ts := tsoutil.ComposeTS(now, 0) + filtered := sd.Filter(&msgstream.MsgPack{EndTs: ts}) + require.False(t, filtered) + + ts = tsoutil.ComposeTS(now+1, 0) + latestRequiredMVCCTimeTick = ts + 5 + + filtered = sd.Filter(&msgstream.MsgPack{EndTs: ts}) + require.False(t, filtered) + + filtered = sd.Filter(&msgstream.MsgPack{EndTs: ts + 4}) + require.False(t, filtered) + + filtered = sd.Filter(&msgstream.MsgPack{EndTs: ts + 5}) + require.False(t, filtered) + + latestRequiredMVCCTimeTick = ts + 10 + + filtered = sd.Filter(&msgstream.MsgPack{EndTs: ts + 7}) + require.False(t, filtered) + + filtered = sd.Filter(&msgstream.MsgPack{EndTs: ts + 10}) + require.False(t, filtered) + + filtered = sd.Filter(&msgstream.MsgPack{EndTs: ts + 11}) + require.True(t, filtered) + + filtered = sd.Filter(&msgstream.MsgPack{EndTs: ts + 12, Msgs: make([]msgstream.TsMsg, 10)}) + require.False(t, filtered) + + filtered = sd.Filter(&msgstream.MsgPack{EndTs: ts + 13}) + require.True(t, filtered) + + filtered = sd.Filter(&msgstream.MsgPack{EndTs: tsoutil.ComposeTS(now+int64(30*time.Millisecond), 0)}) + require.False(t, filtered) +} diff --git a/internal/util/pipeline/stream_pipeline.go b/internal/util/pipeline/stream_pipeline.go index e296c38499..b006c31088 100644 --- a/internal/util/pipeline/stream_pipeline.go +++ b/internal/util/pipeline/stream_pipeline.go @@ -52,7 +52,8 @@ type streamPipeline struct { closeWg sync.WaitGroup closeOnce sync.Once - lastAccessTime *atomic.Time + lastAccessTime *atomic.Time + emptyTimeTickSlowdowner *emptyTimeTickSlowdowner } func (p *streamPipeline) work() { @@ -67,7 +68,16 @@ func (p *streamPipeline) work() { log.Ctx(context.TODO()).Debug("stream pipeline input closed") return } + p.lastAccessTime.Store(time.Now()) + // Currently, milvus use the timetick to synchronize the system periodically, + // so the wal will still produce empty timetick message after the last write operation is done. + // When there're huge amount of vchannel in one pchannel, it will introduce a great overhead. + // So we filter out the empty time tick message as much as possible. + // TODO: After 3.0, we can remove the filter logic by LSN+MVCC. + if p.emptyTimeTickSlowdowner.Filter(msg) { + continue + } log.Ctx(context.TODO()).RatedDebug(10, "stream pipeline fetch msg", zap.Int("sum", len(msg.Msgs))) p.pipeline.inputChannel <- msg p.pipeline.process() @@ -149,6 +159,7 @@ func NewPipelineWithStream(dispatcher msgdispatcher.Client, nodeTtInterval time.Duration, enableTtChecker bool, vChannel string, + lastestMVCCTimeTickGetter LastestMVCCTimeTickGetter, ) StreamPipeline { pipeline := &streamPipeline{ pipeline: &pipeline{ @@ -156,11 +167,12 @@ func NewPipelineWithStream(dispatcher msgdispatcher.Client, nodeTtInterval: nodeTtInterval, enableTtChecker: enableTtChecker, }, - dispatcher: dispatcher, - vChannel: vChannel, - closeCh: make(chan struct{}), - closeWg: sync.WaitGroup{}, - lastAccessTime: atomic.NewTime(time.Now()), + dispatcher: dispatcher, + vChannel: vChannel, + closeCh: make(chan struct{}), + closeWg: sync.WaitGroup{}, + lastAccessTime: atomic.NewTime(time.Now()), + emptyTimeTickSlowdowner: newEmptyTimeTickSlowdowner(lastestMVCCTimeTickGetter, vChannel), } return pipeline diff --git a/internal/util/pipeline/stream_pipeline_test.go b/internal/util/pipeline/stream_pipeline_test.go index 4cb6a43031..18f1a2a60d 100644 --- a/internal/util/pipeline/stream_pipeline_test.go +++ b/internal/util/pipeline/stream_pipeline_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/mocks/util/mock_pipeline" "github.com/milvus-io/milvus/pkg/v2/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" @@ -50,7 +51,9 @@ func (suite *StreamPipelineSuite) SetupTest() { suite.msgDispatcher = msgdispatcher.NewMockClient(suite.T()) suite.msgDispatcher.EXPECT().Register(mock.Anything, mock.Anything).Return(suite.inChannel, nil) suite.msgDispatcher.EXPECT().Deregister(suite.channel) - suite.pipeline = NewPipelineWithStream(suite.msgDispatcher, 0, false, suite.channel) + getter := mock_pipeline.NewMockLastestMVCCTimeTickGetter(suite.T()) + getter.EXPECT().GetLatestRequiredMVCCTimeTick().Return(0) + suite.pipeline = NewPipelineWithStream(suite.msgDispatcher, 0, false, suite.channel, getter) suite.length = 4 } diff --git a/pkg/metrics/streaming_service_metrics.go b/pkg/metrics/streaming_service_metrics.go index 94cfefa3e1..b21b79b4b4 100644 --- a/pkg/metrics/streaming_service_metrics.go +++ b/pkg/metrics/streaming_service_metrics.go @@ -466,6 +466,21 @@ var ( Name: "recovery_is_on_persisting", Help: "Is recovery storage on persisting", }, WALChannelLabelName, WALChannelTermLabelName) + + WALDelegatorEmptyTimeTickFilteredTotal = newWALCounterVec(prometheus.CounterOpts{ + Name: "delegator_empty_time_tick_filtered_total", + Help: "Total of empty time tick filtered", + }, WALChannelLabelName) + + WALDelegatorTsafeTimeTickUnfilteredTotal = newWALCounterVec(prometheus.CounterOpts{ + Name: "delegator_tsafe_time_tick_unfiltered_total", + Help: "Total of empty time tick unfiltered because of tsafe", + }, WALChannelLabelName) + + WALFlusherEmptyTimeTickFilteredTotal = newWALCounterVec(prometheus.CounterOpts{ + Name: "flusher_empty_time_tick_filtered_total", + Help: "Total of empty time tick filtered", + }, WALChannelLabelName) ) // RegisterStreamingServiceClient registers streaming service client metrics @@ -568,6 +583,9 @@ func registerWAL(registry *prometheus.Registry) { registry.MustRegister(WALRecoveryPersistedTimeTick) registry.MustRegister(WALRecoveryInconsistentEventTotal) registry.MustRegister(WALRecoveryIsOnPersisting) + registry.MustRegister(WALDelegatorEmptyTimeTickFilteredTotal) + registry.MustRegister(WALDelegatorTsafeTimeTickUnfilteredTotal) + registry.MustRegister(WALFlusherEmptyTimeTickFilteredTotal) } func newStreamingCoordGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec { diff --git a/pkg/util/funcutil/random.go b/pkg/util/funcutil/random.go index 45f3588130..187557b199 100644 --- a/pkg/util/funcutil/random.go +++ b/pkg/util/funcutil/random.go @@ -19,22 +19,15 @@ package funcutil import ( "fmt" "math/rand" - "time" ) -var r *rand.Rand - -func init() { - r = rand.New(rand.NewSource(time.Now().UnixNano())) -} - var letterRunes = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") // RandomBytes returns a batch of random string func RandomBytes(n int) []byte { b := make([]byte, n) for i := range b { - b[i] = letterRunes[r.Intn(len(letterRunes))] + b[i] = letterRunes[rand.Intn(len(letterRunes))] } return b } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index c1168ca97c..d356063d6a 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -6542,6 +6542,10 @@ type streamingConfig struct { WALRecoveryMaxDirtyMessage ParamItem `refreshable:"true"` WALRecoveryGracefulCloseTimeout ParamItem `refreshable:"true"` WALRecoverySchemaExpirationTolerance ParamItem `refreshable:"true"` + + // Empty TimeTick Filtering configration + DelegatorEmptyTimeTickMaxFilterInterval ParamItem `refreshable:"true"` + FlushEmptyTimeTickMaxFilterInterval ParamItem `refreshable:"true"` } func (p *streamingConfig) init(base *BaseTable) { @@ -6897,6 +6901,29 @@ If the schema is older than (the channel checkpoint - tolerance), it will be rem Export: false, } p.WALRecoverySchemaExpirationTolerance.Init(base.mgr) + + p.DelegatorEmptyTimeTickMaxFilterInterval = ParamItem{ + Key: "streaming.delegator.emptyTimeTick.maxFilterInterval", + Version: "2.6.9", + Doc: `The max filter interval for empty time tick of delegator, 1m by default. +If the interval since last timetick is less than this config, the empty time tick will be filtered.`, + DefaultValue: "1m", + Export: false, + } + p.DelegatorEmptyTimeTickMaxFilterInterval.Init(base.mgr) + + p.FlushEmptyTimeTickMaxFilterInterval = ParamItem{ + Key: "streaming.flush.emptyTimeTick.maxFilterInterval", + Version: "2.6.9", + Doc: `The max filter interval for empty time tick of flush, 1s by default. +If the interval since last timetick is less than this config, the empty time tick will be filtered. +Because current flusher need the empty time tick to trigger the cp update, +too huge threshold will block the GetFlushState operation, +so we set 1 second here as a threshold.`, + DefaultValue: "1s", + Export: false, + } + p.FlushEmptyTimeTickMaxFilterInterval.Init(base.mgr) } // runtimeConfig is just a private environment value table. diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index f7f6b32094..86fefd5251 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -715,6 +715,8 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 10*time.Minute, params.StreamingCfg.FlushL0MaxLifetime.GetAsDurationByParse()) assert.Equal(t, 500000, params.StreamingCfg.FlushL0MaxRowNum.GetAsInt()) assert.Equal(t, int64(32*1024*1024), params.StreamingCfg.FlushL0MaxSize.GetAsSize()) + assert.Equal(t, 1*time.Minute, params.StreamingCfg.DelegatorEmptyTimeTickMaxFilterInterval.GetAsDurationByParse()) + assert.Equal(t, 1*time.Second, params.StreamingCfg.FlushEmptyTimeTickMaxFilterInterval.GetAsDurationByParse()) params.Save(params.StreamingCfg.WALBalancerTriggerInterval.Key, "50s") params.Save(params.StreamingCfg.WALBalancerBackoffInitialInterval.Key, "50s") diff --git a/tests/go_client/common/utils.go b/tests/go_client/common/utils.go index 59668ca1d1..500cf92f77 100644 --- a/tests/go_client/common/utils.go +++ b/tests/go_client/common/utils.go @@ -7,7 +7,6 @@ import ( "math/rand" "reflect" "strings" - "time" "github.com/x448/float16" "go.uber.org/zap" @@ -16,19 +15,12 @@ import ( "github.com/milvus-io/milvus/pkg/v2/log" ) -var ( - letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") - r *rand.Rand -) - -func init() { - r = rand.New(rand.NewSource(time.Now().UnixNano())) -} +var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") func GenRandomString(prefix string, n int) string { b := make([]rune, n) for i := range b { - b[i] = letterRunes[r.Intn(len(letterRunes))] + b[i] = letterRunes[rand.Intn(len(letterRunes))] } str := fmt.Sprintf("%s_%s", prefix, string(b)) return str diff --git a/tests/go_client/testcases/helper/helper.go b/tests/go_client/testcases/helper/helper.go index f4fc43406d..b88cb9f6c8 100644 --- a/tests/go_client/testcases/helper/helper.go +++ b/tests/go_client/testcases/helper/helper.go @@ -151,7 +151,7 @@ func GetAllFieldsName(schema entity.Schema) []string { // CreateDefaultMilvusClient creates a new client with default configuration func CreateDefaultMilvusClient(ctx context.Context, t *testing.T) *base.MilvusClient { t.Helper() - mc, err := base.NewMilvusClient(ctx, defaultClientConfig) + mc, err := base.NewMilvusClient(ctx, GetDefaultClientConfig()) common.CheckErr(t, err, true) t.Cleanup(func() { diff --git a/tests/go_client/testcases/helper/test_setup.go b/tests/go_client/testcases/helper/test_setup.go index 40e6c9224b..885bd2c899 100644 --- a/tests/go_client/testcases/helper/test_setup.go +++ b/tests/go_client/testcases/helper/test_setup.go @@ -7,6 +7,7 @@ import ( "time" "go.uber.org/zap" + "google.golang.org/grpc" client "github.com/milvus-io/milvus/client/v2/milvusclient" "github.com/milvus-io/milvus/pkg/v2/log" @@ -30,7 +31,12 @@ func setDefaultClientConfig(cfg *client.ClientConfig) { } func GetDefaultClientConfig() *client.ClientConfig { - return defaultClientConfig + newCfg := *defaultClientConfig + dialOptions := newCfg.DialOptions + newDialOptions := make([]grpc.DialOption, len(dialOptions)) + copy(newDialOptions, dialOptions) + newCfg.DialOptions = newDialOptions + return &newCfg } func GetAddr() string {