diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index 480ad2da33..8caa1f9933 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -722,12 +722,6 @@ func filterSegmentInfo(segmentInfos []*querypb.SegmentInfo, segmentIDs map[int64 // Search performs replica search tasks. func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) { - log.Ctx(ctx).Debug("Received SearchRequest", - zap.Strings("vChannels", req.GetDmlChannels()), - zap.Int64s("segmentIDs", req.GetSegmentIDs()), - zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()), - zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp())) - if req.GetReq().GetBase().GetTargetID() != paramtable.GetNodeID() { return &internalpb.SearchResults{ Status: &commonpb.Status{ @@ -744,6 +738,16 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) ( ErrorCode: commonpb.ErrorCode_Success, }, } + + tr := timerecord.NewTimeRecorder("Search") + if !req.GetFromShardLeader() { + log.Ctx(ctx).Debug("Received SearchRequest", + zap.Strings("vChannels", req.GetDmlChannels()), + zap.Int64s("segmentIDs", req.GetSegmentIDs()), + zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()), + zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp())) + } + toReduceResults := make([]*internalpb.SearchResults, 0) runningGp, runningCtx := errgroup.WithContext(ctx) mu := &sync.Mutex{} @@ -777,6 +781,7 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) ( if err := runningGp.Wait(); err != nil { return failRet, nil } + ret, err := reduceSearchResults(ctx, toReduceResults, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType()) if err != nil { failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError @@ -785,6 +790,7 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) ( } if !req.FromShardLeader { + tr.CtxElapse(ctx, "search done in all shards") rateCol.Add(metricsinfo.NQPerSecond, float64(req.GetReq().GetNq())) rateCol.Add(metricsinfo.SearchThroughput, float64(proto.Size(req))) metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel).Add(float64(proto.Size(req))) @@ -812,14 +818,6 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se node.wg.Add(1) defer node.wg.Done() - msgID := req.GetReq().GetBase().GetMsgID() - log.Ctx(ctx).Debug("Received SearchRequest", - zap.Bool("fromShardLeader", req.GetFromShardLeader()), - zap.String("vChannel", dmlChannel), - zap.Int64s("segmentIDs", req.GetSegmentIDs()), - zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()), - zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp())) - if node.queryShardService == nil { failRet.Status.Reason = "queryShardService is nil" return failRet, nil @@ -835,13 +833,13 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se return failRet, nil } - log.Ctx(ctx).Debug("start do search", - zap.Bool("fromShardLeader", req.GetFromShardLeader()), - zap.String("vChannel", dmlChannel), - zap.Int64s("segmentIDs", req.GetSegmentIDs())) - tr := timerecord.NewTimeRecorder("") - if req.FromShardLeader { + tr := timerecord.NewTimeRecorder("SubSearch") + log.Ctx(ctx).Debug("start do subsearch", + zap.Bool("fromShardLeader", req.GetFromShardLeader()), + zap.String("vChannel", dmlChannel), + zap.Int64s("segmentIDs", req.GetSegmentIDs())) + historicalTask, err2 := newSearchTask(ctx, req) if err2 != nil { failRet.Status.Reason = err2.Error() @@ -861,8 +859,7 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se return failRet, nil } - tr.CtxElapse(ctx, fmt.Sprintf("do search done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v", - msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs())) + tr.CtxElapse(ctx, fmt.Sprintf("do subsearch done, vChannel = %s, segmentIDs = %v", dmlChannel, req.GetSegmentIDs())) failRet.Status.ErrorCode = commonpb.ErrorCode_Success metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), @@ -876,6 +873,11 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se } //from Proxy + tr := timerecord.NewTimeRecorder("SearchShard") + log.Ctx(ctx).Debug("start do search", + zap.String("vChannel", dmlChannel), + zap.Int64s("segmentIDs", req.GetSegmentIDs())) + cluster, ok := qs.clusterService.getShardCluster(dmlChannel) if !ok { failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader @@ -883,12 +885,14 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se return failRet, nil } - searchCtx, cancel := context.WithCancel(ctx) - defer cancel() + var ( + searchCtx, cancel = context.WithCancel(ctx) - var results []*internalpb.SearchResults - var streamingResult *internalpb.SearchResults - var errCluster error + results []*internalpb.SearchResults + streamingResult *internalpb.SearchResults + errCluster error + ) + defer cancel() withStreaming := func(ctx context.Context) error { streamingTask, err := newSearchTask(searchCtx, req) @@ -916,13 +920,12 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se // shard leader dispatches request to its shard cluster results, errCluster = cluster.Search(searchCtx, req, withStreaming) if errCluster != nil { - log.Ctx(ctx).Warn("search cluster failed", zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(errCluster)) + log.Ctx(ctx).Warn("search shard cluster failed", zap.String("vChannel", dmlChannel), zap.Error(errCluster)) failRet.Status.Reason = errCluster.Error() return failRet, nil } - tr.CtxElapse(ctx, fmt.Sprintf("start reduce search result, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v", - msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs())) + tr.CtxElapse(ctx, fmt.Sprintf("do search done in shard cluster, vChannel = %s, segmentIDs = %v", dmlChannel, req.GetSegmentIDs())) results = append(results, streamingResult) ret, err2 := reduceSearchResults(ctx, results, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType()) @@ -931,12 +934,10 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se return failRet, nil } - tr.CtxElapse(ctx, fmt.Sprintf("do search done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v", - msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs())) + tr.CtxElapse(ctx, fmt.Sprintf("do reduce done in shard cluster, vChannel = %s, segmentIDs = %v", dmlChannel, req.GetSegmentIDs())) failRet.Status.ErrorCode = commonpb.ErrorCode_Success - latency := tr.ElapseSpan() - metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.Leader).Observe(float64(latency.Milliseconds())) + metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.Leader).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel).Inc() metrics.QueryNodeSearchNQ.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(req.Req.GetNq())) metrics.QueryNodeSearchTopK.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(req.Req.GetTopk())) @@ -1237,7 +1238,7 @@ func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.S // GetMetrics return system infos of the query node, such as total memory, memory usage, cpu usage ... func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { if !node.isHealthyOrStopping() { - log.Warn("QueryNode.GetMetrics failed", + log.Ctx(ctx).Warn("QueryNode.GetMetrics failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.String("req", req.Request), zap.Error(errQueryNodeIsUnhealthy(paramtable.GetNodeID()))) @@ -1255,7 +1256,7 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR metricType, err := metricsinfo.ParseMetricType(req.Request) if err != nil { - log.Warn("QueryNode.GetMetrics failed to parse metric type", + log.Ctx(ctx).Warn("QueryNode.GetMetrics failed to parse metric type", zap.Int64("nodeId", paramtable.GetNodeID()), zap.String("req", req.Request), zap.Error(err)) @@ -1271,7 +1272,7 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR if metricType == metricsinfo.SystemInfoMetrics { queryNodeMetrics, err := getSystemInfoMetrics(ctx, req, node) if err != nil { - log.Warn("QueryNode.GetMetrics failed", + log.Ctx(ctx).Warn("QueryNode.GetMetrics failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.String("req", req.Request), zap.String("metricType", metricType), @@ -1283,16 +1284,10 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR }, }, nil } - log.Debug("QueryNode.GetMetrics", - zap.Int64("node_id", paramtable.GetNodeID()), - zap.String("req", req.Request), - zap.String("metric_type", metricType), - zap.Any("queryNodeMetrics", queryNodeMetrics)) - return queryNodeMetrics, nil } - log.Debug("QueryNode.GetMetrics failed, request metric type is not implemented yet", + log.Ctx(ctx).Debug("QueryNode.GetMetrics failed, request metric type is not implemented yet", zap.Int64("nodeId", paramtable.GetNodeID()), zap.String("req", req.Request), zap.String("metricType", metricType)) diff --git a/internal/querynode/reduce_test.go b/internal/querynode/reduce_test.go index 4177f5a8f2..b030d5bcdd 100644 --- a/internal/querynode/reduce_test.go +++ b/internal/querynode/reduce_test.go @@ -95,7 +95,7 @@ func TestReduce_AllFunc(t *testing.T) { searchReq.timestamp = 0 assert.NoError(t, err) - searchResult, err := segment.search(searchReq) + searchResult, err := segment.search(ctx, searchReq) assert.NoError(t, err) err = checkSearchResult(nq, plan, searchResult) diff --git a/internal/querynode/result.go b/internal/querynode/result.go index ffa328b9cf..41b6f2f7ce 100644 --- a/internal/querynode/result.go +++ b/internal/querynode/result.go @@ -80,26 +80,20 @@ func reduceStatisticResponse(results []*internalpb.GetStatisticsResponse) (*inte func reduceSearchResults(ctx context.Context, results []*internalpb.SearchResults, nq int64, topk int64, metricType string) (*internalpb.SearchResults, error) { searchResultData, err := decodeSearchResults(results) if err != nil { - log.Ctx(ctx).Warn("shard leader decode search results errors", zap.Error(err)) + log.Ctx(ctx).Warn("decode search results errors", zap.Error(err)) return nil, err } - log.Ctx(ctx).Debug("shard leader get valid search results", zap.Int("numbers", len(searchResultData))) - - for i, sData := range searchResultData { - log.Ctx(ctx).Debug("reduceSearchResultData", - zap.Int("result No.", i), - zap.Int64("nq", sData.NumQueries), - zap.Int64("topk", sData.TopK)) - } + log.Ctx(ctx).Debug("reduceSearchResultData", + zap.Int("numbers", len(searchResultData)), zap.Int64("targetNq", nq), zap.Int64("targetTopk", topk)) reducedResultData, err := reduceSearchResultData(ctx, searchResultData, nq, topk) if err != nil { - log.Ctx(ctx).Warn("shard leader reduce errors", zap.Error(err)) + log.Ctx(ctx).Warn("reduce search results error", zap.Error(err)) return nil, err } searchResults, err := encodeSearchResultData(reducedResultData, nq, topk, metricType) if err != nil { - log.Warn("shard leader encode search result errors", zap.Error(err)) + log.Ctx(ctx).Warn("encode search results error", zap.Error(err)) return nil, err } //if searchResults.SlicedBlob == nil { @@ -178,7 +172,10 @@ func reduceSearchResultData(ctx context.Context, searchResultData []*schemapb.Se // } ret.Topks = append(ret.Topks, j) } - log.Ctx(ctx).Debug("skip duplicated search result", zap.Int64("count", skipDupCnt)) + + if skipDupCnt > 0 { + log.Ctx(ctx).Debug("skip duplicated search result", zap.Int64("count", skipDupCnt)) + } return ret, nil } diff --git a/internal/querynode/search.go b/internal/querynode/search.go index c2e66c69f6..80c32b32fc 100644 --- a/internal/querynode/search.go +++ b/internal/querynode/search.go @@ -22,6 +22,8 @@ import ( "fmt" "sync" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" @@ -32,16 +34,23 @@ import ( // searchOnSegments performs search on listed segments // all segment ids are validated before calling this function func searchSegments(ctx context.Context, replica ReplicaInterface, segType segmentType, searchReq *searchRequest, segIDs []UniqueID) ([]*SearchResult, error) { - // results variables - resultCh := make(chan *SearchResult, len(segIDs)) - errs := make([]error, len(segIDs)) + var ( + // results variables + resultCh = make(chan *SearchResult, len(segIDs)) + errs = make([]error, len(segIDs)) + wg sync.WaitGroup + + // For log only + mu sync.Mutex + segmentsWithoutIndex []UniqueID + ) + searchLabel := metrics.SealedSegmentLabel if segType == commonpb.SegmentState_Growing { searchLabel = metrics.GrowingSegmentLabel } // calling segment search in goroutines - var wg sync.WaitGroup for i, segID := range segIDs { wg.Add(1) go func(segID UniqueID, i int) { @@ -54,9 +63,15 @@ func searchSegments(ctx context.Context, replica ReplicaInterface, segType segme log.Error(err.Error()) // should not happen but still ignore it since the result is still correct return } + + if !seg.hasLoadIndexForIndexedField(searchReq.searchFieldID) { + mu.Lock() + segmentsWithoutIndex = append(segmentsWithoutIndex, segID) + mu.Unlock() + } // record search time tr := timerecord.NewTimeRecorder("searchOnSegments") - searchResult, err := seg.search(searchReq) + searchResult, err := seg.search(ctx, searchReq) errs[i] = err resultCh <- searchResult // update metrics @@ -79,6 +94,10 @@ func searchSegments(ctx context.Context, replica ReplicaInterface, segType segme } } + if len(segmentsWithoutIndex) > 0 { + log.Ctx(ctx).Info("search growing/sealed segments without indexes", zap.Int64s("segmentIDs", segmentsWithoutIndex)) + } + return searchResults, nil } diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index b9b3706849..eeecd24d8a 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -310,7 +310,7 @@ func (s *Segment) getMemSize() int64 { return int64(memoryUsageInBytes) } -func (s *Segment) search(searchReq *searchRequest) (*SearchResult, error) { +func (s *Segment) search(ctx context.Context, searchReq *searchRequest) (*SearchResult, error) { /* CStatus Search(void* plan, @@ -332,7 +332,7 @@ func (s *Segment) search(searchReq *searchRequest) (*SearchResult, error) { loadIndex := s.hasLoadIndexForIndexedField(searchReq.searchFieldID) var searchResult SearchResult - log.Debug("start do search on segment", + log.Ctx(ctx).Debug("start do search on segment", zap.Int64("msgID", searchReq.msgID), zap.Int64("segmentID", s.segmentID), zap.String("segmentType", s.segmentType.String()), @@ -345,7 +345,7 @@ func (s *Segment) search(searchReq *searchRequest) (*SearchResult, error) { if err := HandleCStatus(&status, "Search failed"); err != nil { return nil, err } - log.Debug("do search on segment done", + log.Ctx(ctx).Debug("do search on segment done", zap.Int64("msgID", searchReq.msgID), zap.Int64("segmentID", s.segmentID), zap.String("segmentType", s.segmentType.String()), diff --git a/internal/querynode/segment_test.go b/internal/querynode/segment_test.go index 15a17a2412..3b38a618f5 100644 --- a/internal/querynode/segment_test.go +++ b/internal/querynode/segment_test.go @@ -425,7 +425,7 @@ func TestSegment_segmentSearch(t *testing.T) { req, err := parseSearchRequest(plan, placeGroupByte) assert.NoError(t, err) - searchResult, err := segment.search(req) + searchResult, err := segment.search(ctx, req) assert.NoError(t, err) err = checkSearchResult(nq, plan, searchResult) diff --git a/internal/querynode/validate.go b/internal/querynode/validate.go index 37d40aa9e1..e87c08ccaf 100644 --- a/internal/querynode/validate.go +++ b/internal/querynode/validate.go @@ -47,7 +47,7 @@ func validateOnHistoricalReplica(ctx context.Context, replica ReplicaInterface, } } - log.Ctx(ctx).Debug("read target partitions", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", searchPartIDs)) + log.Ctx(ctx).Debug("read target partitions on historical replica", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", searchPartIDs)) col, err2 := replica.getCollectionByID(collectionID) if err2 != nil { return searchPartIDs, segmentIDs, err2 @@ -55,8 +55,8 @@ func validateOnHistoricalReplica(ctx context.Context, replica ReplicaInterface, // all partitions have been released if len(searchPartIDs) == 0 && col.getLoadType() == loadTypePartition { - return searchPartIDs, segmentIDs, errors.New("partitions have been released , collectionID = " + - fmt.Sprintln(collectionID) + "target partitionIDs = " + fmt.Sprintln(searchPartIDs)) + return searchPartIDs, segmentIDs, + fmt.Errorf("partitions have been released , collectionID = %d, target partitionID = %v", collectionID, searchPartIDs) } if len(searchPartIDs) == 0 && col.getLoadType() == loadTypeCollection { return searchPartIDs, segmentIDs, nil @@ -112,7 +112,7 @@ func validateOnStreamReplica(ctx context.Context, replica ReplicaInterface, coll } } - log.Ctx(ctx).Debug("read target partitions", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", searchPartIDs)) + log.Ctx(ctx).Debug("read target partitions on stream replica", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", searchPartIDs)) col, err2 := replica.getCollectionByID(collectionID) if err2 != nil { return searchPartIDs, segmentIDs, err2 @@ -120,19 +120,13 @@ func validateOnStreamReplica(ctx context.Context, replica ReplicaInterface, coll // all partitions have been released if len(searchPartIDs) == 0 && col.getLoadType() == loadTypePartition { - return searchPartIDs, segmentIDs, errors.New("partitions have been released , collectionID = " + - fmt.Sprintln(collectionID) + "target partitionIDs = " + fmt.Sprintln(searchPartIDs)) + return searchPartIDs, segmentIDs, + fmt.Errorf("partitions have been released , collectionID = %d, target partitionIDs = %v", collectionID, searchPartIDs) } if len(searchPartIDs) == 0 && col.getLoadType() == loadTypeCollection { return searchPartIDs, segmentIDs, nil } segmentIDs, err = replica.getSegmentIDsByVChannel(searchPartIDs, vChannel, segmentTypeGrowing) - log.Ctx(ctx).Debug("validateOnStreamReplica getSegmentIDsByVChannel", - zap.Any("collectionID", collectionID), - zap.Any("vChannel", vChannel), - zap.Any("partitionIDs", searchPartIDs), - zap.Any("segmentIDs", segmentIDs), - zap.Error(err)) - return searchPartIDs, segmentIDs, nil + return searchPartIDs, segmentIDs, err } diff --git a/internal/util/timerecord/time_recorder.go b/internal/util/timerecord/time_recorder.go index e744ae5bb0..2a023d8548 100644 --- a/internal/util/timerecord/time_recorder.go +++ b/internal/util/timerecord/time_recorder.go @@ -91,7 +91,7 @@ func (tr *TimeRecorder) CtxElapse(ctx context.Context, msg string) time.Duration func (tr *TimeRecorder) printTimeRecord(ctx context.Context, msg string, span time.Duration) { log.Ctx(ctx).WithOptions(zap.AddCallerSkip(2)). - Debug(fmt.Sprintf("timerecorder %s: record elapsed duration", tr.header), + Debug(fmt.Sprintf("tr/%s", tr.header), zap.String("msg", msg), zap.Duration("duration", span), )