From 8905b042f15c951e93aa4373ff1dbc88fdf5e86a Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Mon, 14 Oct 2024 14:47:22 +0800 Subject: [PATCH] fix: add proportion for capacity seal policy in streaming flusher (#36761) issue: #36760 --------- Signed-off-by: chyezh --- .../flushcommon/pipeline/data_sync_service.go | 16 ++++++-- internal/flushcommon/writebuffer/options.go | 17 +++++++-- .../flushcommon/writebuffer/write_buffer.go | 38 ++++++++++++------- .../flusher/flusherimpl/channel_lifetime.go | 14 ++++++- .../segment/manager/pchannel_manager_test.go | 5 ++- .../policy/segment_limitation_policy.go | 5 ++- .../wal/interceptors/segment/stats/stats.go | 12 +++--- .../segment/stats/stats_manager.go | 6 +-- .../segment/stats/stats_manager_test.go | 4 +- .../interceptors/segment/stats/stats_test.go | 6 +-- 10 files changed, 84 insertions(+), 39 deletions(-) diff --git a/internal/flushcommon/pipeline/data_sync_service.go b/internal/flushcommon/pipeline/data_sync_service.go index 8512a088a1..983b66b0e6 100644 --- a/internal/flushcommon/pipeline/data_sync_service.go +++ b/internal/flushcommon/pipeline/data_sync_service.go @@ -221,6 +221,7 @@ func initMetaCache(initCtx context.Context, chunkManager storage.ChunkManager, i func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams, info *datapb.ChannelWatchInfo, metacache metacache.MetaCache, unflushed, flushed []*datapb.SegmentInfo, input <-chan *msgstream.MsgPack, + wbTaskObserverCallback writebuffer.TaskObserverCallback, ) (*DataSyncService, error) { var ( channelName = info.GetVchan().GetChannelName() @@ -316,7 +317,8 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams, // if fail to init flowgraph nodes. err = params.WriteBufferManager.Register(channelName, metacache, writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(params.Broker, config.serverID)), - writebuffer.WithIDAllocator(params.Allocator)) + writebuffer.WithIDAllocator(params.Allocator), + writebuffer.WithTaskObserverCallback(wbTaskObserverCallback)) if err != nil { log.Warn("failed to register channel buffer", zap.String("channel", channelName), zap.Error(err)) return nil, err @@ -353,10 +355,16 @@ func NewDataSyncService(initCtx context.Context, pipelineParams *util.PipelinePa if metaCache, err = getMetaCacheWithTickler(initCtx, pipelineParams, info, tickler, unflushedSegmentInfos, flushedSegmentInfos); err != nil { return nil, err } - return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, nil) + return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, nil, nil) } -func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *util.PipelineParams, info *datapb.ChannelWatchInfo, input <-chan *msgstream.MsgPack) (*DataSyncService, error) { +func NewStreamingNodeDataSyncService( + initCtx context.Context, + pipelineParams *util.PipelineParams, + info *datapb.ChannelWatchInfo, + input <-chan *msgstream.MsgPack, + wbTaskObserverCallback writebuffer.TaskObserverCallback, +) (*DataSyncService, error) { // recover segment checkpoints var ( err error @@ -381,7 +389,7 @@ func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *ut if metaCache, err = getMetaCacheForStreaming(initCtx, pipelineParams, info, unflushedSegmentInfos, flushedSegmentInfos); err != nil { return nil, err } - return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, input) + return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, input, wbTaskObserverCallback) } func NewDataSyncServiceWithMetaCache(metaCache metacache.MetaCache) *DataSyncService { diff --git a/internal/flushcommon/writebuffer/options.go b/internal/flushcommon/writebuffer/options.go index 2b3a5d9c51..dd43eb2c36 100644 --- a/internal/flushcommon/writebuffer/options.go +++ b/internal/flushcommon/writebuffer/options.go @@ -19,14 +19,17 @@ const ( type WriteBufferOption func(opt *writeBufferOption) +type TaskObserverCallback func(t syncmgr.Task, err error) + type writeBufferOption struct { deletePolicy string idAllocator allocator.Interface syncPolicies []SyncPolicy - pkStatsFactory metacache.PkStatsFactory - metaWriter syncmgr.MetaWriter - errorHandler func(error) + pkStatsFactory metacache.PkStatsFactory + metaWriter syncmgr.MetaWriter + errorHandler func(error) + taskObserverCallback TaskObserverCallback } func defaultWBOption(metacache metacache.MetaCache) *writeBufferOption { @@ -85,3 +88,11 @@ func WithErrorHandler(handler func(err error)) WriteBufferOption { opt.errorHandler = handler } } + +// WithTaskObserverCallback sets the callback function for observing task status. +// The callback will be called when every task is executed, should be concurrent safe to be called. +func WithTaskObserverCallback(callback TaskObserverCallback) WriteBufferOption { + return func(opt *writeBufferOption) { + opt.taskObserverCallback = callback + } +} diff --git a/internal/flushcommon/writebuffer/write_buffer.go b/internal/flushcommon/writebuffer/write_buffer.go index 5276cba880..1e6bfb7209 100644 --- a/internal/flushcommon/writebuffer/write_buffer.go +++ b/internal/flushcommon/writebuffer/write_buffer.go @@ -151,7 +151,8 @@ type writeBufferBase struct { checkpoint *msgpb.MsgPosition flushTimestamp *atomic.Uint64 - errHandler func(err error) + errHandler func(err error) + taskObserverCallback func(t syncmgr.Task, err error) // execute when a sync task finished, should be concurrent safe. // pre build logger logger *log.MLogger @@ -181,19 +182,20 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, syncMgr s } wb := &writeBufferBase{ - channelName: channel, - collectionID: metacache.Collection(), - collSchema: schema, - estSizePerRecord: estSize, - syncMgr: syncMgr, - metaWriter: option.metaWriter, - buffers: make(map[int64]*segmentBuffer), - metaCache: metacache, - serializer: serializer, - syncCheckpoint: newCheckpointCandiates(), - syncPolicies: option.syncPolicies, - flushTimestamp: flushTs, - errHandler: option.errorHandler, + channelName: channel, + collectionID: metacache.Collection(), + collSchema: schema, + estSizePerRecord: estSize, + syncMgr: syncMgr, + metaWriter: option.metaWriter, + buffers: make(map[int64]*segmentBuffer), + metaCache: metacache, + serializer: serializer, + syncCheckpoint: newCheckpointCandiates(), + syncPolicies: option.syncPolicies, + flushTimestamp: flushTs, + errHandler: option.errorHandler, + taskObserverCallback: option.taskObserverCallback, } wb.logger = log.With(zap.Int64("collectionID", wb.collectionID), @@ -342,6 +344,10 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) } result = append(result, wb.syncMgr.SyncData(ctx, syncTask, func(err error) error { + if wb.taskObserverCallback != nil { + wb.taskObserverCallback(syncTask, err) + } + if err != nil { return err } @@ -654,6 +660,10 @@ func (wb *writeBufferBase) Close(ctx context.Context, drop bool) { } f := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error { + if wb.taskObserverCallback != nil { + wb.taskObserverCallback(syncTask, err) + } + if err != nil { return err } diff --git a/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go b/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go index 560c6f46e1..ace4d80c2f 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go +++ b/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go @@ -26,10 +26,12 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/flushcommon/pipeline" + "github.com/milvus-io/milvus/internal/flushcommon/syncmgr" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" adaptor2 "github.com/milvus-io/milvus/internal/streamingnode/server/wal/adaptor" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor" "github.com/milvus-io/milvus/pkg/streaming/util/options" @@ -116,7 +118,17 @@ func (c *channelLifetime) Run() error { // Build and add pipeline. ds, err := pipeline.NewStreamingNodeDataSyncService(ctx, c.f.pipelineParams, - &datapb.ChannelWatchInfo{Vchan: resp.GetInfo(), Schema: resp.GetSchema()}, handler.Chan()) + &datapb.ChannelWatchInfo{Vchan: resp.GetInfo(), Schema: resp.GetSchema()}, handler.Chan(), func(t syncmgr.Task, err error) { + if err != nil || t == nil { + return + } + if tt, ok := t.(*syncmgr.SyncTask); ok { + insertLogs, _, _ := tt.Binlogs() + resource.Resource().SegmentAssignStatsManager().UpdateOnSync(tt.SegmentID(), + stats.SyncOperationMetrics{BinLogCounterIncr: uint64(len(insertLogs))}, + ) + } + }) if err != nil { return err } diff --git a/internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager_test.go b/internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager_test.go index cf118d695c..4bd6ad48b7 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager_test.go +++ b/internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager_test.go @@ -135,8 +135,8 @@ func TestSegmentAllocManager(t *testing.T) { assert.True(t, m.IsNoWaitSeal()) // Try to seal with a policy - resource.Resource().SegmentAssignStatsManager().UpdateOnFlush(6000, stats.FlushOperationMetrics{ - BinLogCounter: 100, + resource.Resource().SegmentAssignStatsManager().UpdateOnSync(6000, stats.SyncOperationMetrics{ + BinLogCounterIncr: 100, }) // ask a unacknowledgement seal for partition 3 to avoid seal operation. result, err = m.AssignSegment(ctx, &AssignSegmentRequest{ @@ -266,6 +266,7 @@ func initializeTestState(t *testing.T) { // s 6000g paramtable.Init() + paramtable.Get().DataCoordCfg.SegmentSealProportion.SwapTempValue("1.0") paramtable.Get().DataCoordCfg.SegmentSealProportionJitter.SwapTempValue("0.0") paramtable.Get().DataCoordCfg.SegmentMaxSize.SwapTempValue("1") diff --git a/internal/streamingnode/server/wal/interceptors/segment/policy/segment_limitation_policy.go b/internal/streamingnode/server/wal/interceptors/segment/policy/segment_limitation_policy.go index 7de8d3a502..f66a3c0edd 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/policy/segment_limitation_policy.go +++ b/internal/streamingnode/server/wal/interceptors/segment/policy/segment_limitation_policy.go @@ -29,6 +29,7 @@ type SegmentLimitationPolicy interface { type jitterSegmentLimitationPolicyExtraInfo struct { Jitter float64 JitterRatio float64 + Proportion float64 MaxSegmentSize uint64 } @@ -46,13 +47,15 @@ func (p jitterSegmentLimitationPolicy) GenerateLimitation() SegmentLimitation { jitterRatio = 1 } maxSegmentSize := uint64(paramtable.Get().DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024) - segmentSize := uint64(jitterRatio * float64(maxSegmentSize)) + proportion := paramtable.Get().DataCoordCfg.SegmentSealProportion.GetAsFloat() + segmentSize := uint64(jitterRatio * float64(maxSegmentSize) * proportion) return SegmentLimitation{ PolicyName: "jitter_segment_limitation", SegmentSize: segmentSize, ExtraInfo: jitterSegmentLimitationPolicyExtraInfo{ Jitter: jitter, JitterRatio: jitterRatio, + Proportion: proportion, MaxSegmentSize: maxSegmentSize, }, } diff --git a/internal/streamingnode/server/wal/interceptors/segment/stats/stats.go b/internal/streamingnode/server/wal/interceptors/segment/stats/stats.go index 5f7d785c08..7d160d9dae 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/stats/stats.go +++ b/internal/streamingnode/server/wal/interceptors/segment/stats/stats.go @@ -48,9 +48,9 @@ func NewProtoFromSegmentStat(stat *SegmentStats) *streamingpb.SegmentAssignmentS } } -// FlushOperationMetrics is the metrics of flush operation. -type FlushOperationMetrics struct { - BinLogCounter uint64 +// SyncOperationMetrics is the metrics of sync operation. +type SyncOperationMetrics struct { + BinLogCounterIncr uint64 // the counter increment of bin log. } // AllocRows alloc space of rows on current segment. @@ -71,9 +71,9 @@ func (s *SegmentStats) BinaryCanBeAssign() uint64 { return s.MaxBinarySize - s.Insert.BinarySize } -// UpdateOnFlush updates the stats of segment on flush. -func (s *SegmentStats) UpdateOnFlush(f FlushOperationMetrics) { - s.BinLogCounter = f.BinLogCounter +// UpdateOnSync updates the stats of segment on sync. +func (s *SegmentStats) UpdateOnSync(f SyncOperationMetrics) { + s.BinLogCounter += f.BinLogCounterIncr } // Copy copies the segment stats. diff --git a/internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager.go b/internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager.go index 864d1221a8..84241147b7 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager.go +++ b/internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager.go @@ -112,9 +112,9 @@ func (m *StatsManager) GetStatsOfSegment(segmentID int64) *SegmentStats { return m.segmentStats[segmentID].Copy() } -// UpdateOnFlush updates the stats of segment on flush. +// UpdateOnSync updates the stats of segment on sync. // It's an async update operation, so it's not necessary to do success. -func (m *StatsManager) UpdateOnFlush(segmentID int64, flush FlushOperationMetrics) { +func (m *StatsManager) UpdateOnSync(segmentID int64, syncMetric SyncOperationMetrics) { m.mu.Lock() defer m.mu.Unlock() @@ -122,7 +122,7 @@ func (m *StatsManager) UpdateOnFlush(segmentID int64, flush FlushOperationMetric if _, ok := m.segmentIndex[segmentID]; !ok { return } - m.segmentStats[segmentID].UpdateOnFlush(flush) + m.segmentStats[segmentID].UpdateOnSync(syncMetric) // binlog counter is updated, notify seal manager to do seal scanning. m.sealNotifier.AddAndNotify(m.segmentIndex[segmentID]) diff --git a/internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager_test.go b/internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager_test.go index 47d53cc6e5..427244db06 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager_test.go +++ b/internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager_test.go @@ -60,11 +60,11 @@ func TestStatsManager(t *testing.T) { assert.Equal(t, uint64(350), m.pchannelStats["pchannel"].BinarySize) assert.Equal(t, uint64(250), m.pchannelStats["pchannel2"].BinarySize) - m.UpdateOnFlush(3, FlushOperationMetrics{BinLogCounter: 100}) + m.UpdateOnSync(3, SyncOperationMetrics{BinLogCounterIncr: 100}) <-m.SealNotifier().WaitChan() infos = m.SealNotifier().Get() assert.Len(t, infos, 1) - m.UpdateOnFlush(1000, FlushOperationMetrics{BinLogCounter: 100}) + m.UpdateOnSync(1000, SyncOperationMetrics{BinLogCounterIncr: 100}) shouldBlock(t, m.SealNotifier().WaitChan()) m.AllocRows(3, InsertMetrics{Rows: 400, BinarySize: 400}) diff --git a/internal/streamingnode/server/wal/interceptors/segment/stats/stats_test.go b/internal/streamingnode/server/wal/interceptors/segment/stats/stats_test.go index bdef19f136..2224be9d46 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/stats/stats_test.go +++ b/internal/streamingnode/server/wal/interceptors/segment/stats/stats_test.go @@ -68,8 +68,8 @@ func TestSegmentStats(t *testing.T) { assert.Equal(t, stat.Insert.Rows, uint64(160)) assert.Equal(t, stat.Insert.BinarySize, uint64(320)) - stat.UpdateOnFlush(FlushOperationMetrics{ - BinLogCounter: 4, + stat.UpdateOnSync(SyncOperationMetrics{ + BinLogCounterIncr: 4, }) - assert.Equal(t, uint64(4), stat.BinLogCounter) + assert.Equal(t, uint64(7), stat.BinLogCounter) }