diff --git a/internal/querynode/query_shard.go b/internal/querynode/query_shard.go index e477669503..dcfd204bb7 100644 --- a/internal/querynode/query_shard.go +++ b/internal/querynode/query_shard.go @@ -223,14 +223,21 @@ func (q *queryShard) getNewTSafe(tp tsType) (Timestamp, error) { } func (q *queryShard) waitUntilServiceable(ctx context.Context, guaranteeTs Timestamp, tp tsType) { + st := q.getServiceableTime(tp) + log.Debug("serviceable check start", zap.String("tsType", tp.String()), zap.Uint64("guarantee ts", guaranteeTs), zap.Uint64("serviceable ts", st), zap.String("channel", q.channel)) + serviceable := func() bool { + st = q.getServiceableTime(tp) + return st >= guaranteeTs + } + q.watcherCond.L.Lock() defer q.watcherCond.L.Unlock() - st := q.getServiceableTime(tp) - for guaranteeTs > st { + for !serviceable() { log.Debug("serviceable ts before guarantee ts", zap.Uint64("serviceable ts", st), zap.Uint64("guarantee ts", guaranteeTs), zap.String("channel", q.channel)) q.watcherCond.Wait() if err := ctx.Err(); err != nil { - log.Warn("waitUntialServiceable timeout", zap.Uint64("serviceable ts", st), zap.Uint64("guarantee ts", guaranteeTs), zap.String("channel", q.channel)) + log.Warn("waitUntilServiceable timeout", zap.Uint64("serviceable ts", st), zap.Uint64("guarantee ts", guaranteeTs), zap.String("channel", q.channel)) + // TODO: implement timeout logic return } st = q.getServiceableTime(tp) @@ -390,9 +397,8 @@ func (q *queryShard) searchLeader(ctx context.Context, req *querypb.SearchReques go func() { defer wg.Done() - // hold request until guarantee timestamp >= service timestamp guaranteeTs := req.GetReq().GetGuaranteeTimestamp() - q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDML) + q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDML) // wait until guarantee timestamp >= service timestamp // shard leader queries its own streaming data // TODO add context sResults, _, _, sErr := q.streaming.search(searchRequests, collectionID, partitionIDs, req.DmlChannel, plan, timestamp) @@ -419,13 +425,13 @@ func (q *queryShard) searchLeader(ctx context.Context, req *querypb.SearchReques MetricType: plan.getMetricType(), NumQueries: queryNum, TopK: topK, - SlicedBlob: nil, + SlicedBlob: nil, // placeholder for serialized streaming result SlicedOffset: 1, SlicedNumCount: 1, }) + // reduce streaming results and transform to blob if len(streamingResults) > 0 { - // reduce search results numSegment := int64(len(streamingResults)) err = reduceSearchResultsAndFillData(plan, streamingResults, numSegment) if err != nil { @@ -615,22 +621,10 @@ func reduceSearchResultData(searchResultData []*schemapb.SearchResultData, nq in } offsets[sel]++ } - - // if realTopK != -1 && realTopK != j { - // log.Warn("Proxy Reduce Search Result", zap.Error(errors.New("the length (topk) between all result of query is different"))) - // // return nil, errors.New("the length (topk) between all result of query is different") - // } ret.Topks = append(ret.Topks, j) } log.Debug("skip duplicated search result", zap.Int64("count", skipDupCnt), zap.Any("ret", ret)) - // ret.TopK = realTopK - - // if !distance.PositivelyRelated(metricType) { - // for k := range ret.Scores { - // ret.Scores[k] *= -1 - // } - // } - + // Note: query shard does not check whether the metricType is positively related, proxy will do the job return ret, nil } @@ -901,6 +895,7 @@ func mergeInternalRetrieveResults(retrieveResults []*internalpb.RetrieveResults) return ret, nil } +// printSearchResultData is for debug usage // func printSearchResultData(data *schemapb.SearchResultData, header string) { // size := len(data.Ids.GetIntId().Data) // if size != len(data.Scores) {