diff --git a/internal/querynodev2/delegator/delta_forward.go b/internal/querynodev2/delegator/delta_forward.go index 06ede0c82f..63220a8122 100644 --- a/internal/querynodev2/delegator/delta_forward.go +++ b/internal/querynodev2/delegator/delta_forward.go @@ -283,18 +283,22 @@ func (sd *shardDelegator) forwardStreamingDirect(ctx context.Context, deleteData worker, err := sd.workerManager.GetWorker(ctx, entry.NodeID) if err != nil { log.Warn("failed to get worker", - zap.Int64("nodeID", paramtable.GetNodeID()), + zap.Int64("nodeID", entry.NodeID), zap.Error(err), ) // skip if node down // delete will be processed after loaded again continue } + // forward to non level0 segment only + segments := lo.Filter(entry.Segments, func(segmentEntry SegmentEntry, _ int) bool { + return segmentEntry.Level != datapb.SegmentLevel_L0 + }) eg.Go(func() error { offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, func(segmentID int64) (DeleteData, bool) { return deleteData, true - }, entry.Segments, querypb.DataScope_Historical)...) + }, segments, querypb.DataScope_Historical)...) return nil }) }