Zhen Ye 8905b042f1
fix: add proportion for capacity seal policy in streaming flusher (#36761)
issue: #36760

---------

Signed-off-by: chyezh <chyezh@outlook.com>
2024-10-14 14:47:22 +08:00

84 lines
2.8 KiB
Go

package stats
import (
"time"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
)
// SegmentStats is the usage stats of a segment.
type SegmentStats struct {
Insert InsertMetrics
MaxBinarySize uint64 // MaxBinarySize of current segment should be assigned, it's a fixed value when segment is transfer int growing.
CreateTime time.Time // created timestamp of this segment, it's a fixed value when segment is created, not a tso.
LastModifiedTime time.Time // LastWriteTime is the last write time of this segment, it's not a tso, just a local time.
BinLogCounter uint64 // BinLogCounter is the counter of binlog, it's an async stat not real time.
ReachLimit bool // ReachLimit is a flag to indicate the segment reach the limit once.
}
// NewSegmentStatFromProto creates a new segment assignment stat from proto.
func NewSegmentStatFromProto(statProto *streamingpb.SegmentAssignmentStat) *SegmentStats {
if statProto == nil {
return nil
}
return &SegmentStats{
Insert: InsertMetrics{
Rows: statProto.InsertedRows,
BinarySize: statProto.InsertedBinarySize,
},
MaxBinarySize: statProto.MaxBinarySize,
CreateTime: time.Unix(0, statProto.CreateTimestampNanoseconds),
BinLogCounter: statProto.BinlogCounter,
LastModifiedTime: time.Unix(0, statProto.LastModifiedTimestampNanoseconds),
}
}
// NewProtoFromSegmentStat creates a new proto from segment assignment stat.
func NewProtoFromSegmentStat(stat *SegmentStats) *streamingpb.SegmentAssignmentStat {
if stat == nil {
return nil
}
return &streamingpb.SegmentAssignmentStat{
MaxBinarySize: stat.MaxBinarySize,
InsertedRows: stat.Insert.Rows,
InsertedBinarySize: stat.Insert.BinarySize,
CreateTimestampNanoseconds: stat.CreateTime.UnixNano(),
BinlogCounter: stat.BinLogCounter,
LastModifiedTimestampNanoseconds: stat.LastModifiedTime.UnixNano(),
}
}
// SyncOperationMetrics is the metrics of sync operation.
type SyncOperationMetrics struct {
BinLogCounterIncr uint64 // the counter increment of bin log.
}
// AllocRows alloc space of rows on current segment.
// Return true if the segment is assigned.
func (s *SegmentStats) AllocRows(m InsertMetrics) bool {
if m.BinarySize > s.BinaryCanBeAssign() {
s.ReachLimit = true
return false
}
s.Insert.Collect(m)
s.LastModifiedTime = time.Now()
return true
}
// BinaryCanBeAssign returns the capacity of binary size can be inserted.
func (s *SegmentStats) BinaryCanBeAssign() uint64 {
return s.MaxBinarySize - s.Insert.BinarySize
}
// UpdateOnSync updates the stats of segment on sync.
func (s *SegmentStats) UpdateOnSync(f SyncOperationMetrics) {
s.BinLogCounter += f.BinLogCounterIncr
}
// Copy copies the segment stats.
func (s *SegmentStats) Copy() *SegmentStats {
s2 := *s
return &s2
}