enhance: Put release segment and other misc cgo call into pool (#38186)

Related to #30273

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-12-05 11:04:40 +08:00 committed by GitHub
parent af288a9c21
commit 618f0cb728
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 28 additions and 13 deletions

View File

@ -1272,20 +1272,25 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) {
return
}
C.DeleteSegment(ptr)
localDiskUsage, err := segcore.GetLocalUsedSize(context.Background(), paramtable.Get().LocalStorageCfg.Path.GetValue())
// ignore error here, shall not block releasing
if err == nil {
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB
}
GetDynamicPool().Submit(func() (any, error) {
C.DeleteSegment(ptr)
localDiskUsage, err := segcore.GetLocalUsedSize(context.Background(), paramtable.Get().LocalStorageCfg.Path.GetValue())
// ignore error here, shall not block releasing
if err == nil {
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB
}
return nil, nil
}).Await()
log.Info("delete segment from memory")
}
// ReleaseSegmentData releases the segment data.
func (s *LocalSegment) ReleaseSegmentData() {
C.ClearSegmentData(s.ptr)
GetDynamicPool().Submit(func() (any, error) {
C.ClearSegmentData(s.ptr)
return nil, nil
}).Await()
for _, indexInfo := range s.Indexes() {
indexInfo.IsLoaded = false
}
@ -1309,7 +1314,10 @@ func (s *LocalSegment) startRelease(scope ReleaseScope) state.LoadStateLockGuard
}
func (s *LocalSegment) RemoveFieldFile(fieldId int64) {
C.RemoveFieldFile(s.ptr, C.int64_t(fieldId))
GetDynamicPool().Submit(func() (any, error) {
C.RemoveFieldFile(s.ptr, C.int64_t(fieldId))
return nil, nil
}).Await()
}
func (s *LocalSegment) RemoveUnusedFieldFiles() error {

View File

@ -1479,8 +1479,11 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn
if fieldIndexInfo, ok := fieldID2IndexInfo[fieldID]; ok {
var estimateResult ResourceEstimate
err := GetCLoadInfoWithFunc(ctx, fieldSchema, loadInfo, fieldIndexInfo, func(c *LoadIndexInfo) error {
loadResourceRequest := C.EstimateLoadIndexResource(c.cLoadIndexInfo)
estimateResult = GetResourceEstimate(&loadResourceRequest)
GetDynamicPool().Submit(func() (any, error) {
loadResourceRequest := C.EstimateLoadIndexResource(c.cLoadIndexInfo)
estimateResult = GetResourceEstimate(&loadResourceRequest)
return nil, nil
}).Await()
return nil
})
if err != nil {

View File

@ -183,8 +183,12 @@ func mergeRequestCost(requestCosts []*internalpb.CostAggregation) *internalpb.Co
}
func getIndexEngineVersion() (minimal, current int32) {
cMinimal, cCurrent := C.GetMinimalIndexVersion(), C.GetCurrentIndexVersion()
return int32(cMinimal), int32(cCurrent)
GetDynamicPool().Submit(func() (any, error) {
cMinimal, cCurrent := C.GetMinimalIndexVersion(), C.GetCurrentIndexVersion()
minimal, current = int32(cMinimal), int32(cCurrent)
return nil, nil
}).Await()
return minimal, current
}
// getSegmentMetricLabel returns the label for segment metrics.