mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
enhance: remove logical usage checks during segment loading (#44770)
issue: #41435 pr: #44743 Signed-off-by: Shawn Wang <shawn.wang@zilliz.com>
This commit is contained in:
parent
6ff30e8f9e
commit
c72a19d174
@ -488,20 +488,6 @@ queryNode:
|
||||
# Enable eviction for Tiered Storage. Defaults to false.
|
||||
# Note that if eviction is enabled, cache data loaded during sync warmup is also subject to eviction.
|
||||
evictionEnabled: false
|
||||
# This ratio estimates how much evictable memory can be cached.
|
||||
# The higher the ratio, the more physical memory is reserved for evictable memory,
|
||||
# resulting in fewer evictions but fewer segments can be loaded.
|
||||
# Conversely, a lower ratio results in more evictions but allows more segments to be loaded.
|
||||
# This parameter is only valid when eviction is enabled.
|
||||
# It defaults to 0.3 (meaning about 30% of evictable in-memory data can be cached), with a valid range of [0.0, 1.0].
|
||||
evictableMemoryCacheRatio: 0.3
|
||||
# This ratio estimates how much evictable disk space can be cached.
|
||||
# The higher the ratio, the more physical disk space is reserved for evictable disk usage,
|
||||
# resulting in fewer evictions but fewer segments can be loaded.
|
||||
# Conversely, a lower ratio results in more evictions but allows more segments to be loaded.
|
||||
# This parameter is only valid when eviction is enabled.
|
||||
# It defaults to 0.3 (meaning about 30% of evictable on-disk data can be cached), with a valid range of [0.0, 1.0].
|
||||
evictableDiskCacheRatio: 0.3
|
||||
# Enable background eviction for Tiered Storage. Defaults to false.
|
||||
# Background eviction is used to do periodic eviction in a separate thread.
|
||||
# And it will only work when both 'evictionEnabled' and 'backgroundEvictionEnabled' are set to 'true'.
|
||||
|
||||
@ -1351,8 +1351,9 @@ func (s *LocalSegment) CreateTextIndex(ctx context.Context, fieldID int64) error
|
||||
}
|
||||
|
||||
func (s *LocalSegment) FinishLoad() error {
|
||||
usage := s.ResourceUsageEstimate()
|
||||
s.manager.AddLogicalResource(usage)
|
||||
// TODO: disable logical resource handling for now
|
||||
// usage := s.ResourceUsageEstimate()
|
||||
// s.manager.AddLogicalResource(usage)
|
||||
return s.csegment.FinishLoad()
|
||||
}
|
||||
|
||||
@ -1418,9 +1419,9 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) {
|
||||
return nil, nil
|
||||
}).Await()
|
||||
|
||||
// release reserved resource after the segment resource is really released.
|
||||
usage := s.ResourceUsageEstimate()
|
||||
s.manager.SubLogicalResource(usage)
|
||||
// TODO: disable logical resource handling for now
|
||||
// usage := s.ResourceUsageEstimate()
|
||||
// s.manager.SubLogicalResource(usage)
|
||||
|
||||
log.Info("delete segment from memory")
|
||||
}
|
||||
|
||||
@ -294,7 +294,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
|
||||
log.Warn("request resource failed", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
defer loader.freeRequest(requestResourceResult.Resource, requestResourceResult.LogicalResource)
|
||||
defer loader.freeRequestResource(requestResourceResult)
|
||||
}
|
||||
newSegments := typeutil.NewConcurrentMap[int64, Segment]()
|
||||
loaded := typeutil.NewConcurrentMap[int64, Segment]()
|
||||
@ -504,12 +504,12 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
|
||||
|
||||
result.ConcurrencyLevel = funcutil.Min(hardware.GetCPUNum(), len(infos))
|
||||
|
||||
// check logical resource first
|
||||
lmu, ldu, err := loader.checkLogicalSegmentSize(ctx, infos, totalMemory)
|
||||
if err != nil {
|
||||
log.Warn("no sufficient logical resource to load segments", zap.Error(err))
|
||||
return result, err
|
||||
}
|
||||
// TODO: disable logical resource checking for now
|
||||
// lmu, ldu, err := loader.checkLogicalSegmentSize(ctx, infos, totalMemory)
|
||||
// if err != nil {
|
||||
// log.Warn("no sufficient logical resource to load segments", zap.Error(err))
|
||||
// return result, err
|
||||
// }
|
||||
|
||||
// then get physical resource usage for loading segments
|
||||
mu, du, err := loader.checkSegmentSize(ctx, infos, totalMemory, physicalMemoryUsage, physicalDiskUsage)
|
||||
@ -520,11 +520,11 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
|
||||
|
||||
result.Resource.MemorySize = mu
|
||||
result.Resource.DiskSize = du
|
||||
result.LogicalResource.MemorySize = lmu
|
||||
result.LogicalResource.DiskSize = ldu
|
||||
// result.LogicalResource.MemorySize = lmu
|
||||
// result.LogicalResource.DiskSize = ldu
|
||||
|
||||
loader.committedResource.Add(result.Resource)
|
||||
loader.committedLogicalResource.Add(result.LogicalResource)
|
||||
// loader.committedLogicalResource.Add(result.LogicalResource)
|
||||
log.Info("request resource for loading segments (unit in MiB)",
|
||||
zap.Float64("memory", logutil.ToMB(float64(result.Resource.MemorySize))),
|
||||
zap.Float64("committedMemory", logutil.ToMB(float64(loader.committedResource.MemorySize))),
|
||||
@ -535,11 +535,14 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// freeRequest returns request memory & storage usage request.
|
||||
func (loader *segmentLoader) freeRequest(resource LoadResource, logicalResource LoadResource) {
|
||||
// freeRequestResource returns request memory & storage usage request.
|
||||
func (loader *segmentLoader) freeRequestResource(requestResourceResult requestResourceResult) {
|
||||
loader.mut.Lock()
|
||||
defer loader.mut.Unlock()
|
||||
|
||||
resource := requestResourceResult.Resource
|
||||
// logicalResource := requestResourceResult.LogicalResource
|
||||
|
||||
if paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool() {
|
||||
C.ReleaseLoadingResource(C.CResourceUsage{
|
||||
memory_bytes: C.int64_t(resource.MemorySize),
|
||||
@ -548,7 +551,7 @@ func (loader *segmentLoader) freeRequest(resource LoadResource, logicalResource
|
||||
}
|
||||
|
||||
loader.committedResource.Sub(resource)
|
||||
loader.committedLogicalResource.Sub(logicalResource)
|
||||
// loader.committedLogicalResource.Sub(logicalResource)
|
||||
loader.committedResourceNotifier.NotifyAll()
|
||||
}
|
||||
|
||||
@ -1067,19 +1070,19 @@ func (loader *segmentLoader) LoadLazySegment(ctx context.Context,
|
||||
segment Segment,
|
||||
loadInfo *querypb.SegmentLoadInfo,
|
||||
) (err error) {
|
||||
resource, err := loader.requestResourceWithTimeout(ctx, loadInfo)
|
||||
result, err := loader.requestResourceWithTimeout(ctx, loadInfo)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Warn("request resource failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
// NOTE: logical resource is not used for lazy load, so set it to zero
|
||||
defer loader.freeRequest(resource, LoadResource{})
|
||||
defer loader.freeRequestResource(result)
|
||||
|
||||
return loader.LoadSegment(ctx, segment, loadInfo)
|
||||
}
|
||||
|
||||
// requestResourceWithTimeout requests memory & storage to load segments with a timeout and retry.
|
||||
func (loader *segmentLoader) requestResourceWithTimeout(ctx context.Context, infos ...*querypb.SegmentLoadInfo) (LoadResource, error) {
|
||||
func (loader *segmentLoader) requestResourceWithTimeout(ctx context.Context, infos ...*querypb.SegmentLoadInfo) (requestResourceResult, error) {
|
||||
retryInterval := paramtable.Get().QueryNodeCfg.LazyLoadRequestResourceRetryInterval.GetAsDuration(time.Millisecond)
|
||||
timeoutStarted := false
|
||||
for {
|
||||
@ -1087,7 +1090,7 @@ func (loader *segmentLoader) requestResourceWithTimeout(ctx context.Context, inf
|
||||
|
||||
result, err := loader.requestResource(ctx, infos...)
|
||||
if err == nil {
|
||||
return result.Resource, nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// start timeout if there's no committed resource in loading.
|
||||
@ -1106,7 +1109,7 @@ func (loader *segmentLoader) requestResourceWithTimeout(ctx context.Context, inf
|
||||
// if error is not caused by retry timeout, return it directly.
|
||||
if err != nil && !errors.Is(err, errRetryTimerNotified) {
|
||||
cancelWithRetryTimeout()
|
||||
return LoadResource{}, err
|
||||
return requestResourceResult{}, err
|
||||
}
|
||||
cancelWithRetryTimeout()
|
||||
}
|
||||
@ -1442,7 +1445,7 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment,
|
||||
log.Warn("request resource failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
defer loader.freeRequest(requestResourceResult.Resource, requestResourceResult.LogicalResource)
|
||||
defer loader.freeRequestResource(requestResourceResult)
|
||||
return loader.loadDeltalogs(ctx, segment, deltaLogs)
|
||||
}
|
||||
|
||||
@ -2185,7 +2188,7 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer loader.freeRequest(requestResourceResult.Resource, requestResourceResult.LogicalResource)
|
||||
defer loader.freeRequestResource(requestResourceResult)
|
||||
|
||||
log.Info("segment loader start to load index", zap.Int("segmentNumAfterFilter", len(infos)))
|
||||
metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadIndex").Inc()
|
||||
|
||||
@ -900,9 +900,9 @@ func (suite *SegmentLoaderDetailSuite) TestRequestResource() {
|
||||
|
||||
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.LazyLoadRequestResourceTimeout.Key, "500")
|
||||
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.LazyLoadRequestResourceRetryInterval.Key, "100")
|
||||
resource, err := suite.loader.requestResourceWithTimeout(context.Background(), loadInfo)
|
||||
result, err := suite.loader.requestResourceWithTimeout(context.Background(), loadInfo)
|
||||
suite.NoError(err)
|
||||
suite.EqualValues(1100000, resource.MemorySize)
|
||||
suite.EqualValues(1100000, result.Resource.MemorySize)
|
||||
|
||||
suite.loader.committedResource.Add(LoadResource{
|
||||
MemorySize: 1024 * 1024 * 1024 * 1024,
|
||||
@ -911,7 +911,7 @@ func (suite *SegmentLoaderDetailSuite) TestRequestResource() {
|
||||
timeoutErr := errors.New("timeout")
|
||||
ctx, cancel := contextutil.WithTimeoutCause(context.Background(), 1000*time.Millisecond, timeoutErr)
|
||||
defer cancel()
|
||||
resource, err = suite.loader.requestResourceWithTimeout(ctx, loadInfo)
|
||||
result, err = suite.loader.requestResourceWithTimeout(ctx, loadInfo)
|
||||
suite.Error(err)
|
||||
suite.ErrorIs(err, timeoutErr)
|
||||
})
|
||||
|
||||
@ -3305,7 +3305,7 @@ resulting in fewer evictions but fewer segments can be loaded.
|
||||
Conversely, a lower ratio results in more evictions but allows more segments to be loaded.
|
||||
This parameter is only valid when eviction is enabled.
|
||||
It defaults to 0.3 (meaning about 30% of evictable in-memory data can be cached), with a valid range of [0.0, 1.0].`,
|
||||
Export: true,
|
||||
Export: false, // TODO: disabled for now, no need to export
|
||||
}
|
||||
p.TieredEvictableMemoryCacheRatio.Init(base.mgr)
|
||||
|
||||
@ -3326,7 +3326,7 @@ resulting in fewer evictions but fewer segments can be loaded.
|
||||
Conversely, a lower ratio results in more evictions but allows more segments to be loaded.
|
||||
This parameter is only valid when eviction is enabled.
|
||||
It defaults to 0.3 (meaning about 30% of evictable on-disk data can be cached), with a valid range of [0.0, 1.0].`,
|
||||
Export: true,
|
||||
Export: false, // TODO: disabled for now, no need to export
|
||||
}
|
||||
p.TieredEvictableDiskCacheRatio.Init(base.mgr)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user