enhance: Avoid load bf in delegator when qn worker has no more memory(#33557) (#33650)

pr: #33557

query coord send load request to delegator, delegator load bf first,
then forward load request to qn worker. but when qn worker has no more
memory, it will return load failed immediatelly. then delegator roll
back the loaded bf. query coord wil retry the load request, and
delegator will load and roll back bf again and again.

this PR delay the loading bf step until load segment succeed in worker.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2024-06-06 10:41:52 +08:00 committed by GitHub
parent f2917f5bdf
commit 0c6354018b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -424,16 +424,6 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
return err
}
// load bloom filter only when candidate not exists
infos := lo.Filter(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) bool {
return !sd.pkOracle.Exists(pkoracle.NewCandidateKey(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed), targetNodeID)
})
candidates, err := sd.loader.LoadBloomFilterSet(ctx, req.GetCollectionID(), req.GetVersion(), infos...)
if err != nil {
log.Warn("failed to load bloom filter set for segment", zap.Error(err))
return err
}
req.Base.TargetID = req.GetDstNodeID()
log.Debug("worker loads segments...")
@ -490,6 +480,16 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 {
sd.GenerateLevel0DeletionCache()
} else {
// load bloom filter only when candidate not exists
infos := lo.Filter(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) bool {
return !sd.pkOracle.Exists(pkoracle.NewCandidateKey(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed), targetNodeID)
})
candidates, err := sd.loader.LoadBloomFilterSet(ctx, req.GetCollectionID(), req.GetVersion(), infos...)
if err != nil {
log.Warn("failed to load bloom filter set for segment", zap.Error(err))
return err
}
log.Debug("load delete...")
err = sd.loadStreamDelete(ctx, candidates, infos, req.GetDeltaPositions(), targetNodeID, worker, entries)
if err != nil {