diff --git a/pkg/mq/msgstream/mqwrapper/kafka/pool.go b/pkg/mq/msgstream/mqwrapper/kafka/pool.go index 6a235c800b..ad82b0b74f 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/pool.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/pool.go @@ -34,15 +34,17 @@ var ( ) func initPool() { - pool := conc.NewPool[any]( - hardware.GetCPUNum(), - conc.WithPreAlloc(false), - conc.WithDisablePurge(false), - conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal - ) + initOnce.Do(func() { + pool := conc.NewPool[any]( + hardware.GetCPUNum(), + conc.WithPreAlloc(false), + conc.WithDisablePurge(false), + conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal + ) - kafkaCPool.Store(pool) - log.Info("init dynamicPool done", zap.Int("size", hardware.GetCPUNum())) + kafkaCPool.Store(pool) + log.Info("init dynamicPool done", zap.Int("size", hardware.GetCPUNum())) + }) } // GetSQPool returns the singleton pool instance for search/query operations.