mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
issue: #32910 see also: #32911 when channel exclusive mode is enabled, replica will record channel node info in meta store, and if the balance policy changes, which means channel exclusive mode is disabled, we should clean up the channel node info in meta store, and stop to balance node between channels. Signed-off-by: Wei Liu <wei.liu@zilliz.com>
242 lines
6.0 KiB
Go
242 lines
6.0 KiB
Go
package meta
|
|
|
|
import (
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/suite"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
|
)
|
|
|
|
type ReplicaSuite struct {
|
|
suite.Suite
|
|
|
|
replicaPB *querypb.Replica
|
|
}
|
|
|
|
func (suite *ReplicaSuite) SetupSuite() {
|
|
paramtable.Init()
|
|
suite.replicaPB = &querypb.Replica{
|
|
ID: 1,
|
|
CollectionID: 2,
|
|
Nodes: []int64{1, 2, 3},
|
|
ResourceGroup: DefaultResourceGroupName,
|
|
RoNodes: []int64{4},
|
|
}
|
|
}
|
|
|
|
func (suite *ReplicaSuite) TestReadOperations() {
|
|
r := newReplica(suite.replicaPB)
|
|
suite.testRead(r)
|
|
// keep same after clone.
|
|
mutableReplica := r.copyForWrite()
|
|
suite.testRead(mutableReplica.IntoReplica())
|
|
}
|
|
|
|
func (suite *ReplicaSuite) TestClone() {
|
|
r := newReplica(suite.replicaPB)
|
|
r2 := r.copyForWrite()
|
|
suite.testRead(r)
|
|
|
|
// after apply write operation on copy, the original should not be affected.
|
|
r2.AddRWNode(5, 6)
|
|
r2.AddRONode(1, 2)
|
|
r2.RemoveNode(3)
|
|
suite.testRead(r)
|
|
}
|
|
|
|
func (suite *ReplicaSuite) TestRange() {
|
|
count := 0
|
|
r := newReplica(suite.replicaPB)
|
|
r.RangeOverRWNodes(func(nodeID int64) bool {
|
|
count++
|
|
return true
|
|
})
|
|
suite.Equal(3, count)
|
|
count = 0
|
|
r.RangeOverRONodes(func(nodeID int64) bool {
|
|
count++
|
|
return true
|
|
})
|
|
suite.Equal(1, count)
|
|
|
|
count = 0
|
|
r.RangeOverRWNodes(func(nodeID int64) bool {
|
|
count++
|
|
return false
|
|
})
|
|
suite.Equal(1, count)
|
|
|
|
mr := r.copyForWrite()
|
|
mr.AddRONode(1)
|
|
|
|
count = 0
|
|
mr.RangeOverRWNodes(func(nodeID int64) bool {
|
|
count++
|
|
return false
|
|
})
|
|
suite.Equal(1, count)
|
|
}
|
|
|
|
func (suite *ReplicaSuite) TestWriteOperation() {
|
|
r := newReplica(suite.replicaPB)
|
|
mr := r.copyForWrite()
|
|
|
|
// test add available node.
|
|
suite.False(mr.Contains(5))
|
|
suite.False(mr.Contains(6))
|
|
mr.AddRWNode(5, 6)
|
|
suite.Equal(3, r.RWNodesCount())
|
|
suite.Equal(1, r.RONodesCount())
|
|
suite.Equal(4, r.NodesCount())
|
|
suite.Equal(5, mr.RWNodesCount())
|
|
suite.Equal(1, mr.RONodesCount())
|
|
suite.Equal(6, mr.NodesCount())
|
|
suite.True(mr.Contains(5))
|
|
suite.True(mr.Contains(5))
|
|
suite.True(mr.Contains(6))
|
|
|
|
// test add ro node.
|
|
suite.False(mr.ContainRWNode(4))
|
|
suite.False(mr.ContainRWNode(7))
|
|
mr.AddRWNode(4, 7)
|
|
suite.Equal(3, r.RWNodesCount())
|
|
suite.Equal(1, r.RONodesCount())
|
|
suite.Equal(4, r.NodesCount())
|
|
suite.Equal(7, mr.RWNodesCount())
|
|
suite.Equal(0, mr.RONodesCount())
|
|
suite.Equal(7, mr.NodesCount())
|
|
suite.True(mr.Contains(4))
|
|
suite.True(mr.Contains(7))
|
|
|
|
// test remove node to ro.
|
|
mr.AddRONode(4, 7)
|
|
suite.Equal(3, r.RWNodesCount())
|
|
suite.Equal(1, r.RONodesCount())
|
|
suite.Equal(4, r.NodesCount())
|
|
suite.Equal(5, mr.RWNodesCount())
|
|
suite.Equal(2, mr.RONodesCount())
|
|
suite.Equal(7, mr.NodesCount())
|
|
suite.False(mr.ContainRWNode(4))
|
|
suite.False(mr.ContainRWNode(7))
|
|
suite.True(mr.ContainRONode(4))
|
|
suite.True(mr.ContainRONode(7))
|
|
|
|
// test remove node.
|
|
mr.RemoveNode(4, 5, 7, 8)
|
|
suite.Equal(3, r.RWNodesCount())
|
|
suite.Equal(1, r.RONodesCount())
|
|
suite.Equal(4, r.NodesCount())
|
|
suite.Equal(4, mr.RWNodesCount())
|
|
suite.Equal(0, mr.RONodesCount())
|
|
suite.Equal(4, mr.NodesCount())
|
|
suite.False(mr.Contains(4))
|
|
suite.False(mr.Contains(5))
|
|
suite.False(mr.Contains(7))
|
|
|
|
// test set resource group.
|
|
mr.SetResourceGroup("rg1")
|
|
suite.Equal(r.GetResourceGroup(), DefaultResourceGroupName)
|
|
suite.Equal("rg1", mr.GetResourceGroup())
|
|
|
|
// should panic after IntoReplica.
|
|
mr.IntoReplica()
|
|
suite.Panics(func() {
|
|
mr.SetResourceGroup("newResourceGroup")
|
|
})
|
|
}
|
|
|
|
func (suite *ReplicaSuite) testRead(r *Replica) {
|
|
// Test GetID()
|
|
suite.Equal(suite.replicaPB.GetID(), r.GetID())
|
|
|
|
// Test GetCollectionID()
|
|
suite.Equal(suite.replicaPB.GetCollectionID(), r.GetCollectionID())
|
|
|
|
// Test GetResourceGroup()
|
|
suite.Equal(suite.replicaPB.GetResourceGroup(), r.GetResourceGroup())
|
|
|
|
// Test GetNodes()
|
|
suite.ElementsMatch(suite.replicaPB.GetNodes(), r.GetNodes())
|
|
|
|
// Test GetRONodes()
|
|
suite.ElementsMatch(suite.replicaPB.GetRoNodes(), r.GetRONodes())
|
|
|
|
// Test AvailableNodesCount()
|
|
suite.Equal(len(suite.replicaPB.GetNodes()), r.RWNodesCount())
|
|
|
|
// Test Contains()
|
|
suite.True(r.Contains(1))
|
|
suite.True(r.Contains(4))
|
|
|
|
// Test ContainRONode()
|
|
suite.False(r.ContainRONode(1))
|
|
suite.True(r.ContainRONode(4))
|
|
|
|
// Test ContainsRWNode()
|
|
suite.True(r.ContainRWNode(1))
|
|
suite.False(r.ContainRWNode(4))
|
|
}
|
|
|
|
func (suite *ReplicaSuite) TestChannelExclusiveMode() {
|
|
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.Balancer.Key, ChannelLevelScoreBalancerName)
|
|
defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.Balancer.Key)
|
|
|
|
r := newReplica(&querypb.Replica{
|
|
ID: 1,
|
|
CollectionID: 2,
|
|
ResourceGroup: DefaultResourceGroupName,
|
|
ChannelNodeInfos: map[string]*querypb.ChannelNodeInfo{
|
|
"channel1": {},
|
|
"channel2": {},
|
|
"channel3": {},
|
|
"channel4": {},
|
|
},
|
|
})
|
|
|
|
mutableReplica := r.copyForWrite()
|
|
// add 10 rw nodes, exclusive mode is false.
|
|
for i := 0; i < 10; i++ {
|
|
mutableReplica.AddRWNode(int64(i))
|
|
}
|
|
r = mutableReplica.IntoReplica()
|
|
for _, channelNodeInfo := range r.replicaPB.GetChannelNodeInfos() {
|
|
suite.Equal(0, len(channelNodeInfo.GetRwNodes()))
|
|
}
|
|
|
|
mutableReplica = r.copyForWrite()
|
|
// add 10 rw nodes, exclusive mode is true.
|
|
for i := 10; i < 20; i++ {
|
|
mutableReplica.AddRWNode(int64(i))
|
|
}
|
|
r = mutableReplica.IntoReplica()
|
|
for _, channelNodeInfo := range r.replicaPB.GetChannelNodeInfos() {
|
|
suite.Equal(5, len(channelNodeInfo.GetRwNodes()))
|
|
}
|
|
|
|
// 4 node become read only, exclusive mode still be true
|
|
mutableReplica = r.copyForWrite()
|
|
for i := 0; i < 4; i++ {
|
|
mutableReplica.AddRONode(int64(i))
|
|
}
|
|
r = mutableReplica.IntoReplica()
|
|
for _, channelNodeInfo := range r.replicaPB.GetChannelNodeInfos() {
|
|
suite.Equal(4, len(channelNodeInfo.GetRwNodes()))
|
|
}
|
|
|
|
// 4 node has been removed, exclusive mode back to false
|
|
mutableReplica = r.copyForWrite()
|
|
for i := 4; i < 8; i++ {
|
|
mutableReplica.RemoveNode(int64(i))
|
|
}
|
|
r = mutableReplica.IntoReplica()
|
|
for _, channelNodeInfo := range r.replicaPB.GetChannelNodeInfos() {
|
|
suite.Equal(0, len(channelNodeInfo.GetRwNodes()))
|
|
}
|
|
}
|
|
|
|
func TestReplica(t *testing.T) {
|
|
suite.Run(t, new(ReplicaSuite))
|
|
}
|