fix: Return io error other than NotExist refreshing config (#38924)

Related to #38923

This PR:

- Check whether `os.Stat` config file error is io.ErrNotExist
- Panic when get config return error during Milvus initialization

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-01-08 12:00:56 +08:00 committed by GitHub
parent 134952b6c5
commit f076898761
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 52 additions and 28 deletions

View File

@ -18,10 +18,12 @@ package config
import ( import (
"fmt" "fmt"
"log"
"strings" "strings"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/spf13/cast" "github.com/spf13/cast"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/pkg/util/typeutil"
) )
@ -44,7 +46,10 @@ func Init(opts ...Option) (*Manager, error) {
sourceManager := NewManager() sourceManager := NewManager()
if o.FileInfo != nil { if o.FileInfo != nil {
s := NewFileSource(o.FileInfo) s := NewFileSource(o.FileInfo)
sourceManager.AddSource(s) err := sourceManager.AddSource(s)
if err != nil {
log.Fatal("failed to add FileSource config", zap.Error(err))
}
} }
if o.EnvKeyFormatter != nil { if o.EnvKeyFormatter != nil {
sourceManager.AddSource(NewEnvSource(o.EnvKeyFormatter)) sourceManager.AddSource(NewEnvSource(o.EnvKeyFormatter))

View File

@ -123,29 +123,38 @@ func (fs *FileSource) loadFromFile() error {
configFiles = fs.files configFiles = fs.files
fs.RUnlock() fs.RUnlock()
notExistsNum := 0
for _, configFile := range configFiles { for _, configFile := range configFiles {
if _, err := os.Stat(configFile); err != nil { if _, err := os.Stat(configFile); err != nil {
if os.IsNotExist(err) {
notExistsNum++
continue continue
} }
return err
}
ext := filepath.Ext(configFile) ext := filepath.Ext(configFile)
if len(ext) == 0 || (ext[1:] != "yaml" && ext[1:] != "yml") { if len(ext) == 0 || (ext[1:] != "yaml" && ext[1:] != "yml") {
return fmt.Errorf("Unsupported Config Type: " + ext) return fmt.Errorf("Unsupported Config Type: %s", ext)
} }
data, err := os.ReadFile(configFile) data, err := os.ReadFile(configFile)
if err != nil { if err != nil {
return errors.Wrap(err, "Read config failed: "+configFile) return errors.Wrapf(err, "Read config failed: %s", configFile)
} }
var config map[string]interface{} var config map[string]interface{}
err = yaml.Unmarshal(data, &config) err = yaml.Unmarshal(data, &config)
if err != nil { if err != nil {
return errors.Wrap(err, "unmarshal yaml file "+configFile+" failed") return errors.Wrapf(err, "unmarshal yaml file %s failed", configFile)
} }
flattenAndMergeMap("", config, newConfig) flattenAndMergeMap("", config, newConfig)
} }
// not allow all config files missing, return error for this case
if notExistsNum == len(configFiles) {
return errors.Newf("all config files not exists, files: %v", configFiles)
}
return fs.update(newConfig) return fs.update(newConfig)
} }

View File

@ -136,6 +136,7 @@ func TestOnEvent(t *testing.T) {
dir, _ := os.MkdirTemp("", "milvus") dir, _ := os.MkdirTemp("", "milvus")
yamlFile := path.Join(dir, "milvus.yaml") yamlFile := path.Join(dir, "milvus.yaml")
os.WriteFile(yamlFile, []byte("a.b: \"\""), 0o600)
mgr, _ := Init(WithEnvSource(formatKey), mgr, _ := Init(WithEnvSource(formatKey),
WithFilesSource(&FileInfo{ WithFilesSource(&FileInfo{
Files: []string{yamlFile}, Files: []string{yamlFile},
@ -147,31 +148,41 @@ func TestOnEvent(t *testing.T) {
RefreshInterval: 10 * time.Millisecond, RefreshInterval: 10 * time.Millisecond,
})) }))
os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600) os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600)
time.Sleep(time.Second) assert.Eventually(t, func() bool {
value, err := mgr.GetConfig("a.b") value, err := mgr.GetConfig("a.b")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, value, "aaa") return value == "aaa"
}, time.Second*5, time.Second)
ctx := context.Background() ctx := context.Background()
client.KV.Put(ctx, "test/config/a/b", "bbb") client.KV.Put(ctx, "test/config/a/b", "bbb")
time.Sleep(time.Second)
value, err = mgr.GetConfig("a.b") assert.Eventually(t, func() bool {
value, err := mgr.GetConfig("a.b")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, value, "bbb") return value == "bbb"
}, time.Second*5, time.Second)
client.KV.Put(ctx, "test/config/a/b", "ccc") client.KV.Put(ctx, "test/config/a/b", "ccc")
time.Sleep(time.Second) assert.Eventually(t, func() bool {
value, err = mgr.GetConfig("a.b") value, err := mgr.GetConfig("a.b")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, value, "ccc") return value == "ccc"
}, time.Second*5, time.Second)
os.WriteFile(yamlFile, []byte("a.b: ddd"), 0o600) os.WriteFile(yamlFile, []byte("a.b: ddd"), 0o600)
time.Sleep(time.Second) assert.Eventually(t, func() bool {
value, err = mgr.GetConfig("a.b") value, err := mgr.GetConfig("a.b")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, value, "ccc") return value == "ccc"
}, time.Second*5, time.Second)
client.KV.Delete(ctx, "test/config/a/b") client.KV.Delete(ctx, "test/config/a/b")
time.Sleep(time.Second) assert.Eventually(t, func() bool {
value, err = mgr.GetConfig("a.b") value, err := mgr.GetConfig("a.b")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, value, "ddd") return value == "ddd"
}, time.Second*5, time.Second)
} }
func TestDeadlock(t *testing.T) { func TestDeadlock(t *testing.T) {
@ -206,6 +217,7 @@ func TestCachedConfig(t *testing.T) {
dir, _ := os.MkdirTemp("", "milvus") dir, _ := os.MkdirTemp("", "milvus")
yamlFile := path.Join(dir, "milvus.yaml") yamlFile := path.Join(dir, "milvus.yaml")
os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600)
mgr, _ := Init(WithEnvSource(formatKey), mgr, _ := Init(WithEnvSource(formatKey),
WithFilesSource(&FileInfo{ WithFilesSource(&FileInfo{
Files: []string{yamlFile}, Files: []string{yamlFile},
@ -218,7 +230,6 @@ func TestCachedConfig(t *testing.T) {
})) }))
// test get cached value from file // test get cached value from file
{ {
os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600)
time.Sleep(time.Second) time.Sleep(time.Second)
_, exist := mgr.GetCachedValue("a.b") _, exist := mgr.GetCachedValue("a.b")
assert.False(t, exist) assert.False(t, exist)

View File

@ -72,8 +72,7 @@ func (r *refresher) refreshPeriodically(name string) {
case <-ticker.C: case <-ticker.C:
err := r.fetchFunc() err := r.fetchFunc()
if err != nil { if err != nil {
log.Error("can not pull configs", zap.Error(err)) log.WithRateGroup("refresher", 1, 60).RatedWarn(60, "can not pull configs", zap.Error(err))
r.stop()
} }
case <-r.intervalDone: case <-r.intervalDone:
log.Info("stop refreshing configurations", zap.String("source", name)) log.Info("stop refreshing configurations", zap.String("source", name))