From 0c6354018b2d5eecb4008b96ff97cf6d559ca27a Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 6 Jun 2024 10:41:52 +0800 Subject: [PATCH] enhance: Avoid load bf in delegator when qn worker has no more memory(#33557) (#33650) pr: #33557 query coord send load request to delegator, delegator load bf first, then forward load request to qn worker. but when qn worker has no more memory, it will return load failed immediatelly. then delegator roll back the loaded bf. query coord wil retry the load request, and delegator will load and roll back bf again and again. this PR delay the loading bf step until load segment succeed in worker. Signed-off-by: Wei Liu --- .../querynodev2/delegator/delegator_data.go | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index be4870a34c..5c4525258c 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -424,16 +424,6 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg return err } - // load bloom filter only when candidate not exists - infos := lo.Filter(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) bool { - return !sd.pkOracle.Exists(pkoracle.NewCandidateKey(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed), targetNodeID) - }) - candidates, err := sd.loader.LoadBloomFilterSet(ctx, req.GetCollectionID(), req.GetVersion(), infos...) - if err != nil { - log.Warn("failed to load bloom filter set for segment", zap.Error(err)) - return err - } - req.Base.TargetID = req.GetDstNodeID() log.Debug("worker loads segments...") @@ -490,6 +480,16 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 { sd.GenerateLevel0DeletionCache() } else { + // load bloom filter only when candidate not exists + infos := lo.Filter(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) bool { + return !sd.pkOracle.Exists(pkoracle.NewCandidateKey(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed), targetNodeID) + }) + candidates, err := sd.loader.LoadBloomFilterSet(ctx, req.GetCollectionID(), req.GetVersion(), infos...) + if err != nil { + log.Warn("failed to load bloom filter set for segment", zap.Error(err)) + return err + } + log.Debug("load delete...") err = sd.loadStreamDelete(ctx, candidates, infos, req.GetDeltaPositions(), targetNodeID, worker, entries) if err != nil {