From 760223f80a1d9400cd22a8a71f31b7a8ec6b28d7 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Mon, 27 May 2024 01:25:40 +0800 Subject: [PATCH] fix: use seperate warmup pool and disable warmup by default (#33348) 1. use a small warmup pool to reduce the impact of warmup 2. change the warmup pool to nonblocking mode 3. disable warmup by default 4. remove the maximum size limit of 16 for the load pool issue: https://github.com/milvus-io/milvus/issues/32772 --------- Signed-off-by: bigsheeper Co-authored-by: xiaofanluan --- configs/milvus.yaml | 2 +- internal/querynodev2/segments/pool.go | 47 +++++++++++++++++---- internal/querynodev2/segments/pool_test.go | 21 +++++++++ internal/querynodev2/segments/segment.go | 4 +- pkg/util/paramtable/component_param.go | 2 +- pkg/util/paramtable/component_param_test.go | 2 +- 6 files changed, 64 insertions(+), 14 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index d1a3b7d855..91cfcc554a 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -335,7 +335,7 @@ queryNode: # chunk cache during the load process. This approach has the potential to substantially reduce query/search latency # for a specific duration post-load, albeit accompanied by a concurrent increase in disk usage; # 2. If set to "off," original vector data will only be loaded into the chunk cache during search/query. - warmup: async + warmup: off mmap: mmapEnabled: false # Enable mmap for loading data lazyload: diff --git a/internal/querynodev2/segments/pool.go b/internal/querynodev2/segments/pool.go index bbf21b91a7..cb025d8c74 100644 --- a/internal/querynodev2/segments/pool.go +++ b/internal/querynodev2/segments/pool.go @@ -37,12 +37,14 @@ var ( // and other operations (insert/delete/statistics/etc.) // since in concurrent situation, there operation may block each other in high payload - sqp atomic.Pointer[conc.Pool[any]] - sqOnce sync.Once - dp atomic.Pointer[conc.Pool[any]] - dynOnce sync.Once - loadPool atomic.Pointer[conc.Pool[any]] - loadOnce sync.Once + sqp atomic.Pointer[conc.Pool[any]] + sqOnce sync.Once + dp atomic.Pointer[conc.Pool[any]] + dynOnce sync.Once + loadPool atomic.Pointer[conc.Pool[any]] + loadOnce sync.Once + warmupPool atomic.Pointer[conc.Pool[any]] + warmupOnce sync.Once ) // initSQPool initialize @@ -80,9 +82,6 @@ func initLoadPool() { loadOnce.Do(func() { pt := paramtable.Get() poolSize := hardware.GetCPUNum() * pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt() - if poolSize > 16 { - poolSize = 16 - } pool := conc.NewPool[any]( poolSize, conc.WithPreAlloc(false), @@ -96,6 +95,23 @@ func initLoadPool() { }) } +func initWarmupPool() { + warmupOnce.Do(func() { + pt := paramtable.Get() + poolSize := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt() + pool := conc.NewPool[any]( + poolSize, + conc.WithPreAlloc(false), + conc.WithDisablePurge(false), + conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal + conc.WithNonBlocking(true), // make warming up non blocking + ) + + warmupPool.Store(pool) + pt.Watch(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, config.NewHandler("qn.warmpool.lowpriority", ResizeWarmupPool)) + }) +} + // GetSQPool returns the singleton pool instance for search/query operations. func GetSQPool() *conc.Pool[any] { initSQPool() @@ -113,6 +129,11 @@ func GetLoadPool() *conc.Pool[any] { return loadPool.Load() } +func GetWarmupPool() *conc.Pool[any] { + initWarmupPool() + return warmupPool.Load() +} + func ResizeSQPool(evt *config.Event) { if evt.HasUpdated { pt := paramtable.Get() @@ -131,6 +152,14 @@ func ResizeLoadPool(evt *config.Event) { } } +func ResizeWarmupPool(evt *config.Event) { + if evt.HasUpdated { + pt := paramtable.Get() + newSize := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt() + resizePool(GetWarmupPool(), newSize, "WarmupPool") + } +} + func resizePool(pool *conc.Pool[any], newSize int, tag string) { log := log.Ctx(context.Background()). With( diff --git a/internal/querynodev2/segments/pool_test.go b/internal/querynodev2/segments/pool_test.go index 6c817bdb1e..868bce4186 100644 --- a/internal/querynodev2/segments/pool_test.go +++ b/internal/querynodev2/segments/pool_test.go @@ -82,6 +82,27 @@ func TestResizePools(t *testing.T) { assert.Equal(t, expectedCap, GetLoadPool().Cap()) }) + t.Run("WarmupPool", func(t *testing.T) { + expectedCap := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt() + + ResizeWarmupPool(&config.Event{ + HasUpdated: true, + }) + assert.Equal(t, expectedCap, GetWarmupPool().Cap()) + + pt.Save(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, strconv.FormatFloat(pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat()*2, 'f', 10, 64)) + ResizeWarmupPool(&config.Event{ + HasUpdated: true, + }) + assert.Equal(t, expectedCap, GetWarmupPool().Cap()) + + pt.Save(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, "0") + ResizeWarmupPool(&config.Event{ + HasUpdated: true, + }) + assert.Equal(t, expectedCap, GetWarmupPool().Cap()) + }) + t.Run("error_pool", func(*testing.T) { pool := conc.NewDefaultPool[any]() c := pool.Cap() diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 68f914d733..e4ad589e41 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -1386,7 +1386,7 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) { warmingUp := strings.ToLower(paramtable.Get().QueryNodeCfg.ChunkCacheWarmingUp.GetValue()) switch warmingUp { case "sync": - GetLoadPool().Submit(func() (any, error) { + GetWarmupPool().Submit(func() (any, error) { cFieldID := C.int64_t(fieldID) status = C.WarmupChunkCache(s.ptr, cFieldID) if err := HandleCStatus(ctx, &status, "warming up chunk cache failed"); err != nil { @@ -1397,7 +1397,7 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) { return nil, nil }).Await() case "async": - GetLoadPool().Submit(func() (any, error) { + GetWarmupPool().Submit(func() (any, error) { if !s.ptrLock.RLockIf(state.IsNotReleased) { return nil, nil } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 735987f994..317ad937f8 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2357,7 +2357,7 @@ func (p *queryNodeConfig) init(base *BaseTable) { p.ChunkCacheWarmingUp = ParamItem{ Key: "queryNode.cache.warmup", Version: "2.3.6", - DefaultValue: "async", + DefaultValue: "off", Doc: `options: async, sync, off. Specifies the necessity for warming up the chunk cache. 1. If set to "sync" or "async," the original vector data will be synchronously/asynchronously loaded into the diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index d3918c9d43..cba13b3893 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -339,7 +339,7 @@ func TestComponentParam(t *testing.T) { // chunk cache assert.Equal(t, "willneed", Params.ReadAheadPolicy.GetValue()) - assert.Equal(t, "async", Params.ChunkCacheWarmingUp.GetValue()) + assert.Equal(t, "false", Params.ChunkCacheWarmingUp.GetValue()) // test small indexNlist/NProbe default params.Remove("queryNode.segcore.smallIndex.nlist")