enhance: expose more metrics data (#39456)

issue: #36621 #39417
1. Adjust the server-side cache size.
2. Add source information for configurations.
3. Add node ID for compaction and indexing tasks.
4. Resolve localhost access issues to fix health check failures for
etcd.

Signed-off-by: jaime <yun.zhang@zilliz.com>
This commit is contained in:
jaime 2025-02-07 11:50:50 +08:00 committed by GitHub
parent a9e0e0a852
commit 8a4ac8cccd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 116 additions and 51 deletions

View File

@ -11,7 +11,7 @@ services:
- ETCD_SNAPSHOT_COUNT=50000 - ETCD_SNAPSHOT_COUNT=50000
volumes: volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd command: etcd -advertise-client-urls=http://etcd:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
healthcheck: healthcheck:
test: ["CMD", "etcdctl", "endpoint", "health"] test: ["CMD", "etcdctl", "endpoint", "health"]
interval: 30s interval: 30s

View File

@ -52,6 +52,7 @@ func newCompactionTaskStats(task *datapb.CompactionTask) *metricsinfo.Compaction
ResultSegments: lo.Map(task.ResultSegments, func(t int64, i int) string { ResultSegments: lo.Map(task.ResultSegments, func(t int64, i int) string {
return strconv.FormatInt(t, 10) return strconv.FormatInt(t, 10)
}), }),
NodeID: task.NodeID,
} }
} }
@ -70,7 +71,7 @@ func newCompactionTaskMeta(ctx context.Context, catalog metastore.DataCoordCatal
ctx: ctx, ctx: ctx,
catalog: catalog, catalog: catalog,
compactionTasks: make(map[int64]map[int64]*datapb.CompactionTask, 0), compactionTasks: make(map[int64]map[int64]*datapb.CompactionTask, 0),
taskStats: expirable.NewLRU[UniqueID, *metricsinfo.CompactionTask](32, nil, time.Minute*15), taskStats: expirable.NewLRU[UniqueID, *metricsinfo.CompactionTask](512, nil, time.Minute*15),
} }
if err := csm.reloadFromKV(); err != nil { if err := csm.reloadFromKV(); err != nil {
return nil, err return nil, err

View File

@ -53,7 +53,7 @@ type importTasks struct {
func newImportTasks() *importTasks { func newImportTasks() *importTasks {
return &importTasks{ return &importTasks{
tasks: make(map[int64]ImportTask), tasks: make(map[int64]ImportTask),
taskStats: expirable.NewLRU[UniqueID, ImportTask](64, nil, time.Minute*30), taskStats: expirable.NewLRU[UniqueID, ImportTask](512, nil, time.Minute*30),
} }
} }

View File

@ -82,6 +82,7 @@ func newIndexTaskStats(s *model.SegmentIndex) *metricsinfo.IndexTaskStats {
IndexVersion: s.IndexVersion, IndexVersion: s.IndexVersion,
CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime * 1000), CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime * 1000),
FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime * 1000), FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime * 1000),
NodeID: s.NodeID,
} }
} }
@ -98,7 +99,7 @@ func newSegmentIndexBuildInfo() *segmentBuildInfo {
// build ID -> segment index // build ID -> segment index
buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex), buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex),
// build ID -> task stats // build ID -> task stats
taskStats: expirable.NewLRU[UniqueID, *metricsinfo.IndexTaskStats](64, nil, time.Minute*30), taskStats: expirable.NewLRU[UniqueID, *metricsinfo.IndexTaskStats](1024, nil, time.Minute*30),
} }
} }

View File

@ -104,7 +104,7 @@ func (s *jobManagerSuite) TestJobManager_triggerStatsTaskLoop() {
allocator: alloc, allocator: alloc,
tasks: make(map[int64]Task), tasks: make(map[int64]Task),
meta: mt, meta: mt,
taskStats: expirable.NewLRU[UniqueID, Task](64, nil, time.Minute*5), taskStats: expirable.NewLRU[UniqueID, Task](512, nil, time.Minute*5),
}, },
allocator: alloc, allocator: alloc,
} }

View File

@ -93,7 +93,7 @@ func newTaskScheduler(
handler: handler, handler: handler,
indexEngineVersionManager: indexEngineVersionManager, indexEngineVersionManager: indexEngineVersionManager,
allocator: allocator, allocator: allocator,
taskStats: expirable.NewLRU[UniqueID, Task](64, nil, time.Minute*15), taskStats: expirable.NewLRU[UniqueID, Task](512, nil, time.Minute*15),
compactionHandler: compactionHandler, compactionHandler: compactionHandler,
} }
ts.reloadFromMeta() ts.reloadFromMeta()

View File

@ -142,7 +142,7 @@ func (st *statsTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta
log.Warn("segment is contains by l0 compaction, skip stats", zap.Int64("taskID", st.taskID), log.Warn("segment is contains by l0 compaction, skip stats", zap.Int64("taskID", st.taskID),
zap.Int64("segmentID", st.segmentID)) zap.Int64("segmentID", st.segmentID))
st.SetState(indexpb.JobState_JobStateFailed, "segment is contains by l0 compaction") st.SetState(indexpb.JobState_JobStateFailed, "segment is contains by l0 compaction")
//reset compacting // reset compacting
meta.SetSegmentsCompacting(ctx, []UniqueID{st.segmentID}, false) meta.SetSegmentsCompacting(ctx, []UniqueID{st.segmentID}, false)
st.SetStartTime(time.Now()) st.SetStartTime(time.Now())
return fmt.Errorf("segment is contains by l0 compaction") return fmt.Errorf("segment is contains by l0 compaction")

View File

@ -72,7 +72,7 @@ func NewSyncManager(chunkManager storage.ChunkManager) SyncManager {
keyLockDispatcher: dispatcher, keyLockDispatcher: dispatcher,
chunkManager: chunkManager, chunkManager: chunkManager,
tasks: typeutil.NewConcurrentMap[string, Task](), tasks: typeutil.NewConcurrentMap[string, Task](),
taskStats: expirable.NewLRU[string, Task](16, nil, time.Minute*15), taskStats: expirable.NewLRU[string, Task](64, nil, time.Minute*15),
} }
// setup config update watcher // setup config update watcher
params.Watch(params.DataNodeCfg.MaxParallelSyncMgrTasks.Key, config.NewHandler("datanode.syncmgr.poolsize", syncMgr.resizeHandler)) params.Watch(params.DataNodeCfg.MaxParallelSyncMgrTasks.Key, config.NewHandler("datanode.syncmgr.poolsize", syncMgr.resizeHandler))

View File

@ -6753,7 +6753,7 @@ func DeregisterSubLabel(subLabel string) {
func (node *Proxy) RegisterRestRouter(router gin.IRouter) { func (node *Proxy) RegisterRestRouter(router gin.IRouter) {
// Cluster request that executed by proxy // Cluster request that executed by proxy
router.GET(http.ClusterInfoPath, getClusterInfo(node)) router.GET(http.ClusterInfoPath, getClusterInfo(node))
router.GET(http.ClusterConfigsPath, getConfigs(paramtable.Get().GetAll())) router.GET(http.ClusterConfigsPath, getConfigs(paramtable.Get().GetConfigsView()))
router.GET(http.ClusterClientsPath, getConnectedClients) router.GET(http.ClusterClientsPath, getConnectedClients)
router.GET(http.ClusterDependenciesPath, getDependencies) router.GET(http.ClusterDependenciesPath, getDependencies)

View File

@ -284,7 +284,7 @@ func NewScheduler(ctx context.Context,
channelTasks: NewConcurrentMap[replicaChannelIndex, Task](), channelTasks: NewConcurrentMap[replicaChannelIndex, Task](),
processQueue: newTaskQueue(), processQueue: newTaskQueue(),
waitQueue: newTaskQueue(), waitQueue: newTaskQueue(),
taskStats: expirable.NewLRU[UniqueID, Task](64, nil, time.Minute*15), taskStats: expirable.NewLRU[UniqueID, Task](256, nil, time.Minute*15),
segmentTaskDelta: NewExecutingTaskDelta(), segmentTaskDelta: NewExecutingTaskDelta(),
channelTaskDelta: NewExecutingTaskDelta(), channelTaskDelta: NewExecutingTaskDelta(),
} }

View File

@ -30,17 +30,17 @@ import (
func TestConfigFromEnv(t *testing.T) { func TestConfigFromEnv(t *testing.T) {
mgr, _ := Init() mgr, _ := Init()
_, err := mgr.GetConfig("test.env") _, _, err := mgr.GetConfig("test.env")
assert.ErrorIs(t, err, ErrKeyNotFound) assert.ErrorIs(t, err, ErrKeyNotFound)
t.Setenv("TEST_ENV", "value") t.Setenv("TEST_ENV", "value")
mgr, _ = Init(WithEnvSource(formatKey)) mgr, _ = Init(WithEnvSource(formatKey))
v, err := mgr.GetConfig("test.env") _, v, err := mgr.GetConfig("test.env")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "value", v) assert.Equal(t, "value", v)
v, err = mgr.GetConfig("TEST_ENV") _, v, err = mgr.GetConfig("TEST_ENV")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "value", v) assert.Equal(t, "value", v)
} }
@ -67,65 +67,65 @@ func TestConfigFromRemote(t *testing.T) {
ctx := context.Background() ctx := context.Background()
t.Run("origin is empty", func(t *testing.T) { t.Run("origin is empty", func(t *testing.T) {
_, err = mgr.GetConfig("test.etcd") _, _, err = mgr.GetConfig("test.etcd")
assert.ErrorIs(t, err, ErrKeyNotFound) assert.ErrorIs(t, err, ErrKeyNotFound)
client.KV.Put(ctx, "test/config/test/etcd", "value") client.KV.Put(ctx, "test/config/test/etcd", "value")
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
v, err := mgr.GetConfig("test.etcd") _, v, err := mgr.GetConfig("test.etcd")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "value", v) assert.Equal(t, "value", v)
v, err = mgr.GetConfig("TEST_ETCD") _, v, err = mgr.GetConfig("TEST_ETCD")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "value", v) assert.Equal(t, "value", v)
client.KV.Delete(ctx, "test/config/test/etcd") client.KV.Delete(ctx, "test/config/test/etcd")
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
_, err = mgr.GetConfig("TEST_ETCD") _, _, err = mgr.GetConfig("TEST_ETCD")
assert.ErrorIs(t, err, ErrKeyNotFound) assert.ErrorIs(t, err, ErrKeyNotFound)
}) })
t.Run("override origin value", func(t *testing.T) { t.Run("override origin value", func(t *testing.T) {
v, _ := mgr.GetConfig("tmp.key") _, v, _ := mgr.GetConfig("tmp.key")
assert.Equal(t, "1", v) assert.Equal(t, "1", v)
client.KV.Put(ctx, "test/config/tmp/key", "2") client.KV.Put(ctx, "test/config/tmp/key", "2")
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
v, _ = mgr.GetConfig("tmp.key") _, v, _ = mgr.GetConfig("tmp.key")
assert.Equal(t, "2", v) assert.Equal(t, "2", v)
client.KV.Put(ctx, "test/config/tmp/key", "3") client.KV.Put(ctx, "test/config/tmp/key", "3")
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
v, _ = mgr.GetConfig("tmp.key") _, v, _ = mgr.GetConfig("tmp.key")
assert.Equal(t, "3", v) assert.Equal(t, "3", v)
client.KV.Delete(ctx, "test/config/tmp/key") client.KV.Delete(ctx, "test/config/tmp/key")
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
v, _ = mgr.GetConfig("tmp.key") _, v, _ = mgr.GetConfig("tmp.key")
assert.Equal(t, "1", v) assert.Equal(t, "1", v)
}) })
t.Run("multi priority", func(t *testing.T) { t.Run("multi priority", func(t *testing.T) {
v, _ := mgr.GetConfig("log.level") _, v, _ := mgr.GetConfig("log.level")
assert.Equal(t, "info", v) assert.Equal(t, "info", v)
client.KV.Put(ctx, "test/config/log/level", "error") client.KV.Put(ctx, "test/config/log/level", "error")
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
v, _ = mgr.GetConfig("log.level") _, v, _ = mgr.GetConfig("log.level")
assert.Equal(t, "error", v) assert.Equal(t, "error", v)
client.KV.Delete(ctx, "test/config/log/level") client.KV.Delete(ctx, "test/config/log/level")
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
v, _ = mgr.GetConfig("log.level") _, v, _ = mgr.GetConfig("log.level")
assert.Equal(t, "info", v) assert.Equal(t, "info", v)
}) })
@ -134,7 +134,7 @@ func TestConfigFromRemote(t *testing.T) {
client.KV.Put(ctx, "test/config/test/etcd", "value2") client.KV.Put(ctx, "test/config/test/etcd", "value2")
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
_, err = mgr.GetConfig("test.etcd") _, _, err = mgr.GetConfig("test.etcd")
return err != nil && errors.Is(err, ErrKeyNotFound) return err != nil && errors.Is(err, ErrKeyNotFound)
}, 300*time.Millisecond, 10*time.Millisecond) }, 300*time.Millisecond, 10*time.Millisecond)
}) })

View File

@ -29,6 +29,7 @@ import (
const ( const (
TombValue = "TOMB_VAULE" TombValue = "TOMB_VAULE"
RuntimeSource = "RuntimeSource"
) )
type Filter func(key string) (string, bool) type Filter func(key string) (string, bool)
@ -118,7 +119,7 @@ func (m *Manager) GetCachedValue(key string) (interface{}, bool) {
func (m *Manager) CASCachedValue(key string, origin string, value interface{}) bool { func (m *Manager) CASCachedValue(key string, origin string, value interface{}) bool {
m.cacheMutex.Lock() m.cacheMutex.Lock()
defer m.cacheMutex.Unlock() defer m.cacheMutex.Unlock()
current, err := m.GetConfig(key) _, current, err := m.GetConfig(key)
if errors.Is(err, ErrKeyNotFound) { if errors.Is(err, ErrKeyNotFound) {
m.configCache[key] = value m.configCache[key] = value
return true return true
@ -147,20 +148,21 @@ func (m *Manager) EvictCacheValueByFormat(keys ...string) {
clear(m.configCache) clear(m.configCache)
} }
func (m *Manager) GetConfig(key string) (string, error) { func (m *Manager) GetConfig(key string) (string, string, error) {
realKey := formatKey(key) realKey := formatKey(key)
v, ok := m.overlays.Get(realKey) v, ok := m.overlays.Get(realKey)
if ok { if ok {
if v == TombValue { if v == TombValue {
return "", errors.Wrap(ErrKeyNotFound, key) // fmt.Errorf("key not found %s", key) return "", "", errors.Wrap(ErrKeyNotFound, key) // fmt.Errorf("key not found %s", key)
} }
return v, nil return RuntimeSource, v, nil
} }
sourceName, ok := m.keySourceMap.Get(realKey) sourceName, ok := m.keySourceMap.Get(realKey)
if !ok { if !ok {
return "", errors.Wrap(ErrKeyNotFound, key) // fmt.Errorf("key not found: %s", key) return "", "", errors.Wrap(ErrKeyNotFound, key) // fmt.Errorf("key not found: %s", key)
} }
return m.getConfigValueBySource(realKey, sourceName) v, err := m.getConfigValueBySource(realKey, sourceName)
return sourceName, v, err
} }
// GetConfigs returns all the key values // GetConfigs returns all the key values
@ -168,7 +170,7 @@ func (m *Manager) GetConfigs() map[string]string {
config := make(map[string]string) config := make(map[string]string)
m.keySourceMap.Range(func(key, value string) bool { m.keySourceMap.Range(func(key, value string) bool {
sValue, err := m.GetConfig(key) _, sValue, err := m.GetConfig(key)
if err != nil { if err != nil {
return true return true
} }
@ -185,15 +187,40 @@ func (m *Manager) GetConfigs() map[string]string {
return config return config
} }
func (m *Manager) GetConfigsView() map[string]string {
config := make(map[string]string)
valueFmt := func(source, value string) string {
return fmt.Sprintf("%s[%s]", value, source)
}
m.keySourceMap.Range(func(key, value string) bool {
source, sValue, err := m.GetConfig(key)
if err != nil {
return true
}
config[key] = valueFmt(source, sValue)
return true
})
m.overlays.Range(func(key, value string) bool {
config[key] = valueFmt(RuntimeSource, value)
return true
})
return config
}
func (m *Manager) GetBy(filters ...Filter) map[string]string { func (m *Manager) GetBy(filters ...Filter) map[string]string {
matchedConfig := make(map[string]string) matchedConfig := make(map[string]string)
m.keySourceMap.Range(func(key, value string) bool { m.keySourceMap.Range(func(key string, value string) bool {
newkey, ok := filterate(key, filters...) newkey, ok := filterate(key, filters...)
if !ok { if !ok {
return true return true
} }
sValue, err := m.GetConfig(key) _, sValue, err := m.GetConfig(key)
if err != nil { if err != nil {
return true return true
} }

View File

@ -49,12 +49,12 @@ func TestConfigChangeEvent(t *testing.T) {
mgr, _ := Init() mgr, _ := Init()
err := mgr.AddSource(fs) err := mgr.AddSource(fs)
assert.NoError(t, err) assert.NoError(t, err)
res, err := mgr.GetConfig("a.b") _, res, err := mgr.GetConfig("a.b")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, res, "3") assert.Equal(t, res, "3")
os.WriteFile(path.Join(dir, "user.yaml"), []byte("a.b: 6"), 0o600) os.WriteFile(path.Join(dir, "user.yaml"), []byte("a.b: 6"), 0o600)
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
res, err = mgr.GetConfig("a.b") _, res, err = mgr.GetConfig("a.b")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, res, "6") assert.Equal(t, res, "6")
} }
@ -78,10 +78,10 @@ func TestBasic(t *testing.T) {
// test set config // test set config
mgr.SetConfig("a.b", "aaa") mgr.SetConfig("a.b", "aaa")
value, err := mgr.GetConfig("a.b") _, value, err := mgr.GetConfig("a.b")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, value, "aaa") assert.Equal(t, value, "aaa")
_, err = mgr.GetConfig("a.a") _, _, err = mgr.GetConfig("a.a")
assert.Error(t, err) assert.Error(t, err)
// test delete config // test delete config
@ -105,7 +105,7 @@ func TestBasic(t *testing.T) {
Key: "ab", Key: "ab",
Value: "aaa", Value: "aaa",
}) })
value, err = mgr.GetConfig("a.b") _, value, err = mgr.GetConfig("a.b")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, value, "aaa") assert.Equal(t, value, "aaa")
@ -116,7 +116,7 @@ func TestBasic(t *testing.T) {
Key: "a.b", Key: "a.b",
Value: "bbb", Value: "bbb",
}) })
value, err = mgr.GetConfig("a.b") _, value, err = mgr.GetConfig("a.b")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, value, "aaa") assert.Equal(t, value, "aaa")
@ -149,7 +149,7 @@ func TestOnEvent(t *testing.T) {
})) }))
os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600) os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600)
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
value, err := mgr.GetConfig("a.b") _, value, err := mgr.GetConfig("a.b")
assert.NoError(t, err) assert.NoError(t, err)
return value == "aaa" return value == "aaa"
}, time.Second*5, time.Second) }, time.Second*5, time.Second)
@ -158,33 +158,62 @@ func TestOnEvent(t *testing.T) {
client.KV.Put(ctx, "test/config/a/b", "bbb") client.KV.Put(ctx, "test/config/a/b", "bbb")
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
value, err := mgr.GetConfig("a.b") _, value, err := mgr.GetConfig("a.b")
assert.NoError(t, err) assert.NoError(t, err)
return value == "bbb" return value == "bbb"
}, time.Second*5, time.Second) }, time.Second*5, time.Second)
client.KV.Put(ctx, "test/config/a/b", "ccc") client.KV.Put(ctx, "test/config/a/b", "ccc")
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
value, err := mgr.GetConfig("a.b") _, value, err := mgr.GetConfig("a.b")
assert.NoError(t, err) assert.NoError(t, err)
return value == "ccc" return value == "ccc"
}, time.Second*5, time.Second) }, time.Second*5, time.Second)
os.WriteFile(yamlFile, []byte("a.b: ddd"), 0o600) os.WriteFile(yamlFile, []byte("a.b: ddd"), 0o600)
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
value, err := mgr.GetConfig("a.b") _, value, err := mgr.GetConfig("a.b")
assert.NoError(t, err) assert.NoError(t, err)
return value == "ccc" return value == "ccc"
}, time.Second*5, time.Second) }, time.Second*5, time.Second)
client.KV.Delete(ctx, "test/config/a/b") client.KV.Delete(ctx, "test/config/a/b")
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
value, err := mgr.GetConfig("a.b") _, value, err := mgr.GetConfig("a.b")
assert.NoError(t, err) assert.NoError(t, err)
return value == "ddd" return value == "ddd"
}, time.Second*5, time.Second) }, time.Second*5, time.Second)
} }
func TestGetConfigAndSource(t *testing.T) {
mgr, _ := Init()
envSource := NewEnvSource(formatKey)
err := mgr.AddSource(envSource)
assert.NoError(t, err)
envSource.configs.Insert("ab-key", "ab-value")
mgr.OnEvent(&Event{
EventSource: envSource.GetSourceName(),
EventType: CreateType,
Key: "ab-key",
})
mgr.SetConfig("ac-key", "ac-value")
_, value, err := mgr.GetConfig("ac-key")
assert.NoError(t, err)
assert.Equal(t, value, "ac-value")
// test get all configs
configs := mgr.GetConfigsView()
v, ok := configs["ab-key"]
assert.True(t, ok)
assert.Contains(t, v, "EnvironmentSource")
v, ok = configs["ac-key"]
assert.True(t, ok)
assert.Contains(t, v, RuntimeSource)
}
func TestDeadlock(t *testing.T) { func TestDeadlock(t *testing.T) {
mgr, _ := Init() mgr, _ := Init()

View File

@ -317,6 +317,7 @@ type IndexTaskStats struct {
IndexVersion int64 `json:"index_version,omitempty,string"` IndexVersion int64 `json:"index_version,omitempty,string"`
CreatedUTCTime string `json:"create_time,omitempty"` CreatedUTCTime string `json:"create_time,omitempty"`
FinishedUTCTime string `json:"finished_time,omitempty"` FinishedUTCTime string `json:"finished_time,omitempty"`
NodeID int64 `json:"node_id,omitempty,string"`
} }
type SyncTask struct { type SyncTask struct {
@ -394,6 +395,7 @@ type CompactionTask struct {
TotalRows int64 `json:"total_rows,omitempty,string"` TotalRows int64 `json:"total_rows,omitempty,string"`
InputSegments []string `json:"input_segments,omitempty"` InputSegments []string `json:"input_segments,omitempty"`
ResultSegments []string `json:"result_segments,omitempty"` ResultSegments []string `json:"result_segments,omitempty"`
NodeID int64 `json:"node_id,omitempty,string"`
} }
// RootCoordConfiguration records the configuration of RootCoord. // RootCoordConfiguration records the configuration of RootCoord.

View File

@ -26,7 +26,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
config "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/pkg/util/typeutil"
@ -252,7 +252,8 @@ func (bt *BaseTable) UpdateSourceOptions(opts ...config.Option) {
// Load loads an object with @key. // Load loads an object with @key.
func (bt *BaseTable) Load(key string) (string, error) { func (bt *BaseTable) Load(key string) (string, error) {
return bt.mgr.GetConfig(key) _, v, err := bt.mgr.GetConfig(key)
return v, err
} }
func (bt *BaseTable) Get(key string) string { func (bt *BaseTable) Get(key string) string {
@ -265,7 +266,7 @@ func (bt *BaseTable) GetWithDefault(key, defaultValue string) string {
return defaultValue return defaultValue
} }
str, err := bt.mgr.GetConfig(key) _, str, err := bt.mgr.GetConfig(key)
if err != nil { if err != nil {
return defaultValue return defaultValue
} }

View File

@ -175,6 +175,10 @@ func (p *ComponentParam) GetAll() map[string]string {
return p.baseTable.mgr.GetConfigs() return p.baseTable.mgr.GetConfigs()
} }
func (p *ComponentParam) GetConfigsView() map[string]string {
return p.baseTable.mgr.GetConfigsView()
}
func (p *ComponentParam) Watch(key string, watcher config.EventHandler) { func (p *ComponentParam) Watch(key string, watcher config.EventHandler) {
p.baseTable.mgr.Dispatcher.Register(key, watcher) p.baseTable.mgr.Dispatcher.Register(key, watcher)
} }

View File

@ -70,14 +70,14 @@ func (pi *ParamItem) getWithRaw() (result, raw string, err error) {
panic(fmt.Sprintf("manager is nil %s", pi.Key)) panic(fmt.Sprintf("manager is nil %s", pi.Key))
} }
// raw value set only once // raw value set only once
raw, err = pi.manager.GetConfig(pi.Key) _, raw, err = pi.manager.GetConfig(pi.Key)
if err != nil || raw == pi.DefaultValue { if err != nil || raw == pi.DefaultValue {
// try fallback if the entry is not exist or default value, // try fallback if the entry is not exist or default value,
// because default value may already defined in milvus.yaml // because default value may already defined in milvus.yaml
// and we don't want the fallback keys be overridden. // and we don't want the fallback keys be overridden.
for _, key := range pi.FallbackKeys { for _, key := range pi.FallbackKeys {
var fallbackRaw string var fallbackRaw string
fallbackRaw, err = pi.manager.GetConfig(key) _, fallbackRaw, err = pi.manager.GetConfig(key)
if err == nil { if err == nil {
raw = fallbackRaw raw = fallbackRaw
break break