diff --git a/internal/distributed/proxy/httpserver/handler_v2.go b/internal/distributed/proxy/httpserver/handler_v2.go index c22c47b72b..6e4aa2b7b7 100644 --- a/internal/distributed/proxy/httpserver/handler_v2.go +++ b/internal/distributed/proxy/httpserver/handler_v2.go @@ -383,7 +383,7 @@ func wrapperProxyWithLimit(ctx context.Context, ginCtx *gin.Context, req any, ch } forwardHandler := func(reqCtx context.Context, req any) (any, error) { - interceptor := streaming.ForwardDMLToLegacyProxyUnaryServerInterceptor() + interceptor := streaming.ForwardLegacyProxyUnaryServerInterceptor() return interceptor(reqCtx, req, &grpc.UnaryServerInfo{FullMethod: fullMethod}, func(ctx context.Context, req any) (interface{}, error) { return handler(ctx, req) }) diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index 385027a6c4..f1ff869b13 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -240,7 +240,7 @@ func (s *Server) startExternalGrpc(errChan chan error) { var unaryServerOption grpc.ServerOption if enableCustomInterceptor { unaryServerOption = grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( - streaming.ForwardDMLToLegacyProxyUnaryServerInterceptor(), + streaming.ForwardLegacyProxyUnaryServerInterceptor(), proxy.DatabaseInterceptor(), UnaryRequestStatsInterceptor, accesslog.UnaryAccessLogInterceptor, diff --git a/internal/distributed/streaming/forward.go b/internal/distributed/streaming/forward.go index 26007d1094..a9f7b91563 100644 --- a/internal/distributed/streaming/forward.go +++ b/internal/distributed/streaming/forward.go @@ -60,7 +60,7 @@ func newForwardService(streamingCoordClient client.Client) *forwardServiceImpl { } type ForwardService interface { - ForwardDMLToLegacyProxy(ctx context.Context, request any) (any, error) + ForwardLegacyProxy(ctx context.Context, request any) (any, error) } // forwardServiceImpl is the implementation of FallbackService. @@ -74,13 +74,13 @@ type forwardServiceImpl struct { rb resolver.Builder } -// ForwardDMLToLegacyProxy forwards the DML request to the legacy proxy. -func (fs *forwardServiceImpl) ForwardDMLToLegacyProxy(ctx context.Context, request any) (any, error) { +// ForwardLegacyProxy forwards the request to the legacy proxy. +func (fs *forwardServiceImpl) ForwardLegacyProxy(ctx context.Context, request any) (any, error) { if err := fs.checkIfForwardDisabledWithLock(ctx); err != nil { return nil, err } - return fs.forwardDMLToLegacyProxy(ctx, request) + return fs.forwardLegacyProxy(ctx, request) } // checkIfForwardDisabledWithLock checks if the forward is disabled with lock. @@ -91,8 +91,8 @@ func (fs *forwardServiceImpl) checkIfForwardDisabledWithLock(ctx context.Context return fs.checkIfForwardDisabled(ctx) } -// forwardDMLToLegacyProxy forwards the DML request to the legacy proxy. -func (fs *forwardServiceImpl) forwardDMLToLegacyProxy(ctx context.Context, request any) (any, error) { +// forwardLegacyProxy forwards the request to the legacy proxy. +func (fs *forwardServiceImpl) forwardLegacyProxy(ctx context.Context, request any) (any, error) { s, err := fs.getLegacyProxyService(ctx) if err != nil { return nil, err @@ -106,6 +106,12 @@ func (fs *forwardServiceImpl) forwardDMLToLegacyProxy(ctx context.Context, reque result, err = s.Delete(ctx, req) case *milvuspb.UpsertRequest: result, err = s.Upsert(ctx, req) + case *milvuspb.SearchRequest: + result, err = s.Search(ctx, req) + case *milvuspb.HybridSearchRequest: + result, err = s.HybridSearch(ctx, req) + case *milvuspb.QueryRequest: + result, err = s.Query(ctx, req) default: panic(fmt.Sprintf("unsupported request type: %T", request)) } @@ -178,7 +184,7 @@ func (fs *forwardServiceImpl) initLegacyProxy() { }) fs.legacyProxy = lazygrpc.WithServiceCreator(conn, milvuspb.NewMilvusServiceClient) fs.rb = rb - fs.Logger().Info("streaming service is not ready, legacy proxy is initiated to forward DML request", zap.Int("proxyPort", port)) + fs.Logger().Info("streaming service is not ready, legacy proxy is initiated to forward request", zap.Int("proxyPort", port)) } // getDialOptions returns the dial options for the legacy proxy. @@ -236,21 +242,24 @@ func (fs *forwardServiceImpl) markForwardDisabled() { } } -// ForwardDMLToLegacyProxyUnaryServerInterceptor forwards the DML request to the legacy proxy. +// ForwardLegacyProxyUnaryServerInterceptor forwards the request to the legacy proxy. // When upgrading from 2.5.x to 2.6.x, the streaming service is not ready yet, // the dml cannot be executed at new 2.6.x proxy until all 2.5.x proxies are down. // // so we need to forward the request to the 2.5.x proxy. -func ForwardDMLToLegacyProxyUnaryServerInterceptor() grpc.UnaryServerInterceptor { +func ForwardLegacyProxyUnaryServerInterceptor() grpc.UnaryServerInterceptor { return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { if info.FullMethod != milvuspb.MilvusService_Insert_FullMethodName && info.FullMethod != milvuspb.MilvusService_Delete_FullMethodName && - info.FullMethod != milvuspb.MilvusService_Upsert_FullMethodName { + info.FullMethod != milvuspb.MilvusService_Upsert_FullMethodName && + info.FullMethod != milvuspb.MilvusService_Search_FullMethodName && + info.FullMethod != milvuspb.MilvusService_HybridSearch_FullMethodName && + info.FullMethod != milvuspb.MilvusService_Query_FullMethodName { return handler(ctx, req) } // try to forward the request to the legacy proxy. - resp, err := WAL().ForwardService().ForwardDMLToLegacyProxy(ctx, req) + resp, err := WAL().ForwardService().ForwardLegacyProxy(ctx, req) if err == nil { return resp, nil } diff --git a/internal/distributed/streaming/forward_test.go b/internal/distributed/streaming/forward_test.go index e907d1b335..a36df4f0ee 100644 --- a/internal/distributed/streaming/forward_test.go +++ b/internal/distributed/streaming/forward_test.go @@ -66,13 +66,19 @@ func TestForwardDMLToLegacyProxy(t *testing.T) { &milvuspb.DeleteRequest{}, &milvuspb.InsertRequest{}, &milvuspb.UpsertRequest{}, + &milvuspb.SearchRequest{}, + &milvuspb.HybridSearchRequest{}, + &milvuspb.QueryRequest{}, } methods := []string{ milvuspb.MilvusService_Delete_FullMethodName, milvuspb.MilvusService_Insert_FullMethodName, milvuspb.MilvusService_Upsert_FullMethodName, + milvuspb.MilvusService_Search_FullMethodName, + milvuspb.MilvusService_HybridSearch_FullMethodName, + milvuspb.MilvusService_Query_FullMethodName, } - interceptor := ForwardDMLToLegacyProxyUnaryServerInterceptor() + interceptor := ForwardLegacyProxyUnaryServerInterceptor() for idx, req := range reqs { method := methods[idx] diff --git a/internal/distributed/streaming/test_streaming.go b/internal/distributed/streaming/test_streaming.go index 71c8db85d0..9ba76a0527 100644 --- a/internal/distributed/streaming/test_streaming.go +++ b/internal/distributed/streaming/test_streaming.go @@ -244,7 +244,7 @@ func (n *noopWALAccesser) ForwardService() ForwardService { type noopForwardService struct{} -func (n *noopForwardService) ForwardDMLToLegacyProxy(ctx context.Context, request any) (any, error) { +func (n *noopForwardService) ForwardLegacyProxy(ctx context.Context, request any) (any, error) { return nil, ErrForwardDisabled }