mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
Skip search GRPC call for standalon (#21383)
Signed-off-by: Li Liu <li.liu@zilliz.com>
This commit is contained in:
parent
2146af1fb2
commit
cc5ecabac6
@ -10,6 +10,7 @@ import (
|
|||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/common"
|
"github.com/milvus-io/milvus/internal/common"
|
||||||
"github.com/milvus-io/milvus/internal/parser/planparserv2"
|
"github.com/milvus-io/milvus/internal/parser/planparserv2"
|
||||||
|
"github.com/milvus-io/milvus/internal/querynode"
|
||||||
|
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -488,7 +489,16 @@ func (t *searchTask) searchShard(ctx context.Context, nodeID int64, qn types.Que
|
|||||||
DmlChannels: channelIDs,
|
DmlChannels: channelIDs,
|
||||||
Scope: querypb.DataScope_All,
|
Scope: querypb.DataScope_All,
|
||||||
}
|
}
|
||||||
result, err := qn.Search(ctx, req)
|
|
||||||
|
queryNode := querynode.GetQueryNode()
|
||||||
|
var result *internalpb.SearchResults
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if queryNode != nil && queryNode.IsStandAlone {
|
||||||
|
result, err = queryNode.Search(ctx, req)
|
||||||
|
} else {
|
||||||
|
result, err = qn.Search(ctx, req)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Ctx(ctx).Warn("QueryNode search return error",
|
log.Ctx(ctx).Warn("QueryNode search return error",
|
||||||
zap.Int64("nodeID", nodeID),
|
zap.Int64("nodeID", nodeID),
|
||||||
|
|||||||
@ -731,7 +731,7 @@ func filterSegmentInfo(segmentInfos []*querypb.SegmentInfo, segmentIDs map[int64
|
|||||||
|
|
||||||
// Search performs replica search tasks.
|
// Search performs replica search tasks.
|
||||||
func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
|
func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
|
||||||
if req.GetReq().GetBase().GetTargetID() != paramtable.GetNodeID() {
|
if !node.IsStandAlone && req.GetReq().GetBase().GetTargetID() != paramtable.GetNodeID() {
|
||||||
return &internalpb.SearchResults{
|
return &internalpb.SearchResults{
|
||||||
Status: &commonpb.Status{
|
Status: &commonpb.Status{
|
||||||
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
|
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
|
||||||
|
|||||||
@ -123,22 +123,32 @@ type QueryNode struct {
|
|||||||
|
|
||||||
// pool for load/release channel
|
// pool for load/release channel
|
||||||
taskPool *concurrency.Pool
|
taskPool *concurrency.Pool
|
||||||
|
|
||||||
|
IsStandAlone bool
|
||||||
|
}
|
||||||
|
|
||||||
|
var queryNode *QueryNode = nil
|
||||||
|
|
||||||
|
func GetQueryNode() *QueryNode {
|
||||||
|
return queryNode
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewQueryNode will return a QueryNode with abnormal state.
|
// NewQueryNode will return a QueryNode with abnormal state.
|
||||||
func NewQueryNode(ctx context.Context, factory dependency.Factory) *QueryNode {
|
func NewQueryNode(ctx context.Context, factory dependency.Factory) *QueryNode {
|
||||||
ctx1, cancel := context.WithCancel(ctx)
|
ctx1, cancel := context.WithCancel(ctx)
|
||||||
node := &QueryNode{
|
|
||||||
|
queryNode = &QueryNode{
|
||||||
queryNodeLoopCtx: ctx1,
|
queryNodeLoopCtx: ctx1,
|
||||||
queryNodeLoopCancel: cancel,
|
queryNodeLoopCancel: cancel,
|
||||||
factory: factory,
|
factory: factory,
|
||||||
|
IsStandAlone: os.Getenv(metricsinfo.DeployModeEnvKey) == metricsinfo.StandaloneDeployMode,
|
||||||
}
|
}
|
||||||
|
|
||||||
node.tSafeReplica = newTSafeReplica()
|
queryNode.tSafeReplica = newTSafeReplica()
|
||||||
node.scheduler = newTaskScheduler(ctx1, node.tSafeReplica)
|
queryNode.scheduler = newTaskScheduler(ctx1, queryNode.tSafeReplica)
|
||||||
node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
queryNode.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||||
|
|
||||||
return node
|
return queryNode
|
||||||
}
|
}
|
||||||
|
|
||||||
func (node *QueryNode) initSession() error {
|
func (node *QueryNode) initSession() error {
|
||||||
|
|||||||
@ -977,7 +977,17 @@ func (sc *ShardCluster) Search(ctx context.Context, req *querypb.SearchRequest,
|
|||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
partialResult, nodeErr := node.client.Search(reqCtx, nodeReq)
|
|
||||||
|
queryNode := GetQueryNode()
|
||||||
|
var partialResult *internalpb.SearchResults
|
||||||
|
var nodeErr error
|
||||||
|
|
||||||
|
if queryNode != nil && queryNode.IsStandAlone {
|
||||||
|
partialResult, nodeErr = queryNode.Search(reqCtx, nodeReq)
|
||||||
|
} else {
|
||||||
|
partialResult, nodeErr = node.client.Search(reqCtx, nodeReq)
|
||||||
|
}
|
||||||
|
|
||||||
resultMut.Lock()
|
resultMut.Lock()
|
||||||
defer resultMut.Unlock()
|
defer resultMut.Unlock()
|
||||||
if nodeErr != nil || partialResult.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
if nodeErr != nil || partialResult.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user