fix: [2.5] Fix infinite loop in ResourceManager recovery process (#45563)

relate: https://github.com/milvus-io/milvus/issues/45557

Signed-off-by: lianyu.sun <lianyu.sun@ly.com>
This commit is contained in:
7y-9 2025-11-17 15:19:39 +08:00 committed by GitHub
parent b8f14d3992
commit a42e847678
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 252 additions and 2 deletions

View File

@ -1009,8 +1009,8 @@ func (rm *ResourceManager) GetResourceGroupsJSON(ctx context.Context) string {
}
func (rm *ResourceManager) CheckNodesInResourceGroup(ctx context.Context) {
rm.rwmutex.RLock()
defer rm.rwmutex.RUnlock()
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
// clean stopping/offline nodes
assignedNodes := typeutil.NewUniqueSet()

View File

@ -1221,3 +1221,253 @@ func TestResourceManager_CheckNodesInResourceGroup_AllNodesHealthy(t *testing.T)
assert.Contains(t, finalNodes, int64(1002), "Healthy node should remain")
assert.Equal(t, 2, len(finalNodes), "Should have exactly 2 nodes")
}
// TestResourceManager_CheckNodesInResourceGroup_WithMultipleResourceGroups tests CheckNodesInResourceGroup with multiple resource groups
func TestResourceManager_CheckNodesInResourceGroup_WithMultipleResourceGroups(t *testing.T) {
// Arrange
manager := createTestResourceManager(t)
ctx := context.Background()
// Create additional resource groups
err := manager.AddResourceGroup(ctx, "rg1", newResourceGroupConfig(2, 2))
assert.NoError(t, err)
err = manager.AddResourceGroup(ctx, "rg2", newResourceGroupConfig(2, 2))
assert.NoError(t, err)
// Add nodes to node manager
for i := 1; i <= 6; i++ {
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
})
manager.nodeMgr.Add(nodeInfo)
manager.handleNodeUp(ctx, int64(i))
}
// Verify initial state
assert.Equal(t, 2, manager.GetResourceGroup(ctx, "rg1").NodeNum())
assert.Equal(t, 2, manager.GetResourceGroup(ctx, "rg2").NodeNum())
assert.Equal(t, 2, manager.GetResourceGroup(ctx, DefaultResourceGroupName).NodeNum())
// Set node 1 as stopping
nodeInfo1 := manager.nodeMgr.Get(1)
nodeInfo1.SetState(session.NodeStateStopping)
// Add a new node
newNodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 7,
Address: "localhost",
Hostname: "localhost",
})
manager.nodeMgr.Add(newNodeInfo)
// Act
manager.CheckNodesInResourceGroup(ctx)
// Assert
// Node 1 should be removed (stopping)
allNodes, _ := manager.GetNodes(ctx, "rg1")
allNodes2, _ := manager.GetNodes(ctx, "rg2")
allNodes3, _ := manager.GetNodes(ctx, DefaultResourceGroupName)
totalNodes := len(allNodes) + len(allNodes2) + len(allNodes3)
// Total nodes should be 6 (removed 1 stopping node, added 1 new node)
assert.Equal(t, 6, totalNodes, "Total nodes should be 6")
// New node should be assigned
allAssignedNodes := append(allNodes, allNodes2...)
allAssignedNodes = append(allAssignedNodes, allNodes3...)
assert.Contains(t, allAssignedNodes, int64(7), "New node should be assigned")
assert.NotContains(t, allAssignedNodes, int64(1), "Stopping node should be removed")
}
// TestResourceManager_CheckNodesInResourceGroup_ConcurrentSafety tests that CheckNodesInResourceGroup is safe for concurrent access
func TestResourceManager_CheckNodesInResourceGroup_ConcurrentSafety(t *testing.T) {
// Arrange
manager := createTestResourceManager(t)
ctx := context.Background()
// Add some nodes
for i := 1; i <= 5; i++ {
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
})
manager.nodeMgr.Add(nodeInfo)
manager.handleNodeUp(ctx, int64(i))
}
// Act - Call CheckNodesInResourceGroup multiple times concurrently
done := make(chan bool, 3)
for i := 0; i < 3; i++ {
go func() {
manager.CheckNodesInResourceGroup(ctx)
done <- true
}()
}
// Wait for all goroutines to complete
for i := 0; i < 3; i++ {
<-done
}
// Assert - Verify that the state is consistent
finalNodes, err := manager.GetNodes(ctx, DefaultResourceGroupName)
assert.NoError(t, err)
assert.Equal(t, 5, len(finalNodes), "Should have all 5 nodes")
}
// TestResourceManager_CheckNodesInResourceGroup_EmbeddedQueryNode tests removal of embedded query nodes
func TestResourceManager_CheckNodesInResourceGroup_EmbeddedQueryNode(t *testing.T) {
// Arrange
manager := createTestResourceManager(t)
ctx := context.Background()
// Add a normal node
nodeInfo1 := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1001,
Address: "localhost:1001",
Hostname: "localhost",
})
manager.nodeMgr.Add(nodeInfo1)
manager.handleNodeUp(ctx, 1001)
// Add an embedded query node
nodeInfo2 := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1002,
Address: "localhost:1002",
Hostname: "localhost",
Labels: map[string]string{
sessionutil.LabelStreamingNodeEmbeddedQueryNode: "1",
},
})
manager.nodeMgr.Add(nodeInfo2)
manager.handleNodeUp(ctx, 1002)
// Manually assign node 1002 to resource group (simulate dirty state)
manager.rwmutex.Lock()
manager.groups[DefaultResourceGroupName].nodes.Insert(1002)
manager.nodeIDMap[1002] = DefaultResourceGroupName
manager.rwmutex.Unlock()
// Act
manager.CheckNodesInResourceGroup(ctx)
// Assert
finalNodes, err := manager.GetNodes(ctx, DefaultResourceGroupName)
assert.NoError(t, err)
assert.Contains(t, finalNodes, int64(1001), "Normal node should remain")
assert.NotContains(t, finalNodes, int64(1002), "Embedded query node should be removed")
}
// TestResourceManager_CheckNodesInResourceGroup_MixedStates tests CheckNodesInResourceGroup with nodes in various states
func TestResourceManager_CheckNodesInResourceGroup_MixedStates(t *testing.T) {
// Arrange
manager := createTestResourceManager(t)
ctx := context.Background()
// Add nodes in different states
// Healthy node
nodeInfo1 := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1001,
Address: "localhost:1001",
Hostname: "localhost",
})
manager.nodeMgr.Add(nodeInfo1)
manager.handleNodeUp(ctx, 1001)
// Stopping node
nodeInfo2 := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1002,
Address: "localhost:1002",
Hostname: "localhost",
})
manager.nodeMgr.Add(nodeInfo2)
manager.handleNodeUp(ctx, 1002)
nodeInfo2.SetState(session.NodeStateStopping)
// Offline node (assigned but not in nodeMgr)
manager.rwmutex.Lock()
manager.groups[DefaultResourceGroupName].nodes.Insert(1003)
manager.nodeIDMap[1003] = DefaultResourceGroupName
manager.rwmutex.Unlock()
// New node not yet assigned
nodeInfo4 := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1004,
Address: "localhost:1004",
Hostname: "localhost",
})
manager.nodeMgr.Add(nodeInfo4)
// Act
manager.CheckNodesInResourceGroup(ctx)
// Assert
finalNodes, err := manager.GetNodes(ctx, DefaultResourceGroupName)
assert.NoError(t, err)
// Healthy node should remain
assert.Contains(t, finalNodes, int64(1001), "Healthy node should remain")
// Stopping node should be removed
assert.NotContains(t, finalNodes, int64(1002), "Stopping node should be removed")
// Offline node should be removed
assert.NotContains(t, finalNodes, int64(1003), "Offline node should be removed")
// New node should be added
assert.Contains(t, finalNodes, int64(1004), "New node should be added")
}
// TestResourceManager_CheckNodesInResourceGroup_NoInfiniteLoop tests that redundant node recovery doesn't cause infinite loops
func TestResourceManager_CheckNodesInResourceGroup_NoInfiniteLoop(t *testing.T) {
// This test addresses the issue described in https://github.com/milvus-io/milvus/issues/45557
// Arrange
manager := createTestResourceManager(t)
ctx := context.Background()
// Create resource groups with specific limits
err := manager.AddResourceGroup(ctx, "rg1", newResourceGroupConfig(2, 2))
assert.NoError(t, err)
// Add exactly enough nodes to fill the resource group
for i := 1; i <= 2; i++ {
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
})
manager.nodeMgr.Add(nodeInfo)
}
// Act - This should not cause an infinite loop
// In the bug scenario, nodes would be transferred back and forth between resource groups
manager.CheckNodesInResourceGroup(ctx)
manager.AutoRecoverResourceGroup(ctx, "rg1")
manager.AutoRecoverResourceGroup(ctx, DefaultResourceGroupName)
// Do it multiple times to ensure stability
for i := 0; i < 5; i++ {
manager.CheckNodesInResourceGroup(ctx)
manager.AutoRecoverResourceGroup(ctx, "rg1")
manager.AutoRecoverResourceGroup(ctx, DefaultResourceGroupName)
}
// Assert - State should be stable
rg1Nodes, _ := manager.GetNodes(ctx, "rg1")
defaultNodes, _ := manager.GetNodes(ctx, DefaultResourceGroupName)
assert.Equal(t, 2, len(rg1Nodes), "rg1 should have 2 nodes")
assert.Equal(t, 0, len(defaultNodes), "default rg should have 0 nodes")
// Verify no nodes are duplicated
allNodes := append(rg1Nodes, defaultNodes...)
uniqueNodes := make(map[int64]bool)
for _, node := range allNodes {
assert.False(t, uniqueNodes[node], "Node %d should not be duplicated", node)
uniqueNodes[node] = true
}
}