mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 09:08:43 +08:00
enhance: support proxy DQL forward (#46036)
issue: #45812 Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
354ab2f55e
commit
c22cdbbf9a
@ -383,7 +383,7 @@ func wrapperProxyWithLimit(ctx context.Context, ginCtx *gin.Context, req any, ch
|
||||
}
|
||||
|
||||
forwardHandler := func(reqCtx context.Context, req any) (any, error) {
|
||||
interceptor := streaming.ForwardDMLToLegacyProxyUnaryServerInterceptor()
|
||||
interceptor := streaming.ForwardLegacyProxyUnaryServerInterceptor()
|
||||
return interceptor(reqCtx, req, &grpc.UnaryServerInfo{FullMethod: fullMethod}, func(ctx context.Context, req any) (interface{}, error) {
|
||||
return handler(ctx, req)
|
||||
})
|
||||
|
||||
@ -240,7 +240,7 @@ func (s *Server) startExternalGrpc(errChan chan error) {
|
||||
var unaryServerOption grpc.ServerOption
|
||||
if enableCustomInterceptor {
|
||||
unaryServerOption = grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
|
||||
streaming.ForwardDMLToLegacyProxyUnaryServerInterceptor(),
|
||||
streaming.ForwardLegacyProxyUnaryServerInterceptor(),
|
||||
proxy.DatabaseInterceptor(),
|
||||
UnaryRequestStatsInterceptor,
|
||||
accesslog.UnaryAccessLogInterceptor,
|
||||
|
||||
@ -60,7 +60,7 @@ func newForwardService(streamingCoordClient client.Client) *forwardServiceImpl {
|
||||
}
|
||||
|
||||
type ForwardService interface {
|
||||
ForwardDMLToLegacyProxy(ctx context.Context, request any) (any, error)
|
||||
ForwardLegacyProxy(ctx context.Context, request any) (any, error)
|
||||
}
|
||||
|
||||
// forwardServiceImpl is the implementation of FallbackService.
|
||||
@ -74,13 +74,13 @@ type forwardServiceImpl struct {
|
||||
rb resolver.Builder
|
||||
}
|
||||
|
||||
// ForwardDMLToLegacyProxy forwards the DML request to the legacy proxy.
|
||||
func (fs *forwardServiceImpl) ForwardDMLToLegacyProxy(ctx context.Context, request any) (any, error) {
|
||||
// ForwardLegacyProxy forwards the request to the legacy proxy.
|
||||
func (fs *forwardServiceImpl) ForwardLegacyProxy(ctx context.Context, request any) (any, error) {
|
||||
if err := fs.checkIfForwardDisabledWithLock(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return fs.forwardDMLToLegacyProxy(ctx, request)
|
||||
return fs.forwardLegacyProxy(ctx, request)
|
||||
}
|
||||
|
||||
// checkIfForwardDisabledWithLock checks if the forward is disabled with lock.
|
||||
@ -91,8 +91,8 @@ func (fs *forwardServiceImpl) checkIfForwardDisabledWithLock(ctx context.Context
|
||||
return fs.checkIfForwardDisabled(ctx)
|
||||
}
|
||||
|
||||
// forwardDMLToLegacyProxy forwards the DML request to the legacy proxy.
|
||||
func (fs *forwardServiceImpl) forwardDMLToLegacyProxy(ctx context.Context, request any) (any, error) {
|
||||
// forwardLegacyProxy forwards the request to the legacy proxy.
|
||||
func (fs *forwardServiceImpl) forwardLegacyProxy(ctx context.Context, request any) (any, error) {
|
||||
s, err := fs.getLegacyProxyService(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -106,6 +106,12 @@ func (fs *forwardServiceImpl) forwardDMLToLegacyProxy(ctx context.Context, reque
|
||||
result, err = s.Delete(ctx, req)
|
||||
case *milvuspb.UpsertRequest:
|
||||
result, err = s.Upsert(ctx, req)
|
||||
case *milvuspb.SearchRequest:
|
||||
result, err = s.Search(ctx, req)
|
||||
case *milvuspb.HybridSearchRequest:
|
||||
result, err = s.HybridSearch(ctx, req)
|
||||
case *milvuspb.QueryRequest:
|
||||
result, err = s.Query(ctx, req)
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported request type: %T", request))
|
||||
}
|
||||
@ -178,7 +184,7 @@ func (fs *forwardServiceImpl) initLegacyProxy() {
|
||||
})
|
||||
fs.legacyProxy = lazygrpc.WithServiceCreator(conn, milvuspb.NewMilvusServiceClient)
|
||||
fs.rb = rb
|
||||
fs.Logger().Info("streaming service is not ready, legacy proxy is initiated to forward DML request", zap.Int("proxyPort", port))
|
||||
fs.Logger().Info("streaming service is not ready, legacy proxy is initiated to forward request", zap.Int("proxyPort", port))
|
||||
}
|
||||
|
||||
// getDialOptions returns the dial options for the legacy proxy.
|
||||
@ -236,21 +242,24 @@ func (fs *forwardServiceImpl) markForwardDisabled() {
|
||||
}
|
||||
}
|
||||
|
||||
// ForwardDMLToLegacyProxyUnaryServerInterceptor forwards the DML request to the legacy proxy.
|
||||
// ForwardLegacyProxyUnaryServerInterceptor forwards the request to the legacy proxy.
|
||||
// When upgrading from 2.5.x to 2.6.x, the streaming service is not ready yet,
|
||||
// the dml cannot be executed at new 2.6.x proxy until all 2.5.x proxies are down.
|
||||
//
|
||||
// so we need to forward the request to the 2.5.x proxy.
|
||||
func ForwardDMLToLegacyProxyUnaryServerInterceptor() grpc.UnaryServerInterceptor {
|
||||
func ForwardLegacyProxyUnaryServerInterceptor() grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
if info.FullMethod != milvuspb.MilvusService_Insert_FullMethodName &&
|
||||
info.FullMethod != milvuspb.MilvusService_Delete_FullMethodName &&
|
||||
info.FullMethod != milvuspb.MilvusService_Upsert_FullMethodName {
|
||||
info.FullMethod != milvuspb.MilvusService_Upsert_FullMethodName &&
|
||||
info.FullMethod != milvuspb.MilvusService_Search_FullMethodName &&
|
||||
info.FullMethod != milvuspb.MilvusService_HybridSearch_FullMethodName &&
|
||||
info.FullMethod != milvuspb.MilvusService_Query_FullMethodName {
|
||||
return handler(ctx, req)
|
||||
}
|
||||
|
||||
// try to forward the request to the legacy proxy.
|
||||
resp, err := WAL().ForwardService().ForwardDMLToLegacyProxy(ctx, req)
|
||||
resp, err := WAL().ForwardService().ForwardLegacyProxy(ctx, req)
|
||||
if err == nil {
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@ -66,13 +66,19 @@ func TestForwardDMLToLegacyProxy(t *testing.T) {
|
||||
&milvuspb.DeleteRequest{},
|
||||
&milvuspb.InsertRequest{},
|
||||
&milvuspb.UpsertRequest{},
|
||||
&milvuspb.SearchRequest{},
|
||||
&milvuspb.HybridSearchRequest{},
|
||||
&milvuspb.QueryRequest{},
|
||||
}
|
||||
methods := []string{
|
||||
milvuspb.MilvusService_Delete_FullMethodName,
|
||||
milvuspb.MilvusService_Insert_FullMethodName,
|
||||
milvuspb.MilvusService_Upsert_FullMethodName,
|
||||
milvuspb.MilvusService_Search_FullMethodName,
|
||||
milvuspb.MilvusService_HybridSearch_FullMethodName,
|
||||
milvuspb.MilvusService_Query_FullMethodName,
|
||||
}
|
||||
interceptor := ForwardDMLToLegacyProxyUnaryServerInterceptor()
|
||||
interceptor := ForwardLegacyProxyUnaryServerInterceptor()
|
||||
|
||||
for idx, req := range reqs {
|
||||
method := methods[idx]
|
||||
|
||||
@ -244,7 +244,7 @@ func (n *noopWALAccesser) ForwardService() ForwardService {
|
||||
|
||||
type noopForwardService struct{}
|
||||
|
||||
func (n *noopForwardService) ForwardDMLToLegacyProxy(ctx context.Context, request any) (any, error) {
|
||||
func (n *noopForwardService) ForwardLegacyProxy(ctx context.Context, request any) (any, error) {
|
||||
return nil, ErrForwardDisabled
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user