From 7beafe99a705a24b8e10e7638419e995c79802bb Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Tue, 13 May 2025 22:08:56 +0800 Subject: [PATCH] enhance: implement wal garbage collector with truncate api (#41770) issue: #41544 - add a truncator implementation into wal recovery storage. - add metrics for recovery storage. --------- Signed-off-by: chyezh --- configs/milvus.yaml | 9 ++ .../segment/manager/seal_queue.go | 3 - .../segment/manager/segment_manager.go | 2 - .../shard/shards/partition_manager.go | 22 ++- .../shard/shards/partition_manager_test.go | 2 + .../shard/shards/shard_manager.go | 45 +++--- .../shard/shards/shard_manager_collection.go | 9 +- .../shard/shards/shard_manager_partition.go | 9 +- .../shard/shards/shard_manager_segment.go | 17 ++- .../shard/shards/shard_manager_test.go | 2 +- .../interceptors/shard/stats/stats_manager.go | 8 +- .../server/wal/metricsutil/segment.go | 68 ++++++--- .../server/wal/recovery/config.go | 32 ++++ .../server/wal/recovery/config_test.go | 9 ++ .../server/wal/recovery/metrics.go | 78 ++++++++++ .../wal/recovery/recovery_background_task.go | 13 +- .../server/wal/recovery/recovery_persisted.go | 1 + .../server/wal/recovery/recovery_storage.go | 4 + .../wal/recovery/recovery_storage_impl.go | 15 ++ .../wal/recovery/recovery_storage_test.go | 5 + .../server/wal/recovery/recovery_stream.go | 2 + .../server/wal/recovery/wal_truncator.go | 137 ++++++++++++++++++ .../server/wal/recovery/wal_truncator_test.go | 40 +++++ pkg/metrics/streaming_service_metrics.go | 47 +++++- pkg/streaming/walimpls/impls/pulsar/wal.go | 3 +- pkg/util/paramtable/component_param.go | 24 +++ pkg/util/paramtable/component_param_test.go | 6 + 27 files changed, 538 insertions(+), 74 deletions(-) create mode 100644 internal/streamingnode/server/wal/recovery/metrics.go create mode 100644 internal/streamingnode/server/wal/recovery/wal_truncator.go create mode 100644 internal/streamingnode/server/wal/recovery/wal_truncator_test.go diff --git a/configs/milvus.yaml b/configs/milvus.yaml index c8a3e448cb..fcacdab88d 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -1242,6 +1242,15 @@ streaming: # When the wal is on-closing, the recovery module will try to persist the recovery info for wal to make next recovery operation more fast. # If that persist operation exceeds this timeout, the wal recovery module will close right now. gracefulCloseTimeout: 3s + walTruncate: + # The interval of sampling wal checkpoint when truncate, 1m by default. + # Every time the checkpoint is persisted, the checkpoint will be sampled and used to be a candidate of truncate checkpoint. + # More samples, more frequent truncate, more memory usage. + sampleInterval: 1m + # The retention interval of wal truncate, 5m by default. + # If the sampled checkpoint is older than this interval, it will be used to truncate wal checkpoint. + # Greater the interval, more wal storage usage, more redundant data in wal + retentionInterval: 5m # Any configuration related to the knowhere vector search engine knowhere: diff --git a/internal/streamingnode/server/wal/interceptors/segment/manager/seal_queue.go b/internal/streamingnode/server/wal/interceptors/segment/manager/seal_queue.go index cb55a49790..df9a46061f 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/manager/seal_queue.go +++ b/internal/streamingnode/server/wal/interceptors/segment/manager/seal_queue.go @@ -127,9 +127,6 @@ func (q *sealQueue) tryToSealSegments(ctx context.Context, segments ...*segmentA undone = append(undone, segment) continue } - q.metrics.ObserveSegmentFlushed( - string(segment.SealPolicy()), - int64(segment.GetStat().Insert.BinarySize)) q.logger.Info("segment has been flushed", zap.Int64("collectionID", segment.GetCollectionID()), zap.Int64("partitionID", segment.GetPartitionID()), diff --git a/internal/streamingnode/server/wal/interceptors/segment/manager/segment_manager.go b/internal/streamingnode/server/wal/interceptors/segment/manager/segment_manager.go index a1d009b549..465e171497 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/manager/segment_manager.go +++ b/internal/streamingnode/server/wal/interceptors/segment/manager/segment_manager.go @@ -46,7 +46,6 @@ func newSegmentAllocManagerFromProto( }, inner.GetSegmentId(), stat) stat = nil } - metrics.UpdateGrowingSegmentState(streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_UNKNOWN, inner.GetState()) return &segmentAllocManager{ pchannel: pchannel, inner: inner, @@ -313,7 +312,6 @@ func (m *mutableSegmentAssignmentMeta) Commit(ctx context.Context) error { // if the state transferred from growing into others, remove the stats from stats manager. m.original.immutableStat = resource.Resource().SegmentAssignStatsManager().UnregisterSealedSegment(m.original.GetSegmentID()) } - m.original.metrics.UpdateGrowingSegmentState(m.original.GetState(), m.modifiedCopy.GetState()) m.original.inner = m.modifiedCopy return nil } diff --git a/internal/streamingnode/server/wal/interceptors/shard/shards/partition_manager.go b/internal/streamingnode/server/wal/interceptors/shard/shards/partition_manager.go index 765aa4253d..10d4aefed7 100644 --- a/internal/streamingnode/server/wal/interceptors/shard/shards/partition_manager.go +++ b/internal/streamingnode/server/wal/interceptors/shard/shards/partition_manager.go @@ -10,6 +10,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/shard/policy" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/shard/utils" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" "github.com/milvus-io/milvus/pkg/v2/util/syncutil" @@ -27,6 +28,7 @@ func newPartitionSegmentManager( segments map[int64]*segmentAllocManager, txnManager TxnManager, fencedAssignTimeTick uint64, + metrics *metricsutil.SegmentAssignMetrics, ) *partitionManager { for _, segment := range segments { if segment.CreateSegmentTimeTick() > fencedAssignTimeTick { @@ -44,6 +46,7 @@ func newPartitionSegmentManager( onAllocating: nil, segments: segments, fencedAssignTimeTick: fencedAssignTimeTick, + metrics: metrics, } m.SetLogger(logger.With(zap.String("vchannel", vchannel), zap.Int64("collectionID", collectionID), zap.Int64("partitionID", paritionID))) return m @@ -63,6 +66,7 @@ type partitionManager struct { onAllocating chan struct{} // indicates that if the partition manager is on-allocating a new segment. segments map[int64]*segmentAllocManager // there will be very few segments in this list. fencedAssignTimeTick uint64 // the time tick that the assign operation is fenced. + metrics *metricsutil.SegmentAssignMetrics } // AddSegment adds a segment to the partition segment manager. @@ -76,6 +80,7 @@ func (m *partitionManager) AddSegment(s *segmentAllocManager) { panic("critical bug: create segment time tick is less than fenced assign time tick") } m.segments[s.GetSegmentID()] = s + m.metrics.ObserveCreateSegment() } // GetSegmentManager returns the segment manager of the given segment ID. @@ -117,6 +122,11 @@ func (m *partitionManager) FlushAndDropPartition(policy policy.SealPolicy) []int segmentIDs := make([]int64, 0, len(m.segments)) for _, segment := range m.segments { segment.Flush(policy) + m.metrics.ObserveSegmentFlushed( + string(segment.SealPolicy().Policy), + int64(segment.GetFlushedStat().Insert.Rows), + int64(segment.GetFlushedStat().Insert.BinarySize), + ) segmentIDs = append(segmentIDs, segment.GetSegmentID()) } m.segments = make(map[int64]*segmentAllocManager) @@ -134,6 +144,11 @@ func (m *partitionManager) FlushAndFenceSegmentUntil(timeTick uint64) []int64 { segmentIDs := make([]int64, 0, len(m.segments)) for _, segment := range m.segments { segment.Flush(policy.PolicyFenced(timeTick)) + m.metrics.ObserveSegmentFlushed( + string(segment.SealPolicy().Policy), + int64(segment.GetFlushedStat().Insert.Rows), + int64(segment.GetFlushedStat().Insert.BinarySize), + ) segmentIDs = append(segmentIDs, segment.GetSegmentID()) } m.segments = make(map[int64]*segmentAllocManager) @@ -157,6 +172,11 @@ func (m *partitionManager) AsyncFlushSegment(signal utils.SealSegmentSignal) err if !sm.IsFlushed() { sm.Flush(signal.SealPolicy) + m.metrics.ObserveSegmentFlushed( + string(sm.SealPolicy().Policy), + int64(sm.GetFlushedStat().Insert.Rows), + int64(sm.GetFlushedStat().Insert.BinarySize), + ) m.asyncFlushSegment(m.ctx, sm) } return nil @@ -190,7 +210,7 @@ func (m *partitionManager) assignSegment(req *AssignSegmentRequest) (*AssignSegm } // There is no segment can be allocated for the insert request. - // Ask a new peding segment to insert. + // Ask a new pending segment to insert. m.asyncAllocSegment() return nil, ErrWaitForNewSegment } diff --git a/internal/streamingnode/server/wal/interceptors/shard/shards/partition_manager_test.go b/internal/streamingnode/server/wal/interceptors/shard/shards/partition_manager_test.go index 55af9801bd..c0534d592c 100644 --- a/internal/streamingnode/server/wal/interceptors/shard/shards/partition_manager_test.go +++ b/internal/streamingnode/server/wal/interceptors/shard/shards/partition_manager_test.go @@ -16,6 +16,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/shard/policy" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/shard/stats" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/shard/utils" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/messagespb" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" @@ -76,6 +77,7 @@ func TestPartitionManager(t *testing.T) { }, &mockedTxnManager{}, 100, + metricsutil.NewSegmentAssignMetrics(channel.Name), ) createSegmentDone := make(chan struct{}, 1) flushSegmentDone := make(chan struct{}, 1) diff --git a/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager.go b/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager.go index a679a5baa6..277de561c2 100644 --- a/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager.go +++ b/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager.go @@ -55,6 +55,8 @@ func RecoverShardManager(param *ShardManagerRecoverParam) *ShardManager { logger := resource.Resource().Logger().With(log.FieldComponent("shard-manager")).With(zap.Stringer("pchannel", param.ChannelInfo)) // create managers list. managers := make(map[int64]*partitionManager) + segmentTotal := 0 + metrics := metricsutil.NewSegmentAssignMetrics(param.ChannelInfo.Name) for collectionID, collectionInfo := range collections { for partitionID := range collectionInfo.PartitionIDs { segmentManagers := make(map[int64]*segmentAllocManager, 0) @@ -76,26 +78,29 @@ func RecoverShardManager(param *ShardManagerRecoverParam) *ShardManager { segmentManagers, param.TxnManager, param.InitialRecoverSnapshot.Checkpoint.TimeTick, // use the checkpoint time tick to fence directly. + metrics, ) + segmentTotal += len(segmentManagers) } } m := &ShardManager{ - mu: sync.Mutex{}, - ctx: ctx, - cancel: cancel, - wal: param.WAL, - pchannel: param.ChannelInfo, - managers: managers, - collections: collections, - txnManager: param.TxnManager, - metrics: metricsutil.NewSegmentAssignMetrics(param.ChannelInfo.Name), + mu: sync.Mutex{}, + ctx: ctx, + cancel: cancel, + wal: param.WAL, + pchannel: param.ChannelInfo, + partitionManagers: managers, + collections: collections, + txnManager: param.TxnManager, + metrics: metrics, } m.SetLogger(logger) m.updateMetrics() + m.metrics.UpdateSegmentCount(segmentTotal) belongs := lo.Values(segmentBelongs) stats := make([]*stats.SegmentStats, 0, len(belongs)) for _, belong := range belongs { - stat := m.managers[belong.PartitionID].segments[belong.SegmentID].GetStatFromRecovery() + stat := m.partitionManagers[belong.PartitionID].segments[belong.SegmentID].GetStatFromRecovery() stats = append(stats, stat) } resource.Resource().SegmentStatsManager().RegisterSealOperator(m, belongs, stats) @@ -160,15 +165,15 @@ func newCollectionInfos(recoverInfos *recovery.RecoverySnapshot) map[int64]*Coll type ShardManager struct { log.Binder - mu sync.Mutex - ctx context.Context - cancel context.CancelFunc - wal *syncutil.Future[wal.WAL] - pchannel types.PChannelInfo - managers map[int64]*partitionManager // map partitionID to partition manager - collections map[int64]*CollectionInfo // map collectionID to collectionInfo - metrics *metricsutil.SegmentAssignMetrics - txnManager TxnManager + mu sync.Mutex + ctx context.Context + cancel context.CancelFunc + wal *syncutil.Future[wal.WAL] + pchannel types.PChannelInfo + partitionManagers map[int64]*partitionManager // map partitionID to partition manager + collections map[int64]*CollectionInfo // map collectionID to collectionInfo + metrics *metricsutil.SegmentAssignMetrics + txnManager TxnManager } type CollectionInfo struct { @@ -192,7 +197,7 @@ func (m *ShardManager) Close() { } func (m *ShardManager) updateMetrics() { - m.metrics.UpdatePartitionCount(len(m.managers)) + m.metrics.UpdatePartitionCount(len(m.partitionManagers)) m.metrics.UpdateCollectionCount(len(m.collections)) } diff --git a/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_collection.go b/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_collection.go index 52b6e33321..13c95ec6f7 100644 --- a/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_collection.go +++ b/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_collection.go @@ -60,11 +60,11 @@ func (m *ShardManager) CreateCollection(msg message.ImmutableCreateCollectionMes m.collections[collectionID] = newCollectionInfo(vchannel, partitionIDs) for _, partitionID := range partitionIDs { - if _, ok := m.managers[partitionID]; ok { + if _, ok := m.partitionManagers[partitionID]; ok { logger.Warn("partition already exists", zap.Int64("partitionID", partitionID)) continue } - m.managers[partitionID] = newPartitionSegmentManager( + m.partitionManagers[partitionID] = newPartitionSegmentManager( m.ctx, m.Logger(), m.wal, @@ -75,6 +75,7 @@ func (m *ShardManager) CreateCollection(msg message.ImmutableCreateCollectionMes make(map[int64]*segmentAllocManager), m.txnManager, timetick, + m.metrics, ) } logger.Info("collection created in segment assignment service", zap.Int64s("partitionIDs", partitionIDs)) @@ -102,7 +103,7 @@ func (m *ShardManager) DropCollection(msg message.ImmutableDropCollectionMessage partitionIDs := make([]int64, 0, len(collectionInfo.PartitionIDs)) segmentIDs := make([]int64, 0, len(collectionInfo.PartitionIDs)) for partitionID := range collectionInfo.PartitionIDs { - pm, ok := m.managers[partitionID] + pm, ok := m.partitionManagers[partitionID] if !ok { logger.Warn("partition not exists", zap.Int64("partitionID", partitionID)) continue @@ -111,7 +112,7 @@ func (m *ShardManager) DropCollection(msg message.ImmutableDropCollectionMessage segments := pm.FlushAndDropPartition(policy.PolicyCollectionRemoved()) partitionIDs = append(partitionIDs, partitionID) segmentIDs = append(segmentIDs, segments...) - delete(m.managers, partitionID) + delete(m.partitionManagers, partitionID) } logger.Info("collection removed", zap.Int64s("partitionIDs", partitionIDs), zap.Int64s("segmentIDs", segmentIDs)) m.updateMetrics() diff --git a/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_partition.go b/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_partition.go index 454ce31c85..b09e6e713c 100644 --- a/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_partition.go +++ b/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_partition.go @@ -64,11 +64,11 @@ func (m *ShardManager) CreatePartition(msg message.ImmutableCreatePartitionMessa } m.collections[collectionID].PartitionIDs[partitionID] = struct{}{} - if _, ok := m.managers[partitionID]; ok { + if _, ok := m.partitionManagers[partitionID]; ok { logger.Warn("partition manager already exists") return } - m.managers[partitionID] = newPartitionSegmentManager( + m.partitionManagers[partitionID] = newPartitionSegmentManager( m.ctx, m.Logger(), m.wal, @@ -79,6 +79,7 @@ func (m *ShardManager) CreatePartition(msg message.ImmutableCreatePartitionMessa make(map[int64]*segmentAllocManager), m.txnManager, tiemtick, + m.metrics, ) m.Logger().Info("partition created") m.updateMetrics() @@ -100,13 +101,13 @@ func (m *ShardManager) DropPartition(msg message.ImmutableDropPartitionMessageV1 } delete(m.collections[collectionID].PartitionIDs, partitionID) - pm, ok := m.managers[partitionID] + pm, ok := m.partitionManagers[partitionID] if !ok { logger.Warn("partition not exists", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID)) return } - delete(m.managers, partitionID) + delete(m.partitionManagers, partitionID) segmentIDs := pm.FlushAndDropPartition(policy.PolicyPartitionRemoved()) m.Logger().Info("partition removed", zap.Int64s("segmentIDs", segmentIDs)) m.updateMetrics() diff --git a/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_segment.go b/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_segment.go index fe6650764f..73baf0d0fe 100644 --- a/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_segment.go +++ b/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_segment.go @@ -47,7 +47,7 @@ func (m *ShardManager) checkIfSegmentCanBeCreated(collectionID int64, partitionI return err } - if m := m.managers[partitionID].GetSegmentManager(segmentID); m != nil { + if m := m.partitionManagers[partitionID].GetSegmentManager(segmentID); m != nil { return ErrSegmentExists } return nil @@ -69,7 +69,7 @@ func (m *ShardManager) checkIfSegmentCanBeFlushed(collecionID int64, partitionID // segment can be flushed only if the segment exists, and its state is flushed. // pm must exists, because we have checked the partition exists. - pm := m.managers[partitionID] + pm := m.partitionManagers[partitionID] sm := pm.GetSegmentManager(segmentID) if sm == nil { return ErrSegmentNotFound @@ -92,7 +92,7 @@ func (m *ShardManager) CreateSegment(msg message.ImmutableCreateSegmentMessageV2 } s := newSegmentAllocManager(m.pchannel, msg) - pm, ok := m.managers[s.GetPartitionID()] + pm, ok := m.partitionManagers[s.GetPartitionID()] if !ok { panic("critical error: partition manager not found when a segment is created") } @@ -113,7 +113,7 @@ func (m *ShardManager) FlushSegment(msg message.ImmutableFlushMessageV2) { return } - pm := m.managers[partitionID] + pm := m.partitionManagers[partitionID] pm.MustRemoveFlushedSegment(segmentID) } @@ -122,7 +122,7 @@ func (m *ShardManager) AssignSegment(req *AssignSegmentRequest) (*AssignSegmentR m.mu.Lock() defer m.mu.Unlock() - if pm, ok := m.managers[req.PartitionID]; ok { + if pm, ok := m.partitionManagers[req.PartitionID]; ok { return pm.AssignSegment(req) } else { return nil, ErrPartitionNotFound @@ -137,7 +137,7 @@ func (m *ShardManager) WaitUntilGrowingSegmentReady(collectionID int64, partiton if err := m.checkIfPartitionExists(collectionID, partitonID); err != nil { return nil, err } - return m.managers[partitonID].WaitPendingGrowingSegmentReady(), nil + return m.partitionManagers[partitonID].WaitPendingGrowingSegmentReady(), nil } // FlushAndFenceSegmentAllocUntil flush all segment that contains the message which timetick is less than the incoming timetick. @@ -159,7 +159,7 @@ func (m *ShardManager) FlushAndFenceSegmentAllocUntil(collectionID int64, timeti // collect all partitions for partitionID := range collectionInfo.PartitionIDs { // Seal all segments and fence assign to the partition manager. - pm, ok := m.managers[partitionID] + pm, ok := m.partitionManagers[partitionID] if !ok { logger.Warn("partition not found when FlushAndFenceSegmentAllocUntil", zap.Int64("partitionID", partitionID)) continue @@ -181,9 +181,10 @@ func (m *ShardManager) AsyncFlushSegment(signal utils.SealSegmentSignal) { m.mu.Lock() defer m.mu.Unlock() - pm, ok := m.managers[signal.SegmentBelongs.PartitionID] + pm, ok := m.partitionManagers[signal.SegmentBelongs.PartitionID] if !ok { logger.Warn("partition not found when AsyncMustSeal, may be already dropped") + return } if err := pm.AsyncFlushSegment(signal); err != nil { logger.Warn("segment not found when AsyncMustSeal, may be already sealed", zap.Error(err)) diff --git a/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_test.go b/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_test.go index 7e4931e115..50f03b28cb 100644 --- a/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_test.go +++ b/internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_test.go @@ -200,7 +200,7 @@ func TestShardManager(t *testing.T) { WithTimeTick(600). WithLastConfirmedUseMessageID(). IntoImmutableMessage(rmq.NewRmqID(3)) - m.managers[10].onAllocating = make(chan struct{}) + m.partitionManagers[10].onAllocating = make(chan struct{}) ch, err := m.WaitUntilGrowingSegmentReady(7, 10) assert.NoError(t, err) select { diff --git a/internal/streamingnode/server/wal/interceptors/shard/stats/stats_manager.go b/internal/streamingnode/server/wal/interceptors/shard/stats/stats_manager.go index 918f871c00..5f9a661fa3 100644 --- a/internal/streamingnode/server/wal/interceptors/shard/stats/stats_manager.go +++ b/internal/streamingnode/server/wal/interceptors/shard/stats/stats_manager.go @@ -174,12 +174,12 @@ func (m *StatsManager) registerNewGrowingSegment(belongs SegmentBelongs, stats * // Must be called after RegisterGrowingSegment and before UnregisterGrowingSegment. func (m *StatsManager) AllocRows(segmentID int64, insert InsertMetrics) error { shouldBeSealed, err := m.allocRows(segmentID, insert) - if err != nil { - return err - } if shouldBeSealed { m.worker.NotifySealSegment(segmentID, policy.PolicyCapacity()) } + if err != nil { + return err + } m.notifyIfTotalGrowingBytesOverHWM() return nil } @@ -215,7 +215,7 @@ func (m *StatsManager) allocRows(segmentID int64, insert InsertMetrics) (bool, e if stat.IsEmpty() { return false, ErrTooLargeInsert } - return false, ErrNotEnoughSpace + return stat.ShouldBeSealed(), ErrNotEnoughSpace } // notifyIfTotalGrowingBytesOverHWM notifies if the total bytes is over the high water mark. diff --git a/internal/streamingnode/server/wal/metricsutil/segment.go b/internal/streamingnode/server/wal/metricsutil/segment.go index 6e97470edf..9eadc0bd0f 100644 --- a/internal/streamingnode/server/wal/metricsutil/segment.go +++ b/internal/streamingnode/server/wal/metricsutil/segment.go @@ -4,22 +4,24 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/milvus-io/milvus/pkg/v2/metrics" - "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) +type GrowingSegmentState string + func NewSegmentAssignMetrics(pchannel string) *SegmentAssignMetrics { constLabel := prometheus.Labels{ metrics.NodeIDLabelName: paramtable.GetStringNodeID(), metrics.WALChannelLabelName: pchannel, } return &SegmentAssignMetrics{ - constLabel: constLabel, - allocTotal: metrics.WALSegmentAllocTotal.MustCurryWith(constLabel), - segmentBytes: metrics.WALSegmentBytes.With(constLabel), - flushedTotal: metrics.WALSegmentFlushedTotal.MustCurryWith(constLabel), - partitionTotal: metrics.WALPartitionTotal.With(constLabel), - collectionTotal: metrics.WALCollectionTotal.With(constLabel), + constLabel: constLabel, + allocTotal: metrics.WALSegmentAllocTotal.With(constLabel), + segmentRowsTotal: metrics.WALSegmentRowsTotal.With(constLabel), + segmentBytes: metrics.WALSegmentBytes.With(constLabel), + flushedTotal: metrics.WALSegmentFlushedTotal.MustCurryWith(constLabel), + partitionTotal: metrics.WALPartitionTotal.With(constLabel), + collectionTotal: metrics.WALCollectionTotal.With(constLabel), } } @@ -27,26 +29,47 @@ func NewSegmentAssignMetrics(pchannel string) *SegmentAssignMetrics { type SegmentAssignMetrics struct { constLabel prometheus.Labels - allocTotal *prometheus.GaugeVec - segmentBytes prometheus.Observer - flushedTotal *prometheus.CounterVec - partitionTotal prometheus.Gauge - collectionTotal prometheus.Gauge + onAllocTotal prometheus.Gauge + onFlushTotal prometheus.Gauge + allocTotal prometheus.Gauge + segmentRowsTotal prometheus.Observer + segmentBytes prometheus.Observer + flushedTotal *prometheus.CounterVec + partitionTotal prometheus.Gauge + collectionTotal prometheus.Gauge } -// UpdateGrowingSegmentState updates the metrics of the segment assignment state. -func (m *SegmentAssignMetrics) UpdateGrowingSegmentState(from streamingpb.SegmentAssignmentState, to streamingpb.SegmentAssignmentState) { - if from != streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_UNKNOWN { - m.allocTotal.WithLabelValues(from.String()).Dec() - } - if to != streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_FLUSHED { - m.allocTotal.WithLabelValues(to.String()).Inc() +// ObserveOnAllocating observe a allocating operation and return a guard function. +func (m *SegmentAssignMetrics) ObserveOnAllocating() func() { + m.onAllocTotal.Inc() + return func() { + m.onAllocTotal.Dec() } } -func (m *SegmentAssignMetrics) ObserveSegmentFlushed(policy string, bytes int64) { - m.segmentBytes.Observe(float64(bytes)) +// ObserveOnFlushing observe a flush operation and return a guard function. +func (m *SegmentAssignMetrics) ObseveOnFlushing() func() { + m.onFlushTotal.Inc() + return func() { + m.onFlushTotal.Dec() + } +} + +// ObserveCreateSegment increments the total number of growing segment. +func (m *SegmentAssignMetrics) ObserveCreateSegment() { + m.allocTotal.Inc() +} + +// ObserveSegmentFlushed records the number of bytes flushed and increments the total number of flushed segments. +func (m *SegmentAssignMetrics) ObserveSegmentFlushed(policy string, rows int64, bytes int64) { + m.allocTotal.Dec() m.flushedTotal.WithLabelValues(policy).Inc() + m.segmentRowsTotal.Observe(float64(rows)) + m.segmentBytes.Observe(float64(bytes)) +} + +func (m *SegmentAssignMetrics) UpdateSegmentCount(cnt int) { + m.allocTotal.Set(float64(cnt)) } func (m *SegmentAssignMetrics) UpdatePartitionCount(cnt int) { @@ -58,8 +81,9 @@ func (m *SegmentAssignMetrics) UpdateCollectionCount(cnt int) { } func (m *SegmentAssignMetrics) Close() { - metrics.WALSegmentAllocTotal.DeletePartialMatch(m.constLabel) + metrics.WALSegmentAllocTotal.Delete(m.constLabel) metrics.WALSegmentFlushedTotal.DeletePartialMatch(m.constLabel) + metrics.WALSegmentRowsTotal.Delete(m.constLabel) metrics.WALSegmentBytes.Delete(m.constLabel) metrics.WALPartitionTotal.Delete(m.constLabel) metrics.WALCollectionTotal.Delete(m.constLabel) diff --git a/internal/streamingnode/server/wal/recovery/config.go b/internal/streamingnode/server/wal/recovery/config.go index c89a14d72f..fdd0ce51eb 100644 --- a/internal/streamingnode/server/wal/recovery/config.go +++ b/internal/streamingnode/server/wal/recovery/config.go @@ -44,3 +44,35 @@ func (cfg *config) validate() error { } return nil } + +// newTruncatorConfig creates a new config for the truncator. +func newTruncatorConfig() *truncatorConfig { + params := paramtable.Get() + samplerInterval := params.StreamingCfg.WALTruncateSampleInterval.GetAsDurationByParse() + retentionInterval := params.StreamingCfg.WALTruncateRetentionInterval.GetAsDurationByParse() + cfg := &truncatorConfig{ + sampleInterval: samplerInterval, + retentionInterval: retentionInterval, + } + if err := cfg.validate(); err != nil { + panic(err) + } + return cfg +} + +// truncatorConfig is the configuration for the truncator. +type truncatorConfig struct { + sampleInterval time.Duration // the interval to sample the checkpoint + retentionInterval time.Duration // the retention interval to sample the checkpoint +} + +// validate validates the truncator config. +func (cfg *truncatorConfig) validate() error { + if cfg.sampleInterval <= 0 { + return errors.New("sampler interval must be greater than 0") + } + if cfg.retentionInterval <= 0 { + return errors.New("retention interval must be greater than 0") + } + return nil +} diff --git a/internal/streamingnode/server/wal/recovery/config_test.go b/internal/streamingnode/server/wal/recovery/config_test.go index ca3740cf68..71d53ac67b 100644 --- a/internal/streamingnode/server/wal/recovery/config_test.go +++ b/internal/streamingnode/server/wal/recovery/config_test.go @@ -49,3 +49,12 @@ func TestConfigValidate(t *testing.T) { }) } } + +func TestTruncatorConfig(t *testing.T) { + // Mock paramtable values + paramtable.Init() + cfg := newTruncatorConfig() + + assert.Equal(t, 1*time.Minute, cfg.sampleInterval) + assert.Equal(t, 5*time.Minute, cfg.retentionInterval) +} diff --git a/internal/streamingnode/server/wal/recovery/metrics.go b/internal/streamingnode/server/wal/recovery/metrics.go new file mode 100644 index 0000000000..2a879871cd --- /dev/null +++ b/internal/streamingnode/server/wal/recovery/metrics.go @@ -0,0 +1,78 @@ +package recovery + +import ( + "strconv" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/milvus-io/milvus/pkg/v2/metrics" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" + "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" +) + +func newRecoveryStorageMetrics(channelInfo types.PChannelInfo) *recoveryMetrics { + constLabels := prometheus.Labels{ + metrics.NodeIDLabelName: paramtable.GetStringNodeID(), + metrics.WALChannelLabelName: channelInfo.Name, + metrics.WALChannelTermLabelName: strconv.FormatInt(channelInfo.Term, 10), + } + return &recoveryMetrics{ + constLabels: constLabels, + info: metrics.WALRecoveryInfo.MustCurryWith(constLabels), + inconsistentEventTotal: metrics.WALRecoveryInconsistentEventTotal.With(constLabels), + isOnPersisting: metrics.WALRecoveryIsOnPersisting.With(constLabels), + inMemTimeTick: metrics.WALRecoveryInMemTimeTick.With(constLabels), + persistedTimeTick: metrics.WALRecoveryPersistedTimeTick.With(constLabels), + truncateTimeTick: metrics.WALTruncateTimeTick.With(constLabels), + } +} + +type recoveryMetrics struct { + constLabels prometheus.Labels + info *prometheus.GaugeVec + inconsistentEventTotal prometheus.Counter + isOnPersisting prometheus.Gauge + inMemTimeTick prometheus.Gauge + persistedTimeTick prometheus.Gauge + truncateTimeTick prometheus.Gauge +} + +// ObserveStateChange sets the state of the recovery storage metrics. +func (m *recoveryMetrics) ObserveStateChange(state string) { + metrics.WALRecoveryInfo.DeletePartialMatch(m.constLabels) + m.info.WithLabelValues(state).Set(1) +} + +func (m *recoveryMetrics) ObServeInMemMetrics(tickTime uint64) { + m.inMemTimeTick.Set(tsoutil.PhysicalTimeSeconds(tickTime)) +} + +func (m *recoveryMetrics) ObServePersistedMetrics(tickTime uint64) { + m.persistedTimeTick.Set(tsoutil.PhysicalTimeSeconds(tickTime)) +} + +func (m *recoveryMetrics) ObServeTruncateMetrics(tickTime uint64) { + m.truncateTimeTick.Set(tsoutil.PhysicalTimeSeconds(tickTime)) +} + +func (m *recoveryMetrics) ObserveInconsitentEvent() { + m.inconsistentEventTotal.Inc() +} + +func (m *recoveryMetrics) ObserveIsOnPersisting(onPersisting bool) { + if onPersisting { + m.isOnPersisting.Set(1) + } else { + m.isOnPersisting.Set(0) + } +} + +func (m *recoveryMetrics) Close() { + metrics.WALRecoveryInfo.DeletePartialMatch(m.constLabels) + metrics.WALRecoveryInconsistentEventTotal.DeletePartialMatch(m.constLabels) + metrics.WALRecoveryIsOnPersisting.DeletePartialMatch(m.constLabels) + metrics.WALRecoveryInMemTimeTick.DeletePartialMatch(m.constLabels) + metrics.WALRecoveryPersistedTimeTick.DeletePartialMatch(m.constLabels) + metrics.WALTruncateTimeTick.DeletePartialMatch(m.constLabels) +} diff --git a/internal/streamingnode/server/wal/recovery/recovery_background_task.go b/internal/streamingnode/server/wal/recovery/recovery_background_task.go index 3b70ea6b7a..4399a8a060 100644 --- a/internal/streamingnode/server/wal/recovery/recovery_background_task.go +++ b/internal/streamingnode/server/wal/recovery/recovery_background_task.go @@ -62,6 +62,7 @@ func (rs *RecoveryStorage) persistDritySnapshotWhenClosing() { // persistDirtySnapshot persists the dirty snapshot to the catalog. func (rs *RecoveryStorage) persistDirtySnapshot(ctx context.Context, snapshot *RecoverySnapshot, lvl zapcore.Level) (err error) { + rs.metrics.ObserveIsOnPersisting(true) logger := rs.Logger().With( zap.String("checkpoint", snapshot.Checkpoint.MessageID.String()), zap.Uint64("checkpointTimeTick", snapshot.Checkpoint.TimeTick), @@ -74,6 +75,7 @@ func (rs *RecoveryStorage) persistDirtySnapshot(ctx context.Context, snapshot *R return } logger.Log(lvl, "persist dirty snapshot") + defer rs.metrics.ObserveIsOnPersisting(false) }() if err := rs.dropAllVirtualChannel(ctx, snapshot.VChannels); err != nil { @@ -109,10 +111,17 @@ func (rs *RecoveryStorage) persistDirtySnapshot(ctx context.Context, snapshot *R } // checkpoint updates should always be persisted after other updates success. - return rs.retryOperationWithBackoff(ctx, rs.Logger().With(zap.String("op", "persistCheckpoint")), func(ctx context.Context) error { + if err := rs.retryOperationWithBackoff(ctx, rs.Logger().With(zap.String("op", "persistCheckpoint")), func(ctx context.Context) error { return resource.Resource().StreamingNodeCatalog(). SaveConsumeCheckpoint(ctx, rs.channel.Name, snapshot.Checkpoint.IntoProto()) - }) + }); err != nil { + return err + } + + // sample the checkpoint for truncator to make wal truncation. + rs.metrics.ObServePersistedMetrics(snapshot.Checkpoint.TimeTick) + rs.truncator.SampleCheckpoint(snapshot.Checkpoint) + return } // dropAllVirtualChannel drops all virtual channels that are in the dropped state. diff --git a/internal/streamingnode/server/wal/recovery/recovery_persisted.go b/internal/streamingnode/server/wal/recovery/recovery_persisted.go index 7ac22a484e..68caebf231 100644 --- a/internal/streamingnode/server/wal/recovery/recovery_persisted.go +++ b/internal/streamingnode/server/wal/recovery/recovery_persisted.go @@ -19,6 +19,7 @@ import ( // recoverRecoveryInfoFromMeta retrieves the recovery info for the given channel. func (r *RecoveryStorage) recoverRecoveryInfoFromMeta(ctx context.Context, walName string, channelInfo types.PChannelInfo, lastTimeTickMessage message.ImmutableMessage) error { + r.metrics.ObserveStateChange(recoveryStorageStatePersistRecovering) r.SetLogger(resource.Resource().Logger().With( log.FieldComponent(componentRecoveryStorage), zap.String("channel", channelInfo.String()), diff --git a/internal/streamingnode/server/wal/recovery/recovery_storage.go b/internal/streamingnode/server/wal/recovery/recovery_storage.go index 605b24a081..c2dc711b28 100644 --- a/internal/streamingnode/server/wal/recovery/recovery_storage.go +++ b/internal/streamingnode/server/wal/recovery/recovery_storage.go @@ -5,6 +5,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" + "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls" ) // RecoverySnapshot is the snapshot of the recovery info. @@ -31,6 +32,9 @@ type RecoveryStreamBuilder interface { // Build builds a recovery stream from the given channel info. // The recovery stream will return the messages from the start checkpoint to the end time tick. Build(param BuildRecoveryStreamParam) RecoveryStream + + // Return the underlying walimpls.WALImpls. + RWWALImpls() walimpls.WALImpls } // RecoveryStream is an interface that is used to recover the recovery storage from the WAL. diff --git a/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go b/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go index 62debde843..a9f35d73b3 100644 --- a/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go +++ b/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go @@ -41,10 +41,17 @@ func RecoverRecoveryStorage( return nil, nil, err } // recovery storage start work. + rs.metrics.ObserveStateChange(recoveryStorageStateWorking) rs.SetLogger(resource.Resource().Logger().With( log.FieldComponent(componentRecoveryStorage), zap.String("channel", recoveryStreamBuilder.Channel().String()), zap.String("state", recoveryStorageStateWorking))) + rs.truncator = newSamplingTruncator( + snapshot.Checkpoint.Clone(), + recoveryStreamBuilder.RWWALImpls(), + rs.metrics, + ) + rs.truncator.SetLogger(rs.Logger()) go rs.backgroundTask() return rs, snapshot, nil } @@ -60,6 +67,7 @@ func newRecoveryStorage(channel types.PChannelInfo) *RecoveryStorage { dirtyCounter: 0, persistNotifier: make(chan struct{}, 1), gracefulClosed: false, + metrics: newRecoveryStorageMetrics(channel), } } @@ -78,6 +86,8 @@ type RecoveryStorage struct { // used to trigger the recovery persist operation. persistNotifier chan struct{} gracefulClosed bool + truncator *samplingTruncator + metrics *recoveryMetrics } // ObserveMessage is called when a new message is observed. @@ -92,6 +102,9 @@ func (r *RecoveryStorage) ObserveMessage(msg message.ImmutableMessage) { func (r *RecoveryStorage) Close() { r.backgroundTaskNotifier.Cancel() r.backgroundTaskNotifier.BlockUntilFinish() + // Stop the truncator. + r.truncator.Close() + r.metrics.Close() } // notifyPersist notifies a persist operation. @@ -154,6 +167,7 @@ func (r *RecoveryStorage) observeMessage(msg message.ImmutableMessage) { checkpointUpdates := !r.checkpoint.MessageID.EQ(msg.LastConfirmedMessageID()) r.checkpoint.TimeTick = msg.TimeTick() r.checkpoint.MessageID = msg.LastConfirmedMessageID() + r.metrics.ObServeInMemMetrics(r.checkpoint.TimeTick) if checkpointUpdates { // only count the dirty if last confirmed message id is updated. @@ -375,4 +389,5 @@ func (r *RecoveryStorage) detectInconsistency(msg message.ImmutableMessage, reas // The log is not fatal in some cases. // because our meta is not atomic-updated, so these error may be logged if crashes when meta updated partially. r.Logger().Warn("inconsistency detected", fields...) + r.metrics.ObserveInconsitentEvent() } diff --git a/internal/streamingnode/server/wal/recovery/recovery_storage_test.go b/internal/streamingnode/server/wal/recovery/recovery_storage_test.go index bacb7e6724..861c28f57e 100644 --- a/internal/streamingnode/server/wal/recovery/recovery_storage_test.go +++ b/internal/streamingnode/server/wal/recovery/recovery_storage_test.go @@ -22,6 +22,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" + "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls" "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/rmq" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" @@ -196,6 +197,10 @@ func (b *streamBuilder) segmentNum() int { return segmentNum } +func (b *streamBuilder) RWWALImpls() walimpls.WALImpls { + return nil +} + type testRecoveryStream struct { ch chan message.ImmutableMessage } diff --git a/internal/streamingnode/server/wal/recovery/recovery_stream.go b/internal/streamingnode/server/wal/recovery/recovery_stream.go index 8de1350004..be16b3643e 100644 --- a/internal/streamingnode/server/wal/recovery/recovery_stream.go +++ b/internal/streamingnode/server/wal/recovery/recovery_stream.go @@ -19,6 +19,8 @@ func (r *RecoveryStorage) recoverFromStream( recoveryStreamBuilder RecoveryStreamBuilder, lastTimeTickMessage message.ImmutableMessage, ) (snapshot *RecoverySnapshot, err error) { + r.metrics.ObserveStateChange(recoveryStorageStateStreamRecovering) + r.metrics.ObServePersistedMetrics(r.checkpoint.TimeTick) r.SetLogger(resource.Resource().Logger().With( log.FieldComponent(componentRecoveryStorage), zap.String("channel", recoveryStreamBuilder.Channel().String()), diff --git a/internal/streamingnode/server/wal/recovery/wal_truncator.go b/internal/streamingnode/server/wal/recovery/wal_truncator.go new file mode 100644 index 0000000000..898974d421 --- /dev/null +++ b/internal/streamingnode/server/wal/recovery/wal_truncator.go @@ -0,0 +1,137 @@ +package recovery + +import ( + "sync" + "time" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/v2/util/syncutil" + "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" +) + +// newSamplingTruncator creates a new sampling truncator. +func newSamplingTruncator( + checkpoint *WALCheckpoint, + truncator walimpls.WALImpls, + recoveryMetrics *recoveryMetrics, +) *samplingTruncator { + st := &samplingTruncator{ + notifier: syncutil.NewAsyncTaskNotifier[struct{}](), + cfg: newTruncatorConfig(), + truncator: truncator, + mu: sync.Mutex{}, + checkpointSamples: []*WALCheckpoint{checkpoint}, + lastTruncatedCheckpoint: nil, + lastSampled: time.Now(), + metrics: recoveryMetrics, + } + go st.background() + return st +} + +// samplingTruncator is a sampling truncator that samples the incoming checkpoint and truncates the WAL. +type samplingTruncator struct { + log.Binder + notifier *syncutil.AsyncTaskNotifier[struct{}] + cfg *truncatorConfig + truncator walimpls.WALImpls + + mu sync.Mutex + checkpointSamples []*WALCheckpoint // the samples of checkpoints + lastTruncatedCheckpoint *WALCheckpoint // the last truncated checkpoint + lastSampled time.Time // the last time the checkpoint is sampled + metrics *recoveryMetrics +} + +// SampleCheckpoint samples the incoming checkpoint and adds it to the checkpoint samples. +func (t *samplingTruncator) SampleCheckpoint(checkpoint *WALCheckpoint) { + t.mu.Lock() + defer t.mu.Unlock() + if time.Since(t.lastSampled) < t.cfg.sampleInterval { + return + } + + if len(t.checkpointSamples) == 0 || t.checkpointSamples[len(t.checkpointSamples)-1].MessageID.LT(checkpoint.MessageID) { + t.checkpointSamples = append(t.checkpointSamples, checkpoint) + } + t.lastSampled = time.Now() +} + +// background starts the background task of the sampling truncator. +func (t *samplingTruncator) background() { + ticker := time.NewTicker(t.cfg.sampleInterval / 2) + defer func() { + ticker.Stop() + t.notifier.Finish(struct{}{}) + t.Logger().Info("sampling truncator background task exit") + }() + + for { + select { + case <-t.notifier.Context().Done(): + return + case <-ticker.C: + t.applyTruncate() + } + } +} + +// consumeCheckpointSamples consumes the checkpoint samples and returns the truncate checkpoint. +func (t *samplingTruncator) consumeCheckpointSamples() *WALCheckpoint { + t.mu.Lock() + defer t.mu.Unlock() + + targetCheckpointIdx := -1 + for i := 0; i < len(t.checkpointSamples); i++ { + if time.Since(tsoutil.PhysicalTime(t.checkpointSamples[i].TimeTick)) < t.cfg.retentionInterval { + break + } + targetCheckpointIdx = i + } + if targetCheckpointIdx >= 0 { + checkpoint := t.checkpointSamples[targetCheckpointIdx] + t.checkpointSamples = t.checkpointSamples[targetCheckpointIdx+1:] + return checkpoint + } + return nil +} + +// applyTruncate applies the truncate operation. +func (t *samplingTruncator) applyTruncate() { + truncateCheckpoint := t.consumeCheckpointSamples() + if truncateCheckpoint == nil { + t.Logger().Debug("no checkpoint sample can be used to truncate wal") + return + } + logger := t.Logger().With(zap.String("messageID", truncateCheckpoint.MessageID.String()), zap.Uint64("timeTick", truncateCheckpoint.TimeTick)) + if t.lastTruncatedCheckpoint != nil { + logger = logger.With(zap.String("lastMessageID", t.lastTruncatedCheckpoint.MessageID.String()), zap.Uint64("lastTimeTick", t.lastTruncatedCheckpoint.TimeTick)) + if truncateCheckpoint.MessageID.EQ(t.lastTruncatedCheckpoint.MessageID) { + logger.Debug("checkpoint sample is the same, ignore the operation", zap.String("messageID", truncateCheckpoint.MessageID.String())) + t.lastTruncatedCheckpoint = truncateCheckpoint + t.metrics.ObServeTruncateMetrics(truncateCheckpoint.TimeTick) + return + } else if truncateCheckpoint.MessageID.LT(t.lastTruncatedCheckpoint.MessageID) { + logger.Warn("checkpoint sample is not in order, the wal may be corrupted", zap.String("targetMessageID", truncateCheckpoint.MessageID.String())) + return + } + } + + err := t.truncator.Truncate(t.notifier.Context(), truncateCheckpoint.MessageID) + if err != nil { + logger.Warn("failed to truncate wal, the checkpoint sample is lost", zap.Error(err)) + return + } + logger.Info("truncate wal") + t.lastTruncatedCheckpoint = truncateCheckpoint + t.metrics.ObServeTruncateMetrics(truncateCheckpoint.TimeTick) +} + +// Close closes the sampling truncator. +func (t *samplingTruncator) Close() { + t.notifier.Cancel() + t.notifier.BlockAndGetResult() +} diff --git a/internal/streamingnode/server/wal/recovery/wal_truncator_test.go b/internal/streamingnode/server/wal/recovery/wal_truncator_test.go new file mode 100644 index 0000000000..77f8113b6e --- /dev/null +++ b/internal/streamingnode/server/wal/recovery/wal_truncator_test.go @@ -0,0 +1,40 @@ +package recovery + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/pkg/v2/mocks/streaming/mock_walimpls" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" + "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/rmq" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" + "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" +) + +func TestTruncator(t *testing.T) { + w := mock_walimpls.NewMockWALImpls(t) + w.EXPECT().Truncate(mock.Anything, mock.Anything).Return(nil) + paramtable.Get().Save(paramtable.Get().StreamingCfg.WALTruncateSampleInterval.Key, "1ms") + paramtable.Get().Save(paramtable.Get().StreamingCfg.WALTruncateRetentionInterval.Key, "2ms") + + truncator := newSamplingTruncator(&WALCheckpoint{ + MessageID: rmq.NewRmqID(1), + TimeTick: 1, + Magic: recoveryMagicStreamingInitialized, + }, w, newRecoveryStorageMetrics(types.PChannelInfo{Name: "test", Term: 1})) + + for i := 0; i < 20; i++ { + time.Sleep(1 * time.Millisecond) + for rand.Int31n(3) < 1 { + truncator.SampleCheckpoint(&WALCheckpoint{ + MessageID: rmq.NewRmqID(int64(i)), + TimeTick: tsoutil.ComposeTSByTime(time.Now(), 0), + Magic: recoveryMagicStreamingInitialized, + }) + } + } + truncator.Close() +} diff --git a/pkg/metrics/streaming_service_metrics.go b/pkg/metrics/streaming_service_metrics.go index 28b7aad40c..ea3f500248 100644 --- a/pkg/metrics/streaming_service_metrics.go +++ b/pkg/metrics/streaming_service_metrics.go @@ -30,10 +30,10 @@ const ( WALInterceptorLabelName = "interceptor_name" WALTxnStateLabelName = "state" WALFlusherStateLabelName = "state" + WALRecoveryStorageStateLabelName = "state" WALStateLabelName = "state" WALChannelLabelName = channelNameLabelName WALSegmentSealPolicyNameLabelName = "policy" - WALSegmentAllocStateLabelName = "state" WALMessageTypeLabelName = "message_type" WALChannelTermLabelName = "term" WALNameLabelName = "wal_name" @@ -250,13 +250,19 @@ var ( WALSegmentAllocTotal = newWALGaugeVec(prometheus.GaugeOpts{ Name: "segment_assign_segment_alloc_total", Help: "Total of segment alloc on wal", - }, WALChannelLabelName, WALSegmentAllocStateLabelName) + }, WALChannelLabelName) WALSegmentFlushedTotal = newWALCounterVec(prometheus.CounterOpts{ Name: "segment_assign_flushed_segment_total", Help: "Total of segment sealed on wal", }, WALChannelLabelName, WALSegmentSealPolicyNameLabelName) + WALSegmentRowsTotal = newWALHistogramVec(prometheus.HistogramOpts{ + Name: "segment_assign_segment_rows_total", + Help: "Total rows of segment alloc on wal", + Buckets: prometheus.ExponentialBucketsRange(128, 1048576, 10), // 5MB -> 1024MB + }, WALChannelLabelName) + WALSegmentBytes = newWALHistogramVec(prometheus.HistogramOpts{ Name: "segment_assign_segment_bytes", Help: "Bytes of segment alloc on wal", @@ -396,6 +402,36 @@ var ( Name: "flusher_time_tick", Help: "the final timetick tick of flusher seen", }, WALChannelLabelName, WALChannelTermLabelName) + + WALRecoveryInfo = newWALGaugeVec(prometheus.GaugeOpts{ + Name: "recovery_info", + Help: "Current info of recovery storage on current wal", + }, WALChannelLabelName, WALChannelTermLabelName, WALRecoveryStorageStateLabelName) + + WALRecoveryInMemTimeTick = newWALGaugeVec(prometheus.GaugeOpts{ + Name: "recovery_in_mem_time_tick", + Help: "the final timetick tick of recovery storage seen", + }, WALChannelLabelName, WALChannelTermLabelName) + + WALRecoveryPersistedTimeTick = newWALGaugeVec(prometheus.GaugeOpts{ + Name: "recovery_persisted_time_tick", + Help: "the final persisted timetick tick of recovery storage seen", + }, WALChannelLabelName, WALChannelTermLabelName) + + WALRecoveryInconsistentEventTotal = newWALCounterVec(prometheus.CounterOpts{ + Name: "recovery_inconsistent_event_total", + Help: "Total of recovery inconsistent event", + }, WALChannelLabelName, WALChannelTermLabelName) + + WALRecoveryIsOnPersisting = newWALGaugeVec(prometheus.GaugeOpts{ + Name: "recovery_is_on_persisting", + Help: "Is recovery storage on persisting", + }, WALChannelLabelName, WALChannelTermLabelName) + + WALTruncateTimeTick = newWALGaugeVec(prometheus.GaugeOpts{ + Name: "truncate_time_tick", + Help: "the final timetick tick of truncator seen", + }, WALChannelLabelName, WALChannelTermLabelName) ) // RegisterStreamingServiceClient registers streaming service client metrics @@ -454,6 +490,7 @@ func registerWAL(registry *prometheus.Registry) { registry.MustRegister(WALGrowingSegmentLWMBytes) registry.MustRegister(WALSegmentAllocTotal) registry.MustRegister(WALSegmentFlushedTotal) + registry.MustRegister(WALSegmentRowsTotal) registry.MustRegister(WALSegmentBytes) registry.MustRegister(WALPartitionTotal) registry.MustRegister(WALCollectionTotal) @@ -480,6 +517,12 @@ func registerWAL(registry *prometheus.Registry) { registry.MustRegister(WALScannerTxnBufBytes) registry.MustRegister(WALFlusherInfo) registry.MustRegister(WALFlusherTimeTick) + registry.MustRegister(WALRecoveryInfo) + registry.MustRegister(WALRecoveryInMemTimeTick) + registry.MustRegister(WALRecoveryPersistedTimeTick) + registry.MustRegister(WALRecoveryInconsistentEventTotal) + registry.MustRegister(WALRecoveryIsOnPersisting) + registry.MustRegister(WALTruncateTimeTick) } func newStreamingCoordGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec { diff --git a/pkg/streaming/walimpls/impls/pulsar/wal.go b/pkg/streaming/walimpls/impls/pulsar/wal.go index b8c9712963..6ba724f0aa 100644 --- a/pkg/streaming/walimpls/impls/pulsar/wal.go +++ b/pkg/streaming/walimpls/impls/pulsar/wal.go @@ -84,7 +84,8 @@ func (w *walImpl) Truncate(ctx context.Context, id message.MessageID) error { Topic: w.Channel().Name, SubscriptionName: truncateCursorSubscriptionName, Type: pulsar.Exclusive, - MaxPendingChunkedMessage: 1, + MaxPendingChunkedMessage: 0, + StartMessageIDInclusive: true, }) if err != nil { return err diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index d9be8a266c..99daf67584 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -5412,6 +5412,8 @@ type streamingConfig struct { WALRecoveryPersistInterval ParamItem `refreshable:"true"` WALRecoveryMaxDirtyMessage ParamItem `refreshable:"true"` WALRecoveryGracefulCloseTimeout ParamItem `refreshable:"true"` + WALTruncateSampleInterval ParamItem `refreshable:"true"` + WALTruncateRetentionInterval ParamItem `refreshable:"true"` } func (p *streamingConfig) init(base *BaseTable) { @@ -5615,6 +5617,28 @@ If that persist operation exceeds this timeout, the wal recovery module will clo Export: true, } p.WALRecoveryGracefulCloseTimeout.Init(base.mgr) + + p.WALTruncateSampleInterval = ParamItem{ + Key: "streaming.walTruncate.sampleInterval", + Version: "2.6.0", + Doc: `The interval of sampling wal checkpoint when truncate, 1m by default. +Every time the checkpoint is persisted, the checkpoint will be sampled and used to be a candidate of truncate checkpoint. +More samples, more frequent truncate, more memory usage.`, + DefaultValue: "1m", + Export: true, + } + p.WALTruncateSampleInterval.Init(base.mgr) + + p.WALTruncateRetentionInterval = ParamItem{ + Key: "streaming.walTruncate.retentionInterval", + Version: "2.6.0", + Doc: `The retention interval of wal truncate, 5m by default. +If the sampled checkpoint is older than this interval, it will be used to truncate wal checkpoint. +Greater the interval, more wal storage usage, more redundant data in wal`, + DefaultValue: "5m", + Export: true, + } + p.WALTruncateRetentionInterval.Init(base.mgr) } // runtimeConfig is just a private environment value table. diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index e30109e636..813cbd47f7 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -631,6 +631,8 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, float64(0.6), params.StreamingCfg.FlushMemoryThreshold.GetAsFloat()) assert.Equal(t, float64(0.4), params.StreamingCfg.FlushGrowingSegmentBytesHwmThreshold.GetAsFloat()) assert.Equal(t, float64(0.2), params.StreamingCfg.FlushGrowingSegmentBytesLwmThreshold.GetAsFloat()) + assert.Equal(t, 1*time.Minute, params.StreamingCfg.WALTruncateSampleInterval.GetAsDurationByParse()) + assert.Equal(t, 5*time.Minute, params.StreamingCfg.WALTruncateRetentionInterval.GetAsDurationByParse()) params.Save(params.StreamingCfg.WALBalancerTriggerInterval.Key, "50s") params.Save(params.StreamingCfg.WALBalancerBackoffInitialInterval.Key, "50s") @@ -652,6 +654,8 @@ func TestComponentParam(t *testing.T) { params.Save(params.StreamingCfg.FlushMemoryThreshold.Key, "0.7") params.Save(params.StreamingCfg.FlushGrowingSegmentBytesHwmThreshold.Key, "0.25") params.Save(params.StreamingCfg.FlushGrowingSegmentBytesLwmThreshold.Key, "0.15") + params.Save(params.StreamingCfg.WALTruncateSampleInterval.Key, "1m") + params.Save(params.StreamingCfg.WALTruncateRetentionInterval.Key, "30m") assert.Equal(t, 50*time.Second, params.StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse()) assert.Equal(t, 50*time.Second, params.StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse()) assert.Equal(t, 3.5, params.StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat()) @@ -672,6 +676,8 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, float64(0.7), params.StreamingCfg.FlushMemoryThreshold.GetAsFloat()) assert.Equal(t, float64(0.25), params.StreamingCfg.FlushGrowingSegmentBytesHwmThreshold.GetAsFloat()) assert.Equal(t, float64(0.15), params.StreamingCfg.FlushGrowingSegmentBytesLwmThreshold.GetAsFloat()) + assert.Equal(t, 1*time.Minute, params.StreamingCfg.WALTruncateSampleInterval.GetAsDurationByParse()) + assert.Equal(t, 30*time.Minute, params.StreamingCfg.WALTruncateRetentionInterval.GetAsDurationByParse()) }) t.Run("channel config priority", func(t *testing.T) {