enhance: make knowhere thread pool config refreshable (#45190)

Signed-off-by: chasingegg <chao.gao@zilliz.com>
This commit is contained in:
Gao 2025-11-04 18:33:33 +08:00 committed by GitHub
parent 966ebfbcab
commit 8f645760af
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 43 additions and 7 deletions

View File

@ -430,6 +430,38 @@ func SetupCoreConfigChangelCallback() {
return nil
})
paramtable.Get().QueryNodeCfg.KnowhereThreadPoolSize.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error {
factor, err := strconv.ParseFloat(newValue, 64)
if err != nil {
return err
}
if factor <= 0 || !paramtable.Get().QueryNodeCfg.EnableDisk.GetAsBool() {
factor = 1
} else if factor > 32 {
factor = 32
}
knowhereThreadPoolSize := uint32(float64(hardware.GetCPUNum()) * factor)
log.Info("UpdateKnowhereThreadPoolSize", zap.Uint32("knowhereThreadPoolSize", knowhereThreadPoolSize))
C.SegcoreSetKnowhereSearchThreadPoolNum(C.uint32_t(knowhereThreadPoolSize))
return nil
})
paramtable.Get().QueryNodeCfg.KnowhereFetchThreadPoolSize.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error {
factor, err := strconv.ParseFloat(newValue, 64)
if err != nil {
return err
}
if factor <= 0 {
factor = 1
} else if factor > 32 {
factor = 32
}
knowhereFetchThreadPoolSize := uint32(float64(hardware.GetCPUNum()) * factor)
log.Info("UpdateKnowhereFetchThreadPoolSize", zap.Uint32("knowhereFetchThreadPoolSize", knowhereFetchThreadPoolSize))
C.SegcoreSetKnowhereFetchThreadPoolNum(C.uint32_t(knowhereFetchThreadPoolSize))
return nil
})
paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error {
coefficient, err := strconv.ParseFloat(newValue, 64)
if err != nil {

View File

@ -3037,8 +3037,8 @@ type queryNodeConfig struct {
StatsPublishInterval ParamItem `refreshable:"true"`
// segcore
KnowhereFetchThreadPoolSize ParamItem `refreshable:"false"`
KnowhereThreadPoolSize ParamItem `refreshable:"false"`
KnowhereFetchThreadPoolSize ParamItem `refreshable:"true"`
KnowhereThreadPoolSize ParamItem `refreshable:"true"`
ChunkRows ParamItem `refreshable:"false"`
EnableInterminSegmentIndex ParamItem `refreshable:"false"`
InterimIndexNlist ParamItem `refreshable:"false"`
@ -3496,13 +3496,13 @@ If set to 0, time based eviction is disabled.`,
Version: "2.0.0",
DefaultValue: "4",
Formatter: func(v string) string {
factor := getAsInt64(v)
factor := getAsFloat(v)
if factor <= 0 || !p.EnableDisk.GetAsBool() {
factor = 1
} else if factor > 32 {
factor = 32
}
knowhereThreadPoolSize := uint32(hardware.GetCPUNum()) * uint32(factor)
knowhereThreadPoolSize := uint32(float64(hardware.GetCPUNum()) * factor)
return strconv.FormatUint(uint64(knowhereThreadPoolSize), 10)
},
Doc: "The number of threads in knowhere's thread pool. If disk is enabled, the pool size will multiply with knowhereThreadPoolNumRatio([1, 32]).",
@ -3513,15 +3513,19 @@ If set to 0, time based eviction is disabled.`,
p.KnowhereFetchThreadPoolSize = ParamItem{
Key: "queryNode.segcore.knowhereFetchThreadPoolNumRatio",
Version: "2.6.0",
DefaultValue: "4",
DefaultValue: "16",
Formatter: func(v string) string {
factor := getAsInt64(v)
factor := getAsFloat(v)
if factor <= 0 {
factor = 1
} else if factor > 32 {
factor = 32
}
knowhereFetchThreadPoolSize := uint32(hardware.GetCPUNum()) * uint32(factor)
knowhereFetchThreadPoolSize := uint32(float64(hardware.GetCPUNum()) * factor)
// avoid too many threads
if knowhereFetchThreadPoolSize > 100 {
knowhereFetchThreadPoolSize = 100
}
return strconv.FormatUint(uint64(knowhereFetchThreadPoolSize), 10)
},
Doc: "The number of threads in knowhere's fetch thread pool for object storage. The pool size will multiply with knowhereThreadPoolNumRatio([1, 32])",