diff --git a/cmd/components/util.go b/cmd/components/util.go index d731bb6e86..a3b3843a4c 100644 --- a/cmd/components/util.go +++ b/cmd/components/util.go @@ -2,20 +2,27 @@ package components import ( "context" + "fmt" "os" + "path/filepath" + "runtime/pprof" "time" "github.com/cockroachdb/errors" + "go.uber.org/zap" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) var errStopTimeout = errors.New("stop timeout") // exitWhenStopTimeout stops a component with timeout and exit progress when timeout. func exitWhenStopTimeout(stop func() error, timeout time.Duration) error { - err := stopWithTimeout(stop, timeout) + err := dumpPprof(func() error { return stopWithTimeout(stop, timeout) }) if errors.Is(err, errStopTimeout) { + log.Info("stop progress timeout, force exit") os.Exit(1) } return err @@ -27,7 +34,7 @@ func stopWithTimeout(stop func() error, timeout time.Duration) error { defer cancel() future := conc.Go(func() (struct{}, error) { - return struct{}{}, stop() + return struct{}{}, dumpPprof(stop) }) select { case <-future.Inner(): @@ -36,3 +43,125 @@ func stopWithTimeout(stop func() error, timeout time.Duration) error { return errStopTimeout } } + +// profileType defines the structure for each type of profile to be collected +type profileType struct { + name string // Name of the profile type + filename string // File path for the profile + dump func(*os.File) error // Function to dump the profile data +} + +// dumpPprof wraps the execution of a function with pprof profiling +// It collects various performance profiles only if the execution fails +func dumpPprof(exec func() error) error { + // Get pprof directory from configuration + pprofDir := paramtable.Get().ServiceParam.ProfileCfg.PprofPath.GetValue() + if err := os.MkdirAll(pprofDir, 0o755); err != nil { + log.Error("failed to create pprof directory", zap.Error(err)) + return exec() + } + + // Generate base file path with timestamp + baseFilePath := filepath.Join( + pprofDir, + fmt.Sprintf("%s_pprof_%s", + paramtable.GetRole(), + time.Now().Format("20060102_150405"), + ), + ) + + // Define all profile types to be collected + profiles := []profileType{ + { + name: "cpu", + filename: baseFilePath + "_cpu.prof", + dump: func(f *os.File) error { + // Ensure no other CPU profiling is active before starting a new one. + // This prevents the "cpu profiling already in use" error. + pprof.StopCPUProfile() + return pprof.StartCPUProfile(f) + }, + }, + { + name: "goroutine", + filename: baseFilePath + "_goroutine.prof", + dump: func(f *os.File) error { + return pprof.Lookup("goroutine").WriteTo(f, 0) + }, + }, + { + name: "heap", + filename: baseFilePath + "_heap.prof", + dump: func(f *os.File) error { + return pprof.WriteHeapProfile(f) + }, + }, + { + name: "block", + filename: baseFilePath + "_block.prof", + dump: func(f *os.File) error { + return pprof.Lookup("block").WriteTo(f, 0) + }, + }, + { + name: "mutex", + filename: baseFilePath + "_mutex.prof", + dump: func(f *os.File) error { + return pprof.Lookup("mutex").WriteTo(f, 0) + }, + }, + } + + // Create all profile files and store file handles + files := make(map[string]*os.File) + for _, p := range profiles { + f, err := os.Create(p.filename) + if err != nil { + log.Error("could not create profile file", + zap.String("profile", p.name), + zap.Error(err)) + for filename, f := range files { + f.Close() + os.Remove(filename) + } + return exec() + } + files[p.filename] = f + } + // Ensure all files are closed when function returns + defer func() { + for _, f := range files { + f.Close() + } + }() + + // Start CPU profiling + cpuProfile := profiles[0] + if err := cpuProfile.dump(files[cpuProfile.filename]); err != nil { + log.Error("could not start CPU profiling", zap.Error(err)) + return exec() + } + defer pprof.StopCPUProfile() + + // Execute the target function + execErr := exec() + + // Only save profiles and collect additional data if execution fails + if execErr != nil { + // Start from index 1 to skip CPU profile (already running) + for _, p := range profiles[1:] { + if err := p.dump(files[p.filename]); err != nil { + log.Error("could not write profile", + zap.String("profile", p.name), + zap.Error(err)) + } + } + } else { + // Remove all files if execution succeeds + for _, p := range profiles { + os.Remove(p.filename) + } + } + + return execErr +} diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 4dec280ed9..963657badd 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -756,7 +756,7 @@ log: grpc: log: level: WARNING - gracefulStopTimeout: 10 # second, time to wait graceful stop finish + gracefulStopTimeout: 3 # second, time to wait graceful stop finish client: compressionEnabled: false dialTimeout: 200 diff --git a/internal/distributed/proxy/service_test.go b/internal/distributed/proxy/service_test.go index bb73295921..7fedc04287 100644 --- a/internal/distributed/proxy/service_test.go +++ b/internal/distributed/proxy/service_test.go @@ -1191,7 +1191,7 @@ func Test_Service_GracefulStop(t *testing.T) { mockProxy.ExpectedCalls = nil mockProxy.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Run(func(_a0 context.Context, _a1 *milvuspb.GetComponentStatesRequest) { fmt.Println("rpc start") - time.Sleep(10 * time.Second) + time.Sleep(3 * time.Second) atomic.AddInt32(&count, 1) fmt.Println("rpc done") }).Return(&milvuspb.ComponentStates{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}}, nil) diff --git a/internal/distributed/utils/util.go b/internal/distributed/utils/util.go index f2cc161ead..5f31495918 100644 --- a/internal/distributed/utils/util.go +++ b/internal/distributed/utils/util.go @@ -16,7 +16,7 @@ func GracefulStopGRPCServer(s *grpc.Server) { ch := make(chan struct{}) go func() { defer close(ch) - log.Debug("try to graceful stop grpc server...") + log.Info("try to graceful stop grpc server...") // will block until all rpc finished. s.GracefulStop() }() @@ -24,7 +24,7 @@ func GracefulStopGRPCServer(s *grpc.Server) { case <-ch: case <-time.After(paramtable.Get().ProxyGrpcServerCfg.GracefulStopTimeout.GetAsDuration(time.Second)): // took too long, manually close grpc server - log.Debug("stop grpc server...") + log.Info("force to stop grpc server...") s.Stop() // concurrent GracefulStop should be interrupted <-ch diff --git a/pkg/util/paramtable/grpc_param.go b/pkg/util/paramtable/grpc_param.go index e9240855f3..b8282eda77 100644 --- a/pkg/util/paramtable/grpc_param.go +++ b/pkg/util/paramtable/grpc_param.go @@ -194,7 +194,7 @@ func (p *GrpcServerConfig) Init(domain string, base *BaseTable) { p.GracefulStopTimeout = ParamItem{ Key: "grpc.gracefulStopTimeout", Version: "2.3.1", - DefaultValue: "10", + DefaultValue: "3", Doc: "second, time to wait graceful stop finish", Export: true, } diff --git a/pkg/util/paramtable/grpc_param_test.go b/pkg/util/paramtable/grpc_param_test.go index 1e9c30ac00..8acec7317f 100644 --- a/pkg/util/paramtable/grpc_param_test.go +++ b/pkg/util/paramtable/grpc_param_test.go @@ -62,8 +62,7 @@ func TestGrpcServerParams(t *testing.T) { base.Save("grpc.serverMaxSendSize", "a") assert.Equal(t, serverConfig.ServerMaxSendSize.GetAsInt(), DefaultServerMaxSendSize) - base.Save(serverConfig.GracefulStopTimeout.Key, "1") - assert.Equal(t, serverConfig.GracefulStopTimeout.GetAsInt(), 1) + assert.Equal(t, serverConfig.GracefulStopTimeout.GetAsInt(), 3) } func TestGrpcClientParams(t *testing.T) { diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index 4c1f069a1a..e496e30a58 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -51,6 +51,7 @@ type ServiceParam struct { RocksmqCfg RocksmqConfig NatsmqCfg NatsmqConfig MinioCfg MinioConfig + ProfileCfg ProfileConfig } func (p *ServiceParam) init(bt *BaseTable) { @@ -64,6 +65,7 @@ func (p *ServiceParam) init(bt *BaseTable) { p.RocksmqCfg.Init(bt) p.NatsmqCfg.Init(bt) p.MinioCfg.Init(bt) + p.ProfileCfg.Init(bt) } func (p *ServiceParam) RocksmqEnable() bool { @@ -1388,3 +1390,25 @@ Leave it empty if you want to use AWS default endpoint`, } p.ListObjectsMaxKeys.Init(base.mgr) } + +// profile config +type ProfileConfig struct { + PprofPath ParamItem `refreshable:"false"` +} + +func (p *ProfileConfig) Init(base *BaseTable) { + p.PprofPath = ParamItem{ + Key: "profile.pprof.path", + Version: "2.5.5", + DefaultValue: "", + Doc: "The folder that storing pprof files, by default will use localStoragePath/pprof", + Formatter: func(v string) string { + if len(v) == 0 { + return path.Join(base.Get("localStorage.path"), "pprof") + } + return v + }, + Export: true, + } + p.PprofPath.Init(base.mgr) +} diff --git a/pkg/util/paramtable/service_param_test.go b/pkg/util/paramtable/service_param_test.go index 6b23d11af0..c54bbfd783 100644 --- a/pkg/util/paramtable/service_param_test.go +++ b/pkg/util/paramtable/service_param_test.go @@ -225,4 +225,11 @@ func TestServiceParam(t *testing.T) { assert.Equal(t, 100000, Params.PaginationSize.GetAsInt()) assert.Equal(t, 32, Params.ReadConcurrency.GetAsInt()) }) + + t.Run("test profile config", func(t *testing.T) { + params := &SParams.ProfileCfg + assert.Equal(t, "/var/lib/milvus/data/pprof", params.PprofPath.GetValue()) + bt.Save(params.PprofPath.Key, "/tmp/pprof") + assert.Equal(t, "/tmp/pprof", params.PprofPath.GetValue()) + }) } diff --git a/tests/integration/replicas/load/load_test.go b/tests/integration/replicas/load/load_test.go index 564a6dcc6b..8346b30be5 100644 --- a/tests/integration/replicas/load/load_test.go +++ b/tests/integration/replicas/load/load_test.go @@ -21,6 +21,7 @@ import ( "fmt" "path" "strings" + "sync" "testing" "time" @@ -988,8 +989,11 @@ func (s *LoadTestSuite) TestLoadWithCompact() { s.releaseCollection(dbName, collName) stopInsertCh := make(chan struct{}, 1) + wg := &sync.WaitGroup{} + wg.Add(1) // Start a goroutine to continuously insert data and trigger compaction go func() { + defer wg.Done() for { select { case <-stopInsertCh: @@ -1023,6 +1027,7 @@ func (s *LoadTestSuite) TestLoadWithCompact() { // Clean up close(stopInsertCh) + wg.Wait() s.releaseCollection(dbName, collName) }