enhance: [2.6] add a disk quota for the loaded binlog size to prevent load failures of querynode (#44932)

issue: #41435 
pr: #44893

---------

Signed-off-by: Shawn Wang <shawn.wang@zilliz.com>
This commit is contained in:
sparknack 2025-10-19 19:46:07 +08:00 committed by GitHub
parent ce8187d28e
commit 64b76b723f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 288 additions and 8 deletions

View File

@ -1209,6 +1209,7 @@ quotaAndLimits:
diskProtection:
enabled: true # When the total file size of object storage is greater than `diskQuota`, all dml requests would be rejected;
diskQuota: -1 # MB, (0, +inf), default no limit
loadedDiskQuota: -1 # MB, (0, +inf), default no limit
diskQuotaPerDB: -1 # MB, (0, +inf), default no limit
diskQuotaPerCollection: -1 # MB, (0, +inf), default no limit
diskQuotaPerPartition: -1 # MB, (0, +inf), default no limit

View File

@ -153,6 +153,7 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
NumFlowGraph: node.pipelineManager.Num(),
},
GrowingSegmentsSize: totalGrowingSize,
LoadedBinlogSize: node.manager.Segment.GetLoadedBinlogSize(),
Effect: metricsinfo.NodeEffect{
NodeID: node.GetNodeID(),
CollectionIDs: lo.Keys(collections),

View File

@ -29,6 +29,7 @@ import (
"fmt"
"sync"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
@ -202,6 +203,10 @@ type SegmentManager interface {
AddLogicalResource(usage ResourceUsage)
SubLogicalResource(usage ResourceUsage)
GetLogicalResource() ResourceUsage
AddLoadedBinlogSize(size int64)
SubLoadedBinlogSize(size int64)
GetLoadedBinlogSize() int64
}
var _ SegmentManager = (*segmentManager)(nil)
@ -377,6 +382,9 @@ type segmentManager struct {
// only MemorySize and DiskSize are used, other fields are ignored.
logicalResource ResourceUsage
logicalResourceLock sync.Mutex
// loadedBinlogSize stats the total binlog size of all loaded segments of this querynode.
loadedBinlogSize atomic.Int64
}
func NewSegmentManager() *segmentManager {
@ -424,6 +432,39 @@ func (mgr *segmentManager) GetLogicalResource() ResourceUsage {
return mgr.logicalResource
}
func (mgr *segmentManager) AddLoadedBinlogSize(size int64) {
mgr.loadedBinlogSize.Add(size)
}
func (mgr *segmentManager) SubLoadedBinlogSize(size int64) {
// Clamp to zero to avoid negative values on concurrent or duplicate subtractions
for {
current := mgr.loadedBinlogSize.Load()
newVal := current - size
if newVal < 0 {
newVal = 0
}
if mgr.loadedBinlogSize.CompareAndSwap(current, newVal) {
if current < size {
log.Warn("Loaded binlog size subtraction exceeds current value, clamped to 0",
zap.Int64("current", current),
zap.Int64("subtracted", size))
}
return
}
// retry on CompareAndSwap failure
}
}
func (mgr *segmentManager) GetLoadedBinlogSize() int64 {
current := mgr.loadedBinlogSize.Load()
if current < 0 {
log.Warn("Loaded binlog size is negative, returning 0", zap.Int64("current", current))
return 0
}
return current
}
// put is the internal put method updating both global segments and secondary index.
func (mgr *segmentManager) put(ctx context.Context, segmentType SegmentType, segment Segment) {
mgr.globalSegments.Put(ctx, segmentType, segment)

View File

@ -190,3 +190,30 @@ func (s *ManagerSuite) TestIncreaseVersion() {
func TestManager(t *testing.T) {
suite.Run(t, new(ManagerSuite))
}
func TestLoadedBinlogSizeAccounting(t *testing.T) {
m := NewSegmentManager()
if got := m.GetLoadedBinlogSize(); got != 0 {
t.Fatalf("expected initial 0, got %d", got)
}
m.AddLoadedBinlogSize(100)
if got := m.GetLoadedBinlogSize(); got != 100 {
t.Fatalf("expected 100 after add, got %d", got)
}
m.AddLoadedBinlogSize(50)
if got := m.GetLoadedBinlogSize(); got != 150 {
t.Fatalf("expected 150 after add, got %d", got)
}
m.SubLoadedBinlogSize(20)
if got := m.GetLoadedBinlogSize(); got != 130 {
t.Fatalf("expected 130 after sub, got %d", got)
}
m.SubLoadedBinlogSize(1000)
if got := m.GetLoadedBinlogSize(); got != 0 {
t.Fatalf("expected clamp to 0, got %d", got)
}
}

View File

@ -25,6 +25,39 @@ func (_m *MockSegmentManager) EXPECT() *MockSegmentManager_Expecter {
return &MockSegmentManager_Expecter{mock: &_m.Mock}
}
// AddLoadedBinlogSize provides a mock function with given fields: size
func (_m *MockSegmentManager) AddLoadedBinlogSize(size int64) {
_m.Called(size)
}
// MockSegmentManager_AddLoadedBinlogSize_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddLoadedBinlogSize'
type MockSegmentManager_AddLoadedBinlogSize_Call struct {
*mock.Call
}
// AddLoadedBinlogSize is a helper method to define mock.On call
// - size int64
func (_e *MockSegmentManager_Expecter) AddLoadedBinlogSize(size interface{}) *MockSegmentManager_AddLoadedBinlogSize_Call {
return &MockSegmentManager_AddLoadedBinlogSize_Call{Call: _e.mock.On("AddLoadedBinlogSize", size)}
}
func (_c *MockSegmentManager_AddLoadedBinlogSize_Call) Run(run func(size int64)) *MockSegmentManager_AddLoadedBinlogSize_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockSegmentManager_AddLoadedBinlogSize_Call) Return() *MockSegmentManager_AddLoadedBinlogSize_Call {
_c.Call.Return()
return _c
}
func (_c *MockSegmentManager_AddLoadedBinlogSize_Call) RunAndReturn(run func(int64)) *MockSegmentManager_AddLoadedBinlogSize_Call {
_c.Run(run)
return _c
}
// AddLogicalResource provides a mock function with given fields: usage
func (_m *MockSegmentManager) AddLogicalResource(usage ResourceUsage) {
_m.Called(usage)
@ -484,6 +517,51 @@ func (_c *MockSegmentManager_GetGrowing_Call) RunAndReturn(run func(int64) Segme
return _c
}
// GetLoadedBinlogSize provides a mock function with no fields
func (_m *MockSegmentManager) GetLoadedBinlogSize() int64 {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for GetLoadedBinlogSize")
}
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
return r0
}
// MockSegmentManager_GetLoadedBinlogSize_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLoadedBinlogSize'
type MockSegmentManager_GetLoadedBinlogSize_Call struct {
*mock.Call
}
// GetLoadedBinlogSize is a helper method to define mock.On call
func (_e *MockSegmentManager_Expecter) GetLoadedBinlogSize() *MockSegmentManager_GetLoadedBinlogSize_Call {
return &MockSegmentManager_GetLoadedBinlogSize_Call{Call: _e.mock.On("GetLoadedBinlogSize")}
}
func (_c *MockSegmentManager_GetLoadedBinlogSize_Call) Run(run func()) *MockSegmentManager_GetLoadedBinlogSize_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSegmentManager_GetLoadedBinlogSize_Call) Return(_a0 int64) *MockSegmentManager_GetLoadedBinlogSize_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSegmentManager_GetLoadedBinlogSize_Call) RunAndReturn(run func() int64) *MockSegmentManager_GetLoadedBinlogSize_Call {
_c.Call.Return(run)
return _c
}
// GetLogicalResource provides a mock function with no fields
func (_m *MockSegmentManager) GetLogicalResource() ResourceUsage {
ret := _m.Called()
@ -804,6 +882,39 @@ func (_c *MockSegmentManager_RemoveBy_Call) RunAndReturn(run func(context.Contex
return _c
}
// SubLoadedBinlogSize provides a mock function with given fields: size
func (_m *MockSegmentManager) SubLoadedBinlogSize(size int64) {
_m.Called(size)
}
// MockSegmentManager_SubLoadedBinlogSize_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SubLoadedBinlogSize'
type MockSegmentManager_SubLoadedBinlogSize_Call struct {
*mock.Call
}
// SubLoadedBinlogSize is a helper method to define mock.On call
// - size int64
func (_e *MockSegmentManager_Expecter) SubLoadedBinlogSize(size interface{}) *MockSegmentManager_SubLoadedBinlogSize_Call {
return &MockSegmentManager_SubLoadedBinlogSize_Call{Call: _e.mock.On("SubLoadedBinlogSize", size)}
}
func (_c *MockSegmentManager_SubLoadedBinlogSize_Call) Run(run func(size int64)) *MockSegmentManager_SubLoadedBinlogSize_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockSegmentManager_SubLoadedBinlogSize_Call) Return() *MockSegmentManager_SubLoadedBinlogSize_Call {
_c.Call.Return()
return _c
}
func (_c *MockSegmentManager_SubLoadedBinlogSize_Call) RunAndReturn(run func(int64)) *MockSegmentManager_SubLoadedBinlogSize_Call {
_c.Run(run)
return _c
}
// SubLogicalResource provides a mock function with given fields: usage
func (_m *MockSegmentManager) SubLogicalResource(usage ResourceUsage) {
_m.Called(usage)

View File

@ -312,6 +312,7 @@ type LocalSegment struct {
// cached results, to avoid too many CGO calls
memSize *atomic.Int64
binlogSize *atomic.Int64
rowNum *atomic.Int64
insertCount *atomic.Int64
@ -389,6 +390,7 @@ func NewSegment(ctx context.Context,
fieldJSONStats: make(map[int64]*querypb.JsonStatsInfo),
memSize: atomic.NewInt64(-1),
binlogSize: atomic.NewInt64(0),
rowNum: atomic.NewInt64(-1),
insertCount: atomic.NewInt64(0),
}
@ -1352,10 +1354,17 @@ func (s *LocalSegment) CreateTextIndex(ctx context.Context, fieldID int64) error
}
func (s *LocalSegment) FinishLoad() error {
err := s.csegment.FinishLoad()
if err != nil {
return err
}
// TODO: disable logical resource handling for now
// usage := s.ResourceUsageEstimate()
// s.manager.AddLogicalResource(usage)
return s.csegment.FinishLoad()
binlogSize := calculateSegmentMemorySize(s.LoadInfo())
s.manager.AddLoadedBinlogSize(binlogSize)
s.binlogSize.Store(binlogSize)
return nil
}
type ReleaseScope int
@ -1424,6 +1433,13 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) {
// usage := s.ResourceUsageEstimate()
// s.manager.SubLogicalResource(usage)
binlogSize := s.binlogSize.Load()
if binlogSize > 0 {
// no concurrent change to s.binlogSize, so the subtraction is safe
s.manager.SubLoadedBinlogSize(binlogSize)
s.binlogSize.Store(0)
}
log.Info("delete segment from memory")
}

View File

@ -244,6 +244,24 @@ func calculateSegmentLogSize(segmentLoadInfo *querypb.SegmentLoadInfo) int64 {
return segmentSize
}
func calculateSegmentMemorySize(segmentLoadInfo *querypb.SegmentLoadInfo) int64 {
segmentSize := int64(0)
for _, fieldBinlog := range segmentLoadInfo.BinlogPaths {
segmentSize += getBinlogDataMemorySize(fieldBinlog)
}
for _, fieldBinlog := range segmentLoadInfo.Statslogs {
segmentSize += getBinlogDataMemorySize(fieldBinlog)
}
for _, fieldBinlog := range segmentLoadInfo.Deltalogs {
segmentSize += getBinlogDataMemorySize(fieldBinlog)
}
return segmentSize
}
func getFieldSizeFromFieldBinlog(fieldBinlog *datapb.FieldBinlog) int64 {
fieldSize := int64(0)
for _, binlog := range fieldBinlog.Binlogs {

View File

@ -1378,6 +1378,22 @@ func (q *QuotaCenter) checkDiskQuota(denyWritingDBs map[int64]struct{}) error {
return err
}
// check disk quota of loaded collections
totalLoadedDiskQuota := Params.QuotaConfig.LoadedDiskQuota.GetAsFloat()
totalLoaded := 0.0
for _, queryNodeMetrics := range q.queryNodeMetrics {
// for streaming node, queryNodeMetrics.LoadedBinlogSize is always 0
totalLoaded += float64(queryNodeMetrics.LoadedBinlogSize)
}
if totalLoaded >= totalLoadedDiskQuota {
log.RatedWarn(10, "cluster loaded disk quota exceeded", zap.Float64("total loaded", totalLoaded), zap.Float64("total loaded disk quota", totalLoadedDiskQuota))
err := q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted, true, nil, nil, nil)
if err != nil {
log.Warn("fail to force deny writing", zap.Error(err))
}
return err
}
collectionDiskQuota := Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat()
dbSizeInfo := make(map[int64]int64)
collections := make([]int64, 0)
@ -1434,7 +1450,7 @@ func (q *QuotaCenter) checkDiskQuota(denyWritingDBs map[int64]struct{}) error {
func (q *QuotaCenter) checkDBDiskQuota(dbSizeInfo map[int64]int64) []int64 {
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
dbIDs := make([]int64, 0)
checkDiskQuota := func(dbID, binlogSize int64, quota float64) {
appendIfExceeded := func(dbID, binlogSize int64, quota float64) {
if float64(binlogSize) >= quota {
log.RatedWarn(10, "db disk quota exceeded",
zap.Int64("db", dbID),
@ -1451,7 +1467,7 @@ func (q *QuotaCenter) checkDBDiskQuota(dbSizeInfo map[int64]int64) []int64 {
if dbDiskQuotaStr := db.GetProperty(common.DatabaseDiskQuotaKey); dbDiskQuotaStr != "" {
if dbDiskQuotaBytes, err := strconv.ParseFloat(dbDiskQuotaStr, 64); err == nil {
dbDiskQuotaMB := dbDiskQuotaBytes * 1024 * 1024
checkDiskQuota(dbID, binlogSize, dbDiskQuotaMB)
appendIfExceeded(dbID, binlogSize, dbDiskQuotaMB)
continue
} else {
log.Warn("invalid configuration for diskQuota.mb",
@ -1460,7 +1476,7 @@ func (q *QuotaCenter) checkDBDiskQuota(dbSizeInfo map[int64]int64) []int64 {
}
}
}
checkDiskQuota(dbID, binlogSize, Params.QuotaConfig.DiskQuotaPerDB.GetAsFloat())
appendIfExceeded(dbID, binlogSize, Params.QuotaConfig.DiskQuotaPerDB.GetAsFloat())
}
return dbIDs
}

View File

@ -884,7 +884,7 @@ func TestQuotaCenter(t *testing.T) {
quotaCenter := NewQuotaCenter(pcm, dc, core.tsoAllocator, meta)
quotaCenter.checkDiskQuota(nil)
checkLimiter := func(notEquals ...int64) {
checkCollectionLimiter := func(notEquals ...int64) {
for db, collections := range quotaCenter.writableCollections {
for collection := range collections {
limiters := quotaCenter.rateLimiter.GetCollectionLimiters(db, collection).GetLimiters()
@ -907,11 +907,21 @@ func TestQuotaCenter(t *testing.T) {
}
}
checkClusterLimiter := func() {
root := quotaCenter.rateLimiter.GetRootLimiters().GetLimiters()
a, _ := root.Get(internalpb.RateType_DMLInsert)
assert.Equal(t, Limit(0), a.Limit())
b, _ := root.Get(internalpb.RateType_DMLUpsert)
assert.Equal(t, Limit(0), b.Limit())
c, _ := root.Get(internalpb.RateType_DMLDelete)
assert.NotEqual(t, Limit(0), c.Limit())
}
// total DiskQuota exceeded
paramtable.Get().Save(Params.QuotaConfig.DiskQuota.Key, "99")
paramtable.Get().Save(Params.QuotaConfig.DiskQuotaPerCollection.Key, "90")
quotaCenter.dataCoordMetrics = &metricsinfo.DataCoordQuotaMetrics{
TotalBinlogSize: 10 * 1024 * 1024,
TotalBinlogSize: 300 * 1024 * 1024,
CollectionBinlogSize: map[int64]int64{
1: 100 * 1024 * 1024,
2: 100 * 1024 * 1024,
@ -925,7 +935,7 @@ func TestQuotaCenter(t *testing.T) {
quotaCenter.collectionIDToDBID = collectionIDToDBID
quotaCenter.resetAllCurrentRates()
quotaCenter.checkDiskQuota(nil)
checkLimiter()
checkClusterLimiter()
paramtable.Get().Reset(Params.QuotaConfig.DiskQuota.Key)
paramtable.Get().Reset(Params.QuotaConfig.DiskQuotaPerCollection.Key)
@ -940,8 +950,24 @@ func TestQuotaCenter(t *testing.T) {
}
quotaCenter.resetAllCurrentRates()
quotaCenter.checkDiskQuota(nil)
checkLimiter(1)
checkCollectionLimiter(1)
paramtable.Get().Save(Params.QuotaConfig.DiskQuotaPerCollection.Key, colQuotaBackup)
// loaded DiskQuota exceeded
loadedQuotaBackup := Params.QuotaConfig.LoadedDiskQuota.GetValue()
paramtable.Get().Save(Params.QuotaConfig.LoadedDiskQuota.Key, "99")
quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{
1: {
LoadedBinlogSize: 100 * 1024 * 1024, // 100MB
},
}
quotaCenter.writableCollections = map[int64]map[int64][]int64{
0: collectionIDToPartitionIDs,
}
quotaCenter.resetAllCurrentRates()
quotaCenter.checkDiskQuota(nil)
checkClusterLimiter()
paramtable.Get().Save(Params.QuotaConfig.LoadedDiskQuota.Key, loadedQuotaBackup)
})
t.Run("test setRates", func(t *testing.T) {

View File

@ -62,6 +62,7 @@ type QueryNodeQuotaMetrics struct {
Rms []RateMetric
Fgm FlowGraphMetric
GrowingSegmentsSize int64
LoadedBinlogSize int64
Effect NodeEffect
DeleteBufferInfo DeleteBufferInfo
StreamingQuota *StreamingQuotaMetrics

View File

@ -153,6 +153,7 @@ type quotaConfig struct {
GrowingSegmentsSizeHighWaterLevel ParamItem `refreshable:"true"`
DiskProtectionEnabled ParamItem `refreshable:"true"`
DiskQuota ParamItem `refreshable:"true"`
LoadedDiskQuota ParamItem `refreshable:"true"`
DiskQuotaPerDB ParamItem `refreshable:"true"`
DiskQuotaPerCollection ParamItem `refreshable:"true"`
DiskQuotaPerPartition ParamItem `refreshable:"true"`
@ -1904,6 +1905,27 @@ but the rate will not be lower than minRateRatio * dmlRate.`,
}
p.DiskQuota.Init(base.mgr)
p.LoadedDiskQuota = ParamItem{
Key: "quotaAndLimits.limitWriting.diskProtection.loadedDiskQuota",
Version: "2.6.4",
DefaultValue: quota,
Formatter: func(v string) string {
if !p.DiskProtectionEnabled.GetAsBool() {
return max
}
level := getAsFloat(v)
// (0, +inf)
if level <= 0 {
return max
}
// megabytes to bytes
return fmt.Sprintf("%f", megaBytes2Bytes(level))
},
Doc: "MB, (0, +inf), default no limit",
Export: true,
}
p.LoadedDiskQuota.Init(base.mgr)
p.DiskQuotaPerDB = ParamItem{
Key: "quotaAndLimits.limitWriting.diskProtection.diskQuotaPerDB",
Version: "2.4.1",