mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: Use singleton delete pool and avoid goroutine leakage (#37220)
Related to #36887 Previously using newly create pool per request shall cause goroutine leakage. This PR change this behavior by using singleton delete pool. This change could also provide better concurrency control over delete memory usage. Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
26028f4137
commit
224d797f94
@ -46,6 +46,9 @@ var (
|
||||
warmupPool atomic.Pointer[conc.Pool[any]]
|
||||
warmupOnce sync.Once
|
||||
|
||||
deletePool atomic.Pointer[conc.Pool[struct{}]]
|
||||
deletePoolOnce sync.Once
|
||||
|
||||
bfPool atomic.Pointer[conc.Pool[any]]
|
||||
bfApplyOnce sync.Once
|
||||
)
|
||||
@ -131,6 +134,13 @@ func initBFApplyPool() {
|
||||
})
|
||||
}
|
||||
|
||||
func initDeletePool() {
|
||||
deletePoolOnce.Do(func() {
|
||||
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0))
|
||||
deletePool.Store(pool)
|
||||
})
|
||||
}
|
||||
|
||||
// GetSQPool returns the singleton pool instance for search/query operations.
|
||||
func GetSQPool() *conc.Pool[any] {
|
||||
initSQPool()
|
||||
@ -158,6 +168,11 @@ func GetBFApplyPool() *conc.Pool[any] {
|
||||
return bfPool.Load()
|
||||
}
|
||||
|
||||
func GetDeletePool() *conc.Pool[struct{}] {
|
||||
initDeletePool()
|
||||
return deletePool.Load()
|
||||
}
|
||||
|
||||
func ResizeSQPool(evt *config.Event) {
|
||||
if evt.HasUpdated {
|
||||
pt := paramtable.Get()
|
||||
|
||||
@ -19,7 +19,6 @@ package querynodev2
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
@ -1423,7 +1422,7 @@ func (node *QueryNode) DeleteBatch(ctx context.Context, req *querypb.DeleteBatch
|
||||
|
||||
// control the execution batch parallel with P number
|
||||
// maybe it shall be lower in case of heavy CPU usage may impacting search/query
|
||||
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0))
|
||||
pool := segments.GetDeletePool()
|
||||
futures := make([]*conc.Future[struct{}], 0, len(segs))
|
||||
errSet := typeutil.NewConcurrentSet[int64]()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user