diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 9f233e4fcf..f97a2dc82b 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -131,17 +131,16 @@ func (loader *segmentLoader) Load(ctx context.Context, zap.String("segmentType", segmentType.String()), ) + if len(segments) == 0 { + log.Info("no segment to load") + return nil, nil + } // Filter out loaded & loading segments infos := loader.prepare(segmentType, segments...) defer loader.unregister(infos...) - segmentNum := len(infos) - if segmentNum == 0 { - log.Info("no segment to load") - return nil, nil - } - - log.Info("start loading...", zap.Int("segmentNum", segmentNum)) + // continue to wait other task done + log.Info("start loading...", zap.Int("segmentNum", len(segments)), zap.Int("afterFilter", len(infos))) // Check memory & storage limit memUsage, diskUsage, concurrencyLevel, err := loader.requestResource(infos...) @@ -210,12 +209,10 @@ func (loader *segmentLoader) Load(ctx context.Context, log.Info("load segment done", zap.Int64("segmentID", segmentID)) metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds())) - - waitCh, ok := loader.loadingSegments.Get(segmentID) - if !ok { - return errors.New("segment was removed from the loading map early") + waitCh, ok := loader.loadingSegments.Get(loadInfo.GetSegmentID()) + if ok { + close(waitCh) } - close(waitCh) return nil } @@ -223,9 +220,9 @@ func (loader *segmentLoader) Load(ctx context.Context, // Start to load, // Make sure we can always benefit from concurrency, and not spawn too many idle goroutines log.Info("start to load segments in parallel", - zap.Int("segmentNum", segmentNum), + zap.Int("segmentNum", len(infos)), zap.Int("concurrencyLevel", concurrencyLevel)) - err = funcutil.ProcessFuncParallel(segmentNum, + err = funcutil.ProcessFuncParallel(len(infos), concurrencyLevel, loadSegmentFunc, "loadSegmentFunc") if err != nil { clearAll() @@ -290,7 +287,14 @@ func (loader *segmentLoader) unregister(segments ...*querypb.SegmentLoadInfo) { loader.mut.Lock() defer loader.mut.Unlock() for i := range segments { - loader.loadingSegments.GetAndRemove(segments[i].GetSegmentID()) + waitCh, ok := loader.loadingSegments.GetAndRemove(segments[i].GetSegmentID()) + if ok { + select { + case <-waitCh: + default: // close wait channel for failed task + close(waitCh) + } + } } } @@ -302,26 +306,32 @@ func (loader *segmentLoader) requestResource(infos ...*querypb.SegmentLoadInfo) concurrencyLevel := funcutil.Min(runtime.GOMAXPROCS(0), len(infos)) - logNum := 0 - for _, field := range infos[0].GetBinlogPaths() { - logNum += len(field.GetBinlogs()) - } - if logNum > 0 { - // IO pool will be run out even with the new smaller level - concurrencyLevel = funcutil.Min(concurrencyLevel, funcutil.Max(loader.ioPool.Free()/logNum, 1)) - } - - for ; concurrencyLevel > 1; concurrencyLevel /= 2 { - _, _, err := loader.checkSegmentSize(infos, concurrencyLevel) - if err == nil { - break + var memUsage, diskUsage uint64 + for _, info := range infos { + logNum := 0 + for _, field := range info.GetBinlogPaths() { + logNum += len(field.GetBinlogs()) + } + if logNum > 0 { + // IO pool will be run out even with the new smaller level + concurrencyLevel = funcutil.Min(concurrencyLevel, funcutil.Max(loader.ioPool.Free()/logNum, 1)) } - } - memUsage, diskUsage, err := loader.checkSegmentSize(infos, concurrencyLevel) - if err != nil { - log.Warn("no sufficient resource to load segments", zap.Error(err)) - return 0, 0, 0, err + for ; concurrencyLevel > 1; concurrencyLevel /= 2 { + _, _, err := loader.checkSegmentSize(infos, concurrencyLevel) + if err == nil { + break + } + } + + mu, du, err := loader.checkSegmentSize(infos, concurrencyLevel) + if err != nil { + log.Warn("no sufficient resource to load segments", zap.Error(err)) + return 0, 0, 0, err + } + + memUsage += mu + diskUsage += du } loader.committedMemSize += memUsage