diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 111c675205..5af7543f3f 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -10,6 +10,7 @@ import ( "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/parser/planparserv2" + "github.com/milvus-io/milvus/internal/querynode" "github.com/golang/protobuf/proto" "go.uber.org/zap" @@ -488,7 +489,16 @@ func (t *searchTask) searchShard(ctx context.Context, nodeID int64, qn types.Que DmlChannels: channelIDs, Scope: querypb.DataScope_All, } - result, err := qn.Search(ctx, req) + + queryNode := querynode.GetQueryNode() + var result *internalpb.SearchResults + var err error + + if queryNode != nil && queryNode.IsStandAlone { + result, err = queryNode.Search(ctx, req) + } else { + result, err = qn.Search(ctx, req) + } if err != nil { log.Ctx(ctx).Warn("QueryNode search return error", zap.Int64("nodeID", nodeID), diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index d4bb0540dc..ec2c198d93 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -731,7 +731,7 @@ func filterSegmentInfo(segmentInfos []*querypb.SegmentInfo, segmentIDs map[int64 // Search performs replica search tasks. func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) { - if req.GetReq().GetBase().GetTargetID() != paramtable.GetNodeID() { + if !node.IsStandAlone && req.GetReq().GetBase().GetTargetID() != paramtable.GetNodeID() { return &internalpb.SearchResults{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_NodeIDNotMatch, diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index c3e18fd074..47b0ef0b9d 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -123,22 +123,32 @@ type QueryNode struct { // pool for load/release channel taskPool *concurrency.Pool + + IsStandAlone bool +} + +var queryNode *QueryNode = nil + +func GetQueryNode() *QueryNode { + return queryNode } // NewQueryNode will return a QueryNode with abnormal state. func NewQueryNode(ctx context.Context, factory dependency.Factory) *QueryNode { ctx1, cancel := context.WithCancel(ctx) - node := &QueryNode{ + + queryNode = &QueryNode{ queryNodeLoopCtx: ctx1, queryNodeLoopCancel: cancel, factory: factory, + IsStandAlone: os.Getenv(metricsinfo.DeployModeEnvKey) == metricsinfo.StandaloneDeployMode, } - node.tSafeReplica = newTSafeReplica() - node.scheduler = newTaskScheduler(ctx1, node.tSafeReplica) - node.UpdateStateCode(commonpb.StateCode_Abnormal) + queryNode.tSafeReplica = newTSafeReplica() + queryNode.scheduler = newTaskScheduler(ctx1, queryNode.tSafeReplica) + queryNode.UpdateStateCode(commonpb.StateCode_Abnormal) - return node + return queryNode } func (node *QueryNode) initSession() error { diff --git a/internal/querynode/shard_cluster.go b/internal/querynode/shard_cluster.go index 44171828b4..72aa8a2464 100644 --- a/internal/querynode/shard_cluster.go +++ b/internal/querynode/shard_cluster.go @@ -977,7 +977,17 @@ func (sc *ShardCluster) Search(ctx context.Context, req *querypb.SearchRequest, wg.Add(1) go func() { defer wg.Done() - partialResult, nodeErr := node.client.Search(reqCtx, nodeReq) + + queryNode := GetQueryNode() + var partialResult *internalpb.SearchResults + var nodeErr error + + if queryNode != nil && queryNode.IsStandAlone { + partialResult, nodeErr = queryNode.Search(reqCtx, nodeReq) + } else { + partialResult, nodeErr = node.client.Search(reqCtx, nodeReq) + } + resultMut.Lock() defer resultMut.Unlock() if nodeErr != nil || partialResult.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {