enhance: implement wal garbage collector with truncate api (#41770)

issue: #41544

- add a truncator implementation into wal recovery storage.
- add metrics for recovery storage.

---------

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-05-13 22:08:56 +08:00 committed by GitHub
parent 8bc9ae9a9e
commit 7beafe99a7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 538 additions and 74 deletions

View File

@ -1242,6 +1242,15 @@ streaming:
# When the wal is on-closing, the recovery module will try to persist the recovery info for wal to make next recovery operation more fast.
# If that persist operation exceeds this timeout, the wal recovery module will close right now.
gracefulCloseTimeout: 3s
walTruncate:
# The interval of sampling wal checkpoint when truncate, 1m by default.
# Every time the checkpoint is persisted, the checkpoint will be sampled and used to be a candidate of truncate checkpoint.
# More samples, more frequent truncate, more memory usage.
sampleInterval: 1m
# The retention interval of wal truncate, 5m by default.
# If the sampled checkpoint is older than this interval, it will be used to truncate wal checkpoint.
# Greater the interval, more wal storage usage, more redundant data in wal
retentionInterval: 5m
# Any configuration related to the knowhere vector search engine
knowhere:

View File

@ -127,9 +127,6 @@ func (q *sealQueue) tryToSealSegments(ctx context.Context, segments ...*segmentA
undone = append(undone, segment)
continue
}
q.metrics.ObserveSegmentFlushed(
string(segment.SealPolicy()),
int64(segment.GetStat().Insert.BinarySize))
q.logger.Info("segment has been flushed",
zap.Int64("collectionID", segment.GetCollectionID()),
zap.Int64("partitionID", segment.GetPartitionID()),

View File

@ -46,7 +46,6 @@ func newSegmentAllocManagerFromProto(
}, inner.GetSegmentId(), stat)
stat = nil
}
metrics.UpdateGrowingSegmentState(streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_UNKNOWN, inner.GetState())
return &segmentAllocManager{
pchannel: pchannel,
inner: inner,
@ -313,7 +312,6 @@ func (m *mutableSegmentAssignmentMeta) Commit(ctx context.Context) error {
// if the state transferred from growing into others, remove the stats from stats manager.
m.original.immutableStat = resource.Resource().SegmentAssignStatsManager().UnregisterSealedSegment(m.original.GetSegmentID())
}
m.original.metrics.UpdateGrowingSegmentState(m.original.GetState(), m.modifiedCopy.GetState())
m.original.inner = m.modifiedCopy
return nil
}

View File

@ -10,6 +10,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/shard/policy"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/shard/utils"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
@ -27,6 +28,7 @@ func newPartitionSegmentManager(
segments map[int64]*segmentAllocManager,
txnManager TxnManager,
fencedAssignTimeTick uint64,
metrics *metricsutil.SegmentAssignMetrics,
) *partitionManager {
for _, segment := range segments {
if segment.CreateSegmentTimeTick() > fencedAssignTimeTick {
@ -44,6 +46,7 @@ func newPartitionSegmentManager(
onAllocating: nil,
segments: segments,
fencedAssignTimeTick: fencedAssignTimeTick,
metrics: metrics,
}
m.SetLogger(logger.With(zap.String("vchannel", vchannel), zap.Int64("collectionID", collectionID), zap.Int64("partitionID", paritionID)))
return m
@ -63,6 +66,7 @@ type partitionManager struct {
onAllocating chan struct{} // indicates that if the partition manager is on-allocating a new segment.
segments map[int64]*segmentAllocManager // there will be very few segments in this list.
fencedAssignTimeTick uint64 // the time tick that the assign operation is fenced.
metrics *metricsutil.SegmentAssignMetrics
}
// AddSegment adds a segment to the partition segment manager.
@ -76,6 +80,7 @@ func (m *partitionManager) AddSegment(s *segmentAllocManager) {
panic("critical bug: create segment time tick is less than fenced assign time tick")
}
m.segments[s.GetSegmentID()] = s
m.metrics.ObserveCreateSegment()
}
// GetSegmentManager returns the segment manager of the given segment ID.
@ -117,6 +122,11 @@ func (m *partitionManager) FlushAndDropPartition(policy policy.SealPolicy) []int
segmentIDs := make([]int64, 0, len(m.segments))
for _, segment := range m.segments {
segment.Flush(policy)
m.metrics.ObserveSegmentFlushed(
string(segment.SealPolicy().Policy),
int64(segment.GetFlushedStat().Insert.Rows),
int64(segment.GetFlushedStat().Insert.BinarySize),
)
segmentIDs = append(segmentIDs, segment.GetSegmentID())
}
m.segments = make(map[int64]*segmentAllocManager)
@ -134,6 +144,11 @@ func (m *partitionManager) FlushAndFenceSegmentUntil(timeTick uint64) []int64 {
segmentIDs := make([]int64, 0, len(m.segments))
for _, segment := range m.segments {
segment.Flush(policy.PolicyFenced(timeTick))
m.metrics.ObserveSegmentFlushed(
string(segment.SealPolicy().Policy),
int64(segment.GetFlushedStat().Insert.Rows),
int64(segment.GetFlushedStat().Insert.BinarySize),
)
segmentIDs = append(segmentIDs, segment.GetSegmentID())
}
m.segments = make(map[int64]*segmentAllocManager)
@ -157,6 +172,11 @@ func (m *partitionManager) AsyncFlushSegment(signal utils.SealSegmentSignal) err
if !sm.IsFlushed() {
sm.Flush(signal.SealPolicy)
m.metrics.ObserveSegmentFlushed(
string(sm.SealPolicy().Policy),
int64(sm.GetFlushedStat().Insert.Rows),
int64(sm.GetFlushedStat().Insert.BinarySize),
)
m.asyncFlushSegment(m.ctx, sm)
}
return nil
@ -190,7 +210,7 @@ func (m *partitionManager) assignSegment(req *AssignSegmentRequest) (*AssignSegm
}
// There is no segment can be allocated for the insert request.
// Ask a new peding segment to insert.
// Ask a new pending segment to insert.
m.asyncAllocSegment()
return nil, ErrWaitForNewSegment
}

View File

@ -16,6 +16,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/shard/policy"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/shard/stats"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/shard/utils"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/messagespb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
@ -76,6 +77,7 @@ func TestPartitionManager(t *testing.T) {
},
&mockedTxnManager{},
100,
metricsutil.NewSegmentAssignMetrics(channel.Name),
)
createSegmentDone := make(chan struct{}, 1)
flushSegmentDone := make(chan struct{}, 1)

View File

@ -55,6 +55,8 @@ func RecoverShardManager(param *ShardManagerRecoverParam) *ShardManager {
logger := resource.Resource().Logger().With(log.FieldComponent("shard-manager")).With(zap.Stringer("pchannel", param.ChannelInfo))
// create managers list.
managers := make(map[int64]*partitionManager)
segmentTotal := 0
metrics := metricsutil.NewSegmentAssignMetrics(param.ChannelInfo.Name)
for collectionID, collectionInfo := range collections {
for partitionID := range collectionInfo.PartitionIDs {
segmentManagers := make(map[int64]*segmentAllocManager, 0)
@ -76,26 +78,29 @@ func RecoverShardManager(param *ShardManagerRecoverParam) *ShardManager {
segmentManagers,
param.TxnManager,
param.InitialRecoverSnapshot.Checkpoint.TimeTick, // use the checkpoint time tick to fence directly.
metrics,
)
segmentTotal += len(segmentManagers)
}
}
m := &ShardManager{
mu: sync.Mutex{},
ctx: ctx,
cancel: cancel,
wal: param.WAL,
pchannel: param.ChannelInfo,
managers: managers,
collections: collections,
txnManager: param.TxnManager,
metrics: metricsutil.NewSegmentAssignMetrics(param.ChannelInfo.Name),
mu: sync.Mutex{},
ctx: ctx,
cancel: cancel,
wal: param.WAL,
pchannel: param.ChannelInfo,
partitionManagers: managers,
collections: collections,
txnManager: param.TxnManager,
metrics: metrics,
}
m.SetLogger(logger)
m.updateMetrics()
m.metrics.UpdateSegmentCount(segmentTotal)
belongs := lo.Values(segmentBelongs)
stats := make([]*stats.SegmentStats, 0, len(belongs))
for _, belong := range belongs {
stat := m.managers[belong.PartitionID].segments[belong.SegmentID].GetStatFromRecovery()
stat := m.partitionManagers[belong.PartitionID].segments[belong.SegmentID].GetStatFromRecovery()
stats = append(stats, stat)
}
resource.Resource().SegmentStatsManager().RegisterSealOperator(m, belongs, stats)
@ -160,15 +165,15 @@ func newCollectionInfos(recoverInfos *recovery.RecoverySnapshot) map[int64]*Coll
type ShardManager struct {
log.Binder
mu sync.Mutex
ctx context.Context
cancel context.CancelFunc
wal *syncutil.Future[wal.WAL]
pchannel types.PChannelInfo
managers map[int64]*partitionManager // map partitionID to partition manager
collections map[int64]*CollectionInfo // map collectionID to collectionInfo
metrics *metricsutil.SegmentAssignMetrics
txnManager TxnManager
mu sync.Mutex
ctx context.Context
cancel context.CancelFunc
wal *syncutil.Future[wal.WAL]
pchannel types.PChannelInfo
partitionManagers map[int64]*partitionManager // map partitionID to partition manager
collections map[int64]*CollectionInfo // map collectionID to collectionInfo
metrics *metricsutil.SegmentAssignMetrics
txnManager TxnManager
}
type CollectionInfo struct {
@ -192,7 +197,7 @@ func (m *ShardManager) Close() {
}
func (m *ShardManager) updateMetrics() {
m.metrics.UpdatePartitionCount(len(m.managers))
m.metrics.UpdatePartitionCount(len(m.partitionManagers))
m.metrics.UpdateCollectionCount(len(m.collections))
}

View File

@ -60,11 +60,11 @@ func (m *ShardManager) CreateCollection(msg message.ImmutableCreateCollectionMes
m.collections[collectionID] = newCollectionInfo(vchannel, partitionIDs)
for _, partitionID := range partitionIDs {
if _, ok := m.managers[partitionID]; ok {
if _, ok := m.partitionManagers[partitionID]; ok {
logger.Warn("partition already exists", zap.Int64("partitionID", partitionID))
continue
}
m.managers[partitionID] = newPartitionSegmentManager(
m.partitionManagers[partitionID] = newPartitionSegmentManager(
m.ctx,
m.Logger(),
m.wal,
@ -75,6 +75,7 @@ func (m *ShardManager) CreateCollection(msg message.ImmutableCreateCollectionMes
make(map[int64]*segmentAllocManager),
m.txnManager,
timetick,
m.metrics,
)
}
logger.Info("collection created in segment assignment service", zap.Int64s("partitionIDs", partitionIDs))
@ -102,7 +103,7 @@ func (m *ShardManager) DropCollection(msg message.ImmutableDropCollectionMessage
partitionIDs := make([]int64, 0, len(collectionInfo.PartitionIDs))
segmentIDs := make([]int64, 0, len(collectionInfo.PartitionIDs))
for partitionID := range collectionInfo.PartitionIDs {
pm, ok := m.managers[partitionID]
pm, ok := m.partitionManagers[partitionID]
if !ok {
logger.Warn("partition not exists", zap.Int64("partitionID", partitionID))
continue
@ -111,7 +112,7 @@ func (m *ShardManager) DropCollection(msg message.ImmutableDropCollectionMessage
segments := pm.FlushAndDropPartition(policy.PolicyCollectionRemoved())
partitionIDs = append(partitionIDs, partitionID)
segmentIDs = append(segmentIDs, segments...)
delete(m.managers, partitionID)
delete(m.partitionManagers, partitionID)
}
logger.Info("collection removed", zap.Int64s("partitionIDs", partitionIDs), zap.Int64s("segmentIDs", segmentIDs))
m.updateMetrics()

View File

@ -64,11 +64,11 @@ func (m *ShardManager) CreatePartition(msg message.ImmutableCreatePartitionMessa
}
m.collections[collectionID].PartitionIDs[partitionID] = struct{}{}
if _, ok := m.managers[partitionID]; ok {
if _, ok := m.partitionManagers[partitionID]; ok {
logger.Warn("partition manager already exists")
return
}
m.managers[partitionID] = newPartitionSegmentManager(
m.partitionManagers[partitionID] = newPartitionSegmentManager(
m.ctx,
m.Logger(),
m.wal,
@ -79,6 +79,7 @@ func (m *ShardManager) CreatePartition(msg message.ImmutableCreatePartitionMessa
make(map[int64]*segmentAllocManager),
m.txnManager,
tiemtick,
m.metrics,
)
m.Logger().Info("partition created")
m.updateMetrics()
@ -100,13 +101,13 @@ func (m *ShardManager) DropPartition(msg message.ImmutableDropPartitionMessageV1
}
delete(m.collections[collectionID].PartitionIDs, partitionID)
pm, ok := m.managers[partitionID]
pm, ok := m.partitionManagers[partitionID]
if !ok {
logger.Warn("partition not exists", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID))
return
}
delete(m.managers, partitionID)
delete(m.partitionManagers, partitionID)
segmentIDs := pm.FlushAndDropPartition(policy.PolicyPartitionRemoved())
m.Logger().Info("partition removed", zap.Int64s("segmentIDs", segmentIDs))
m.updateMetrics()

View File

@ -47,7 +47,7 @@ func (m *ShardManager) checkIfSegmentCanBeCreated(collectionID int64, partitionI
return err
}
if m := m.managers[partitionID].GetSegmentManager(segmentID); m != nil {
if m := m.partitionManagers[partitionID].GetSegmentManager(segmentID); m != nil {
return ErrSegmentExists
}
return nil
@ -69,7 +69,7 @@ func (m *ShardManager) checkIfSegmentCanBeFlushed(collecionID int64, partitionID
// segment can be flushed only if the segment exists, and its state is flushed.
// pm must exists, because we have checked the partition exists.
pm := m.managers[partitionID]
pm := m.partitionManagers[partitionID]
sm := pm.GetSegmentManager(segmentID)
if sm == nil {
return ErrSegmentNotFound
@ -92,7 +92,7 @@ func (m *ShardManager) CreateSegment(msg message.ImmutableCreateSegmentMessageV2
}
s := newSegmentAllocManager(m.pchannel, msg)
pm, ok := m.managers[s.GetPartitionID()]
pm, ok := m.partitionManagers[s.GetPartitionID()]
if !ok {
panic("critical error: partition manager not found when a segment is created")
}
@ -113,7 +113,7 @@ func (m *ShardManager) FlushSegment(msg message.ImmutableFlushMessageV2) {
return
}
pm := m.managers[partitionID]
pm := m.partitionManagers[partitionID]
pm.MustRemoveFlushedSegment(segmentID)
}
@ -122,7 +122,7 @@ func (m *ShardManager) AssignSegment(req *AssignSegmentRequest) (*AssignSegmentR
m.mu.Lock()
defer m.mu.Unlock()
if pm, ok := m.managers[req.PartitionID]; ok {
if pm, ok := m.partitionManagers[req.PartitionID]; ok {
return pm.AssignSegment(req)
} else {
return nil, ErrPartitionNotFound
@ -137,7 +137,7 @@ func (m *ShardManager) WaitUntilGrowingSegmentReady(collectionID int64, partiton
if err := m.checkIfPartitionExists(collectionID, partitonID); err != nil {
return nil, err
}
return m.managers[partitonID].WaitPendingGrowingSegmentReady(), nil
return m.partitionManagers[partitonID].WaitPendingGrowingSegmentReady(), nil
}
// FlushAndFenceSegmentAllocUntil flush all segment that contains the message which timetick is less than the incoming timetick.
@ -159,7 +159,7 @@ func (m *ShardManager) FlushAndFenceSegmentAllocUntil(collectionID int64, timeti
// collect all partitions
for partitionID := range collectionInfo.PartitionIDs {
// Seal all segments and fence assign to the partition manager.
pm, ok := m.managers[partitionID]
pm, ok := m.partitionManagers[partitionID]
if !ok {
logger.Warn("partition not found when FlushAndFenceSegmentAllocUntil", zap.Int64("partitionID", partitionID))
continue
@ -181,9 +181,10 @@ func (m *ShardManager) AsyncFlushSegment(signal utils.SealSegmentSignal) {
m.mu.Lock()
defer m.mu.Unlock()
pm, ok := m.managers[signal.SegmentBelongs.PartitionID]
pm, ok := m.partitionManagers[signal.SegmentBelongs.PartitionID]
if !ok {
logger.Warn("partition not found when AsyncMustSeal, may be already dropped")
return
}
if err := pm.AsyncFlushSegment(signal); err != nil {
logger.Warn("segment not found when AsyncMustSeal, may be already sealed", zap.Error(err))

View File

@ -200,7 +200,7 @@ func TestShardManager(t *testing.T) {
WithTimeTick(600).
WithLastConfirmedUseMessageID().
IntoImmutableMessage(rmq.NewRmqID(3))
m.managers[10].onAllocating = make(chan struct{})
m.partitionManagers[10].onAllocating = make(chan struct{})
ch, err := m.WaitUntilGrowingSegmentReady(7, 10)
assert.NoError(t, err)
select {

View File

@ -174,12 +174,12 @@ func (m *StatsManager) registerNewGrowingSegment(belongs SegmentBelongs, stats *
// Must be called after RegisterGrowingSegment and before UnregisterGrowingSegment.
func (m *StatsManager) AllocRows(segmentID int64, insert InsertMetrics) error {
shouldBeSealed, err := m.allocRows(segmentID, insert)
if err != nil {
return err
}
if shouldBeSealed {
m.worker.NotifySealSegment(segmentID, policy.PolicyCapacity())
}
if err != nil {
return err
}
m.notifyIfTotalGrowingBytesOverHWM()
return nil
}
@ -215,7 +215,7 @@ func (m *StatsManager) allocRows(segmentID int64, insert InsertMetrics) (bool, e
if stat.IsEmpty() {
return false, ErrTooLargeInsert
}
return false, ErrNotEnoughSpace
return stat.ShouldBeSealed(), ErrNotEnoughSpace
}
// notifyIfTotalGrowingBytesOverHWM notifies if the total bytes is over the high water mark.

View File

@ -4,22 +4,24 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
type GrowingSegmentState string
func NewSegmentAssignMetrics(pchannel string) *SegmentAssignMetrics {
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel,
}
return &SegmentAssignMetrics{
constLabel: constLabel,
allocTotal: metrics.WALSegmentAllocTotal.MustCurryWith(constLabel),
segmentBytes: metrics.WALSegmentBytes.With(constLabel),
flushedTotal: metrics.WALSegmentFlushedTotal.MustCurryWith(constLabel),
partitionTotal: metrics.WALPartitionTotal.With(constLabel),
collectionTotal: metrics.WALCollectionTotal.With(constLabel),
constLabel: constLabel,
allocTotal: metrics.WALSegmentAllocTotal.With(constLabel),
segmentRowsTotal: metrics.WALSegmentRowsTotal.With(constLabel),
segmentBytes: metrics.WALSegmentBytes.With(constLabel),
flushedTotal: metrics.WALSegmentFlushedTotal.MustCurryWith(constLabel),
partitionTotal: metrics.WALPartitionTotal.With(constLabel),
collectionTotal: metrics.WALCollectionTotal.With(constLabel),
}
}
@ -27,26 +29,47 @@ func NewSegmentAssignMetrics(pchannel string) *SegmentAssignMetrics {
type SegmentAssignMetrics struct {
constLabel prometheus.Labels
allocTotal *prometheus.GaugeVec
segmentBytes prometheus.Observer
flushedTotal *prometheus.CounterVec
partitionTotal prometheus.Gauge
collectionTotal prometheus.Gauge
onAllocTotal prometheus.Gauge
onFlushTotal prometheus.Gauge
allocTotal prometheus.Gauge
segmentRowsTotal prometheus.Observer
segmentBytes prometheus.Observer
flushedTotal *prometheus.CounterVec
partitionTotal prometheus.Gauge
collectionTotal prometheus.Gauge
}
// UpdateGrowingSegmentState updates the metrics of the segment assignment state.
func (m *SegmentAssignMetrics) UpdateGrowingSegmentState(from streamingpb.SegmentAssignmentState, to streamingpb.SegmentAssignmentState) {
if from != streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_UNKNOWN {
m.allocTotal.WithLabelValues(from.String()).Dec()
}
if to != streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_FLUSHED {
m.allocTotal.WithLabelValues(to.String()).Inc()
// ObserveOnAllocating observe a allocating operation and return a guard function.
func (m *SegmentAssignMetrics) ObserveOnAllocating() func() {
m.onAllocTotal.Inc()
return func() {
m.onAllocTotal.Dec()
}
}
func (m *SegmentAssignMetrics) ObserveSegmentFlushed(policy string, bytes int64) {
m.segmentBytes.Observe(float64(bytes))
// ObserveOnFlushing observe a flush operation and return a guard function.
func (m *SegmentAssignMetrics) ObseveOnFlushing() func() {
m.onFlushTotal.Inc()
return func() {
m.onFlushTotal.Dec()
}
}
// ObserveCreateSegment increments the total number of growing segment.
func (m *SegmentAssignMetrics) ObserveCreateSegment() {
m.allocTotal.Inc()
}
// ObserveSegmentFlushed records the number of bytes flushed and increments the total number of flushed segments.
func (m *SegmentAssignMetrics) ObserveSegmentFlushed(policy string, rows int64, bytes int64) {
m.allocTotal.Dec()
m.flushedTotal.WithLabelValues(policy).Inc()
m.segmentRowsTotal.Observe(float64(rows))
m.segmentBytes.Observe(float64(bytes))
}
func (m *SegmentAssignMetrics) UpdateSegmentCount(cnt int) {
m.allocTotal.Set(float64(cnt))
}
func (m *SegmentAssignMetrics) UpdatePartitionCount(cnt int) {
@ -58,8 +81,9 @@ func (m *SegmentAssignMetrics) UpdateCollectionCount(cnt int) {
}
func (m *SegmentAssignMetrics) Close() {
metrics.WALSegmentAllocTotal.DeletePartialMatch(m.constLabel)
metrics.WALSegmentAllocTotal.Delete(m.constLabel)
metrics.WALSegmentFlushedTotal.DeletePartialMatch(m.constLabel)
metrics.WALSegmentRowsTotal.Delete(m.constLabel)
metrics.WALSegmentBytes.Delete(m.constLabel)
metrics.WALPartitionTotal.Delete(m.constLabel)
metrics.WALCollectionTotal.Delete(m.constLabel)

View File

@ -44,3 +44,35 @@ func (cfg *config) validate() error {
}
return nil
}
// newTruncatorConfig creates a new config for the truncator.
func newTruncatorConfig() *truncatorConfig {
params := paramtable.Get()
samplerInterval := params.StreamingCfg.WALTruncateSampleInterval.GetAsDurationByParse()
retentionInterval := params.StreamingCfg.WALTruncateRetentionInterval.GetAsDurationByParse()
cfg := &truncatorConfig{
sampleInterval: samplerInterval,
retentionInterval: retentionInterval,
}
if err := cfg.validate(); err != nil {
panic(err)
}
return cfg
}
// truncatorConfig is the configuration for the truncator.
type truncatorConfig struct {
sampleInterval time.Duration // the interval to sample the checkpoint
retentionInterval time.Duration // the retention interval to sample the checkpoint
}
// validate validates the truncator config.
func (cfg *truncatorConfig) validate() error {
if cfg.sampleInterval <= 0 {
return errors.New("sampler interval must be greater than 0")
}
if cfg.retentionInterval <= 0 {
return errors.New("retention interval must be greater than 0")
}
return nil
}

View File

@ -49,3 +49,12 @@ func TestConfigValidate(t *testing.T) {
})
}
}
func TestTruncatorConfig(t *testing.T) {
// Mock paramtable values
paramtable.Init()
cfg := newTruncatorConfig()
assert.Equal(t, 1*time.Minute, cfg.sampleInterval)
assert.Equal(t, 5*time.Minute, cfg.retentionInterval)
}

View File

@ -0,0 +1,78 @@
package recovery
import (
"strconv"
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
)
func newRecoveryStorageMetrics(channelInfo types.PChannelInfo) *recoveryMetrics {
constLabels := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: channelInfo.Name,
metrics.WALChannelTermLabelName: strconv.FormatInt(channelInfo.Term, 10),
}
return &recoveryMetrics{
constLabels: constLabels,
info: metrics.WALRecoveryInfo.MustCurryWith(constLabels),
inconsistentEventTotal: metrics.WALRecoveryInconsistentEventTotal.With(constLabels),
isOnPersisting: metrics.WALRecoveryIsOnPersisting.With(constLabels),
inMemTimeTick: metrics.WALRecoveryInMemTimeTick.With(constLabels),
persistedTimeTick: metrics.WALRecoveryPersistedTimeTick.With(constLabels),
truncateTimeTick: metrics.WALTruncateTimeTick.With(constLabels),
}
}
type recoveryMetrics struct {
constLabels prometheus.Labels
info *prometheus.GaugeVec
inconsistentEventTotal prometheus.Counter
isOnPersisting prometheus.Gauge
inMemTimeTick prometheus.Gauge
persistedTimeTick prometheus.Gauge
truncateTimeTick prometheus.Gauge
}
// ObserveStateChange sets the state of the recovery storage metrics.
func (m *recoveryMetrics) ObserveStateChange(state string) {
metrics.WALRecoveryInfo.DeletePartialMatch(m.constLabels)
m.info.WithLabelValues(state).Set(1)
}
func (m *recoveryMetrics) ObServeInMemMetrics(tickTime uint64) {
m.inMemTimeTick.Set(tsoutil.PhysicalTimeSeconds(tickTime))
}
func (m *recoveryMetrics) ObServePersistedMetrics(tickTime uint64) {
m.persistedTimeTick.Set(tsoutil.PhysicalTimeSeconds(tickTime))
}
func (m *recoveryMetrics) ObServeTruncateMetrics(tickTime uint64) {
m.truncateTimeTick.Set(tsoutil.PhysicalTimeSeconds(tickTime))
}
func (m *recoveryMetrics) ObserveInconsitentEvent() {
m.inconsistentEventTotal.Inc()
}
func (m *recoveryMetrics) ObserveIsOnPersisting(onPersisting bool) {
if onPersisting {
m.isOnPersisting.Set(1)
} else {
m.isOnPersisting.Set(0)
}
}
func (m *recoveryMetrics) Close() {
metrics.WALRecoveryInfo.DeletePartialMatch(m.constLabels)
metrics.WALRecoveryInconsistentEventTotal.DeletePartialMatch(m.constLabels)
metrics.WALRecoveryIsOnPersisting.DeletePartialMatch(m.constLabels)
metrics.WALRecoveryInMemTimeTick.DeletePartialMatch(m.constLabels)
metrics.WALRecoveryPersistedTimeTick.DeletePartialMatch(m.constLabels)
metrics.WALTruncateTimeTick.DeletePartialMatch(m.constLabels)
}

View File

@ -62,6 +62,7 @@ func (rs *RecoveryStorage) persistDritySnapshotWhenClosing() {
// persistDirtySnapshot persists the dirty snapshot to the catalog.
func (rs *RecoveryStorage) persistDirtySnapshot(ctx context.Context, snapshot *RecoverySnapshot, lvl zapcore.Level) (err error) {
rs.metrics.ObserveIsOnPersisting(true)
logger := rs.Logger().With(
zap.String("checkpoint", snapshot.Checkpoint.MessageID.String()),
zap.Uint64("checkpointTimeTick", snapshot.Checkpoint.TimeTick),
@ -74,6 +75,7 @@ func (rs *RecoveryStorage) persistDirtySnapshot(ctx context.Context, snapshot *R
return
}
logger.Log(lvl, "persist dirty snapshot")
defer rs.metrics.ObserveIsOnPersisting(false)
}()
if err := rs.dropAllVirtualChannel(ctx, snapshot.VChannels); err != nil {
@ -109,10 +111,17 @@ func (rs *RecoveryStorage) persistDirtySnapshot(ctx context.Context, snapshot *R
}
// checkpoint updates should always be persisted after other updates success.
return rs.retryOperationWithBackoff(ctx, rs.Logger().With(zap.String("op", "persistCheckpoint")), func(ctx context.Context) error {
if err := rs.retryOperationWithBackoff(ctx, rs.Logger().With(zap.String("op", "persistCheckpoint")), func(ctx context.Context) error {
return resource.Resource().StreamingNodeCatalog().
SaveConsumeCheckpoint(ctx, rs.channel.Name, snapshot.Checkpoint.IntoProto())
})
}); err != nil {
return err
}
// sample the checkpoint for truncator to make wal truncation.
rs.metrics.ObServePersistedMetrics(snapshot.Checkpoint.TimeTick)
rs.truncator.SampleCheckpoint(snapshot.Checkpoint)
return
}
// dropAllVirtualChannel drops all virtual channels that are in the dropped state.

View File

@ -19,6 +19,7 @@ import (
// recoverRecoveryInfoFromMeta retrieves the recovery info for the given channel.
func (r *RecoveryStorage) recoverRecoveryInfoFromMeta(ctx context.Context, walName string, channelInfo types.PChannelInfo, lastTimeTickMessage message.ImmutableMessage) error {
r.metrics.ObserveStateChange(recoveryStorageStatePersistRecovering)
r.SetLogger(resource.Resource().Logger().With(
log.FieldComponent(componentRecoveryStorage),
zap.String("channel", channelInfo.String()),

View File

@ -5,6 +5,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls"
)
// RecoverySnapshot is the snapshot of the recovery info.
@ -31,6 +32,9 @@ type RecoveryStreamBuilder interface {
// Build builds a recovery stream from the given channel info.
// The recovery stream will return the messages from the start checkpoint to the end time tick.
Build(param BuildRecoveryStreamParam) RecoveryStream
// Return the underlying walimpls.WALImpls.
RWWALImpls() walimpls.WALImpls
}
// RecoveryStream is an interface that is used to recover the recovery storage from the WAL.

View File

@ -41,10 +41,17 @@ func RecoverRecoveryStorage(
return nil, nil, err
}
// recovery storage start work.
rs.metrics.ObserveStateChange(recoveryStorageStateWorking)
rs.SetLogger(resource.Resource().Logger().With(
log.FieldComponent(componentRecoveryStorage),
zap.String("channel", recoveryStreamBuilder.Channel().String()),
zap.String("state", recoveryStorageStateWorking)))
rs.truncator = newSamplingTruncator(
snapshot.Checkpoint.Clone(),
recoveryStreamBuilder.RWWALImpls(),
rs.metrics,
)
rs.truncator.SetLogger(rs.Logger())
go rs.backgroundTask()
return rs, snapshot, nil
}
@ -60,6 +67,7 @@ func newRecoveryStorage(channel types.PChannelInfo) *RecoveryStorage {
dirtyCounter: 0,
persistNotifier: make(chan struct{}, 1),
gracefulClosed: false,
metrics: newRecoveryStorageMetrics(channel),
}
}
@ -78,6 +86,8 @@ type RecoveryStorage struct {
// used to trigger the recovery persist operation.
persistNotifier chan struct{}
gracefulClosed bool
truncator *samplingTruncator
metrics *recoveryMetrics
}
// ObserveMessage is called when a new message is observed.
@ -92,6 +102,9 @@ func (r *RecoveryStorage) ObserveMessage(msg message.ImmutableMessage) {
func (r *RecoveryStorage) Close() {
r.backgroundTaskNotifier.Cancel()
r.backgroundTaskNotifier.BlockUntilFinish()
// Stop the truncator.
r.truncator.Close()
r.metrics.Close()
}
// notifyPersist notifies a persist operation.
@ -154,6 +167,7 @@ func (r *RecoveryStorage) observeMessage(msg message.ImmutableMessage) {
checkpointUpdates := !r.checkpoint.MessageID.EQ(msg.LastConfirmedMessageID())
r.checkpoint.TimeTick = msg.TimeTick()
r.checkpoint.MessageID = msg.LastConfirmedMessageID()
r.metrics.ObServeInMemMetrics(r.checkpoint.TimeTick)
if checkpointUpdates {
// only count the dirty if last confirmed message id is updated.
@ -375,4 +389,5 @@ func (r *RecoveryStorage) detectInconsistency(msg message.ImmutableMessage, reas
// The log is not fatal in some cases.
// because our meta is not atomic-updated, so these error may be logged if crashes when meta updated partially.
r.Logger().Warn("inconsistency detected", fields...)
r.metrics.ObserveInconsitentEvent()
}

View File

@ -22,6 +22,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/rmq"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
@ -196,6 +197,10 @@ func (b *streamBuilder) segmentNum() int {
return segmentNum
}
func (b *streamBuilder) RWWALImpls() walimpls.WALImpls {
return nil
}
type testRecoveryStream struct {
ch chan message.ImmutableMessage
}

View File

@ -19,6 +19,8 @@ func (r *RecoveryStorage) recoverFromStream(
recoveryStreamBuilder RecoveryStreamBuilder,
lastTimeTickMessage message.ImmutableMessage,
) (snapshot *RecoverySnapshot, err error) {
r.metrics.ObserveStateChange(recoveryStorageStateStreamRecovering)
r.metrics.ObServePersistedMetrics(r.checkpoint.TimeTick)
r.SetLogger(resource.Resource().Logger().With(
log.FieldComponent(componentRecoveryStorage),
zap.String("channel", recoveryStreamBuilder.Channel().String()),

View File

@ -0,0 +1,137 @@
package recovery
import (
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
)
// newSamplingTruncator creates a new sampling truncator.
func newSamplingTruncator(
checkpoint *WALCheckpoint,
truncator walimpls.WALImpls,
recoveryMetrics *recoveryMetrics,
) *samplingTruncator {
st := &samplingTruncator{
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
cfg: newTruncatorConfig(),
truncator: truncator,
mu: sync.Mutex{},
checkpointSamples: []*WALCheckpoint{checkpoint},
lastTruncatedCheckpoint: nil,
lastSampled: time.Now(),
metrics: recoveryMetrics,
}
go st.background()
return st
}
// samplingTruncator is a sampling truncator that samples the incoming checkpoint and truncates the WAL.
type samplingTruncator struct {
log.Binder
notifier *syncutil.AsyncTaskNotifier[struct{}]
cfg *truncatorConfig
truncator walimpls.WALImpls
mu sync.Mutex
checkpointSamples []*WALCheckpoint // the samples of checkpoints
lastTruncatedCheckpoint *WALCheckpoint // the last truncated checkpoint
lastSampled time.Time // the last time the checkpoint is sampled
metrics *recoveryMetrics
}
// SampleCheckpoint samples the incoming checkpoint and adds it to the checkpoint samples.
func (t *samplingTruncator) SampleCheckpoint(checkpoint *WALCheckpoint) {
t.mu.Lock()
defer t.mu.Unlock()
if time.Since(t.lastSampled) < t.cfg.sampleInterval {
return
}
if len(t.checkpointSamples) == 0 || t.checkpointSamples[len(t.checkpointSamples)-1].MessageID.LT(checkpoint.MessageID) {
t.checkpointSamples = append(t.checkpointSamples, checkpoint)
}
t.lastSampled = time.Now()
}
// background starts the background task of the sampling truncator.
func (t *samplingTruncator) background() {
ticker := time.NewTicker(t.cfg.sampleInterval / 2)
defer func() {
ticker.Stop()
t.notifier.Finish(struct{}{})
t.Logger().Info("sampling truncator background task exit")
}()
for {
select {
case <-t.notifier.Context().Done():
return
case <-ticker.C:
t.applyTruncate()
}
}
}
// consumeCheckpointSamples consumes the checkpoint samples and returns the truncate checkpoint.
func (t *samplingTruncator) consumeCheckpointSamples() *WALCheckpoint {
t.mu.Lock()
defer t.mu.Unlock()
targetCheckpointIdx := -1
for i := 0; i < len(t.checkpointSamples); i++ {
if time.Since(tsoutil.PhysicalTime(t.checkpointSamples[i].TimeTick)) < t.cfg.retentionInterval {
break
}
targetCheckpointIdx = i
}
if targetCheckpointIdx >= 0 {
checkpoint := t.checkpointSamples[targetCheckpointIdx]
t.checkpointSamples = t.checkpointSamples[targetCheckpointIdx+1:]
return checkpoint
}
return nil
}
// applyTruncate applies the truncate operation.
func (t *samplingTruncator) applyTruncate() {
truncateCheckpoint := t.consumeCheckpointSamples()
if truncateCheckpoint == nil {
t.Logger().Debug("no checkpoint sample can be used to truncate wal")
return
}
logger := t.Logger().With(zap.String("messageID", truncateCheckpoint.MessageID.String()), zap.Uint64("timeTick", truncateCheckpoint.TimeTick))
if t.lastTruncatedCheckpoint != nil {
logger = logger.With(zap.String("lastMessageID", t.lastTruncatedCheckpoint.MessageID.String()), zap.Uint64("lastTimeTick", t.lastTruncatedCheckpoint.TimeTick))
if truncateCheckpoint.MessageID.EQ(t.lastTruncatedCheckpoint.MessageID) {
logger.Debug("checkpoint sample is the same, ignore the operation", zap.String("messageID", truncateCheckpoint.MessageID.String()))
t.lastTruncatedCheckpoint = truncateCheckpoint
t.metrics.ObServeTruncateMetrics(truncateCheckpoint.TimeTick)
return
} else if truncateCheckpoint.MessageID.LT(t.lastTruncatedCheckpoint.MessageID) {
logger.Warn("checkpoint sample is not in order, the wal may be corrupted", zap.String("targetMessageID", truncateCheckpoint.MessageID.String()))
return
}
}
err := t.truncator.Truncate(t.notifier.Context(), truncateCheckpoint.MessageID)
if err != nil {
logger.Warn("failed to truncate wal, the checkpoint sample is lost", zap.Error(err))
return
}
logger.Info("truncate wal")
t.lastTruncatedCheckpoint = truncateCheckpoint
t.metrics.ObServeTruncateMetrics(truncateCheckpoint.TimeTick)
}
// Close closes the sampling truncator.
func (t *samplingTruncator) Close() {
t.notifier.Cancel()
t.notifier.BlockAndGetResult()
}

View File

@ -0,0 +1,40 @@
package recovery
import (
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus/pkg/v2/mocks/streaming/mock_walimpls"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/rmq"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
)
func TestTruncator(t *testing.T) {
w := mock_walimpls.NewMockWALImpls(t)
w.EXPECT().Truncate(mock.Anything, mock.Anything).Return(nil)
paramtable.Get().Save(paramtable.Get().StreamingCfg.WALTruncateSampleInterval.Key, "1ms")
paramtable.Get().Save(paramtable.Get().StreamingCfg.WALTruncateRetentionInterval.Key, "2ms")
truncator := newSamplingTruncator(&WALCheckpoint{
MessageID: rmq.NewRmqID(1),
TimeTick: 1,
Magic: recoveryMagicStreamingInitialized,
}, w, newRecoveryStorageMetrics(types.PChannelInfo{Name: "test", Term: 1}))
for i := 0; i < 20; i++ {
time.Sleep(1 * time.Millisecond)
for rand.Int31n(3) < 1 {
truncator.SampleCheckpoint(&WALCheckpoint{
MessageID: rmq.NewRmqID(int64(i)),
TimeTick: tsoutil.ComposeTSByTime(time.Now(), 0),
Magic: recoveryMagicStreamingInitialized,
})
}
}
truncator.Close()
}

View File

@ -30,10 +30,10 @@ const (
WALInterceptorLabelName = "interceptor_name"
WALTxnStateLabelName = "state"
WALFlusherStateLabelName = "state"
WALRecoveryStorageStateLabelName = "state"
WALStateLabelName = "state"
WALChannelLabelName = channelNameLabelName
WALSegmentSealPolicyNameLabelName = "policy"
WALSegmentAllocStateLabelName = "state"
WALMessageTypeLabelName = "message_type"
WALChannelTermLabelName = "term"
WALNameLabelName = "wal_name"
@ -250,13 +250,19 @@ var (
WALSegmentAllocTotal = newWALGaugeVec(prometheus.GaugeOpts{
Name: "segment_assign_segment_alloc_total",
Help: "Total of segment alloc on wal",
}, WALChannelLabelName, WALSegmentAllocStateLabelName)
}, WALChannelLabelName)
WALSegmentFlushedTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "segment_assign_flushed_segment_total",
Help: "Total of segment sealed on wal",
}, WALChannelLabelName, WALSegmentSealPolicyNameLabelName)
WALSegmentRowsTotal = newWALHistogramVec(prometheus.HistogramOpts{
Name: "segment_assign_segment_rows_total",
Help: "Total rows of segment alloc on wal",
Buckets: prometheus.ExponentialBucketsRange(128, 1048576, 10), // 5MB -> 1024MB
}, WALChannelLabelName)
WALSegmentBytes = newWALHistogramVec(prometheus.HistogramOpts{
Name: "segment_assign_segment_bytes",
Help: "Bytes of segment alloc on wal",
@ -396,6 +402,36 @@ var (
Name: "flusher_time_tick",
Help: "the final timetick tick of flusher seen",
}, WALChannelLabelName, WALChannelTermLabelName)
WALRecoveryInfo = newWALGaugeVec(prometheus.GaugeOpts{
Name: "recovery_info",
Help: "Current info of recovery storage on current wal",
}, WALChannelLabelName, WALChannelTermLabelName, WALRecoveryStorageStateLabelName)
WALRecoveryInMemTimeTick = newWALGaugeVec(prometheus.GaugeOpts{
Name: "recovery_in_mem_time_tick",
Help: "the final timetick tick of recovery storage seen",
}, WALChannelLabelName, WALChannelTermLabelName)
WALRecoveryPersistedTimeTick = newWALGaugeVec(prometheus.GaugeOpts{
Name: "recovery_persisted_time_tick",
Help: "the final persisted timetick tick of recovery storage seen",
}, WALChannelLabelName, WALChannelTermLabelName)
WALRecoveryInconsistentEventTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "recovery_inconsistent_event_total",
Help: "Total of recovery inconsistent event",
}, WALChannelLabelName, WALChannelTermLabelName)
WALRecoveryIsOnPersisting = newWALGaugeVec(prometheus.GaugeOpts{
Name: "recovery_is_on_persisting",
Help: "Is recovery storage on persisting",
}, WALChannelLabelName, WALChannelTermLabelName)
WALTruncateTimeTick = newWALGaugeVec(prometheus.GaugeOpts{
Name: "truncate_time_tick",
Help: "the final timetick tick of truncator seen",
}, WALChannelLabelName, WALChannelTermLabelName)
)
// RegisterStreamingServiceClient registers streaming service client metrics
@ -454,6 +490,7 @@ func registerWAL(registry *prometheus.Registry) {
registry.MustRegister(WALGrowingSegmentLWMBytes)
registry.MustRegister(WALSegmentAllocTotal)
registry.MustRegister(WALSegmentFlushedTotal)
registry.MustRegister(WALSegmentRowsTotal)
registry.MustRegister(WALSegmentBytes)
registry.MustRegister(WALPartitionTotal)
registry.MustRegister(WALCollectionTotal)
@ -480,6 +517,12 @@ func registerWAL(registry *prometheus.Registry) {
registry.MustRegister(WALScannerTxnBufBytes)
registry.MustRegister(WALFlusherInfo)
registry.MustRegister(WALFlusherTimeTick)
registry.MustRegister(WALRecoveryInfo)
registry.MustRegister(WALRecoveryInMemTimeTick)
registry.MustRegister(WALRecoveryPersistedTimeTick)
registry.MustRegister(WALRecoveryInconsistentEventTotal)
registry.MustRegister(WALRecoveryIsOnPersisting)
registry.MustRegister(WALTruncateTimeTick)
}
func newStreamingCoordGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec {

View File

@ -84,7 +84,8 @@ func (w *walImpl) Truncate(ctx context.Context, id message.MessageID) error {
Topic: w.Channel().Name,
SubscriptionName: truncateCursorSubscriptionName,
Type: pulsar.Exclusive,
MaxPendingChunkedMessage: 1,
MaxPendingChunkedMessage: 0,
StartMessageIDInclusive: true,
})
if err != nil {
return err

View File

@ -5412,6 +5412,8 @@ type streamingConfig struct {
WALRecoveryPersistInterval ParamItem `refreshable:"true"`
WALRecoveryMaxDirtyMessage ParamItem `refreshable:"true"`
WALRecoveryGracefulCloseTimeout ParamItem `refreshable:"true"`
WALTruncateSampleInterval ParamItem `refreshable:"true"`
WALTruncateRetentionInterval ParamItem `refreshable:"true"`
}
func (p *streamingConfig) init(base *BaseTable) {
@ -5615,6 +5617,28 @@ If that persist operation exceeds this timeout, the wal recovery module will clo
Export: true,
}
p.WALRecoveryGracefulCloseTimeout.Init(base.mgr)
p.WALTruncateSampleInterval = ParamItem{
Key: "streaming.walTruncate.sampleInterval",
Version: "2.6.0",
Doc: `The interval of sampling wal checkpoint when truncate, 1m by default.
Every time the checkpoint is persisted, the checkpoint will be sampled and used to be a candidate of truncate checkpoint.
More samples, more frequent truncate, more memory usage.`,
DefaultValue: "1m",
Export: true,
}
p.WALTruncateSampleInterval.Init(base.mgr)
p.WALTruncateRetentionInterval = ParamItem{
Key: "streaming.walTruncate.retentionInterval",
Version: "2.6.0",
Doc: `The retention interval of wal truncate, 5m by default.
If the sampled checkpoint is older than this interval, it will be used to truncate wal checkpoint.
Greater the interval, more wal storage usage, more redundant data in wal`,
DefaultValue: "5m",
Export: true,
}
p.WALTruncateRetentionInterval.Init(base.mgr)
}
// runtimeConfig is just a private environment value table.

View File

@ -631,6 +631,8 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, float64(0.6), params.StreamingCfg.FlushMemoryThreshold.GetAsFloat())
assert.Equal(t, float64(0.4), params.StreamingCfg.FlushGrowingSegmentBytesHwmThreshold.GetAsFloat())
assert.Equal(t, float64(0.2), params.StreamingCfg.FlushGrowingSegmentBytesLwmThreshold.GetAsFloat())
assert.Equal(t, 1*time.Minute, params.StreamingCfg.WALTruncateSampleInterval.GetAsDurationByParse())
assert.Equal(t, 5*time.Minute, params.StreamingCfg.WALTruncateRetentionInterval.GetAsDurationByParse())
params.Save(params.StreamingCfg.WALBalancerTriggerInterval.Key, "50s")
params.Save(params.StreamingCfg.WALBalancerBackoffInitialInterval.Key, "50s")
@ -652,6 +654,8 @@ func TestComponentParam(t *testing.T) {
params.Save(params.StreamingCfg.FlushMemoryThreshold.Key, "0.7")
params.Save(params.StreamingCfg.FlushGrowingSegmentBytesHwmThreshold.Key, "0.25")
params.Save(params.StreamingCfg.FlushGrowingSegmentBytesLwmThreshold.Key, "0.15")
params.Save(params.StreamingCfg.WALTruncateSampleInterval.Key, "1m")
params.Save(params.StreamingCfg.WALTruncateRetentionInterval.Key, "30m")
assert.Equal(t, 50*time.Second, params.StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse())
assert.Equal(t, 50*time.Second, params.StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse())
assert.Equal(t, 3.5, params.StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat())
@ -672,6 +676,8 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, float64(0.7), params.StreamingCfg.FlushMemoryThreshold.GetAsFloat())
assert.Equal(t, float64(0.25), params.StreamingCfg.FlushGrowingSegmentBytesHwmThreshold.GetAsFloat())
assert.Equal(t, float64(0.15), params.StreamingCfg.FlushGrowingSegmentBytesLwmThreshold.GetAsFloat())
assert.Equal(t, 1*time.Minute, params.StreamingCfg.WALTruncateSampleInterval.GetAsDurationByParse())
assert.Equal(t, 30*time.Minute, params.StreamingCfg.WALTruncateRetentionInterval.GetAsDurationByParse())
})
t.Run("channel config priority", func(t *testing.T) {