package components import ( "context" "fmt" "os" "path/filepath" "runtime/pprof" "time" "github.com/cockroachdb/errors" "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/util/conc" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) var errStopTimeout = errors.New("stop timeout") // exitWhenStopTimeout stops a component with timeout and exit progress when timeout. func exitWhenStopTimeout(stop func() error, timeout time.Duration) error { err := stopWithTimeout(stop, timeout) if errors.Is(err, errStopTimeout) { start := time.Now() dumpPprof() log.Info("stop progress timeout, force exit", zap.String("component", paramtable.GetRole()), zap.Duration("cost", time.Since(start)), zap.Error(err)) os.Exit(1) } return err } // stopWithTimeout stops a component with timeout. func stopWithTimeout(stop func() error, timeout time.Duration) error { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() future := conc.Go(func() (struct{}, error) { return struct{}{}, stop() }) select { case <-future.Inner(): return errors.Wrap(future.Err(), "failed to stop component") case <-ctx.Done(): return errStopTimeout } } // profileType defines the structure for each type of profile to be collected type profileType struct { name string // Name of the profile type filename string // File path for the profile dump func(*os.File) error // Function to dump the profile data } // dumpPprof collects various performance profiles func dumpPprof() { // Get pprof directory from configuration pprofDir := paramtable.Get().ServiceParam.ProfileCfg.PprofPath.GetValue() // Clean existing directory if not empty if pprofDir != "" { if err := os.RemoveAll(pprofDir); err != nil { log.Error("failed to clean pprof directory", zap.String("path", pprofDir), zap.Error(err)) } } // Recreate directory with proper permissions if err := os.MkdirAll(pprofDir, 0o755); err != nil { log.Error("failed to create pprof directory", zap.String("path", pprofDir), zap.Error(err)) return } // Generate base file path with timestamp baseFilePath := filepath.Join( pprofDir, fmt.Sprintf("%s_pprof_%s", paramtable.GetRole(), time.Now().Format("20060102_150405"), ), ) // Define all profile types to be collected profiles := []profileType{ { name: "goroutine", filename: baseFilePath + "_goroutine.prof", dump: func(f *os.File) error { return pprof.Lookup("goroutine").WriteTo(f, 0) }, }, { name: "heap", filename: baseFilePath + "_heap.prof", dump: func(f *os.File) error { return pprof.WriteHeapProfile(f) }, }, { name: "block", filename: baseFilePath + "_block.prof", dump: func(f *os.File) error { return pprof.Lookup("block").WriteTo(f, 0) }, }, { name: "mutex", filename: baseFilePath + "_mutex.prof", dump: func(f *os.File) error { return pprof.Lookup("mutex").WriteTo(f, 0) }, }, } // Create all profile files and store file handles files := make(map[string]*os.File) for _, p := range profiles { f, err := os.Create(p.filename) if err != nil { log.Error("could not create profile file", zap.String("profile", p.name), zap.Error(err)) for filename, f := range files { f.Close() os.Remove(filename) } return } files[p.filename] = f } // Ensure all files are closed when function returns defer func() { for _, f := range files { f.Close() } }() for _, p := range profiles { if err := p.dump(files[p.filename]); err != nil { log.Error("could not write profile", zap.String("profile", p.name), zap.Error(err)) } } }