fix: delay to start the metric server port (#36080)

- issue: #36083
/kind improvement

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2024-09-09 14:01:05 +08:00 committed by GitHub
parent 8787e65b1f
commit 99817953eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 36 additions and 4 deletions

View File

@ -30,6 +30,7 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/samber/lo"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
@ -385,6 +386,21 @@ func (mr *MilvusRoles) Run() {
defer streaming.Release() defer streaming.Release()
} }
enableComponents := []bool{
mr.EnableRootCoord,
mr.EnableProxy,
mr.EnableQueryCoord,
mr.EnableQueryNode,
mr.EnableDataCoord,
mr.EnableDataNode,
mr.EnableIndexCoord,
mr.EnableIndexNode,
}
enableComponents = lo.Filter(enableComponents, func(v bool, _ int) bool {
return v
})
healthz.SetComponentNum(len(enableComponents))
expr.Init() expr.Init()
expr.Register("param", paramtable.Get()) expr.Register("param", paramtable.Get())
mr.setupLogger() mr.setupLogger()

View File

@ -53,6 +53,7 @@ type HealthResponse struct {
type HealthHandler struct { type HealthHandler struct {
indicators []Indicator indicators []Indicator
indicatorNum int
// unregister role when call stop by restful api // unregister role when call stop by restful api
unregisterLock sync.RWMutex unregisterLock sync.RWMutex
@ -67,6 +68,10 @@ func Register(indicator Indicator) {
defaultHandler.indicators = append(defaultHandler.indicators, indicator) defaultHandler.indicators = append(defaultHandler.indicators, indicator)
} }
func SetComponentNum(num int) {
defaultHandler.indicatorNum = num
}
func UnRegister(role string) { func UnRegister(role string) {
defaultHandler.unregisterLock.Lock() defaultHandler.unregisterLock.Lock()
defer defaultHandler.unregisterLock.Unlock() defer defaultHandler.unregisterLock.Unlock()
@ -86,11 +91,13 @@ func (handler *HealthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
State: "OK", State: "OK",
} }
ctx := context.Background() ctx := context.Background()
healthNum := 0
for _, in := range handler.indicators { for _, in := range handler.indicators {
handler.unregisterLock.RLock() handler.unregisterLock.RLock()
_, unregistered := handler.unregisteredRoles[in.GetName()] _, unregistered := handler.unregisteredRoles[in.GetName()]
handler.unregisterLock.RUnlock() handler.unregisterLock.RUnlock()
if unregistered { if unregistered {
healthNum++
continue continue
} }
code := in.Health(ctx) code := in.Health(ctx)
@ -98,11 +105,15 @@ func (handler *HealthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
Name: in.GetName(), Name: in.GetName(),
Code: code, Code: code,
}) })
if code != commonpb.StateCode_Healthy && code != commonpb.StateCode_StandBy { if code == commonpb.StateCode_Healthy || code == commonpb.StateCode_StandBy {
resp.State = fmt.Sprintf("component %s state is %s", in.GetName(), code.String()) healthNum++
} }
} }
if healthNum != handler.indicatorNum {
resp.State = fmt.Sprintf("Not all components are healthy, %d/%d", healthNum, handler.indicatorNum)
}
if resp.State == "OK" { if resp.State == "OK" {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} else { } else {

View File

@ -101,6 +101,7 @@ func (suite *HTTPServerTestSuite) TestHealthzHandler() {
url := "http://localhost:" + DefaultListenPort + "/healthz" url := "http://localhost:" + DefaultListenPort + "/healthz"
client := http.Client{} client := http.Client{}
healthz.SetComponentNum(1)
healthz.Register(&MockIndicator{"m1", commonpb.StateCode_Healthy}) healthz.Register(&MockIndicator{"m1", commonpb.StateCode_Healthy})
req, _ := http.NewRequest(http.MethodGet, url, nil) req, _ := http.NewRequest(http.MethodGet, url, nil)
@ -118,6 +119,7 @@ func (suite *HTTPServerTestSuite) TestHealthzHandler() {
body, _ = io.ReadAll(resp.Body) body, _ = io.ReadAll(resp.Body)
suite.Equal("{\"state\":\"OK\",\"detail\":[{\"name\":\"m1\",\"code\":1}]}", string(body)) suite.Equal("{\"state\":\"OK\",\"detail\":[{\"name\":\"m1\",\"code\":1}]}", string(body))
healthz.SetComponentNum(2)
healthz.Register(&MockIndicator{"m2", commonpb.StateCode_Abnormal}) healthz.Register(&MockIndicator{"m2", commonpb.StateCode_Abnormal})
req, _ = http.NewRequest(http.MethodGet, url, nil) req, _ = http.NewRequest(http.MethodGet, url, nil)
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
@ -125,7 +127,10 @@ func (suite *HTTPServerTestSuite) TestHealthzHandler() {
suite.Nil(err) suite.Nil(err)
defer resp.Body.Close() defer resp.Body.Close()
body, _ = io.ReadAll(resp.Body) body, _ = io.ReadAll(resp.Body)
suite.Equal("{\"state\":\"component m2 state is Abnormal\",\"detail\":[{\"name\":\"m1\",\"code\":1},{\"name\":\"m2\",\"code\":2}]}", string(body)) respObj := &healthz.HealthResponse{}
err = json.Unmarshal(body, respObj)
suite.NoError(err)
suite.NotEqual("OK", respObj.State)
} }
func (suite *HTTPServerTestSuite) TestEventlogHandler() { func (suite *HTTPServerTestSuite) TestEventlogHandler() {