From 3bc7d63be91003f2e97c4455ec611cab23e56477 Mon Sep 17 00:00:00 2001 From: congqixia Date: Sat, 14 Sep 2024 10:27:08 +0800 Subject: [PATCH] fix: overwrite correct selection when pk duplicated (#35826) Related to #35505 --------- Signed-off-by: Congqi Xia --- internal/querynodev2/segments/result.go | 76 +++++++++++++++---------- 1 file changed, 45 insertions(+), 31 deletions(-) diff --git a/internal/querynodev2/segments/result.go b/internal/querynodev2/segments/result.go index 70c3b28a22..fac6ba23c6 100644 --- a/internal/querynodev2/segments/result.go +++ b/internal/querynodev2/segments/result.go @@ -390,8 +390,6 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore validRetrieveResults := []*TimestampedRetrieveResult[*segcorepb.RetrieveResults]{} validSegments := make([]Segment, 0, len(segments)) - selectedOffsets := make([][]int64, 0, len(retrieveResults)) - selectedIndexes := make([][]int64, 0, len(retrieveResults)) hasMoreResult := false for i, r := range retrieveResults { size := typeutil.GetSizeOfIDs(r.GetIds()) @@ -408,8 +406,6 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore if plan.ignoreNonPk { validSegments = append(validSegments, segments[i]) } - selectedOffsets = append(selectedOffsets, make([]int64, 0, len(r.GetOffset()))) - selectedIndexes = append(selectedIndexes, make([]int64, 0, len(r.GetOffset()))) loopEnd += size hasMoreResult = r.GetHasMoreResult() || hasMoreResult } @@ -419,8 +415,6 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore return ret, nil } - selected := make([]int, 0, ret.GetAllRetrieveCount()) - var limit int = -1 if param.limit != typeutil.Unlimited && !param.mergeStopForBest { limit = int(param.limit) @@ -433,6 +427,15 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore var availableCount int var retSize int64 maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64() + + type selection struct { + batchIndex int // index of validate retrieve results + resultIndex int64 // index of selection in selected result item + offset int64 // offset of the result + } + + var selections []selection + for j := 0; j < loopEnd && (limit == -1 || availableCount < limit); j++ { sel, drainOneResult := typeutil.SelectMinPKWithTimestamp(validRetrieveResults, cursors) if sel == -1 || (param.mergeStopForBest && drainOneResult) { @@ -443,9 +446,11 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore ts := validRetrieveResults[sel].Timestamps[cursors[sel]] if _, ok := idTsMap[pk]; !ok { typeutil.AppendPKs(ret.Ids, pk) - selected = append(selected, sel) - selectedOffsets[sel] = append(selectedOffsets[sel], validRetrieveResults[sel].Result.GetOffset()[cursors[sel]]) - selectedIndexes[sel] = append(selectedIndexes[sel], cursors[sel]) + selections = append(selections, selection{ + batchIndex: sel, + resultIndex: cursors[sel], + offset: validRetrieveResults[sel].Result.GetOffset()[cursors[sel]], + }) idTsMap[pk] = ts availableCount++ } else { @@ -453,8 +458,21 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore skipDupCnt++ if ts != 0 && ts > idTsMap[pk] { idTsMap[pk] = ts - selectedOffsets[sel][len(selectedOffsets[sel])-1] = validRetrieveResults[sel].Result.GetOffset()[cursors[sel]] - selectedIndexes[sel][len(selectedIndexes[sel])-1] = cursors[sel] + idx := len(selections) - 1 + for ; idx >= 0; idx-- { + selection := selections[idx] + pkValue := typeutil.GetPK(validRetrieveResults[selection.batchIndex].GetIds(), selection.resultIndex) + if pk == pkValue { + break + } + } + if idx >= 0 { + selections[idx] = selection{ + batchIndex: sel, + resultIndex: cursors[sel], + offset: validRetrieveResults[sel].Result.GetOffset()[cursors[sel]], + } + } } } @@ -470,31 +488,28 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore // judge the `!plan.ignoreNonPk` condition. _, span2 := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "MergeSegcoreResults-AppendFieldData") defer span2.End() - ret.FieldsData = typeutil.PrepareResultFieldData(validRetrieveResults[0].Result.GetFieldsData(), int64(len(selected))) - cursors = make([]int64, len(validRetrieveResults)) - for _, sel := range selected { + ret.FieldsData = typeutil.PrepareResultFieldData(validRetrieveResults[0].Result.GetFieldsData(), int64(len(selections))) + // cursors = make([]int64, len(validRetrieveResults)) + for _, selection := range selections { // cannot use `cursors[sel]` directly, since some of them may be skipped. - retSize += typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[sel].Result.GetFieldsData(), selectedIndexes[sel][cursors[sel]]) + retSize += typeutil.AppendFieldData(ret.FieldsData, validRetrieveResults[selection.batchIndex].Result.GetFieldsData(), selection.resultIndex) // limit retrieve result to avoid oom if retSize > maxOutputSize { return nil, fmt.Errorf("query results exceed the maxOutputSize Limit %d", maxOutputSize) } - - cursors[sel]++ } } else { // target entry not retrieved. ctx, span2 := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "MergeSegcoreResults-RetrieveByOffsets-AppendFieldData") defer span2.End() segmentResults := make([]*segcorepb.RetrieveResults, len(validRetrieveResults)) - futures := make([]*conc.Future[any], 0, len(validRetrieveResults)) - for i, offsets := range selectedOffsets { - if len(offsets) == 0 { - log.Ctx(ctx).Debug("skip empty retrieve results", zap.Int64("segment", validSegments[i].ID())) - continue - } - idx, theOffsets := i, offsets + groups := lo.GroupBy(selections, func(sel selection) int { + return sel.batchIndex + }) + futures := make([]*conc.Future[any], 0, len(groups)) + for i, selections := range groups { + idx, theOffsets := i, lo.Map(selections, func(sel selection, _ int) int64 { return sel.offset }) future := GetSQPool().Submit(func() (any, error) { var r *segcorepb.RetrieveResults var err error @@ -515,23 +530,22 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore for _, r := range segmentResults { if len(r.GetFieldsData()) != 0 { - ret.FieldsData = typeutil.PrepareResultFieldData(r.GetFieldsData(), int64(len(selected))) + ret.FieldsData = typeutil.PrepareResultFieldData(r.GetFieldsData(), int64(len(selections))) break } } _, span3 := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "MergeSegcoreResults-AppendFieldData") defer span3.End() - cursors = make([]int64, len(segmentResults)) - for _, sel := range selected { - retSize += typeutil.AppendFieldData(ret.FieldsData, segmentResults[sel].GetFieldsData(), cursors[sel]) - + // retrieve result is compacted, use 0,1,2...end + segmentResOffset := make([]int64, len(segmentResults)) + for _, selection := range selections { + retSize += typeutil.AppendFieldData(ret.FieldsData, segmentResults[selection.batchIndex].GetFieldsData(), segmentResOffset[selection.batchIndex]) + segmentResOffset[selection.batchIndex]++ // limit retrieve result to avoid oom if retSize > maxOutputSize { return nil, fmt.Errorf("query results exceed the maxOutputSize Limit %d", maxOutputSize) } - - cursors[sel]++ } }