milvus/internal/datacoord/index_engine_version_manager.go
wei liu 3e9e830074
enhance: Implement rewatch mechanism for etcd failure scenarios (#43829)
issue: #43828
Implement robust rewatch mechanism to handle etcd connection failures
and node reconnection scenarios in DataCoord and QueryCoord, along with
heartbeat lag monitoring capabilities.

Changes include:
- Implement rewatchDataNodes/rewatchQueryNodes callbacks for etcd
reconnection scenarios
- Add idempotent rewatchNodes method to handle etcd session recovery
gracefully
- Add QueryCoordLastHeartbeatTimeStamp metric for monitoring node
heartbeat lag
- Clean up heartbeat metrics when nodes go down to prevent metric leaks

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
2025-08-14 10:31:44 +08:00

188 lines
4.8 KiB
Go

package datacoord
import (
"math"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/lock"
)
type IndexEngineVersionManager interface {
Startup(sessions map[string]*sessionutil.Session)
AddNode(session *sessionutil.Session)
RemoveNode(session *sessionutil.Session)
Update(session *sessionutil.Session)
GetCurrentIndexEngineVersion() int32
GetMinimalIndexEngineVersion() int32
GetCurrentScalarIndexEngineVersion() int32
GetMinimalScalarIndexEngineVersion() int32
GetIndexNonEncoding() bool
}
type versionManagerImpl struct {
mu lock.Mutex
versions map[int64]sessionutil.IndexEngineVersion
scalarIndexVersions map[int64]sessionutil.IndexEngineVersion
indexNonEncoding map[int64]bool
}
func newIndexEngineVersionManager() IndexEngineVersionManager {
return &versionManagerImpl{
versions: map[int64]sessionutil.IndexEngineVersion{},
scalarIndexVersions: map[int64]sessionutil.IndexEngineVersion{},
indexNonEncoding: map[int64]bool{},
}
}
func (m *versionManagerImpl) Startup(sessions map[string]*sessionutil.Session) {
m.mu.Lock()
defer m.mu.Unlock()
sessionMap := lo.MapKeys(sessions, func(session *sessionutil.Session, _ string) int64 {
return session.ServerID
})
// clean offline nodes
for sessionID := range m.versions {
if _, ok := sessionMap[sessionID]; !ok {
m.removeNodeByID(sessionID)
}
}
// deal with new online nodes
for _, session := range sessions {
m.addOrUpdate(session)
}
}
func (m *versionManagerImpl) AddNode(session *sessionutil.Session) {
m.mu.Lock()
defer m.mu.Unlock()
m.addOrUpdate(session)
}
func (m *versionManagerImpl) RemoveNode(session *sessionutil.Session) {
m.mu.Lock()
defer m.mu.Unlock()
m.removeNodeByID(session.ServerID)
}
func (m *versionManagerImpl) removeNodeByID(sessionID int64) {
delete(m.versions, sessionID)
delete(m.scalarIndexVersions, sessionID)
delete(m.indexNonEncoding, sessionID)
}
func (m *versionManagerImpl) Update(session *sessionutil.Session) {
m.mu.Lock()
defer m.mu.Unlock()
m.addOrUpdate(session)
}
func (m *versionManagerImpl) addOrUpdate(session *sessionutil.Session) {
log.Info("addOrUpdate version", zap.Int64("nodeId", session.ServerID), zap.Int32("minimal", session.IndexEngineVersion.MinimalIndexVersion), zap.Int32("current", session.IndexEngineVersion.CurrentIndexVersion))
m.versions[session.ServerID] = session.IndexEngineVersion
m.scalarIndexVersions[session.ServerID] = session.ScalarIndexEngineVersion
m.indexNonEncoding[session.ServerID] = session.IndexNonEncoding
}
func (m *versionManagerImpl) GetCurrentIndexEngineVersion() int32 {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.versions) == 0 {
log.Info("index versions is empty")
return 0
}
current := int32(math.MaxInt32)
for _, version := range m.versions {
if version.CurrentIndexVersion < current {
current = version.CurrentIndexVersion
}
}
log.Info("Merged current version", zap.Int32("current", current))
return current
}
func (m *versionManagerImpl) GetMinimalIndexEngineVersion() int32 {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.versions) == 0 {
log.Info("index versions is empty")
return 0
}
minimal := int32(0)
for _, version := range m.versions {
if version.MinimalIndexVersion > minimal {
minimal = version.MinimalIndexVersion
}
}
log.Info("Merged minimal version", zap.Int32("minimal", minimal))
return minimal
}
func (m *versionManagerImpl) GetCurrentScalarIndexEngineVersion() int32 {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.scalarIndexVersions) == 0 {
log.Info("scalar index versions is empty")
return 0
}
current := int32(math.MaxInt32)
for _, version := range m.scalarIndexVersions {
if version.CurrentIndexVersion < current {
current = version.CurrentIndexVersion
}
}
log.Info("Merged current scalar index version", zap.Int32("current", current))
return current
}
func (m *versionManagerImpl) GetMinimalScalarIndexEngineVersion() int32 {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.scalarIndexVersions) == 0 {
log.Info("scalar index versions is empty")
return 0
}
minimal := int32(0)
for _, version := range m.scalarIndexVersions {
if version.MinimalIndexVersion > minimal {
minimal = version.MinimalIndexVersion
}
}
log.Info("Merged minimal scalar index version", zap.Int32("minimal", minimal))
return minimal
}
func (m *versionManagerImpl) GetIndexNonEncoding() bool {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.indexNonEncoding) == 0 {
log.Info("indexNonEncoding map is empty")
// by default, we fall back to old index format for safety
return false
}
noneEncoding := true
for _, encoding := range m.indexNonEncoding {
noneEncoding = noneEncoding && encoding
}
return noneEncoding
}