wei liu 4a3a1b7cc7
enhance: Remove CPU profile to prevent blocking stop progress (#40459)
pr: #40460
- Removed CPU profile dump from util.go's pprof collection
- Avoid potential blocking in StopCPUProfile() during shutdown
- Maintain goroutine/heap/block/mutex profiles for diagnostics
- Ensure safe shutdown timeout handling without profile stalls

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
2025-03-13 14:40:11 +08:00

153 lines
3.7 KiB
Go

package components
import (
"context"
"fmt"
"os"
"path/filepath"
"runtime/pprof"
"time"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/conc"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
var errStopTimeout = errors.New("stop timeout")
// exitWhenStopTimeout stops a component with timeout and exit progress when timeout.
func exitWhenStopTimeout(stop func() error, timeout time.Duration) error {
err := stopWithTimeout(stop, timeout)
if errors.Is(err, errStopTimeout) {
start := time.Now()
dumpPprof()
log.Info("stop progress timeout, force exit",
zap.String("component", paramtable.GetRole()),
zap.Duration("cost", time.Since(start)),
zap.Error(err))
os.Exit(1)
}
return err
}
// stopWithTimeout stops a component with timeout.
func stopWithTimeout(stop func() error, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
future := conc.Go(func() (struct{}, error) {
return struct{}{}, stop()
})
select {
case <-future.Inner():
return errors.Wrap(future.Err(), "failed to stop component")
case <-ctx.Done():
return errStopTimeout
}
}
// profileType defines the structure for each type of profile to be collected
type profileType struct {
name string // Name of the profile type
filename string // File path for the profile
dump func(*os.File) error // Function to dump the profile data
}
// dumpPprof collects various performance profiles
func dumpPprof() {
// Get pprof directory from configuration
pprofDir := paramtable.Get().ServiceParam.ProfileCfg.PprofPath.GetValue()
// Clean existing directory if not empty
if pprofDir != "" {
if err := os.RemoveAll(pprofDir); err != nil {
log.Error("failed to clean pprof directory",
zap.String("path", pprofDir),
zap.Error(err))
}
}
// Recreate directory with proper permissions
if err := os.MkdirAll(pprofDir, 0o755); err != nil {
log.Error("failed to create pprof directory",
zap.String("path", pprofDir),
zap.Error(err))
return
}
// Generate base file path with timestamp
baseFilePath := filepath.Join(
pprofDir,
fmt.Sprintf("%s_pprof_%s",
paramtable.GetRole(),
time.Now().Format("20060102_150405"),
),
)
// Define all profile types to be collected
profiles := []profileType{
{
name: "goroutine",
filename: baseFilePath + "_goroutine.prof",
dump: func(f *os.File) error {
return pprof.Lookup("goroutine").WriteTo(f, 0)
},
},
{
name: "heap",
filename: baseFilePath + "_heap.prof",
dump: func(f *os.File) error {
return pprof.WriteHeapProfile(f)
},
},
{
name: "block",
filename: baseFilePath + "_block.prof",
dump: func(f *os.File) error {
return pprof.Lookup("block").WriteTo(f, 0)
},
},
{
name: "mutex",
filename: baseFilePath + "_mutex.prof",
dump: func(f *os.File) error {
return pprof.Lookup("mutex").WriteTo(f, 0)
},
},
}
// Create all profile files and store file handles
files := make(map[string]*os.File)
for _, p := range profiles {
f, err := os.Create(p.filename)
if err != nil {
log.Error("could not create profile file",
zap.String("profile", p.name),
zap.Error(err))
for filename, f := range files {
f.Close()
os.Remove(filename)
}
return
}
files[p.filename] = f
}
// Ensure all files are closed when function returns
defer func() {
for _, f := range files {
f.Close()
}
}()
for _, p := range profiles {
if err := p.dump(files[p.filename]); err != nil {
log.Error("could not write profile",
zap.String("profile", p.name),
zap.Error(err))
}
}
}