From bdd65871eacad0d5418b12d088d89c3aefc097c3 Mon Sep 17 00:00:00 2001 From: sparknack Date: Fri, 1 Aug 2025 21:31:37 +0800 Subject: [PATCH] enhance: tiered storage: estimate segment loading resource usage while considering eviction (#43323) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit issue: #41435 After introducing the caching layer's lazy loading and eviction mechanisms, most parts of a segment won't be loaded into memory or disk immediately, even if the segment is marked as LOADED. This means physical resource usage may be very low. However, we still need to reserve enough resources for the segments marked as LOADED. Thus, the logic of resource usage estimation during segment loading, which based on physcial resource usage only for now, should be changed. To address this issue, we introduced the concept of logical resource usage in this patch. This can be thought of as the base reserved resource for each LOADED segment. A segment’s logical resource usage is derived from its final evictable and inevictable resource usage and calculated as follows: ``` SLR = SFPIER + evitable_cache_ratio * SFPER ``` it also equals to ``` SLR = (SFPIER + SFPER) - (1.0 - evitable_cache_ratio) * SFPER ``` `SLR`: The logical resource usage of a segment. `SFPIER`: The final physical inevictable resource usage of a segment. `SFPER`: The final physical evictable resource usage of a segment. `evitable_cache_ratio`: The ratio of a segment's evictable resources that can be cached locally. The higher the ratio, the more physical memory is reserved for evictable memory. When loading a segment, two types of resource usage are taken into account. First is the estimated maximum physical resource usage: ``` PPR = HPR + CPR + SMPR - SFPER ``` `PPR`: The predicted physical resource usage after the current segment is allowed to load. `HPR`: The physical resource usage obtained from hardware information. `CPR`: The total physical resource usage of segments that have been committed but not yet loaded. When one new segment is allow to load, `CPR' = CPR + (SMR - SER)`. When one of the committed segments is loaded, `CPR' = CPR - (SMR - SER)`. `SMPR`: The maximum physical resource usage of the current segment. `SFPER`: The final physical evictable resource usage of the current segment. Second is the estimated logical resource usage, this check is only valid when eviction is enabled: ``` PLR = LLR + CLR + SLR ``` `PLR`: The predicted logical resource usage after the current segment is allowed to load. `LLR`: The total logical resource usage of all loaded segments. When a new segment is loaded, `LLR` should be updated to `LLR' = LLR + SLR`. `CLR`: The total logical resource usage of segments that have been committed but not yet loaded. When one new segment is allow to load, `CLR' = CLR + SLR`. When one of the committed segments is loaded, `CLR' = CLR - SLR`. `SLR`: The logical resource usage of the current segment. Only when `PPR < PRL && PLR < PRL` (`PRL`: Physical resource limit of the querynode), the segment is allowed to be loaded. --------- Signed-off-by: Shawn Wang --- configs/milvus.yaml | 14 ++ .../querynodev2/delegator/delegator_data.go | 1 + internal/querynodev2/segments/manager.go | 47 ++++ internal/querynodev2/segments/manager_test.go | 1 + .../segments/mock_segment_manager.go | 111 ++++++++++ .../querynodev2/segments/retrieve_test.go | 2 + internal/querynodev2/segments/search_test.go | 2 + internal/querynodev2/segments/segment.go | 24 +- .../querynodev2/segments/segment_loader.go | 205 +++++++++++++++--- internal/querynodev2/segments/segment_test.go | 2 + internal/querynodev2/server_test.go | 1 + pkg/util/paramtable/component_param.go | 44 ++++ 12 files changed, 415 insertions(+), 39 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 8a6a2d3514..48486380e5 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -481,6 +481,20 @@ queryNode: # Enable eviction for Tiered Storage. Defaults to false. # Note that if eviction is enabled, cache data loaded during sync warmup is also subject to eviction. evictionEnabled: false + # This ratio estimates how much evictable memory can be cached. + # The higher the ratio, the more physical memory is reserved for evictable memory, + # resulting in fewer evictions but fewer segments can be loaded. + # Conversely, a lower ratio results in more evictions but allows more segments to be loaded. + # This parameter is only valid when eviction is enabled. + # It defaults to 1.0 (meaning all evictable memory is cached), with a valid range of [0.0, 1.0]. + evictableMemoryCacheRatio: 1 + # This ratio estimates how much evictable disk space can be cached. + # The higher the ratio, the more physical disk space is reserved for evictable disk usage, + # resulting in fewer evictions but fewer segments can be loaded. + # Conversely, a lower ratio results in more evictions but allows more segments to be loaded. + # This parameter is only valid when eviction is enabled. + # It defaults to 1.0 (meaning all evictable disk is cached), with a valid range of [0.0, 1.0]. + evictableDiskCacheRatio: 1 # Time in seconds after which an unaccessed cache cell will be evicted. # If a cached data hasn't been accessed again after this time since its last access, it will be evicted. # If set to 0, time based eviction is disabled. diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index a8790c251b..e4c2594458 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -100,6 +100,7 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { growing, err = segments.NewSegment( context.Background(), sd.collection, + sd.segmentManager, segments.SegmentTypeGrowing, 0, &querypb.SegmentLoadInfo{ diff --git a/internal/querynodev2/segments/manager.go b/internal/querynodev2/segments/manager.go index a96b06cb70..2226f98d34 100644 --- a/internal/querynodev2/segments/manager.go +++ b/internal/querynodev2/segments/manager.go @@ -27,6 +27,7 @@ import "C" import ( "context" "fmt" + "sync" "go.uber.org/zap" "golang.org/x/sync/singleflight" @@ -197,6 +198,10 @@ type SegmentManager interface { // Deprecated: quick fix critical issue: #30857 // TODO: All Segment assigned to querynode should be managed by SegmentManager, including loading or releasing to perform a transaction. Exist(segmentID typeutil.UniqueID, typ SegmentType) bool + + AddLogicalResource(usage ResourceUsage) + SubLogicalResource(usage ResourceUsage) + GetLogicalResource() ResourceUsage } var _ SegmentManager = (*segmentManager)(nil) @@ -366,6 +371,12 @@ type segmentManager struct { growingOnReleasingSegments *typeutil.ConcurrentSet[int64] sealedOnReleasingSegments *typeutil.ConcurrentSet[int64] + + // logicalResource is the logical resource usage for all loaded segments of this querynode segment manager, + // which is to avoid memory and disk pressure when loading too many segments after eviction is enabled. + // only MemorySize and DiskSize are used, other fields are ignored. + logicalResource ResourceUsage + logicalResourceLock sync.Mutex } func NewSegmentManager() *segmentManager { @@ -374,9 +385,45 @@ func NewSegmentManager() *segmentManager { secondaryIndex: newSecondarySegmentIndex(), growingOnReleasingSegments: typeutil.NewConcurrentSet[int64](), sealedOnReleasingSegments: typeutil.NewConcurrentSet[int64](), + logicalResourceLock: sync.Mutex{}, } } +func (mgr *segmentManager) AddLogicalResource(usage ResourceUsage) { + mgr.logicalResourceLock.Lock() + defer mgr.logicalResourceLock.Unlock() + + mgr.logicalResource.MemorySize += usage.MemorySize + mgr.logicalResource.DiskSize += usage.DiskSize +} + +func (mgr *segmentManager) SubLogicalResource(usage ResourceUsage) { + mgr.logicalResourceLock.Lock() + defer mgr.logicalResourceLock.Unlock() + + // avoid overflow of memory and disk size + if mgr.logicalResource.MemorySize < usage.MemorySize { + mgr.logicalResource.MemorySize = 0 + log.Warn("Logical memory size would be negative, setting to 0") + } else { + mgr.logicalResource.MemorySize -= usage.MemorySize + } + + if mgr.logicalResource.DiskSize < usage.DiskSize { + mgr.logicalResource.DiskSize = 0 + log.Warn("Logical disk size would be negative, setting to 0") + } else { + mgr.logicalResource.DiskSize -= usage.DiskSize + } +} + +func (mgr *segmentManager) GetLogicalResource() ResourceUsage { + mgr.logicalResourceLock.Lock() + defer mgr.logicalResourceLock.Unlock() + + return mgr.logicalResource +} + // put is the internal put method updating both global segments and secondary index. func (mgr *segmentManager) put(ctx context.Context, segmentType SegmentType, segment Segment) { mgr.globalSegments.Put(ctx, segmentType, segment) diff --git a/internal/querynodev2/segments/manager_test.go b/internal/querynodev2/segments/manager_test.go index 7d4b1568a8..30931385e6 100644 --- a/internal/querynodev2/segments/manager_test.go +++ b/internal/querynodev2/segments/manager_test.go @@ -59,6 +59,7 @@ func (s *ManagerSuite) SetupTest() { segment, err := NewSegment( context.Background(), collection, + s.mgr, s.types[i], 0, &querypb.SegmentLoadInfo{ diff --git a/internal/querynodev2/segments/mock_segment_manager.go b/internal/querynodev2/segments/mock_segment_manager.go index b069a54926..ebebbcd3c6 100644 --- a/internal/querynodev2/segments/mock_segment_manager.go +++ b/internal/querynodev2/segments/mock_segment_manager.go @@ -25,6 +25,39 @@ func (_m *MockSegmentManager) EXPECT() *MockSegmentManager_Expecter { return &MockSegmentManager_Expecter{mock: &_m.Mock} } +// AddLogicalResource provides a mock function with given fields: usage +func (_m *MockSegmentManager) AddLogicalResource(usage ResourceUsage) { + _m.Called(usage) +} + +// MockSegmentManager_AddLogicalResource_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddLogicalResource' +type MockSegmentManager_AddLogicalResource_Call struct { + *mock.Call +} + +// AddLogicalResource is a helper method to define mock.On call +// - usage ResourceUsage +func (_e *MockSegmentManager_Expecter) AddLogicalResource(usage interface{}) *MockSegmentManager_AddLogicalResource_Call { + return &MockSegmentManager_AddLogicalResource_Call{Call: _e.mock.On("AddLogicalResource", usage)} +} + +func (_c *MockSegmentManager_AddLogicalResource_Call) Run(run func(usage ResourceUsage)) *MockSegmentManager_AddLogicalResource_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(ResourceUsage)) + }) + return _c +} + +func (_c *MockSegmentManager_AddLogicalResource_Call) Return() *MockSegmentManager_AddLogicalResource_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSegmentManager_AddLogicalResource_Call) RunAndReturn(run func(ResourceUsage)) *MockSegmentManager_AddLogicalResource_Call { + _c.Run(run) + return _c +} + // Clear provides a mock function with given fields: ctx func (_m *MockSegmentManager) Clear(ctx context.Context) { _m.Called(ctx) @@ -451,6 +484,51 @@ func (_c *MockSegmentManager_GetGrowing_Call) RunAndReturn(run func(int64) Segme return _c } +// GetLogicalResource provides a mock function with no fields +func (_m *MockSegmentManager) GetLogicalResource() ResourceUsage { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetLogicalResource") + } + + var r0 ResourceUsage + if rf, ok := ret.Get(0).(func() ResourceUsage); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(ResourceUsage) + } + + return r0 +} + +// MockSegmentManager_GetLogicalResource_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLogicalResource' +type MockSegmentManager_GetLogicalResource_Call struct { + *mock.Call +} + +// GetLogicalResource is a helper method to define mock.On call +func (_e *MockSegmentManager_Expecter) GetLogicalResource() *MockSegmentManager_GetLogicalResource_Call { + return &MockSegmentManager_GetLogicalResource_Call{Call: _e.mock.On("GetLogicalResource")} +} + +func (_c *MockSegmentManager_GetLogicalResource_Call) Run(run func()) *MockSegmentManager_GetLogicalResource_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSegmentManager_GetLogicalResource_Call) Return(_a0 ResourceUsage) *MockSegmentManager_GetLogicalResource_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSegmentManager_GetLogicalResource_Call) RunAndReturn(run func() ResourceUsage) *MockSegmentManager_GetLogicalResource_Call { + _c.Call.Return(run) + return _c +} + // GetSealed provides a mock function with given fields: segmentID func (_m *MockSegmentManager) GetSealed(segmentID int64) Segment { ret := _m.Called(segmentID) @@ -726,6 +804,39 @@ func (_c *MockSegmentManager_RemoveBy_Call) RunAndReturn(run func(context.Contex return _c } +// SubLogicalResource provides a mock function with given fields: usage +func (_m *MockSegmentManager) SubLogicalResource(usage ResourceUsage) { + _m.Called(usage) +} + +// MockSegmentManager_SubLogicalResource_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SubLogicalResource' +type MockSegmentManager_SubLogicalResource_Call struct { + *mock.Call +} + +// SubLogicalResource is a helper method to define mock.On call +// - usage ResourceUsage +func (_e *MockSegmentManager_Expecter) SubLogicalResource(usage interface{}) *MockSegmentManager_SubLogicalResource_Call { + return &MockSegmentManager_SubLogicalResource_Call{Call: _e.mock.On("SubLogicalResource", usage)} +} + +func (_c *MockSegmentManager_SubLogicalResource_Call) Run(run func(usage ResourceUsage)) *MockSegmentManager_SubLogicalResource_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(ResourceUsage)) + }) + return _c +} + +func (_c *MockSegmentManager_SubLogicalResource_Call) Return() *MockSegmentManager_SubLogicalResource_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSegmentManager_SubLogicalResource_Call) RunAndReturn(run func(ResourceUsage)) *MockSegmentManager_SubLogicalResource_Call { + _c.Run(run) + return _c +} + // Unpin provides a mock function with given fields: _a0 func (_m *MockSegmentManager) Unpin(_a0 []Segment) { _m.Called(_a0) diff --git a/internal/querynodev2/segments/retrieve_test.go b/internal/querynodev2/segments/retrieve_test.go index 0efa4a6a81..4f3edd6ca8 100644 --- a/internal/querynodev2/segments/retrieve_test.go +++ b/internal/querynodev2/segments/retrieve_test.go @@ -88,6 +88,7 @@ func (suite *RetrieveSuite) SetupTest() { suite.sealed, err = NewSegment(ctx, suite.collection, + suite.manager.Segment, SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{ @@ -117,6 +118,7 @@ func (suite *RetrieveSuite) SetupTest() { suite.growing, err = NewSegment(ctx, suite.collection, + suite.manager.Segment, SegmentTypeGrowing, 0, &querypb.SegmentLoadInfo{ diff --git a/internal/querynodev2/segments/search_test.go b/internal/querynodev2/segments/search_test.go index 6181dce738..1772a2636a 100644 --- a/internal/querynodev2/segments/search_test.go +++ b/internal/querynodev2/segments/search_test.go @@ -80,6 +80,7 @@ func (suite *SearchSuite) SetupTest() { suite.sealed, err = NewSegment(ctx, suite.collection, + suite.manager.Segment, SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{ @@ -109,6 +110,7 @@ func (suite *SearchSuite) SetupTest() { suite.growing, err = NewSegment(ctx, suite.collection, + suite.manager.Segment, SegmentTypeGrowing, 0, &querypb.SegmentLoadInfo{ diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index b9af7114b7..ce2d6acdc4 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -228,7 +228,7 @@ func (s *baseSegment) BatchPkExist(lc *storage.BatchLocationsCache) []bool { return s.bloomFilterSet.BatchPkExist(lc) } -// ResourceUsageEstimate returns the estimated resource usage of the segment. +// ResourceUsageEstimate returns the final estimated resource usage of the segment. func (s *baseSegment) ResourceUsageEstimate() ResourceUsage { if s.segmentType == SegmentTypeGrowing { // Growing segment cannot do resource usage estimate. @@ -240,10 +240,14 @@ func (s *baseSegment) ResourceUsageEstimate() ResourceUsage { } usage, err := getResourceUsageEstimateOfSegment(s.collection.Schema(), s.LoadInfo(), resourceEstimateFactor{ - memoryUsageFactor: 1.0, - memoryIndexUsageFactor: 1.0, - EnableInterminSegmentIndex: false, - deltaDataExpansionFactor: paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.GetAsFloat(), + memoryUsageFactor: 1.0, + memoryIndexUsageFactor: 1.0, + EnableInterminSegmentIndex: false, + tempSegmentIndexFactor: 0.0, + deltaDataExpansionFactor: paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.GetAsFloat(), + TieredEvictionEnabled: paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool(), + TieredEvictableMemoryCacheRatio: paramtable.Get().QueryNodeCfg.TieredEvictableMemoryCacheRatio.GetAsFloat(), + TieredEvictableDiskCacheRatio: paramtable.Get().QueryNodeCfg.TieredEvictableDiskCacheRatio.GetAsFloat(), }) if err != nil { // Should never failure, if failed, segment should never be loaded. @@ -280,6 +284,7 @@ var _ Segment = (*LocalSegment)(nil) // Segment is a wrapper of the underlying C-structure segment. type LocalSegment struct { baseSegment + manager SegmentManager ptrLock *state.LoadStateLock ptr C.CSegmentInterface // TODO: Remove in future, after move load index into segcore package. // always keep same with csegment.RawPtr(), for eaiser to access, @@ -298,6 +303,7 @@ type LocalSegment struct { func NewSegment(ctx context.Context, collection *Collection, + manager SegmentManager, segmentType SegmentType, version int64, loadInfo *querypb.SegmentLoadInfo, @@ -352,6 +358,7 @@ func NewSegment(ctx context.Context, segment := &LocalSegment{ baseSegment: base, + manager: manager, ptrLock: locker, ptr: C.CSegmentInterface(csegment.RawPointer()), csegment: csegment, @@ -1249,6 +1256,8 @@ func (s *LocalSegment) CreateTextIndex(ctx context.Context, fieldID int64) error } func (s *LocalSegment) FinishLoad() error { + usage := s.ResourceUsageEstimate() + s.manager.AddLogicalResource(usage) return s.csegment.FinishLoad() } @@ -1304,11 +1313,16 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) { return } + usage := s.ResourceUsageEstimate() + GetDynamicPool().Submit(func() (any, error) { C.DeleteSegment(ptr) return nil, nil }).Await() + // release reserved resource after the segment resource is really released. + s.manager.SubLogicalResource(usage) + log.Info("delete segment from memory") } diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 75929b0116..32ca4f1cdc 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -122,6 +122,7 @@ func GetResourceEstimate(estimate *C.LoadResourceRequest) ResourceEstimate { type requestResourceResult struct { Resource LoadResource + LogicalResource LoadResource CommittedResource LoadResource ConcurrencyLevel int } @@ -146,11 +147,14 @@ func (r *LoadResource) IsZero() bool { } type resourceEstimateFactor struct { - memoryUsageFactor float64 - memoryIndexUsageFactor float64 - EnableInterminSegmentIndex bool - tempSegmentIndexFactor float64 - deltaDataExpansionFactor float64 + memoryUsageFactor float64 + memoryIndexUsageFactor float64 + EnableInterminSegmentIndex bool + tempSegmentIndexFactor float64 + deltaDataExpansionFactor float64 + TieredEvictionEnabled bool + TieredEvictableMemoryCacheRatio float64 + TieredEvictableDiskCacheRatio float64 } func NewLoader( @@ -223,6 +227,7 @@ type segmentLoader struct { mut sync.Mutex // guards committedResource committedResource LoadResource + committedLogicalResource LoadResource committedResourceNotifier *syncutil.VersionedNotifier duf *diskUsageFetcher @@ -290,7 +295,7 @@ func (loader *segmentLoader) Load(ctx context.Context, log.Warn("request resource failed", zap.Error(err)) return nil, err } - defer loader.freeRequest(requestResourceResult.Resource) + defer loader.freeRequest(requestResourceResult.Resource, requestResourceResult.LogicalResource) } newSegments := typeutil.NewConcurrentMap[int64, Segment]() loaded := typeutil.NewConcurrentMap[int64, Segment]() @@ -312,6 +317,7 @@ func (loader *segmentLoader) Load(ctx context.Context, segment, err := NewSegment( ctx, collection, + loader.manager.Segment, segmentType, version, loadInfo, @@ -466,10 +472,10 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer zap.Int64s("segmentIDs", segmentIDs), ) - memoryUsage := hardware.GetUsedMemoryCount() + physicalMemoryUsage := hardware.GetUsedMemoryCount() totalMemory := hardware.GetMemoryCount() - diskUsage, err := loader.duf.GetDiskUsage() + physicalDiskUsage, err := loader.duf.GetDiskUsage() if err != nil { return requestResourceResult{}, errors.Wrap(err, "get local used size failed") } @@ -482,26 +488,35 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer CommittedResource: loader.committedResource, } - if loader.committedResource.MemorySize+memoryUsage >= totalMemory { - return result, merr.WrapErrServiceMemoryLimitExceeded(float32(loader.committedResource.MemorySize+memoryUsage), float32(totalMemory)) - } else if loader.committedResource.DiskSize+uint64(diskUsage) >= diskCap { - return result, merr.WrapErrServiceDiskLimitExceeded(float32(loader.committedResource.DiskSize+uint64(diskUsage)), float32(diskCap)) + if loader.committedResource.MemorySize+physicalMemoryUsage >= totalMemory { + return result, merr.WrapErrServiceMemoryLimitExceeded(float32(loader.committedResource.MemorySize+physicalMemoryUsage), float32(totalMemory)) + } else if loader.committedResource.DiskSize+uint64(physicalDiskUsage) >= diskCap { + return result, merr.WrapErrServiceDiskLimitExceeded(float32(loader.committedResource.DiskSize+uint64(physicalDiskUsage)), float32(diskCap)) } result.ConcurrencyLevel = funcutil.Min(hardware.GetCPUNum(), len(infos)) - mu, du, err := loader.checkSegmentSize(ctx, infos, memoryUsage, totalMemory, diskUsage) + mu, du, err := loader.checkSegmentSize(ctx, infos, totalMemory, physicalMemoryUsage, physicalDiskUsage) if err != nil { log.Warn("no sufficient resource to load segments", zap.Error(err)) return result, err } - result.Resource.MemorySize += mu - result.Resource.DiskSize += du + lmu, ldu, err := loader.checkLogicalSegmentSize(ctx, infos, totalMemory) + if err != nil { + log.Warn("no sufficient resource to load segments", zap.Error(err)) + return result, err + } + + result.Resource.MemorySize = mu + result.Resource.DiskSize = du + result.LogicalResource.MemorySize = lmu + result.LogicalResource.DiskSize = ldu toMB := func(mem uint64) float64 { return float64(mem) / 1024 / 1024 } loader.committedResource.Add(result.Resource) + loader.committedLogicalResource.Add(result.LogicalResource) log.Info("request resource for loading segments (unit in MiB)", zap.Float64("memory", toMB(result.Resource.MemorySize)), zap.Float64("committedMemory", toMB(loader.committedResource.MemorySize)), @@ -513,11 +528,12 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer } // freeRequest returns request memory & storage usage request. -func (loader *segmentLoader) freeRequest(resource LoadResource) { +func (loader *segmentLoader) freeRequest(resource LoadResource, logicalResource LoadResource) { loader.mut.Lock() defer loader.mut.Unlock() loader.committedResource.Sub(resource) + loader.committedLogicalResource.Sub(logicalResource) loader.committedResourceNotifier.NotifyAll() } @@ -983,7 +999,8 @@ func (loader *segmentLoader) LoadLazySegment(ctx context.Context, log.Ctx(ctx).Warn("request resource failed", zap.Error(err)) return err } - defer loader.freeRequest(resource) + // NOTE: logical resource is not used for lazy load, so set it to zero + defer loader.freeRequest(resource, LoadResource{}) return loader.LoadSegment(ctx, segment, loadInfo) } @@ -1352,7 +1369,7 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, log.Warn("request resource failed", zap.Error(err)) return err } - defer loader.freeRequest(requestResourceResult.Resource) + defer loader.freeRequest(requestResourceResult.Resource, requestResourceResult.LogicalResource) return loader.loadDeltalogs(ctx, segment, deltaLogs) } @@ -1431,10 +1448,97 @@ func JoinIDPath(ids ...int64) string { return path.Join(idStr...) } +// After introducing the caching layer's lazy loading and eviction mechanisms, most parts of a segment won't be +// loaded into memory or disk immediately, even if the segment is marked as LOADED. This means physical resource +// usage may be very low. +// However, we still need to reserve enough resources for the segments marked as LOADED. The reserved resource is +// treated as the logical resource usage. Logical resource usage is based on the segment final resource usage. +// checkLogicalSegmentSize checks whether the memory & disk is sufficient to load the segments, +// returns the memory & disk logical usage while loading if possible to load, otherwise, returns error +func (loader *segmentLoader) checkLogicalSegmentSize(ctx context.Context, segmentLoadInfos []*querypb.SegmentLoadInfo, totalMem uint64) (uint64, uint64, error) { + if !paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool() { + return 0, 0, nil + } + + if len(segmentLoadInfos) == 0 { + return 0, 0, nil + } + + log := log.Ctx(ctx).With( + zap.Int64("collectionID", segmentLoadInfos[0].GetCollectionID()), + ) + + toMB := func(mem uint64) float64 { + return float64(mem) / 1024 / 1024 + } + + logicalMemUsage := loader.manager.Segment.GetLogicalResource().MemorySize + logicalDiskUsage := loader.manager.Segment.GetLogicalResource().DiskSize + + logicalMemUsage += loader.committedLogicalResource.MemorySize + logicalDiskUsage += loader.committedLogicalResource.DiskSize + + // logical resource usage is based on the segment final resource usage, + // so we need to estimate the final resource usage of the segments + finalFactor := resourceEstimateFactor{ + memoryUsageFactor: 1.0, + memoryIndexUsageFactor: 1.0, + EnableInterminSegmentIndex: false, + tempSegmentIndexFactor: 0.0, + deltaDataExpansionFactor: paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.GetAsFloat(), + TieredEvictionEnabled: paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool(), + TieredEvictableMemoryCacheRatio: paramtable.Get().QueryNodeCfg.TieredEvictableMemoryCacheRatio.GetAsFloat(), + TieredEvictableDiskCacheRatio: paramtable.Get().QueryNodeCfg.TieredEvictableDiskCacheRatio.GetAsFloat(), + } + predictLogicalMemUsage := logicalMemUsage + predictLogicalDiskUsage := logicalDiskUsage + for _, loadInfo := range segmentLoadInfos { + collection := loader.manager.Collection.Get(loadInfo.GetCollectionID()) + finalUsage, err := getResourceUsageEstimateOfSegment(collection.Schema(), loadInfo, finalFactor) + if err != nil { + log.Warn( + "failed to estimate final resource usage of segment", + zap.Int64("collectionID", loadInfo.GetCollectionID()), + zap.Int64("segmentID", loadInfo.GetSegmentID()), + zap.Error(err)) + return 0, 0, err + } + + log.Debug("segment logical resource for loading", + zap.Int64("segmentID", loadInfo.GetSegmentID()), + zap.Float64("memoryUsage(MB)", toMB(finalUsage.MemorySize)), + zap.Float64("diskUsage(MB)", toMB(finalUsage.DiskSize)), + ) + predictLogicalDiskUsage += finalUsage.DiskSize + predictLogicalMemUsage += finalUsage.MemorySize + } + + log.Info("predict memory and disk logical usage after loaded (in MiB)", + zap.Float64("predictLogicalMemUsage(MB)", toMB(predictLogicalMemUsage)), + zap.Float64("predictLogicalDiskUsage(MB)", toMB(predictLogicalDiskUsage)), + ) + + if predictLogicalMemUsage > uint64(float64(totalMem)*paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat()) { + return 0, 0, fmt.Errorf("load segment failed, OOM if load, predictMemUsage = %v MB, totalMem = %v MB thresholdFactor = %f", + toMB(predictLogicalMemUsage), + toMB(totalMem), + paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat()) + } + + if predictLogicalDiskUsage > uint64(float64(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64())*paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()) { + return 0, 0, merr.WrapErrServiceDiskLimitExceeded(float32(predictLogicalDiskUsage), float32(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64()), fmt.Sprintf("load segment failed, disk space is not enough, predictDiskUsage = %v MB, totalDisk = %v MB, thresholdFactor = %f", + toMB(predictLogicalDiskUsage), + toMB(uint64(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64())), + paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat())) + } + + return predictLogicalMemUsage - logicalMemUsage, predictLogicalDiskUsage - logicalDiskUsage, nil +} + // checkSegmentSize checks whether the memory & disk is sufficient to load the segments // returns the memory & disk usage while loading if possible to load, // otherwise, returns error -func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadInfos []*querypb.SegmentLoadInfo, memUsage, totalMem uint64, localDiskUsage int64) (uint64, uint64, error) { +func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadInfos []*querypb.SegmentLoadInfo, totalMem, memUsage uint64, localDiskUsage int64) (uint64, uint64, error) { if len(segmentLoadInfos) == 0 { return 0, 0, nil } @@ -1454,12 +1558,17 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn diskUsage := uint64(localDiskUsage) + loader.committedResource.DiskSize - factor := resourceEstimateFactor{ + maxFactor := resourceEstimateFactor{ memoryUsageFactor: paramtable.Get().QueryNodeCfg.LoadMemoryUsageFactor.GetAsFloat(), memoryIndexUsageFactor: paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), EnableInterminSegmentIndex: paramtable.Get().QueryNodeCfg.EnableInterminSegmentIndex.GetAsBool(), tempSegmentIndexFactor: paramtable.Get().QueryNodeCfg.InterimIndexMemExpandRate.GetAsFloat(), deltaDataExpansionFactor: paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.GetAsFloat(), + TieredEvictionEnabled: paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool(), + // NOTE: when tiered eviction is enabled, maxUsage should only consider the inevictable memory & disk usage. + // All evictable memory & disk usage should be removed from estimation, so set both cache ratios to 0. + TieredEvictableMemoryCacheRatio: 0.0, + TieredEvictableDiskCacheRatio: 0.0, } maxSegmentSize := uint64(0) predictMemUsage := memUsage @@ -1468,10 +1577,10 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn mmapFieldCount := 0 for _, loadInfo := range segmentLoadInfos { collection := loader.manager.Collection.Get(loadInfo.GetCollectionID()) - usage, err := getResourceUsageEstimateOfSegment(collection.Schema(), loadInfo, factor) + loadingUsage, err := getResourceUsageEstimateOfSegment(collection.Schema(), loadInfo, maxFactor) if err != nil { log.Warn( - "failed to estimate resource usage of segment", + "failed to estimate max resource usage of segment", zap.Int64("collectionID", loadInfo.GetCollectionID()), zap.Int64("segmentID", loadInfo.GetSegmentID()), zap.Error(err)) @@ -1480,16 +1589,16 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn log.Debug("segment resource for loading", zap.Int64("segmentID", loadInfo.GetSegmentID()), - zap.Float64("memoryUsage(MB)", toMB(usage.MemorySize)), - zap.Float64("diskUsage(MB)", toMB(usage.DiskSize)), - zap.Float64("memoryLoadFactor", factor.memoryUsageFactor), + zap.Float64("memoryUsage(MB)", toMB(loadingUsage.MemorySize)), + zap.Float64("diskUsage(MB)", toMB(loadingUsage.DiskSize)), + zap.Float64("memoryLoadFactor", maxFactor.memoryUsageFactor), ) - mmapFieldCount += usage.MmapFieldCount - predictDiskUsage += usage.DiskSize - predictMemUsage += usage.MemorySize - predictGpuMemUsage = usage.FieldGpuMemorySize - if usage.MemorySize > maxSegmentSize { - maxSegmentSize = usage.MemorySize + mmapFieldCount += loadingUsage.MmapFieldCount + predictDiskUsage += loadingUsage.DiskSize + predictMemUsage += loadingUsage.MemorySize + predictGpuMemUsage = loadingUsage.FieldGpuMemorySize + if loadingUsage.MemorySize > maxSegmentSize { + maxSegmentSize = loadingUsage.MemorySize } } @@ -1532,6 +1641,7 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn // getResourceUsageEstimateOfSegment estimates the resource usage of the segment func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadInfo *querypb.SegmentLoadInfo, multiplyFactor resourceEstimateFactor) (usage *ResourceUsage, err error) { var segmentMemorySize, segmentDiskSize uint64 + var segmentEvictableMemorySize, segmentEvictableDiskSize uint64 var indexMemorySize uint64 var mmapFieldCount int var fieldGpuMemorySize []uint64 @@ -1574,6 +1684,11 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn } indexMemorySize += estimateResult.MaxMemoryCost segmentDiskSize += estimateResult.MaxDiskCost + if multiplyFactor.TieredEvictionEnabled { + // to avoid burst memory allocation during index loading, use final cost to estimate evictable size + segmentEvictableMemorySize += estimateResult.FinalMemoryCost + segmentEvictableDiskSize += estimateResult.FinalDiskCost + } if vecindexmgr.GetVecIndexMgrInstance().IsGPUVecIndex(common.GetIndexType(fieldIndexInfo.IndexParams)) { fieldGpuMemorySize = append(fieldGpuMemorySize, estimateResult.MaxMemoryCost) } @@ -1627,8 +1742,14 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn mmapVectorField := paramtable.Get().QueryNodeCfg.MmapVectorField.GetAsBool() if mmapVectorField { segmentDiskSize += binlogSize + if multiplyFactor.TieredEvictionEnabled { + segmentEvictableDiskSize += binlogSize + } } else { segmentMemorySize += binlogSize + if multiplyFactor.TieredEvictionEnabled { + segmentEvictableMemorySize += binlogSize + } } continue } @@ -1636,22 +1757,33 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn // missing mapping, shall be "0" group for storage v2 if fieldSchema == nil { segmentMemorySize += binlogSize + if multiplyFactor.TieredEvictionEnabled { + segmentEvictableMemorySize += binlogSize + } continue } mmapEnabled := isDataMmapEnable(fieldSchema) if !mmapEnabled || common.IsSystemField(fieldSchema.GetFieldID()) { segmentMemorySize += binlogSize + // system field is not evictable, skip evictable size calculation + if !common.IsSystemField(fieldSchema.GetFieldID()) && multiplyFactor.TieredEvictionEnabled { + segmentEvictableMemorySize += binlogSize + } if DoubleMemorySystemField(fieldSchema.GetFieldID()) || DoubleMemoryDataType(fieldSchema.GetDataType()) { segmentMemorySize += binlogSize } } else { segmentDiskSize += uint64(getBinlogDataDiskSize(fieldBinlog)) + if multiplyFactor.TieredEvictionEnabled { + segmentEvictableDiskSize += uint64(getBinlogDataDiskSize(fieldBinlog)) + } } } // get size of stats data for _, fieldBinlog := range loadInfo.Statslogs { segmentMemorySize += uint64(getBinlogDataMemorySize(fieldBinlog)) + // stats data is not evictable, skip evictable size calculation } // get size of delete data @@ -1668,10 +1800,15 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn expansionFactor = multiplyFactor.deltaDataExpansionFactor } segmentMemorySize += uint64(float64(memSize) * expansionFactor) + // deltalog is not evictable, skip evictable size calculation } + + evictableMemoryUncacheSize := uint64(float64(segmentEvictableMemorySize) * (1.0 - multiplyFactor.TieredEvictableMemoryCacheRatio)) + evictableDiskUncacheSize := uint64(float64(segmentEvictableDiskSize) * (1.0 - multiplyFactor.TieredEvictableDiskCacheRatio)) + return &ResourceUsage{ - MemorySize: segmentMemorySize + indexMemorySize, - DiskSize: segmentDiskSize, + MemorySize: segmentMemorySize + indexMemorySize - evictableMemoryUncacheSize, + DiskSize: segmentDiskSize - evictableDiskUncacheSize, MmapFieldCount: mmapFieldCount, FieldGpuMemorySize: fieldGpuMemorySize, }, nil @@ -1758,7 +1895,7 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, if err != nil { return err } - defer loader.freeRequest(requestResourceResult.Resource) + defer loader.freeRequest(requestResourceResult.Resource, requestResourceResult.LogicalResource) log.Info("segment loader start to load index", zap.Int("segmentNumAfterFilter", len(infos))) metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadIndex").Inc() diff --git a/internal/querynodev2/segments/segment_test.go b/internal/querynodev2/segments/segment_test.go index 26189db18b..b5b6194f2f 100644 --- a/internal/querynodev2/segments/segment_test.go +++ b/internal/querynodev2/segments/segment_test.go @@ -70,6 +70,7 @@ func (suite *SegmentSuite) SetupTest() { suite.sealed, err = NewSegment(ctx, suite.collection, + suite.manager.Segment, SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{ @@ -113,6 +114,7 @@ func (suite *SegmentSuite) SetupTest() { suite.growing, err = NewSegment(ctx, suite.collection, + suite.manager.Segment, SegmentTypeGrowing, 0, &querypb.SegmentLoadInfo{ diff --git a/internal/querynodev2/server_test.go b/internal/querynodev2/server_test.go index 810e61ab6e..87480b0997 100644 --- a/internal/querynodev2/server_test.go +++ b/internal/querynodev2/server_test.go @@ -229,6 +229,7 @@ func (suite *QueryNodeSuite) TestStop() { segment, err := segments.NewSegment( context.Background(), collection, + suite.node.manager.Segment, segments.SegmentTypeSealed, 1, &querypb.SegmentLoadInfo{ diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 6f3af3a3fa..596f1d5574 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2839,6 +2839,8 @@ type queryNodeConfig struct { TieredDiskLowWatermarkRatio ParamItem `refreshable:"false"` TieredDiskHighWatermarkRatio ParamItem `refreshable:"false"` TieredEvictionEnabled ParamItem `refreshable:"false"` + TieredEvictableMemoryCacheRatio ParamItem `refreshable:"false"` + TieredEvictableDiskCacheRatio ParamItem `refreshable:"false"` TieredCacheTouchWindowMs ParamItem `refreshable:"false"` TieredEvictionIntervalMs ParamItem `refreshable:"false"` TieredLoadingMemoryFactor ParamItem `refreshable:"false"` @@ -3048,6 +3050,48 @@ Note that if eviction is enabled, cache data loaded during sync warmup is also s } p.TieredEvictionEnabled.Init(base.mgr) + p.TieredEvictableMemoryCacheRatio = ParamItem{ + Key: "queryNode.segcore.tieredStorage.evictableMemoryCacheRatio", + Version: "2.6.0", + DefaultValue: "1.0", + Formatter: func(v string) string { + ratio := getAsFloat(v) + if ratio < 0 || ratio > 1 { + return "1.0" + } + return fmt.Sprintf("%f", ratio) + }, + Doc: `This ratio estimates how much evictable memory can be cached. +The higher the ratio, the more physical memory is reserved for evictable memory, +resulting in fewer evictions but fewer segments can be loaded. +Conversely, a lower ratio results in more evictions but allows more segments to be loaded. +This parameter is only valid when eviction is enabled. +It defaults to 1.0 (meaning all evictable memory is cached), with a valid range of [0.0, 1.0].`, + Export: true, + } + p.TieredEvictableMemoryCacheRatio.Init(base.mgr) + + p.TieredEvictableDiskCacheRatio = ParamItem{ + Key: "queryNode.segcore.tieredStorage.evictableDiskCacheRatio", + Version: "2.6.0", + DefaultValue: "1.0", + Formatter: func(v string) string { + ratio := getAsFloat(v) + if ratio < 0 || ratio > 1 { + return "1.0" + } + return fmt.Sprintf("%f", ratio) + }, + Doc: `This ratio estimates how much evictable disk space can be cached. +The higher the ratio, the more physical disk space is reserved for evictable disk usage, +resulting in fewer evictions but fewer segments can be loaded. +Conversely, a lower ratio results in more evictions but allows more segments to be loaded. +This parameter is only valid when eviction is enabled. +It defaults to 1.0 (meaning all evictable disk is cached), with a valid range of [0.0, 1.0].`, + Export: true, + } + p.TieredEvictableDiskCacheRatio.Init(base.mgr) + p.TieredMemoryLowWatermarkRatio = ParamItem{ Key: "queryNode.segcore.tieredStorage.memoryLowWatermarkRatio", Version: "2.6.0",