mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: Add dynamic cgo pool for proxy CGO call (#34768)
Related to #34705 Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
2ac7164c39
commit
e4e18cb8c3
@ -24,15 +24,53 @@ package proxy
|
|||||||
import "C"
|
import "C"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||||
)
|
)
|
||||||
|
|
||||||
func CheckVecIndexWithDataTypeExist(name string, dType schemapb.DataType) bool {
|
var (
|
||||||
cIndexName := C.CString(name)
|
dp atomic.Pointer[conc.Pool[any]]
|
||||||
cType := uint32(dType)
|
dynOnce sync.Once
|
||||||
defer C.free(unsafe.Pointer(cIndexName))
|
)
|
||||||
check := bool(C.CheckVecIndexWithDataType(cIndexName, cType))
|
|
||||||
return check
|
func initDynamicPool() {
|
||||||
|
dynOnce.Do(func() {
|
||||||
|
pool := conc.NewPool[any](
|
||||||
|
hardware.GetCPUNum(),
|
||||||
|
conc.WithPreAlloc(false),
|
||||||
|
conc.WithDisablePurge(false),
|
||||||
|
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
|
||||||
|
)
|
||||||
|
|
||||||
|
dp.Store(pool)
|
||||||
|
log.Info("init dynamicPool done", zap.Int("size", hardware.GetCPUNum()))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDynamicPool returns the singleton pool for dynamic cgo operations.
|
||||||
|
func GetDynamicPool() *conc.Pool[any] {
|
||||||
|
initDynamicPool()
|
||||||
|
return dp.Load()
|
||||||
|
}
|
||||||
|
|
||||||
|
func CheckVecIndexWithDataTypeExist(name string, dType schemapb.DataType) bool {
|
||||||
|
var result bool
|
||||||
|
GetDynamicPool().Submit(func() (any, error) {
|
||||||
|
cIndexName := C.CString(name)
|
||||||
|
cType := uint32(dType)
|
||||||
|
defer C.free(unsafe.Pointer(cIndexName))
|
||||||
|
result = bool(C.CheckVecIndexWithDataType(cIndexName, cType))
|
||||||
|
return nil, nil
|
||||||
|
}).Await()
|
||||||
|
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user