mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
Make bulkLoad failed if quota exceeds and remove autoindex config (#19770)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com> Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
82dd1bba1c
commit
22b32465e7
@ -468,7 +468,3 @@ quotaAndLimits:
|
||||
maxReadResultRate: -1 # MB/s, default no limit
|
||||
# coolOffSpeed is the speed of search&query rates cool off.
|
||||
coolOffSpeed: 0.9 # (0, 1]
|
||||
|
||||
# AutoIndexConfig
|
||||
autoIndex:
|
||||
enable: false
|
||||
|
||||
@ -477,9 +477,18 @@ func (m *meta) UpdateFlushSegmentsInfo(
|
||||
s.NumOfRows = cp.GetNumOfRows()
|
||||
modSegments[cp.GetSegmentID()] = s
|
||||
}
|
||||
var totalSize int64
|
||||
segments := make([]*datapb.SegmentInfo, 0, len(modSegments))
|
||||
for _, seg := range modSegments {
|
||||
segments = append(segments, seg.SegmentInfo)
|
||||
totalSize += seg.getSegmentSize()
|
||||
}
|
||||
// check disk quota
|
||||
for _, seg := range m.segments.GetSegments() {
|
||||
totalSize += seg.getSegmentSize()
|
||||
}
|
||||
if float64(totalSize) >= Params.QuotaConfig.DiskQuota {
|
||||
return fmt.Errorf("UpdateFlushSegmentsInfo failed: disk quota exceeds if update, segID = %d", segmentID)
|
||||
}
|
||||
if err := m.catalog.AlterSegments(m.ctx, segments); err != nil {
|
||||
log.Error("meta update: update flush segments info - failed to store flush segment info into Etcd",
|
||||
|
||||
@ -514,6 +514,7 @@ func TestGetUnFlushedSegments(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUpdateFlushSegmentsInfo(t *testing.T) {
|
||||
Params.Init()
|
||||
t.Run("normal", func(t *testing.T) {
|
||||
meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "")
|
||||
assert.Nil(t, err)
|
||||
@ -590,6 +591,30 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
|
||||
assert.Nil(t, segmentInfo.Binlogs)
|
||||
assert.Nil(t, segmentInfo.StartPosition)
|
||||
})
|
||||
|
||||
t.Run("test exceed disk quota", func(t *testing.T) {
|
||||
meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "")
|
||||
assert.Nil(t, err)
|
||||
|
||||
diskQuotaBackup := Params.QuotaConfig.DiskQuota
|
||||
const (
|
||||
diskQuota = 5 * 1024 * 1024 * 1024
|
||||
segmentSize = 3 * 1024 * 1024 * 1024
|
||||
)
|
||||
Params.QuotaConfig.DiskQuota = diskQuota
|
||||
segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing,
|
||||
Binlogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: segmentSize}}}},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statslog0")}}}
|
||||
err = meta.AddSegment(segment1)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = meta.UpdateFlushSegmentsInfo(1, true, false, true, []*datapb.FieldBinlog{getFieldBinlogPaths(1, "binlog1")},
|
||||
[]*datapb.FieldBinlog{getFieldBinlogPaths(1, "statslog1")},
|
||||
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: segmentSize}}}},
|
||||
[]*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})
|
||||
assert.Error(t, err)
|
||||
Params.QuotaConfig.DiskQuota = diskQuotaBackup
|
||||
})
|
||||
}
|
||||
|
||||
func TestSaveHandoffMeta(t *testing.T) {
|
||||
|
||||
@ -437,6 +437,7 @@ func (p *quotaConfig) initTtProtectionEnabled() {
|
||||
|
||||
func (p *quotaConfig) initMaxTimeTickDelay() {
|
||||
if !p.TtProtectionEnabled {
|
||||
p.MaxTimeTickDelay = math.MaxInt64
|
||||
return
|
||||
}
|
||||
const defaultMaxTtDelay = 30.0
|
||||
@ -466,6 +467,7 @@ func (p *quotaConfig) initDataNodeMemoryLowWaterLevel() {
|
||||
|
||||
func (p *quotaConfig) initDataNodeMemoryHighWaterLevel() {
|
||||
if !p.MemProtectionEnabled {
|
||||
p.DataNodeMemoryHighWaterLevel = 1
|
||||
return
|
||||
}
|
||||
p.DataNodeMemoryHighWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.memProtection.dataNodeMemoryHighWaterLevel", defaultHighWaterLevel)
|
||||
@ -494,6 +496,7 @@ func (p *quotaConfig) initQueryNodeMemoryLowWaterLevel() {
|
||||
|
||||
func (p *quotaConfig) initQueryNodeMemoryHighWaterLevel() {
|
||||
if !p.MemProtectionEnabled {
|
||||
p.QueryNodeMemoryLowWaterLevel = defaultLowWaterLevel
|
||||
return
|
||||
}
|
||||
p.QueryNodeMemoryHighWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.memProtection.queryNodeMemoryHighWaterLevel", defaultHighWaterLevel)
|
||||
@ -537,6 +540,7 @@ func (p *quotaConfig) initQueueProtectionEnabled() {
|
||||
|
||||
func (p *quotaConfig) initNQInQueueThreshold() {
|
||||
if !p.QueueProtectionEnabled {
|
||||
p.NQInQueueThreshold = math.MaxInt64
|
||||
return
|
||||
}
|
||||
p.NQInQueueThreshold = p.Base.ParseInt64WithDefault("quotaAndLimits.limitReading.queueProtection.nqInQueueThreshold", math.MaxInt64)
|
||||
@ -548,6 +552,7 @@ func (p *quotaConfig) initNQInQueueThreshold() {
|
||||
|
||||
func (p *quotaConfig) initQueueLatencyThreshold() {
|
||||
if !p.QueueProtectionEnabled {
|
||||
p.QueueLatencyThreshold = defaultMax
|
||||
return
|
||||
}
|
||||
p.QueueLatencyThreshold = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.queueProtection.queueLatencyThreshold", defaultMax)
|
||||
@ -579,9 +584,6 @@ func (p *quotaConfig) initMaxReadResultRate() {
|
||||
func (p *quotaConfig) initCoolOffSpeed() {
|
||||
const defaultSpeed = 0.9
|
||||
p.CoolOffSpeed = defaultSpeed
|
||||
if !p.QueueProtectionEnabled {
|
||||
return
|
||||
}
|
||||
p.CoolOffSpeed = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.coolOffSpeed", defaultSpeed)
|
||||
// (0, 1]
|
||||
if p.CoolOffSpeed <= 0 || p.CoolOffSpeed > 1 {
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
package paramtable
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -84,8 +85,8 @@ func TestQuotaParam(t *testing.T) {
|
||||
t.Run("test limit reading", func(t *testing.T) {
|
||||
assert.False(t, qc.ForceDenyReading)
|
||||
assert.Equal(t, false, qc.QueueProtectionEnabled)
|
||||
assert.Equal(t, int64(0), qc.NQInQueueThreshold)
|
||||
assert.Equal(t, float64(0), qc.QueueLatencyThreshold)
|
||||
assert.Equal(t, int64(math.MaxInt64), qc.NQInQueueThreshold)
|
||||
assert.Equal(t, defaultMax, qc.QueueLatencyThreshold)
|
||||
assert.Equal(t, false, qc.ResultProtectionEnabled)
|
||||
assert.Equal(t, defaultMax, qc.MaxReadResultRate)
|
||||
assert.Equal(t, 0.9, qc.CoolOffSpeed)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user