mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
enhance: [2.4] dump pprof info if component stop progress timeout (#39764)
issue: https://github.com/milvus-io/milvus/issues/39735 pr: https://github.com/milvus-io/milvus/pull/39726 --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
0721afb5b2
commit
c9c9ef9a50
@ -2,20 +2,27 @@ package components
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"runtime/pprof"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/cockroachdb/errors"
|
"github.com/cockroachdb/errors"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errStopTimeout = errors.New("stop timeout")
|
var errStopTimeout = errors.New("stop timeout")
|
||||||
|
|
||||||
// exitWhenStopTimeout stops a component with timeout and exit progress when timeout.
|
// exitWhenStopTimeout stops a component with timeout and exit progress when timeout.
|
||||||
func exitWhenStopTimeout(stop func() error, timeout time.Duration) error {
|
func exitWhenStopTimeout(stop func() error, timeout time.Duration) error {
|
||||||
err := stopWithTimeout(stop, timeout)
|
err := dumpPprof(func() error { return stopWithTimeout(stop, timeout) })
|
||||||
if errors.Is(err, errStopTimeout) {
|
if errors.Is(err, errStopTimeout) {
|
||||||
|
log.Info("stop progress timeout, force exit")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
@ -27,7 +34,7 @@ func stopWithTimeout(stop func() error, timeout time.Duration) error {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
future := conc.Go(func() (struct{}, error) {
|
future := conc.Go(func() (struct{}, error) {
|
||||||
return struct{}{}, stop()
|
return struct{}{}, dumpPprof(stop)
|
||||||
})
|
})
|
||||||
select {
|
select {
|
||||||
case <-future.Inner():
|
case <-future.Inner():
|
||||||
@ -36,3 +43,125 @@ func stopWithTimeout(stop func() error, timeout time.Duration) error {
|
|||||||
return errStopTimeout
|
return errStopTimeout
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// profileType defines the structure for each type of profile to be collected
|
||||||
|
type profileType struct {
|
||||||
|
name string // Name of the profile type
|
||||||
|
filename string // File path for the profile
|
||||||
|
dump func(*os.File) error // Function to dump the profile data
|
||||||
|
}
|
||||||
|
|
||||||
|
// dumpPprof wraps the execution of a function with pprof profiling
|
||||||
|
// It collects various performance profiles only if the execution fails
|
||||||
|
func dumpPprof(exec func() error) error {
|
||||||
|
// Get pprof directory from configuration
|
||||||
|
pprofDir := paramtable.Get().ServiceParam.ProfileCfg.PprofPath.GetValue()
|
||||||
|
if err := os.MkdirAll(pprofDir, 0o755); err != nil {
|
||||||
|
log.Error("failed to create pprof directory", zap.Error(err))
|
||||||
|
return exec()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate base file path with timestamp
|
||||||
|
baseFilePath := filepath.Join(
|
||||||
|
pprofDir,
|
||||||
|
fmt.Sprintf("%s_pprof_%s",
|
||||||
|
paramtable.GetRole(),
|
||||||
|
time.Now().Format("20060102_150405"),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Define all profile types to be collected
|
||||||
|
profiles := []profileType{
|
||||||
|
{
|
||||||
|
name: "cpu",
|
||||||
|
filename: baseFilePath + "_cpu.prof",
|
||||||
|
dump: func(f *os.File) error {
|
||||||
|
// Ensure no other CPU profiling is active before starting a new one.
|
||||||
|
// This prevents the "cpu profiling already in use" error.
|
||||||
|
pprof.StopCPUProfile()
|
||||||
|
return pprof.StartCPUProfile(f)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "goroutine",
|
||||||
|
filename: baseFilePath + "_goroutine.prof",
|
||||||
|
dump: func(f *os.File) error {
|
||||||
|
return pprof.Lookup("goroutine").WriteTo(f, 0)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "heap",
|
||||||
|
filename: baseFilePath + "_heap.prof",
|
||||||
|
dump: func(f *os.File) error {
|
||||||
|
return pprof.WriteHeapProfile(f)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "block",
|
||||||
|
filename: baseFilePath + "_block.prof",
|
||||||
|
dump: func(f *os.File) error {
|
||||||
|
return pprof.Lookup("block").WriteTo(f, 0)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "mutex",
|
||||||
|
filename: baseFilePath + "_mutex.prof",
|
||||||
|
dump: func(f *os.File) error {
|
||||||
|
return pprof.Lookup("mutex").WriteTo(f, 0)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create all profile files and store file handles
|
||||||
|
files := make(map[string]*os.File)
|
||||||
|
for _, p := range profiles {
|
||||||
|
f, err := os.Create(p.filename)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("could not create profile file",
|
||||||
|
zap.String("profile", p.name),
|
||||||
|
zap.Error(err))
|
||||||
|
for filename, f := range files {
|
||||||
|
f.Close()
|
||||||
|
os.Remove(filename)
|
||||||
|
}
|
||||||
|
return exec()
|
||||||
|
}
|
||||||
|
files[p.filename] = f
|
||||||
|
}
|
||||||
|
// Ensure all files are closed when function returns
|
||||||
|
defer func() {
|
||||||
|
for _, f := range files {
|
||||||
|
f.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Start CPU profiling
|
||||||
|
cpuProfile := profiles[0]
|
||||||
|
if err := cpuProfile.dump(files[cpuProfile.filename]); err != nil {
|
||||||
|
log.Error("could not start CPU profiling", zap.Error(err))
|
||||||
|
return exec()
|
||||||
|
}
|
||||||
|
defer pprof.StopCPUProfile()
|
||||||
|
|
||||||
|
// Execute the target function
|
||||||
|
execErr := exec()
|
||||||
|
|
||||||
|
// Only save profiles and collect additional data if execution fails
|
||||||
|
if execErr != nil {
|
||||||
|
// Start from index 1 to skip CPU profile (already running)
|
||||||
|
for _, p := range profiles[1:] {
|
||||||
|
if err := p.dump(files[p.filename]); err != nil {
|
||||||
|
log.Error("could not write profile",
|
||||||
|
zap.String("profile", p.name),
|
||||||
|
zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Remove all files if execution succeeds
|
||||||
|
for _, p := range profiles {
|
||||||
|
os.Remove(p.filename)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return execErr
|
||||||
|
}
|
||||||
|
|||||||
@ -756,7 +756,7 @@ log:
|
|||||||
grpc:
|
grpc:
|
||||||
log:
|
log:
|
||||||
level: WARNING
|
level: WARNING
|
||||||
gracefulStopTimeout: 10 # second, time to wait graceful stop finish
|
gracefulStopTimeout: 3 # second, time to wait graceful stop finish
|
||||||
client:
|
client:
|
||||||
compressionEnabled: false
|
compressionEnabled: false
|
||||||
dialTimeout: 200
|
dialTimeout: 200
|
||||||
|
|||||||
@ -1191,7 +1191,7 @@ func Test_Service_GracefulStop(t *testing.T) {
|
|||||||
mockProxy.ExpectedCalls = nil
|
mockProxy.ExpectedCalls = nil
|
||||||
mockProxy.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Run(func(_a0 context.Context, _a1 *milvuspb.GetComponentStatesRequest) {
|
mockProxy.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Run(func(_a0 context.Context, _a1 *milvuspb.GetComponentStatesRequest) {
|
||||||
fmt.Println("rpc start")
|
fmt.Println("rpc start")
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(3 * time.Second)
|
||||||
atomic.AddInt32(&count, 1)
|
atomic.AddInt32(&count, 1)
|
||||||
fmt.Println("rpc done")
|
fmt.Println("rpc done")
|
||||||
}).Return(&milvuspb.ComponentStates{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}}, nil)
|
}).Return(&milvuspb.ComponentStates{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}}, nil)
|
||||||
|
|||||||
@ -16,7 +16,7 @@ func GracefulStopGRPCServer(s *grpc.Server) {
|
|||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
defer close(ch)
|
defer close(ch)
|
||||||
log.Debug("try to graceful stop grpc server...")
|
log.Info("try to graceful stop grpc server...")
|
||||||
// will block until all rpc finished.
|
// will block until all rpc finished.
|
||||||
s.GracefulStop()
|
s.GracefulStop()
|
||||||
}()
|
}()
|
||||||
@ -24,7 +24,7 @@ func GracefulStopGRPCServer(s *grpc.Server) {
|
|||||||
case <-ch:
|
case <-ch:
|
||||||
case <-time.After(paramtable.Get().ProxyGrpcServerCfg.GracefulStopTimeout.GetAsDuration(time.Second)):
|
case <-time.After(paramtable.Get().ProxyGrpcServerCfg.GracefulStopTimeout.GetAsDuration(time.Second)):
|
||||||
// took too long, manually close grpc server
|
// took too long, manually close grpc server
|
||||||
log.Debug("stop grpc server...")
|
log.Info("force to stop grpc server...")
|
||||||
s.Stop()
|
s.Stop()
|
||||||
// concurrent GracefulStop should be interrupted
|
// concurrent GracefulStop should be interrupted
|
||||||
<-ch
|
<-ch
|
||||||
|
|||||||
@ -194,7 +194,7 @@ func (p *GrpcServerConfig) Init(domain string, base *BaseTable) {
|
|||||||
p.GracefulStopTimeout = ParamItem{
|
p.GracefulStopTimeout = ParamItem{
|
||||||
Key: "grpc.gracefulStopTimeout",
|
Key: "grpc.gracefulStopTimeout",
|
||||||
Version: "2.3.1",
|
Version: "2.3.1",
|
||||||
DefaultValue: "10",
|
DefaultValue: "3",
|
||||||
Doc: "second, time to wait graceful stop finish",
|
Doc: "second, time to wait graceful stop finish",
|
||||||
Export: true,
|
Export: true,
|
||||||
}
|
}
|
||||||
|
|||||||
@ -62,8 +62,7 @@ func TestGrpcServerParams(t *testing.T) {
|
|||||||
base.Save("grpc.serverMaxSendSize", "a")
|
base.Save("grpc.serverMaxSendSize", "a")
|
||||||
assert.Equal(t, serverConfig.ServerMaxSendSize.GetAsInt(), DefaultServerMaxSendSize)
|
assert.Equal(t, serverConfig.ServerMaxSendSize.GetAsInt(), DefaultServerMaxSendSize)
|
||||||
|
|
||||||
base.Save(serverConfig.GracefulStopTimeout.Key, "1")
|
assert.Equal(t, serverConfig.GracefulStopTimeout.GetAsInt(), 3)
|
||||||
assert.Equal(t, serverConfig.GracefulStopTimeout.GetAsInt(), 1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGrpcClientParams(t *testing.T) {
|
func TestGrpcClientParams(t *testing.T) {
|
||||||
|
|||||||
@ -51,6 +51,7 @@ type ServiceParam struct {
|
|||||||
RocksmqCfg RocksmqConfig
|
RocksmqCfg RocksmqConfig
|
||||||
NatsmqCfg NatsmqConfig
|
NatsmqCfg NatsmqConfig
|
||||||
MinioCfg MinioConfig
|
MinioCfg MinioConfig
|
||||||
|
ProfileCfg ProfileConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ServiceParam) init(bt *BaseTable) {
|
func (p *ServiceParam) init(bt *BaseTable) {
|
||||||
@ -64,6 +65,7 @@ func (p *ServiceParam) init(bt *BaseTable) {
|
|||||||
p.RocksmqCfg.Init(bt)
|
p.RocksmqCfg.Init(bt)
|
||||||
p.NatsmqCfg.Init(bt)
|
p.NatsmqCfg.Init(bt)
|
||||||
p.MinioCfg.Init(bt)
|
p.MinioCfg.Init(bt)
|
||||||
|
p.ProfileCfg.Init(bt)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ServiceParam) RocksmqEnable() bool {
|
func (p *ServiceParam) RocksmqEnable() bool {
|
||||||
@ -1388,3 +1390,25 @@ Leave it empty if you want to use AWS default endpoint`,
|
|||||||
}
|
}
|
||||||
p.ListObjectsMaxKeys.Init(base.mgr)
|
p.ListObjectsMaxKeys.Init(base.mgr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// profile config
|
||||||
|
type ProfileConfig struct {
|
||||||
|
PprofPath ParamItem `refreshable:"false"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ProfileConfig) Init(base *BaseTable) {
|
||||||
|
p.PprofPath = ParamItem{
|
||||||
|
Key: "profile.pprof.path",
|
||||||
|
Version: "2.5.5",
|
||||||
|
DefaultValue: "",
|
||||||
|
Doc: "The folder that storing pprof files, by default will use localStoragePath/pprof",
|
||||||
|
Formatter: func(v string) string {
|
||||||
|
if len(v) == 0 {
|
||||||
|
return path.Join(base.Get("localStorage.path"), "pprof")
|
||||||
|
}
|
||||||
|
return v
|
||||||
|
},
|
||||||
|
Export: true,
|
||||||
|
}
|
||||||
|
p.PprofPath.Init(base.mgr)
|
||||||
|
}
|
||||||
|
|||||||
@ -225,4 +225,11 @@ func TestServiceParam(t *testing.T) {
|
|||||||
assert.Equal(t, 100000, Params.PaginationSize.GetAsInt())
|
assert.Equal(t, 100000, Params.PaginationSize.GetAsInt())
|
||||||
assert.Equal(t, 32, Params.ReadConcurrency.GetAsInt())
|
assert.Equal(t, 32, Params.ReadConcurrency.GetAsInt())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("test profile config", func(t *testing.T) {
|
||||||
|
params := &SParams.ProfileCfg
|
||||||
|
assert.Equal(t, "/var/lib/milvus/data/pprof", params.PprofPath.GetValue())
|
||||||
|
bt.Save(params.PprofPath.Key, "/tmp/pprof")
|
||||||
|
assert.Equal(t, "/tmp/pprof", params.PprofPath.GetValue())
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -21,6 +21,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -988,8 +989,11 @@ func (s *LoadTestSuite) TestLoadWithCompact() {
|
|||||||
s.releaseCollection(dbName, collName)
|
s.releaseCollection(dbName, collName)
|
||||||
|
|
||||||
stopInsertCh := make(chan struct{}, 1)
|
stopInsertCh := make(chan struct{}, 1)
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
wg.Add(1)
|
||||||
// Start a goroutine to continuously insert data and trigger compaction
|
// Start a goroutine to continuously insert data and trigger compaction
|
||||||
go func() {
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-stopInsertCh:
|
case <-stopInsertCh:
|
||||||
@ -1023,6 +1027,7 @@ func (s *LoadTestSuite) TestLoadWithCompact() {
|
|||||||
|
|
||||||
// Clean up
|
// Clean up
|
||||||
close(stopInsertCh)
|
close(stopInsertCh)
|
||||||
|
wg.Wait()
|
||||||
s.releaseCollection(dbName, collName)
|
s.releaseCollection(dbName, collName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user