RESTful api use same port as metrics & healthcheck (#17732)

Signed-off-by: shaoyue.chen <shaoyue.chen@zilliz.com>
This commit is contained in:
shaoyue 2022-06-24 15:34:14 +08:00 committed by GitHub
parent 05273309e8
commit 96b97f7f58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 40 additions and 83 deletions

View File

@ -133,9 +133,6 @@ proxy:
http: http:
enabled: true # Whether to enable the http server enabled: true # Whether to enable the http server
debug_mode: false # Whether to enable http server debug mode debug_mode: false # Whether to enable http server debug mode
port: 8080 # Whether to enable the http server
readTimeout: 30000 # 30000 ms http read timeout
writeTimeout: 30000 # 30000 ms http write timeout
timeTickInterval: 200 # ms, the interval that proxy synchronize the time tick timeTickInterval: 200 # ms, the interval that proxy synchronize the time tick
msgStream: msgStream:

View File

@ -73,8 +73,12 @@ var HTTPParams paramtable.HTTPConfig
var ( var (
errMissingMetadata = status.Errorf(codes.InvalidArgument, "missing metadata") errMissingMetadata = status.Errorf(codes.InvalidArgument, "missing metadata")
errInvalidToken = status.Errorf(codes.Unauthenticated, "invalid token") errInvalidToken = status.Errorf(codes.Unauthenticated, "invalid token")
// registerHTTPHandlerOnce avoid register http handler multiple times
registerHTTPHandlerOnce sync.Once
) )
const apiPathPrefix = "/api/v1"
// Server is the Proxy Server // Server is the Proxy Server
type Server struct { type Server struct {
ctx context.Context ctx context.Context
@ -82,9 +86,6 @@ type Server struct {
proxy types.ProxyComponent proxy types.ProxyComponent
grpcInternalServer *grpc.Server grpcInternalServer *grpc.Server
grpcExternalServer *grpc.Server grpcExternalServer *grpc.Server
httpServer *http.Server
// avoid race
httpServerMtx sync.Mutex
etcdCli *clientv3.Client etcdCli *clientv3.Client
rootCoordClient types.RootCoord rootCoordClient types.RootCoord
@ -111,9 +112,8 @@ func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error)
return server, err return server, err
} }
// startHTTPServer starts the http server, panic when failed // registerHTTPServer register the http server, panic when failed
func (s *Server) startHTTPServer(port int) { func (s *Server) registerHTTPServer() {
defer s.wg.Done()
// (Embedded Milvus Only) Discard gin logs if logging is disabled. // (Embedded Milvus Only) Discard gin logs if logging is disabled.
// We might need to put these logs in some files in the further. // We might need to put these logs in some files in the further.
// But we don't care about these logs now, at least not in embedded Milvus. // But we don't care about these logs now, at least not in embedded Milvus.
@ -125,21 +125,9 @@ func (s *Server) startHTTPServer(port int) {
gin.SetMode(gin.ReleaseMode) gin.SetMode(gin.ReleaseMode)
} }
ginHandler := gin.Default() ginHandler := gin.Default()
apiv1 := ginHandler.Group("/api/v1") apiv1 := ginHandler.Group(apiPathPrefix)
httpserver.NewHandlers(s.proxy).RegisterRoutesTo(apiv1) httpserver.NewHandlers(s.proxy).RegisterRoutesTo(apiv1)
s.httpServerMtx.Lock() http.Handle("/", ginHandler)
s.httpServer = &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: ginHandler,
ReadTimeout: HTTPParams.ReadTimeout,
WriteTimeout: HTTPParams.WriteTimeout,
}
s.httpServerMtx.Unlock()
if err := s.httpServer.ListenAndServe(); err != nil {
if err != http.ErrServerClosed {
panic("failed to start http server: " + err.Error())
}
}
} }
func (s *Server) startInternalRPCServer(grpcInternalPort int, errChan chan error) { func (s *Server) startInternalRPCServer(grpcInternalPort int, errChan chan error) {
@ -358,9 +346,10 @@ func (s *Server) init() error {
} }
if HTTPParams.Enabled { if HTTPParams.Enabled {
log.Info("start http server of proxy", zap.Int("port", HTTPParams.Port)) registerHTTPHandlerOnce.Do(func() {
s.wg.Add(1) log.Info("register http server of proxy")
go s.startHTTPServer(HTTPParams.Port) s.registerHTTPServer()
})
} }
if s.rootCoordClient == nil { if s.rootCoordClient == nil {
@ -523,16 +512,6 @@ func (s *Server) Stop() error {
} }
gracefulWg := sync.WaitGroup{} gracefulWg := sync.WaitGroup{}
gracefulWg.Add(1)
go func() {
defer gracefulWg.Done()
s.httpServerMtx.Lock()
defer s.httpServerMtx.Unlock()
if s.httpServer != nil {
log.Debug("Graceful stop http server...")
s.httpServer.Shutdown(context.TODO())
}
}()
gracefulWg.Add(1) gracefulWg.Add(1)
go func() { go func() {

View File

@ -1327,7 +1327,7 @@ func TestServer_Watch(t *testing.T) {
assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, ret.Status) assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, ret.Status)
} }
func Test_NewServer_HTTPServerDisabled(t *testing.T) { func Test_NewServer_HTTPServer_Enabled(t *testing.T) {
ctx := context.Background() ctx := context.Background()
server, err := NewServer(ctx, nil) server, err := NewServer(ctx, nil)
assert.NotNil(t, server) assert.NotNil(t, server)
@ -1340,13 +1340,21 @@ func Test_NewServer_HTTPServerDisabled(t *testing.T) {
server.dataCoordClient = &MockDataCoord{} server.dataCoordClient = &MockDataCoord{}
HTTPParams.InitOnce() HTTPParams.InitOnce()
HTTPParams.Enabled = false HTTPParams.Enabled = true
err = runAndWaitForServerReady(server) err = runAndWaitForServerReady(server)
assert.Nil(t, err) assert.Nil(t, err)
assert.Nil(t, server.httpServer)
err = server.Stop() err = server.Stop()
assert.Nil(t, err) assert.Nil(t, err)
defer func() {
e := recover()
if e == nil {
t.Fatalf("test should have panicked but did not")
}
}()
// if disable workds path not registered, so it shall not panic
server.registerHTTPServer()
} }
func getServer(t *testing.T) *Server { func getServer(t *testing.T) *Server {
@ -1371,6 +1379,7 @@ func Test_NewServer_TLS_TwoWay(t *testing.T) {
Params.ServerPemPath = "../../../configs/cert/server.pem" Params.ServerPemPath = "../../../configs/cert/server.pem"
Params.ServerKeyPath = "../../../configs/cert/server.key" Params.ServerKeyPath = "../../../configs/cert/server.key"
Params.CaPemPath = "../../../configs/cert/ca.pem" Params.CaPemPath = "../../../configs/cert/ca.pem"
HTTPParams.Enabled = false
err := runAndWaitForServerReady(server) err := runAndWaitForServerReady(server)
assert.Nil(t, err) assert.Nil(t, err)
@ -1386,6 +1395,7 @@ func Test_NewServer_TLS_OneWay(t *testing.T) {
Params.TLSMode = 1 Params.TLSMode = 1
Params.ServerPemPath = "../../../configs/cert/server.pem" Params.ServerPemPath = "../../../configs/cert/server.pem"
Params.ServerKeyPath = "../../../configs/cert/server.key" Params.ServerKeyPath = "../../../configs/cert/server.key"
HTTPParams.Enabled = false
err := runAndWaitForServerReady(server) err := runAndWaitForServerReady(server)
assert.Nil(t, err) assert.Nil(t, err)
@ -1401,6 +1411,7 @@ func Test_NewServer_TLS_FileNotExisted(t *testing.T) {
Params.TLSMode = 1 Params.TLSMode = 1
Params.ServerPemPath = "../not/existed/server.pem" Params.ServerPemPath = "../not/existed/server.pem"
Params.ServerKeyPath = "../../../configs/cert/server.key" Params.ServerKeyPath = "../../../configs/cert/server.key"
HTTPParams.Enabled = false
err := runAndWaitForServerReady(server) err := runAndWaitForServerReady(server)
assert.NotNil(t, err) assert.NotNil(t, err)
server.Stop() server.Stop()

View File

@ -2,18 +2,14 @@ package paramtable
import ( import (
"sync" "sync"
"time"
) )
type HTTPConfig struct { type HTTPConfig struct {
BaseTable BaseTable
once sync.Once once sync.Once
Enabled bool Enabled bool
DebugMode bool DebugMode bool
Port int
ReadTimeout time.Duration
WriteTimeout time.Duration
} }
// InitOnce initialize HTTPConfig // InitOnce initialize HTTPConfig
@ -28,9 +24,6 @@ func (p *HTTPConfig) init() {
p.initHTTPEnabled() p.initHTTPEnabled()
p.initHTTPDebugMode() p.initHTTPDebugMode()
p.initHTTPPort()
p.initHTTPReadTimeout()
p.initHTTPWriteTimeout()
} }
func (p *HTTPConfig) initHTTPEnabled() { func (p *HTTPConfig) initHTTPEnabled() {
@ -40,17 +33,3 @@ func (p *HTTPConfig) initHTTPEnabled() {
func (p *HTTPConfig) initHTTPDebugMode() { func (p *HTTPConfig) initHTTPDebugMode() {
p.DebugMode = p.ParseBool("proxy.http.debug_mode", false) p.DebugMode = p.ParseBool("proxy.http.debug_mode", false)
} }
func (p *HTTPConfig) initHTTPPort() {
p.Port = p.ParseIntWithDefault("proxy.http.port", 8080)
}
func (p *HTTPConfig) initHTTPReadTimeout() {
interval := p.ParseIntWithDefault("proxy.http.readTimeout", 30000)
p.ReadTimeout = time.Duration(interval) * time.Millisecond
}
func (p *HTTPConfig) initHTTPWriteTimeout() {
interval := p.ParseIntWithDefault("proxy.http.writeTimeout", 30000)
p.WriteTimeout = time.Duration(interval) * time.Millisecond
}

View File

@ -2,7 +2,6 @@ package paramtable
import ( import (
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -12,7 +11,4 @@ func TestHTTPConfig_Init(t *testing.T) {
cf.InitOnce() cf.InitOnce()
assert.Equal(t, cf.Enabled, true) assert.Equal(t, cf.Enabled, true)
assert.Equal(t, cf.DebugMode, false) assert.Equal(t, cf.DebugMode, false)
assert.Equal(t, cf.Port, 8080)
assert.Equal(t, cf.ReadTimeout, time.Second*30)
assert.Equal(t, cf.WriteTimeout, time.Second*30)
} }

View File

@ -18,23 +18,18 @@ DATA_PATH="${ROOT}/tests/scripts/restful-data/"
MILVUS_CLUSTER_ENABLED="${MILVUS_CLUSTER_ENABLED:-false}" MILVUS_CLUSTER_ENABLED="${MILVUS_CLUSTER_ENABLED:-false}"
# TODO: use service instead of podIP when milvus-helm supports MILVUS_SERVICE_ADDRESS="${MILVUS_HELM_RELEASE_NAME}-milvus.${MILVUS_HELM_NAMESPACE}:9091"
if [[ "${MILVUS_CLUSTER_ENABLED}" == "false" ]]; then
MILVUS_SERVICE_NAME=$(kubectl -n ${MILVUS_HELM_NAMESPACE} get pods -l app.kubernetes.io/name=milvus -l component=standalone -l app.kubernetes.io/instance=${MILVUS_HELM_RELEASE_NAME} -o=jsonpath='{.items[0].status.podIP}')
else
MILVUS_SERVICE_NAME=$(kubectl -n ${MILVUS_HELM_NAMESPACE} get pods -l app.kubernetes.io/name=milvus -l component=proxy -l app.kubernetes.io/instance=${MILVUS_HELM_RELEASE_NAME} -o=jsonpath='{.items[0].status.podIP}')
fi
# Create a collection # Create a collection
curl -X 'POST' \ curl -X 'POST' \
"http://${MILVUS_SERVICE_NAME}:8080/api/v1/collection" \ "http://${MILVUS_SERVICE_ADDRESS}/api/v1/collection" \
-H 'accept: application/json' \ -H 'accept: application/json' \
-H 'Content-Type: application/json' \ -H 'Content-Type: application/json' \
-d @${DATA_PATH}/create-collection.json -d @${DATA_PATH}/create-collection.json
# Has collection # Has collection
curl -X 'GET' \ curl -X 'GET' \
"http://${MILVUS_SERVICE_NAME}:8080/api/v1/collection/existence" \ "http://${MILVUS_SERVICE_ADDRESS}/api/v1/collection/existence" \
-H 'accept: application/json' \ -H 'accept: application/json' \
-H 'Content-Type: application/json' \ -H 'Content-Type: application/json' \
-d '{ -d '{
@ -43,7 +38,7 @@ curl -X 'GET' \
# Check collection details # Check collection details
curl -X 'GET' \ curl -X 'GET' \
"http://${MILVUS_SERVICE_NAME}:8080/api/v1/collection" \ "http://${MILVUS_SERVICE_ADDRESS}/api/v1/collection" \
-H 'accept: application/json' \ -H 'accept: application/json' \
-H 'Content-Type: application/json' \ -H 'Content-Type: application/json' \
-d '{ -d '{
@ -52,7 +47,7 @@ curl -X 'GET' \
# Load collection # Load collection
curl -X 'POST' \ curl -X 'POST' \
"http://${MILVUS_SERVICE_NAME}:8080/api/v1/collection/load" \ "http://${MILVUS_SERVICE_ADDRESS}/api/v1/collection/load" \
-H 'accept: application/json' \ -H 'accept: application/json' \
-H 'Content-Type: application/json' \ -H 'Content-Type: application/json' \
-d '{ -d '{
@ -62,14 +57,14 @@ curl -X 'POST' \
### Data ### Data
# Insert Data # Insert Data
curl -X 'POST' \ curl -X 'POST' \
"http://${MILVUS_SERVICE_NAME}:8080/api/v1/entities" \ "http://${MILVUS_SERVICE_ADDRESS}/api/v1/entities" \
-H 'accept: application/json' \ -H 'accept: application/json' \
-H 'Content-Type: application/json' \ -H 'Content-Type: application/json' \
-d @${DATA_PATH}/insert-data.json -d @${DATA_PATH}/insert-data.json
# Build Index # Build Index
curl -X 'POST' \ curl -X 'POST' \
"http://${MILVUS_SERVICE_NAME}:8080/api/v1/index" \ "http://${MILVUS_SERVICE_ADDRESS}/api/v1/index" \
-H 'accept: application/json' \ -H 'accept: application/json' \
-H 'Content-Type: application/json' \ -H 'Content-Type: application/json' \
-d '{ -d '{
@ -84,14 +79,14 @@ curl -X 'POST' \
# KNN Search # KNN Search
curl -X 'POST' \ curl -X 'POST' \
"http://${MILVUS_SERVICE_NAME}:8080/api/v1/search" \ "http://${MILVUS_SERVICE_ADDRESS}/api/v1/search" \
-H 'accept: application/json' \ -H 'accept: application/json' \
-H 'Content-Type: application/json' \ -H 'Content-Type: application/json' \
-d @${DATA_PATH}/search.json -d @${DATA_PATH}/search.json
# Drop Index # Drop Index
curl -X 'DELETE' \ curl -X 'DELETE' \
"http://${MILVUS_SERVICE_NAME}:8080/api/v1/index" \ "http://${MILVUS_SERVICE_ADDRESS}/api/v1/index" \
-H 'accept: application/json' \ -H 'accept: application/json' \
-H 'Content-Type: application/json' \ -H 'Content-Type: application/json' \
-d '{ -d '{
@ -101,7 +96,7 @@ curl -X 'DELETE' \
# Release collection # Release collection
curl -X 'DELETE' \ curl -X 'DELETE' \
"http://${MILVUS_SERVICE_NAME}:8080/api/v1/collection/load" \ "http://${MILVUS_SERVICE_ADDRESS}/api/v1/collection/load" \
-H 'accept: application/json' \ -H 'accept: application/json' \
-H 'Content-Type: application/json' \ -H 'Content-Type: application/json' \
-d '{ -d '{
@ -110,7 +105,7 @@ curl -X 'DELETE' \
# Drop collection # Drop collection
curl -X 'DELETE' \ curl -X 'DELETE' \
"http://${MILVUS_SERVICE_NAME}:8080/api/v1/collection" \ "http://${MILVUS_SERVICE_ADDRESS}/api/v1/collection" \
-H 'accept: application/json' \ -H 'accept: application/json' \
-H 'Content-Type: application/json' \ -H 'Content-Type: application/json' \
-d '{ -d '{