fix: use binlog counter to trigger flush but not stats log (#37037)

issue: #36804

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2024-10-23 15:07:29 +08:00 committed by GitHub
parent 4746f47282
commit f3d9d05a28
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 44 additions and 34 deletions

View File

@ -528,8 +528,8 @@ dataCoord:
# The max idle time of segment in seconds, 10*60. # The max idle time of segment in seconds, 10*60.
maxIdleTime: 600 maxIdleTime: 600
minSizeFromIdleToSealed: 16 # The min size in MB of segment which can be idle from sealed. minSizeFromIdleToSealed: 16 # The min size in MB of segment which can be idle from sealed.
# The max number of binlog file for one segment, the segment will be sealed if # The max number of binlog (which is equal to the binlog file num of primary key) for one segment,
# the number of binlog file reaches to max value. # the segment will be sealed if the number of binlog file reaches to max value.
maxBinlogFileNumber: 32 maxBinlogFileNumber: 32
smallProportion: 0.5 # The segment is considered as "small segment" when its # of rows is smaller than smallProportion: 0.5 # The segment is considered as "small segment" when its # of rows is smaller than
# (smallProportion * segment max # of rows). # (smallProportion * segment max # of rows).

View File

@ -161,10 +161,12 @@ func sealL1SegmentByLifetime(lifetime time.Duration) segmentSealPolicyFunc {
func sealL1SegmentByBinlogFileNumber(maxBinlogFileNumber int) segmentSealPolicyFunc { func sealL1SegmentByBinlogFileNumber(maxBinlogFileNumber int) segmentSealPolicyFunc {
return func(segment *SegmentInfo, ts Timestamp) (bool, string) { return func(segment *SegmentInfo, ts Timestamp) (bool, string) {
logFileCounter := 0 logFileCounter := 0
for _, fieldBinlog := range segment.GetStatslogs() { for _, fieldBinlog := range segment.GetBinlogs() {
// Only count the binlog file number of the first field which is equal to the binlog file number of the primary field.
// Remove the multiplier generated by the number of fields.
logFileCounter += len(fieldBinlog.GetBinlogs()) logFileCounter += len(fieldBinlog.GetBinlogs())
break
} }
return logFileCounter >= maxBinlogFileNumber, return logFileCounter >= maxBinlogFileNumber,
fmt.Sprintf("Segment binlog number too large, binlog number: %d, max binlog number: %d", logFileCounter, maxBinlogFileNumber) fmt.Sprintf("Segment binlog number too large, binlog number: %d, max binlog number: %d", logFileCounter, maxBinlogFileNumber)
} }

View File

@ -626,7 +626,7 @@ func TestTryToSealSegment(t *testing.T) {
segments := segmentManager.meta.segments.segments segments := segmentManager.meta.segments.segments
assert.Equal(t, 1, len(segments)) assert.Equal(t, 1, len(segments))
for _, seg := range segments { for _, seg := range segments {
seg.Statslogs = []*datapb.FieldBinlog{ seg.Binlogs = []*datapb.FieldBinlog{
{ {
FieldID: 1, FieldID: 1,
Binlogs: []*datapb.Binlog{ Binlogs: []*datapb.Binlog{

View File

@ -124,9 +124,10 @@ func (c *channelLifetime) Run() error {
} }
if tt, ok := t.(*syncmgr.SyncTask); ok { if tt, ok := t.(*syncmgr.SyncTask); ok {
insertLogs, _, _ := tt.Binlogs() insertLogs, _, _ := tt.Binlogs()
resource.Resource().SegmentAssignStatsManager().UpdateOnSync(tt.SegmentID(), resource.Resource().SegmentAssignStatsManager().UpdateOnSync(tt.SegmentID(), stats.SyncOperationMetrics{
stats.SyncOperationMetrics{BinLogCounterIncr: uint64(len(insertLogs))}, BinLogCounterIncr: 1,
) BinLogFileCounterIncr: uint64(len(insertLogs)),
})
} }
}) })
if err != nil { if err != nil {

View File

@ -23,7 +23,7 @@ func GetSegmentAsyncSealPolicy() []SegmentAsyncSealPolicy {
// TODO: dynamic policy can be applied here in future. // TODO: dynamic policy can be applied here in future.
return []SegmentAsyncSealPolicy{ return []SegmentAsyncSealPolicy{
&sealByCapacity{}, &sealByCapacity{},
&sealByBinlogFileNumber{}, &sealByBinlogNumber{},
&sealByLifetime{}, &sealByLifetime{},
&sealByIdleTime{}, &sealByIdleTime{},
} }
@ -57,23 +57,23 @@ func (p *sealByCapacity) ShouldBeSealed(stats *stats.SegmentStats) SealPolicyRes
} }
} }
// sealByBinlogFileNumberExtraInfo is the extra info of the seal by binlog file number policy. // sealByBinlogFileExtraInfo is the extra info of the seal by binlog file number policy.
type sealByBinlogFileNumberExtraInfo struct { type sealByBinlogFileExtraInfo struct {
BinLogFileNumberLimit int BinLogNumberLimit int
} }
// sealByBinlogFileNumber is a policy to seal the segment by the binlog file number. // sealByBinlogNumber is a policy to seal the segment by the binlog file number.
type sealByBinlogFileNumber struct{} type sealByBinlogNumber struct{}
// ShouldBeSealed checks if the segment should be sealed, and return the reason string. // ShouldBeSealed checks if the segment should be sealed, and return the reason string.
func (p *sealByBinlogFileNumber) ShouldBeSealed(stats *stats.SegmentStats) SealPolicyResult { func (p *sealByBinlogNumber) ShouldBeSealed(stats *stats.SegmentStats) SealPolicyResult {
limit := paramtable.Get().DataCoordCfg.SegmentMaxBinlogFileNumber.GetAsInt() limit := paramtable.Get().DataCoordCfg.SegmentMaxBinlogFileNumber.GetAsInt()
shouldBeSealed := stats.BinLogCounter >= uint64(limit) shouldBeSealed := stats.BinLogCounter >= uint64(limit)
return SealPolicyResult{ return SealPolicyResult{
PolicyName: "binlog_file_number", PolicyName: "binlog_number",
ShouldBeSealed: shouldBeSealed, ShouldBeSealed: shouldBeSealed,
ExtraInfo: &sealByBinlogFileNumberExtraInfo{ ExtraInfo: &sealByBinlogFileExtraInfo{
BinLogFileNumberLimit: limit, BinLogNumberLimit: limit,
}, },
} }
} }

View File

@ -7,13 +7,15 @@ import (
) )
// SegmentStats is the usage stats of a segment. // SegmentStats is the usage stats of a segment.
// The SegmentStats is imprecise, so it is not promised to be recoverable for performance.
type SegmentStats struct { type SegmentStats struct {
Insert InsertMetrics Insert InsertMetrics
MaxBinarySize uint64 // MaxBinarySize of current segment should be assigned, it's a fixed value when segment is transfer int growing. MaxBinarySize uint64 // MaxBinarySize of current segment should be assigned, it's a fixed value when segment is transfer int growing.
CreateTime time.Time // created timestamp of this segment, it's a fixed value when segment is created, not a tso. CreateTime time.Time // created timestamp of this segment, it's a fixed value when segment is created, not a tso.
LastModifiedTime time.Time // LastWriteTime is the last write time of this segment, it's not a tso, just a local time. LastModifiedTime time.Time // LastWriteTime is the last write time of this segment, it's not a tso, just a local time.
BinLogCounter uint64 // BinLogCounter is the counter of binlog, it's an async stat not real time. BinLogCounter uint64 // BinLogCounter is the counter of binlog (equal to the binlog file count of primary key), it's an async stat not real time.
ReachLimit bool // ReachLimit is a flag to indicate the segment reach the limit once. BinLogFileCounter uint64 // BinLogFileCounter is the counter of binlog files, it's an async stat not real time.
ReachLimit bool // ReachLimit is a flag to indicate the segment reach the limit once.
} }
// NewSegmentStatFromProto creates a new segment assignment stat from proto. // NewSegmentStatFromProto creates a new segment assignment stat from proto.
@ -50,7 +52,8 @@ func NewProtoFromSegmentStat(stat *SegmentStats) *streamingpb.SegmentAssignmentS
// SyncOperationMetrics is the metrics of sync operation. // SyncOperationMetrics is the metrics of sync operation.
type SyncOperationMetrics struct { type SyncOperationMetrics struct {
BinLogCounterIncr uint64 // the counter increment of bin log. BinLogCounterIncr uint64 // the counter increment of bin log
BinLogFileCounterIncr uint64 // the counter increment of bin log file
} }
// AllocRows alloc space of rows on current segment. // AllocRows alloc space of rows on current segment.
@ -74,6 +77,7 @@ func (s *SegmentStats) BinaryCanBeAssign() uint64 {
// UpdateOnSync updates the stats of segment on sync. // UpdateOnSync updates the stats of segment on sync.
func (s *SegmentStats) UpdateOnSync(f SyncOperationMetrics) { func (s *SegmentStats) UpdateOnSync(f SyncOperationMetrics) {
s.BinLogCounter += f.BinLogCounterIncr s.BinLogCounter += f.BinLogCounterIncr
s.BinLogFileCounter += f.BinLogFileCounterIncr
} }
// Copy copies the segment stats. // Copy copies the segment stats.

View File

@ -43,10 +43,11 @@ func TestSegmentStats(t *testing.T) {
Rows: 100, Rows: 100,
BinarySize: 200, BinarySize: 200,
}, },
MaxBinarySize: 400, MaxBinarySize: 400,
CreateTime: now, CreateTime: now,
LastModifiedTime: now, LastModifiedTime: now,
BinLogCounter: 3, BinLogCounter: 3,
BinLogFileCounter: 4,
} }
insert1 := InsertMetrics{ insert1 := InsertMetrics{
@ -69,7 +70,9 @@ func TestSegmentStats(t *testing.T) {
assert.Equal(t, stat.Insert.BinarySize, uint64(320)) assert.Equal(t, stat.Insert.BinarySize, uint64(320))
stat.UpdateOnSync(SyncOperationMetrics{ stat.UpdateOnSync(SyncOperationMetrics{
BinLogCounterIncr: 4, BinLogCounterIncr: 4,
BinLogFileCounterIncr: 9,
}) })
assert.Equal(t, uint64(7), stat.BinLogCounter) assert.Equal(t, uint64(7), stat.BinLogCounter)
assert.Equal(t, uint64(13), stat.BinLogFileCounter)
} }

View File

@ -217,13 +217,13 @@ var (
Help: "Total of append message to wal", Help: "Total of append message to wal",
}, WALChannelLabelName, WALMessageTypeLabelName, StatusLabelName) }, WALChannelLabelName, WALMessageTypeLabelName, StatusLabelName)
WALAppendMessageDurationSeconds = newStreamingNodeHistogramVec(prometheus.HistogramOpts{ WALAppendMessageDurationSeconds = newWALHistogramVec(prometheus.HistogramOpts{
Name: "append_message_duration_seconds", Name: "append_message_duration_seconds",
Help: "Duration of wal append message", Help: "Duration of wal append message",
Buckets: secondsBuckets, Buckets: secondsBuckets,
}, WALChannelLabelName, StatusLabelName) }, WALChannelLabelName, StatusLabelName)
WALImplsAppendMessageDurationSeconds = newStreamingNodeHistogramVec(prometheus.HistogramOpts{ WALImplsAppendMessageDurationSeconds = newWALHistogramVec(prometheus.HistogramOpts{
Name: "impls_append_message_duration_seconds", Name: "impls_append_message_duration_seconds",
Help: "Duration of wal impls append message", Help: "Duration of wal impls append message",
Buckets: secondsBuckets, Buckets: secondsBuckets,

View File

@ -3417,8 +3417,8 @@ The max idle time of segment in seconds, 10*60.`,
Key: "dataCoord.segment.maxBinlogFileNumber", Key: "dataCoord.segment.maxBinlogFileNumber",
Version: "2.2.0", Version: "2.2.0",
DefaultValue: "32", DefaultValue: "32",
Doc: `The max number of binlog file for one segment, the segment will be sealed if Doc: `The max number of binlog (which is equal to the binlog file num of primary key) for one segment,
the number of binlog file reaches to max value.`, the segment will be sealed if the number of binlog file reaches to max value.`,
Export: true, Export: true,
} }
p.SegmentMaxBinlogFileNumber.Init(base.mgr) p.SegmentMaxBinlogFileNumber.Init(base.mgr)