diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index 9273f405eb..192e78ba03 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -702,13 +702,6 @@ func (node *QueryNode) isHealthy() bool { // Search performs replica search tasks. func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) { - log.Debug("Received SearchRequest", - zap.Int64("msgID", req.GetReq().GetBase().GetMsgID()), - zap.Strings("vChannels", req.GetDmlChannels()), - zap.Int64s("segmentIDs", req.GetSegmentIDs()), - zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()), - zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp())) - if req.GetReq().GetBase().GetTargetID() != node.session.ServerID { return &internalpb.SearchResults{ Status: &commonpb.Status{ @@ -725,6 +718,16 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) ( ErrorCode: commonpb.ErrorCode_Success, }, } + + tr := timerecord.NewTimeRecorder("Search") + if !req.GetFromShardLeader() { + log.Ctx(ctx).Debug("Received SearchRequest", + zap.Strings("vChannels", req.GetDmlChannels()), + zap.Int64s("segmentIDs", req.GetSegmentIDs()), + zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()), + zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp())) + } + toReduceResults := make([]*internalpb.SearchResults, 0) runningGp, runningCtx := errgroup.WithContext(ctx) mu := &sync.Mutex{} @@ -758,6 +761,7 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) ( if err := runningGp.Wait(); err != nil { return failRet, nil } + ret, err := reduceSearchResults(ctx, toReduceResults, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType()) if err != nil { failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError @@ -766,6 +770,7 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) ( } if !req.FromShardLeader { + tr.CtxElapse(ctx, "search done in all shards") rateCol.Add(metricsinfo.NQPerSecond, float64(req.GetReq().GetNq())) rateCol.Add(metricsinfo.SearchThroughput, float64(proto.Size(req))) metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(Params.QueryNodeCfg.GetNodeID(), 10), metrics.SearchLabel).Add(float64(proto.Size(req))) @@ -791,15 +796,6 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se return failRet, nil } - msgID := req.GetReq().GetBase().GetMsgID() - log.Ctx(ctx).Debug("Received SearchRequest", - zap.Int64("msgID", msgID), - zap.Bool("fromShardLeader", req.GetFromShardLeader()), - zap.String("vChannel", dmlChannel), - zap.Int64s("segmentIDs", req.GetSegmentIDs()), - zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()), - zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp())) - if node.queryShardService == nil { failRet.Status.Reason = "queryShardService is nil" return failRet, nil @@ -808,22 +804,20 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se qs, err := node.queryShardService.getQueryShard(dmlChannel) if err != nil { log.Ctx(ctx).Warn("Search failed, failed to get query shard", - zap.Int64("msgID", msgID), - zap.String("dml channel", dmlChannel), + zap.String("vChannel", dmlChannel), zap.Error(err)) failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader failRet.Status.Reason = err.Error() return failRet, nil } - log.Ctx(ctx).Debug("start do search", - zap.Int64("msgID", msgID), - zap.Bool("fromShardLeader", req.GetFromShardLeader()), - zap.String("vChannel", dmlChannel), - zap.Int64s("segmentIDs", req.GetSegmentIDs())) - tr := timerecord.NewTimeRecorder("") - if req.FromShardLeader { + tr := timerecord.NewTimeRecorder("SubSearch") + log.Ctx(ctx).Debug("start do subsearch", + zap.Bool("fromShardLeader", req.GetFromShardLeader()), + zap.String("vChannel", dmlChannel), + zap.Int64s("segmentIDs", req.GetSegmentIDs())) + historicalTask, err2 := newSearchTask(ctx, req) if err2 != nil { failRet.Status.Reason = err2.Error() @@ -843,8 +837,7 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se return failRet, nil } - tr.CtxElapse(ctx, fmt.Sprintf("do search done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v", - msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs())) + tr.CtxElapse(ctx, fmt.Sprintf("do subsearch done, vChannel = %s, segmentIDs = %v", dmlChannel, req.GetSegmentIDs())) failRet.Status.ErrorCode = commonpb.ErrorCode_Success metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), @@ -858,6 +851,11 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se } //from Proxy + tr := timerecord.NewTimeRecorder("SearchShard") + log.Ctx(ctx).Debug("start do search", + zap.String("vChannel", dmlChannel), + zap.Int64s("segmentIDs", req.GetSegmentIDs())) + cluster, ok := qs.clusterService.getShardCluster(dmlChannel) if !ok { failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader @@ -865,12 +863,14 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se return failRet, nil } - searchCtx, cancel := context.WithCancel(ctx) - defer cancel() + var ( + searchCtx, cancel = context.WithCancel(ctx) - var results []*internalpb.SearchResults - var streamingResult *internalpb.SearchResults - var errCluster error + results []*internalpb.SearchResults + streamingResult *internalpb.SearchResults + errCluster error + ) + defer cancel() withStreaming := func(ctx context.Context) error { streamingTask, err := newSearchTask(searchCtx, req) @@ -898,13 +898,12 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se // shard leader dispatches request to its shard cluster results, errCluster = cluster.Search(searchCtx, req, withStreaming) if errCluster != nil { - log.Ctx(ctx).Warn("search cluster failed", zap.Int64("msgID", msgID), zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(errCluster)) + log.Ctx(ctx).Warn("search shard cluster failed", zap.String("vChannel", dmlChannel), zap.Error(errCluster)) failRet.Status.Reason = errCluster.Error() return failRet, nil } - tr.CtxElapse(ctx, fmt.Sprintf("start reduce search result, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v", - msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs())) + tr.CtxElapse(ctx, fmt.Sprintf("do search done in shard cluster, vChannel = %s, segmentIDs = %v", dmlChannel, req.GetSegmentIDs())) results = append(results, streamingResult) ret, err2 := reduceSearchResults(ctx, results, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType()) @@ -913,8 +912,7 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se return failRet, nil } - tr.CtxElapse(ctx, fmt.Sprintf("do search done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v", - msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs())) + tr.CtxElapse(ctx, fmt.Sprintf("do reduce done in shard cluster, vChannel = %s, segmentIDs = %v", dmlChannel, req.GetSegmentIDs())) failRet.Status.ErrorCode = commonpb.ErrorCode_Success latency := tr.ElapseSpan() @@ -1196,7 +1194,7 @@ func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.S // GetMetrics return system infos of the query node, such as total memory, memory usage, cpu usage ... func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { if !node.isHealthy() { - log.Warn("QueryNode.GetMetrics failed", + log.Ctx(ctx).Warn("QueryNode.GetMetrics failed", zap.Int64("nodeId", Params.QueryNodeCfg.GetNodeID()), zap.String("req", req.Request), zap.Error(errQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID()))) @@ -1212,7 +1210,7 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR metricType, err := metricsinfo.ParseMetricType(req.Request) if err != nil { - log.Warn("QueryNode.GetMetrics failed to parse metric type", + log.Ctx(ctx).Warn("QueryNode.GetMetrics failed to parse metric type", zap.Int64("nodeId", Params.QueryNodeCfg.GetNodeID()), zap.String("req", req.Request), zap.Error(err)) @@ -1228,8 +1226,8 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR if metricType == metricsinfo.SystemInfoMetrics { queryNodeMetrics, err := getSystemInfoMetrics(ctx, req, node) if err != nil { - log.Warn("QueryNode.GetMetrics failed", - zap.Int64("nodeId", Params.QueryNodeCfg.GetNodeID()), + log.Ctx(ctx).Warn("QueryNode.GetMetrics failed", + zap.Int64("NodeId", Params.QueryNodeCfg.GetNodeID()), zap.String("req", req.Request), zap.String("metricType", metricType), zap.Error(err)) @@ -1240,17 +1238,11 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR }, }, nil } - log.Debug("QueryNode.GetMetrics", - zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()), - zap.String("req", req.Request), - zap.String("metric_type", metricType), - zap.Any("queryNodeMetrics", queryNodeMetrics)) - return queryNodeMetrics, nil } - log.Debug("QueryNode.GetMetrics failed, request metric type is not implemented yet", - zap.Int64("nodeId", Params.QueryNodeCfg.GetNodeID()), + log.Ctx(ctx).Debug("QueryNode.GetMetrics failed, request metric type is not implemented yet", + zap.Int64("NodeId", Params.QueryNodeCfg.GetNodeID()), zap.String("req", req.Request), zap.String("metricType", metricType)) diff --git a/internal/querynode/reduce_test.go b/internal/querynode/reduce_test.go index f3b355e3ea..344769bc3c 100644 --- a/internal/querynode/reduce_test.go +++ b/internal/querynode/reduce_test.go @@ -93,7 +93,7 @@ func TestReduce_AllFunc(t *testing.T) { searchReq.timestamp = 0 assert.NoError(t, err) - searchResult, err := segment.search(searchReq) + searchResult, err := segment.search(ctx, searchReq) assert.NoError(t, err) err = checkSearchResult(nq, plan, searchResult) diff --git a/internal/querynode/result.go b/internal/querynode/result.go index d95e8c3019..a2524b8180 100644 --- a/internal/querynode/result.go +++ b/internal/querynode/result.go @@ -80,26 +80,20 @@ func reduceStatisticResponse(results []*internalpb.GetStatisticsResponse) (*inte func reduceSearchResults(ctx context.Context, results []*internalpb.SearchResults, nq int64, topk int64, metricType string) (*internalpb.SearchResults, error) { searchResultData, err := decodeSearchResults(results) if err != nil { - log.Ctx(ctx).Warn("shard leader decode search results errors", zap.Error(err)) + log.Ctx(ctx).Warn("decode search results errors", zap.Error(err)) return nil, err } - log.Ctx(ctx).Debug("shard leader get valid search results", zap.Int("numbers", len(searchResultData))) - - for i, sData := range searchResultData { - log.Ctx(ctx).Debug("reduceSearchResultData", - zap.Int("result No.", i), - zap.Int64("nq", sData.NumQueries), - zap.Int64("topk", sData.TopK)) - } + log.Ctx(ctx).Debug("reduceSearchResultData", + zap.Int("numbers", len(searchResultData)), zap.Int64("targetNq", nq), zap.Int64("targetTopk", topk)) reducedResultData, err := reduceSearchResultData(ctx, searchResultData, nq, topk) if err != nil { - log.Ctx(ctx).Warn("shard leader reduce errors", zap.Error(err)) + log.Ctx(ctx).Warn("reduce search results error", zap.Error(err)) return nil, err } searchResults, err := encodeSearchResultData(reducedResultData, nq, topk, metricType) if err != nil { - log.Warn("shard leader encode search result errors", zap.Error(err)) + log.Ctx(ctx).Warn("encode search results error", zap.Error(err)) return nil, err } //if searchResults.SlicedBlob == nil { @@ -178,7 +172,10 @@ func reduceSearchResultData(ctx context.Context, searchResultData []*schemapb.Se // } ret.Topks = append(ret.Topks, j) } - log.Ctx(ctx).Debug("skip duplicated search result", zap.Int64("count", skipDupCnt)) + + if skipDupCnt > 0 { + log.Ctx(ctx).Debug("skip duplicated search result", zap.Int64("count", skipDupCnt)) + } return ret, nil } diff --git a/internal/querynode/search.go b/internal/querynode/search.go index e885e8dd8f..27ea26e264 100644 --- a/internal/querynode/search.go +++ b/internal/querynode/search.go @@ -22,6 +22,8 @@ import ( "fmt" "sync" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" @@ -31,16 +33,23 @@ import ( // searchOnSegments performs search on listed segments // all segment ids are validated before calling this function func searchSegments(ctx context.Context, replica ReplicaInterface, segType segmentType, searchReq *searchRequest, segIDs []UniqueID) ([]*SearchResult, error) { - // results variables - resultCh := make(chan *SearchResult, len(segIDs)) - errs := make([]error, len(segIDs)) + var ( + // results variables + resultCh = make(chan *SearchResult, len(segIDs)) + errs = make([]error, len(segIDs)) + wg sync.WaitGroup + + // For log only + mu sync.Mutex + segmentsWithoutIndex []UniqueID + ) + searchLabel := metrics.SealedSegmentLabel if segType == commonpb.SegmentState_Growing { searchLabel = metrics.GrowingSegmentLabel } // calling segment search in goroutines - var wg sync.WaitGroup for i, segID := range segIDs { wg.Add(1) go func(segID UniqueID, i int) { @@ -53,9 +62,15 @@ func searchSegments(ctx context.Context, replica ReplicaInterface, segType segme log.Error(err.Error()) // should not happen but still ignore it since the result is still correct return } + + if !seg.hasLoadIndexForIndexedField(searchReq.searchFieldID) { + mu.Lock() + segmentsWithoutIndex = append(segmentsWithoutIndex, segID) + mu.Unlock() + } // record search time tr := timerecord.NewTimeRecorder("searchOnSegments") - searchResult, err := seg.search(searchReq) + searchResult, err := seg.search(ctx, searchReq) errs[i] = err resultCh <- searchResult // update metrics @@ -78,6 +93,10 @@ func searchSegments(ctx context.Context, replica ReplicaInterface, segType segme } } + if len(segmentsWithoutIndex) > 0 { + log.Ctx(ctx).Info("search growing/sealed segments without indexes", zap.Int64s("segmentIDs", segmentsWithoutIndex)) + } + return searchResults, nil } diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 47827854f3..f229c0cb3c 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -305,7 +305,7 @@ func (s *Segment) getMemSize() int64 { return int64(memoryUsageInBytes) } -func (s *Segment) search(searchReq *searchRequest) (*SearchResult, error) { +func (s *Segment) search(ctx context.Context, searchReq *searchRequest) (*SearchResult, error) { /* CStatus Search(void* plan, @@ -327,7 +327,7 @@ func (s *Segment) search(searchReq *searchRequest) (*SearchResult, error) { loadIndex := s.hasLoadIndexForIndexedField(searchReq.searchFieldID) var searchResult SearchResult - log.Debug("start do search on segment", + log.Ctx(ctx).Debug("start do search on segment", zap.Int64("msgID", searchReq.msgID), zap.Int64("segmentID", s.segmentID), zap.String("segmentType", s.segmentType.String()), @@ -341,7 +341,7 @@ func (s *Segment) search(searchReq *searchRequest) (*SearchResult, error) { if err := HandleCStatus(&status, "Search failed"); err != nil { return nil, err } - log.Debug("do search on segment done", + log.Ctx(ctx).Debug("do search on segment done", zap.Int64("msgID", searchReq.msgID), zap.Int64("segmentID", s.segmentID), zap.String("segmentType", s.segmentType.String()), diff --git a/internal/querynode/segment_test.go b/internal/querynode/segment_test.go index c977210fe7..22665d23b4 100644 --- a/internal/querynode/segment_test.go +++ b/internal/querynode/segment_test.go @@ -424,7 +424,7 @@ func TestSegment_segmentSearch(t *testing.T) { req, err := parseSearchRequest(plan, placeGroupByte) assert.NoError(t, err) - searchResult, err := segment.search(req) + searchResult, err := segment.search(ctx, req) assert.NoError(t, err) err = checkSearchResult(nq, plan, searchResult) diff --git a/internal/querynode/validate.go b/internal/querynode/validate.go index 37d40aa9e1..e87c08ccaf 100644 --- a/internal/querynode/validate.go +++ b/internal/querynode/validate.go @@ -47,7 +47,7 @@ func validateOnHistoricalReplica(ctx context.Context, replica ReplicaInterface, } } - log.Ctx(ctx).Debug("read target partitions", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", searchPartIDs)) + log.Ctx(ctx).Debug("read target partitions on historical replica", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", searchPartIDs)) col, err2 := replica.getCollectionByID(collectionID) if err2 != nil { return searchPartIDs, segmentIDs, err2 @@ -55,8 +55,8 @@ func validateOnHistoricalReplica(ctx context.Context, replica ReplicaInterface, // all partitions have been released if len(searchPartIDs) == 0 && col.getLoadType() == loadTypePartition { - return searchPartIDs, segmentIDs, errors.New("partitions have been released , collectionID = " + - fmt.Sprintln(collectionID) + "target partitionIDs = " + fmt.Sprintln(searchPartIDs)) + return searchPartIDs, segmentIDs, + fmt.Errorf("partitions have been released , collectionID = %d, target partitionID = %v", collectionID, searchPartIDs) } if len(searchPartIDs) == 0 && col.getLoadType() == loadTypeCollection { return searchPartIDs, segmentIDs, nil @@ -112,7 +112,7 @@ func validateOnStreamReplica(ctx context.Context, replica ReplicaInterface, coll } } - log.Ctx(ctx).Debug("read target partitions", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", searchPartIDs)) + log.Ctx(ctx).Debug("read target partitions on stream replica", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", searchPartIDs)) col, err2 := replica.getCollectionByID(collectionID) if err2 != nil { return searchPartIDs, segmentIDs, err2 @@ -120,19 +120,13 @@ func validateOnStreamReplica(ctx context.Context, replica ReplicaInterface, coll // all partitions have been released if len(searchPartIDs) == 0 && col.getLoadType() == loadTypePartition { - return searchPartIDs, segmentIDs, errors.New("partitions have been released , collectionID = " + - fmt.Sprintln(collectionID) + "target partitionIDs = " + fmt.Sprintln(searchPartIDs)) + return searchPartIDs, segmentIDs, + fmt.Errorf("partitions have been released , collectionID = %d, target partitionIDs = %v", collectionID, searchPartIDs) } if len(searchPartIDs) == 0 && col.getLoadType() == loadTypeCollection { return searchPartIDs, segmentIDs, nil } segmentIDs, err = replica.getSegmentIDsByVChannel(searchPartIDs, vChannel, segmentTypeGrowing) - log.Ctx(ctx).Debug("validateOnStreamReplica getSegmentIDsByVChannel", - zap.Any("collectionID", collectionID), - zap.Any("vChannel", vChannel), - zap.Any("partitionIDs", searchPartIDs), - zap.Any("segmentIDs", segmentIDs), - zap.Error(err)) - return searchPartIDs, segmentIDs, nil + return searchPartIDs, segmentIDs, err } diff --git a/internal/util/timerecord/time_recorder.go b/internal/util/timerecord/time_recorder.go index e744ae5bb0..2a023d8548 100644 --- a/internal/util/timerecord/time_recorder.go +++ b/internal/util/timerecord/time_recorder.go @@ -91,7 +91,7 @@ func (tr *TimeRecorder) CtxElapse(ctx context.Context, msg string) time.Duration func (tr *TimeRecorder) printTimeRecord(ctx context.Context, msg string, span time.Duration) { log.Ctx(ctx).WithOptions(zap.AddCallerSkip(2)). - Debug(fmt.Sprintf("timerecorder %s: record elapsed duration", tr.header), + Debug(fmt.Sprintf("tr/%s", tr.header), zap.String("msg", msg), zap.Duration("duration", span), )