diff --git a/configs/milvus.yaml b/configs/milvus.yaml index d1a3b7d855..91cfcc554a 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -335,7 +335,7 @@ queryNode: # chunk cache during the load process. This approach has the potential to substantially reduce query/search latency # for a specific duration post-load, albeit accompanied by a concurrent increase in disk usage; # 2. If set to "off," original vector data will only be loaded into the chunk cache during search/query. - warmup: async + warmup: off mmap: mmapEnabled: false # Enable mmap for loading data lazyload: diff --git a/internal/querynodev2/segments/pool.go b/internal/querynodev2/segments/pool.go index bbf21b91a7..cb025d8c74 100644 --- a/internal/querynodev2/segments/pool.go +++ b/internal/querynodev2/segments/pool.go @@ -37,12 +37,14 @@ var ( // and other operations (insert/delete/statistics/etc.) // since in concurrent situation, there operation may block each other in high payload - sqp atomic.Pointer[conc.Pool[any]] - sqOnce sync.Once - dp atomic.Pointer[conc.Pool[any]] - dynOnce sync.Once - loadPool atomic.Pointer[conc.Pool[any]] - loadOnce sync.Once + sqp atomic.Pointer[conc.Pool[any]] + sqOnce sync.Once + dp atomic.Pointer[conc.Pool[any]] + dynOnce sync.Once + loadPool atomic.Pointer[conc.Pool[any]] + loadOnce sync.Once + warmupPool atomic.Pointer[conc.Pool[any]] + warmupOnce sync.Once ) // initSQPool initialize @@ -80,9 +82,6 @@ func initLoadPool() { loadOnce.Do(func() { pt := paramtable.Get() poolSize := hardware.GetCPUNum() * pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt() - if poolSize > 16 { - poolSize = 16 - } pool := conc.NewPool[any]( poolSize, conc.WithPreAlloc(false), @@ -96,6 +95,23 @@ func initLoadPool() { }) } +func initWarmupPool() { + warmupOnce.Do(func() { + pt := paramtable.Get() + poolSize := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt() + pool := conc.NewPool[any]( + poolSize, + conc.WithPreAlloc(false), + conc.WithDisablePurge(false), + conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal + conc.WithNonBlocking(true), // make warming up non blocking + ) + + warmupPool.Store(pool) + pt.Watch(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, config.NewHandler("qn.warmpool.lowpriority", ResizeWarmupPool)) + }) +} + // GetSQPool returns the singleton pool instance for search/query operations. func GetSQPool() *conc.Pool[any] { initSQPool() @@ -113,6 +129,11 @@ func GetLoadPool() *conc.Pool[any] { return loadPool.Load() } +func GetWarmupPool() *conc.Pool[any] { + initWarmupPool() + return warmupPool.Load() +} + func ResizeSQPool(evt *config.Event) { if evt.HasUpdated { pt := paramtable.Get() @@ -131,6 +152,14 @@ func ResizeLoadPool(evt *config.Event) { } } +func ResizeWarmupPool(evt *config.Event) { + if evt.HasUpdated { + pt := paramtable.Get() + newSize := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt() + resizePool(GetWarmupPool(), newSize, "WarmupPool") + } +} + func resizePool(pool *conc.Pool[any], newSize int, tag string) { log := log.Ctx(context.Background()). With( diff --git a/internal/querynodev2/segments/pool_test.go b/internal/querynodev2/segments/pool_test.go index 6c817bdb1e..868bce4186 100644 --- a/internal/querynodev2/segments/pool_test.go +++ b/internal/querynodev2/segments/pool_test.go @@ -82,6 +82,27 @@ func TestResizePools(t *testing.T) { assert.Equal(t, expectedCap, GetLoadPool().Cap()) }) + t.Run("WarmupPool", func(t *testing.T) { + expectedCap := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt() + + ResizeWarmupPool(&config.Event{ + HasUpdated: true, + }) + assert.Equal(t, expectedCap, GetWarmupPool().Cap()) + + pt.Save(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, strconv.FormatFloat(pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat()*2, 'f', 10, 64)) + ResizeWarmupPool(&config.Event{ + HasUpdated: true, + }) + assert.Equal(t, expectedCap, GetWarmupPool().Cap()) + + pt.Save(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, "0") + ResizeWarmupPool(&config.Event{ + HasUpdated: true, + }) + assert.Equal(t, expectedCap, GetWarmupPool().Cap()) + }) + t.Run("error_pool", func(*testing.T) { pool := conc.NewDefaultPool[any]() c := pool.Cap() diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 68f914d733..e4ad589e41 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -1386,7 +1386,7 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) { warmingUp := strings.ToLower(paramtable.Get().QueryNodeCfg.ChunkCacheWarmingUp.GetValue()) switch warmingUp { case "sync": - GetLoadPool().Submit(func() (any, error) { + GetWarmupPool().Submit(func() (any, error) { cFieldID := C.int64_t(fieldID) status = C.WarmupChunkCache(s.ptr, cFieldID) if err := HandleCStatus(ctx, &status, "warming up chunk cache failed"); err != nil { @@ -1397,7 +1397,7 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) { return nil, nil }).Await() case "async": - GetLoadPool().Submit(func() (any, error) { + GetWarmupPool().Submit(func() (any, error) { if !s.ptrLock.RLockIf(state.IsNotReleased) { return nil, nil } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 735987f994..317ad937f8 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2357,7 +2357,7 @@ func (p *queryNodeConfig) init(base *BaseTable) { p.ChunkCacheWarmingUp = ParamItem{ Key: "queryNode.cache.warmup", Version: "2.3.6", - DefaultValue: "async", + DefaultValue: "off", Doc: `options: async, sync, off. Specifies the necessity for warming up the chunk cache. 1. If set to "sync" or "async," the original vector data will be synchronously/asynchronously loaded into the diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index d3918c9d43..cba13b3893 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -339,7 +339,7 @@ func TestComponentParam(t *testing.T) { // chunk cache assert.Equal(t, "willneed", Params.ReadAheadPolicy.GetValue()) - assert.Equal(t, "async", Params.ChunkCacheWarmingUp.GetValue()) + assert.Equal(t, "false", Params.ChunkCacheWarmingUp.GetValue()) // test small indexNlist/NProbe default params.Remove("queryNode.segcore.smallIndex.nlist")