mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: add rw/ro streaming query node replica management (#38677)
issue: #38399 - Embed the query node into streaming node to make delegator available at streaming node. - The embedded query node has a special server label `QUERYNODE_STREAMING-EMBEDDED`. - Change the balance strategy to make the channel assigned to streaming node as much as possible. Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
8117d59f85
commit
c84a0748c4
@ -150,7 +150,9 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
|
||||
role.EnableIndexNode = true
|
||||
case typeutil.StreamingNodeRole:
|
||||
streamingutil.MustEnableStreamingService()
|
||||
streamingutil.EnableEmbededQueryNode()
|
||||
role.EnableStreamingNode = true
|
||||
role.EnableQueryNode = true
|
||||
case typeutil.StandaloneRole, typeutil.EmbeddedRole:
|
||||
role.EnableRootCoord = true
|
||||
role.EnableProxy = true
|
||||
@ -175,6 +177,10 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
|
||||
role.EnableIndexNode = enableIndexNode
|
||||
role.EnableProxy = enableProxy
|
||||
role.EnableStreamingNode = enableStreamingNode
|
||||
if enableStreamingNode && !enableQueryNode {
|
||||
role.EnableQueryNode = true
|
||||
streamingutil.EnableEmbededQueryNode()
|
||||
}
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", serverType, getHelp())
|
||||
os.Exit(-1)
|
||||
|
||||
106
internal/coordinator/snmanager/streaming_node_manager.go
Normal file
106
internal/coordinator/snmanager/streaming_node_manager.go
Normal file
@ -0,0 +1,106 @@
|
||||
package snmanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/streaming/util/types"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/syncutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
var StaticStreamingNodeManager = newStreamingNodeManager()
|
||||
|
||||
func newStreamingNodeManager() *StreamingNodeManager {
|
||||
snm := &StreamingNodeManager{
|
||||
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
|
||||
balancer: syncutil.NewFuture[balancer.Balancer](),
|
||||
cond: syncutil.NewContextCond(&sync.Mutex{}),
|
||||
latestAssignments: make(map[string]types.PChannelInfoAssigned),
|
||||
streamingNodes: typeutil.NewUniqueSet(),
|
||||
nodeChangedNotifier: syncutil.NewVersionedNotifier(),
|
||||
}
|
||||
go snm.execute()
|
||||
return snm
|
||||
}
|
||||
|
||||
// StreamingNodeManager is a manager for manage the querynode that embedded into streaming node.
|
||||
// StreamingNodeManager is exclusive with ResourceManager.
|
||||
type StreamingNodeManager struct {
|
||||
notifier *syncutil.AsyncTaskNotifier[struct{}]
|
||||
balancer *syncutil.Future[balancer.Balancer]
|
||||
// The coord is merged after 2.6, so we don't need to make distribution safe.
|
||||
cond *syncutil.ContextCond
|
||||
latestAssignments map[string]types.PChannelInfoAssigned // The latest assignments info got from streaming coord balance module.
|
||||
streamingNodes typeutil.UniqueSet
|
||||
nodeChangedNotifier *syncutil.VersionedNotifier // used to notify that node in streaming node manager has been changed.
|
||||
}
|
||||
|
||||
// GetWALLocated returns the server id of the node that the wal of the vChannel is located.
|
||||
func (s *StreamingNodeManager) GetWALLocated(vChannel string) int64 {
|
||||
pchannel := funcutil.ToPhysicalChannel(vChannel)
|
||||
var targetServerID int64
|
||||
|
||||
s.cond.L.Lock()
|
||||
for {
|
||||
if assignment, ok := s.latestAssignments[pchannel]; ok {
|
||||
targetServerID = assignment.Node.ServerID
|
||||
break
|
||||
}
|
||||
s.cond.Wait(context.Background())
|
||||
}
|
||||
s.cond.L.Unlock()
|
||||
return targetServerID
|
||||
}
|
||||
|
||||
// GetStreamingQueryNodeIDs returns the server ids of the streaming query nodes.
|
||||
func (s *StreamingNodeManager) GetStreamingQueryNodeIDs() typeutil.UniqueSet {
|
||||
s.cond.L.Lock()
|
||||
defer s.cond.L.Unlock()
|
||||
return s.streamingNodes.Clone()
|
||||
}
|
||||
|
||||
// ListenNodeChanged returns a listener for node changed event.
|
||||
func (s *StreamingNodeManager) ListenNodeChanged() *syncutil.VersionedListener {
|
||||
return s.nodeChangedNotifier.Listen(syncutil.VersionedListenAtEarliest)
|
||||
}
|
||||
|
||||
// SetBalancerReady set the balancer ready for the streaming node manager from streamingcoord initialization.
|
||||
func (s *StreamingNodeManager) SetBalancerReady(b balancer.Balancer) {
|
||||
s.balancer.Set(b)
|
||||
}
|
||||
|
||||
func (s *StreamingNodeManager) execute() (err error) {
|
||||
defer s.notifier.Finish(struct{}{})
|
||||
|
||||
balancer, err := s.balancer.GetWithContext(s.notifier.Context())
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to wait balancer ready")
|
||||
}
|
||||
for {
|
||||
if err := balancer.WatchChannelAssignments(s.notifier.Context(), func(
|
||||
version typeutil.VersionInt64Pair,
|
||||
relations []types.PChannelInfoAssigned,
|
||||
) error {
|
||||
s.cond.LockAndBroadcast()
|
||||
s.latestAssignments = make(map[string]types.PChannelInfoAssigned)
|
||||
s.streamingNodes = typeutil.NewUniqueSet()
|
||||
for _, relation := range relations {
|
||||
s.latestAssignments[relation.Channel.Name] = relation
|
||||
s.streamingNodes.Insert(relation.Node.ServerID)
|
||||
}
|
||||
s.nodeChangedNotifier.NotifyAll()
|
||||
log.Info("streaming node manager updated", zap.Any("assignments", s.latestAssignments), zap.Any("streamingNodes", s.streamingNodes))
|
||||
s.cond.L.Unlock()
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,62 @@
|
||||
package snmanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer"
|
||||
"github.com/milvus-io/milvus/pkg/streaming/util/types"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
type pChannelInfoAssigned struct {
|
||||
version typeutil.VersionInt64Pair
|
||||
pchannels []types.PChannelInfoAssigned
|
||||
}
|
||||
|
||||
func TestStreamingNodeManager(t *testing.T) {
|
||||
m := newStreamingNodeManager()
|
||||
b := mock_balancer.NewMockBalancer(t)
|
||||
|
||||
ch := make(chan pChannelInfoAssigned, 1)
|
||||
b.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).Run(
|
||||
func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case p := <-ch:
|
||||
cb(p.version, p.pchannels)
|
||||
}
|
||||
}
|
||||
})
|
||||
m.SetBalancerReady(b)
|
||||
|
||||
streamingNodes := m.GetStreamingQueryNodeIDs()
|
||||
assert.Empty(t, streamingNodes)
|
||||
|
||||
ch <- pChannelInfoAssigned{
|
||||
version: typeutil.VersionInt64Pair{
|
||||
Global: 1,
|
||||
Local: 1,
|
||||
},
|
||||
pchannels: []types.PChannelInfoAssigned{
|
||||
{
|
||||
Channel: types.PChannelInfo{Name: "a_test", Term: 1},
|
||||
Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost:1"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
listener := m.ListenNodeChanged()
|
||||
err := listener.Wait(context.Background())
|
||||
assert.NoError(t, err)
|
||||
|
||||
node := m.GetWALLocated("a_test")
|
||||
assert.Equal(t, node, int64(1))
|
||||
streamingNodes = m.GetStreamingQueryNodeIDs()
|
||||
assert.Equal(t, len(streamingNodes), 1)
|
||||
}
|
||||
@ -27,6 +27,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
@ -108,6 +109,8 @@ func (b *RoundRobinBalancer) AssignSegment(ctx context.Context, collectionID int
|
||||
}
|
||||
|
||||
func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan {
|
||||
nodes = filterSQNIfStreamingServiceEnabled(nodes)
|
||||
|
||||
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
|
||||
if !forceAssign {
|
||||
versionRangeFilter := semver.MustParseRange(">2.3.x")
|
||||
@ -122,22 +125,29 @@ func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, collectionID int
|
||||
if len(nodesInfo) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
plans := make([]ChannelAssignPlan, 0)
|
||||
scoreDelta := make(map[int64]int)
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
channels, plans, scoreDelta = assignChannelToWALLocatedFirstForNodeInfo(channels, nodesInfo)
|
||||
}
|
||||
|
||||
sort.Slice(nodesInfo, func(i, j int) bool {
|
||||
cnt1, cnt2 := nodesInfo[i].ChannelCnt(), nodesInfo[j].ChannelCnt()
|
||||
id1, id2 := nodesInfo[i].ID(), nodesInfo[j].ID()
|
||||
delta1, delta2 := b.scheduler.GetChannelTaskDelta(id1, -1), b.scheduler.GetChannelTaskDelta(id2, -1)
|
||||
delta1, delta2 := b.scheduler.GetChannelTaskDelta(id1, -1)+scoreDelta[id1], b.scheduler.GetChannelTaskDelta(id2, -1)+scoreDelta[id2]
|
||||
return cnt1+delta1 < cnt2+delta2
|
||||
})
|
||||
ret := make([]ChannelAssignPlan, 0, len(channels))
|
||||
|
||||
for i, c := range channels {
|
||||
plan := ChannelAssignPlan{
|
||||
Channel: c,
|
||||
From: -1,
|
||||
To: nodesInfo[i%len(nodesInfo)].ID(),
|
||||
}
|
||||
ret = append(ret, plan)
|
||||
plans = append(plans, plan)
|
||||
}
|
||||
return ret
|
||||
return plans
|
||||
}
|
||||
|
||||
func (b *RoundRobinBalancer) BalanceReplica(ctx context.Context, replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) {
|
||||
|
||||
@ -28,6 +28,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
@ -67,6 +68,17 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica
|
||||
}
|
||||
}()
|
||||
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
// Make a plan to rebalance the channel first.
|
||||
// The Streaming QueryNode doesn't make the channel level score, so just fallback to the ScoreBasedBalancer.
|
||||
stoppingBalance := paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool()
|
||||
channelPlan := b.ScoreBasedBalancer.balanceChannels(ctx, br, replica, stoppingBalance)
|
||||
// If the channelPlan is not empty, do it directly, don't do the segment balance.
|
||||
if len(channelPlan) > 0 {
|
||||
return nil, channelPlan
|
||||
}
|
||||
}
|
||||
|
||||
exclusiveMode := true
|
||||
channels := b.targetMgr.GetDmChannelsByCollection(ctx, replica.GetCollectionID(), meta.CurrentTarget)
|
||||
for channelName := range channels {
|
||||
@ -122,7 +134,7 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica
|
||||
zap.Any("available nodes", rwNodes),
|
||||
)
|
||||
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
|
||||
if b.permitBalanceChannel(replica.GetCollectionID()) {
|
||||
if b.permitBalanceChannel(replica.GetCollectionID()) && !streamingutil.IsStreamingServiceEnabled() {
|
||||
channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, channelName, rwNodes, roNodes)...)
|
||||
}
|
||||
|
||||
@ -130,7 +142,7 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica
|
||||
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, channelName, rwNodes, roNodes)...)
|
||||
}
|
||||
} else {
|
||||
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) {
|
||||
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) && !streamingutil.IsStreamingServiceEnabled() {
|
||||
channelPlans = append(channelPlans, b.genChannelPlan(ctx, replica, channelName, rwNodes)...)
|
||||
}
|
||||
|
||||
|
||||
@ -15,6 +15,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
@ -485,24 +486,53 @@ func (b *MultiTargetBalancer) BalanceReplica(ctx context.Context, replica *meta.
|
||||
}
|
||||
}()
|
||||
|
||||
if replica.NodesCount() == 0 {
|
||||
return nil, nil
|
||||
stoppingBalance := paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool()
|
||||
|
||||
channelPlans = b.balanceChannels(ctx, br, replica, stoppingBalance)
|
||||
if len(channelPlans) == 0 {
|
||||
segmentPlans = b.balanceSegments(ctx, replica, stoppingBalance)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (b *MultiTargetBalancer) balanceChannels(ctx context.Context, br *balanceReport, replica *meta.Replica, stoppingBalance bool) []ChannelAssignPlan {
|
||||
var rwNodes, roNodes []int64
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
rwNodes, roNodes = replica.GetRWSQNodes(), replica.GetROSQNodes()
|
||||
} else {
|
||||
rwNodes, roNodes = replica.GetRWNodes(), replica.GetRONodes()
|
||||
}
|
||||
|
||||
if len(rwNodes) == 0 || !b.permitBalanceChannel(replica.GetCollectionID()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(roNodes) != 0 {
|
||||
if !stoppingBalance {
|
||||
log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes))
|
||||
return nil
|
||||
}
|
||||
return b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)
|
||||
}
|
||||
|
||||
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() {
|
||||
return b.genChannelPlan(ctx, br, replica, rwNodes)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *MultiTargetBalancer) balanceSegments(ctx context.Context, replica *meta.Replica, stoppingBalance bool) []SegmentAssignPlan {
|
||||
rwNodes := replica.GetRWNodes()
|
||||
roNodes := replica.GetRONodes()
|
||||
|
||||
if len(rwNodes) == 0 {
|
||||
// no available nodes to balance
|
||||
return nil, nil
|
||||
if len(rwNodes) == 0 || !b.permitBalanceSegment(replica.GetCollectionID()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// print current distribution before generating plans
|
||||
segmentPlans, channelPlans = make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0)
|
||||
if len(roNodes) != 0 {
|
||||
if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
|
||||
if !stoppingBalance {
|
||||
log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes))
|
||||
return nil, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Info("Handle stopping nodes",
|
||||
@ -510,23 +540,9 @@ func (b *MultiTargetBalancer) BalanceReplica(ctx context.Context, replica *meta.
|
||||
zap.Any("available nodes", rwNodes),
|
||||
)
|
||||
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
|
||||
if b.permitBalanceChannel(replica.GetCollectionID()) {
|
||||
channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)...)
|
||||
}
|
||||
if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
|
||||
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)...)
|
||||
}
|
||||
} else {
|
||||
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) {
|
||||
channelPlans = append(channelPlans, b.genChannelPlan(ctx, br, replica, rwNodes)...)
|
||||
}
|
||||
|
||||
if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
|
||||
segmentPlans = b.genSegmentPlan(ctx, replica, rwNodes)
|
||||
}
|
||||
return b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)
|
||||
}
|
||||
|
||||
return segmentPlans, channelPlans
|
||||
return b.genSegmentPlan(ctx, replica, rwNodes)
|
||||
}
|
||||
|
||||
func (b *MultiTargetBalancer) genSegmentPlan(ctx context.Context, replica *meta.Replica, rwNodes []int64) []SegmentAssignPlan {
|
||||
|
||||
@ -26,9 +26,11 @@ import (
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
@ -87,6 +89,8 @@ func (b *RowCountBasedBalancer) AssignSegment(ctx context.Context, collectionID
|
||||
// AssignSegment, when row count based balancer assign segments, it will assign channel to node with least global channel count.
|
||||
// try to make every query node has channel count
|
||||
func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan {
|
||||
nodes = filterSQNIfStreamingServiceEnabled(nodes)
|
||||
|
||||
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
|
||||
if !forceAssign {
|
||||
versionRangeFilter := semver.MustParseRange(">2.3.x")
|
||||
@ -99,19 +103,29 @@ func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, collectionID
|
||||
}
|
||||
|
||||
nodeItems := b.convertToNodeItemsByChannel(nodes)
|
||||
nodeItems = lo.Shuffle(nodeItems)
|
||||
if len(nodeItems) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
queue := newPriorityQueue()
|
||||
for _, item := range nodeItems {
|
||||
queue.push(item)
|
||||
}
|
||||
|
||||
plans := make([]ChannelAssignPlan, 0, len(channels))
|
||||
plans := make([]ChannelAssignPlan, 0)
|
||||
for _, c := range channels {
|
||||
// pick the node with the least channel num and allocate to it.
|
||||
ni := queue.pop().(*nodeItem)
|
||||
var ni *nodeItem
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
// When streaming service is enabled, we need to assign channel to the node where WAL is located.
|
||||
nodeID := snmanager.StaticStreamingNodeManager.GetWALLocated(c.GetChannelName())
|
||||
if item, ok := nodeItems[nodeID]; ok {
|
||||
ni = item
|
||||
}
|
||||
}
|
||||
if ni == nil {
|
||||
// pick the node with the least channel num and allocate to it.
|
||||
ni = queue.pop().(*nodeItem)
|
||||
}
|
||||
plan := ChannelAssignPlan{
|
||||
From: -1,
|
||||
To: ni.nodeID,
|
||||
@ -151,8 +165,8 @@ func (b *RowCountBasedBalancer) convertToNodeItemsBySegment(nodeIDs []int64) []*
|
||||
return ret
|
||||
}
|
||||
|
||||
func (b *RowCountBasedBalancer) convertToNodeItemsByChannel(nodeIDs []int64) []*nodeItem {
|
||||
ret := make([]*nodeItem, 0, len(nodeIDs))
|
||||
func (b *RowCountBasedBalancer) convertToNodeItemsByChannel(nodeIDs []int64) map[int64]*nodeItem {
|
||||
ret := make(map[int64]*nodeItem, len(nodeIDs))
|
||||
for _, node := range nodeIDs {
|
||||
channels := b.dist.ChannelDistManager.GetByFilter(meta.WithNodeID2Channel(node))
|
||||
|
||||
@ -161,7 +175,7 @@ func (b *RowCountBasedBalancer) convertToNodeItemsByChannel(nodeIDs []int64) []*
|
||||
channelCount += b.scheduler.GetChannelTaskDelta(node, -1)
|
||||
// more channel num, less priority
|
||||
nodeItem := newNodeItem(channelCount, node)
|
||||
ret = append(ret, &nodeItem)
|
||||
ret[node] = &nodeItem
|
||||
}
|
||||
return ret
|
||||
}
|
||||
@ -181,22 +195,53 @@ func (b *RowCountBasedBalancer) BalanceReplica(ctx context.Context, replica *met
|
||||
log.Info("balance plan generated", zap.Stringers("report details", br.records))
|
||||
}
|
||||
}()
|
||||
if replica.NodesCount() == 0 {
|
||||
return nil, nil
|
||||
|
||||
stoppingBalance := paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool()
|
||||
|
||||
channelPlans = b.balanceChannels(ctx, br, replica, stoppingBalance)
|
||||
if len(channelPlans) == 0 {
|
||||
segmentPlans = b.balanceSegments(ctx, replica, stoppingBalance)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (b *RowCountBasedBalancer) balanceChannels(ctx context.Context, br *balanceReport, replica *meta.Replica, stoppingBalance bool) []ChannelAssignPlan {
|
||||
var rwNodes, roNodes []int64
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
rwNodes, roNodes = replica.GetRWSQNodes(), replica.GetROSQNodes()
|
||||
} else {
|
||||
rwNodes, roNodes = replica.GetRWNodes(), replica.GetRONodes()
|
||||
}
|
||||
|
||||
if len(rwNodes) == 0 || !b.permitBalanceChannel(replica.GetCollectionID()) {
|
||||
return nil
|
||||
}
|
||||
if len(roNodes) != 0 {
|
||||
if !stoppingBalance {
|
||||
log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes))
|
||||
return nil
|
||||
}
|
||||
return b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)
|
||||
}
|
||||
|
||||
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() {
|
||||
return b.genChannelPlan(ctx, br, replica, rwNodes)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *RowCountBasedBalancer) balanceSegments(ctx context.Context, replica *meta.Replica, stoppingBalance bool) []SegmentAssignPlan {
|
||||
rwNodes := replica.GetRWNodes()
|
||||
roNodes := replica.GetRONodes()
|
||||
if len(rwNodes) == 0 {
|
||||
// no available nodes to balance
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
segmentPlans, channelPlans = make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0)
|
||||
if len(rwNodes) == 0 || !b.permitBalanceSegment(replica.GetCollectionID()) {
|
||||
return nil
|
||||
}
|
||||
// print current distribution before generating plans
|
||||
if len(roNodes) != 0 {
|
||||
if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
|
||||
if !stoppingBalance {
|
||||
log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes))
|
||||
return nil, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Info("Handle stopping nodes",
|
||||
@ -204,24 +249,9 @@ func (b *RowCountBasedBalancer) BalanceReplica(ctx context.Context, replica *met
|
||||
zap.Any("available nodes", rwNodes),
|
||||
)
|
||||
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
|
||||
if b.permitBalanceChannel(replica.GetCollectionID()) {
|
||||
channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)...)
|
||||
}
|
||||
|
||||
if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
|
||||
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)...)
|
||||
}
|
||||
} else {
|
||||
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) {
|
||||
channelPlans = append(channelPlans, b.genChannelPlan(ctx, br, replica, rwNodes)...)
|
||||
}
|
||||
|
||||
if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
|
||||
segmentPlans = append(segmentPlans, b.genSegmentPlan(ctx, replica, rwNodes)...)
|
||||
}
|
||||
return b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)
|
||||
}
|
||||
|
||||
return segmentPlans, channelPlans
|
||||
return b.genSegmentPlan(ctx, replica, rwNodes)
|
||||
}
|
||||
|
||||
func (b *RowCountBasedBalancer) genStoppingSegmentPlan(ctx context.Context, replica *meta.Replica, rwNodes []int64, roNodes []int64) []SegmentAssignPlan {
|
||||
|
||||
@ -25,10 +25,12 @@ import (
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
@ -149,6 +151,8 @@ func (b *ScoreBasedBalancer) AssignChannel(ctx context.Context, collectionID int
|
||||
}
|
||||
|
||||
func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan {
|
||||
nodes = filterSQNIfStreamingServiceEnabled(nodes)
|
||||
|
||||
balanceBatchSize := math.MaxInt64
|
||||
if !forceAssign {
|
||||
nodes = lo.Filter(nodes, func(node int64, _ int) bool {
|
||||
@ -175,8 +179,18 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64
|
||||
plans := make([]ChannelAssignPlan, 0, len(channels))
|
||||
for _, ch := range channels {
|
||||
func(ch *meta.DmChannel) {
|
||||
var targetNode *nodeItem
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
// When streaming service is enabled, we need to assign channel to the node where WAL is located.
|
||||
nodeID := snmanager.StaticStreamingNodeManager.GetWALLocated(ch.GetChannelName())
|
||||
if item, ok := nodeItemsMap[nodeID]; ok {
|
||||
targetNode = item
|
||||
}
|
||||
}
|
||||
// for each channel, pick the node with the least score
|
||||
targetNode := queue.pop().(*nodeItem)
|
||||
if targetNode == nil {
|
||||
targetNode = queue.pop().(*nodeItem)
|
||||
}
|
||||
// make sure candidate is always push back
|
||||
defer queue.push(targetNode)
|
||||
scoreChanges := b.calculateChannelScore(ch, collectionID)
|
||||
@ -439,52 +453,78 @@ func (b *ScoreBasedBalancer) BalanceReplica(ctx context.Context, replica *meta.R
|
||||
log.Info("balance plan generated", zap.Stringers("nodesInfo", br.NodesInfo()), zap.Stringers("report details", br.records))
|
||||
}
|
||||
}()
|
||||
|
||||
if replica.NodesCount() == 0 {
|
||||
br.AddRecord(StrRecord("replica has no querynode"))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
stoppingBalance := paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool()
|
||||
|
||||
channelPlans = b.balanceChannels(ctx, br, replica, stoppingBalance)
|
||||
if len(channelPlans) == 0 {
|
||||
segmentPlans = b.balanceSegments(ctx, br, replica, stoppingBalance)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (b *ScoreBasedBalancer) balanceChannels(ctx context.Context, br *balanceReport, replica *meta.Replica, stoppingBalance bool) []ChannelAssignPlan {
|
||||
var rwNodes []int64
|
||||
var roNodes []int64
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
rwNodes, roNodes = replica.GetRWSQNodes(), replica.GetROSQNodes()
|
||||
} else {
|
||||
rwNodes, roNodes = replica.GetRWNodes(), replica.GetRONodes()
|
||||
}
|
||||
|
||||
if len(rwNodes) == 0 || !b.permitBalanceChannel(replica.GetCollectionID()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(roNodes) != 0 {
|
||||
if !stoppingBalance {
|
||||
log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes))
|
||||
br.AddRecord(StrRecord("stopping balance is disabled"))
|
||||
return nil
|
||||
}
|
||||
|
||||
br.AddRecord(StrRecordf("executing stopping balance: %v", roNodes))
|
||||
return b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)
|
||||
}
|
||||
|
||||
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() {
|
||||
return b.genChannelPlan(ctx, br, replica, rwNodes)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *ScoreBasedBalancer) balanceSegments(ctx context.Context, br *balanceReport, replica *meta.Replica, stoppingBalance bool) []SegmentAssignPlan {
|
||||
rwNodes := replica.GetRWNodes()
|
||||
roNodes := replica.GetRONodes()
|
||||
|
||||
if len(rwNodes) == 0 {
|
||||
// no available nodes to balance
|
||||
br.AddRecord(StrRecord("no rwNodes to balance"))
|
||||
return nil, nil
|
||||
return nil
|
||||
}
|
||||
if !b.permitBalanceSegment(replica.GetCollectionID()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// print current distribution before generating plans
|
||||
segmentPlans, channelPlans = make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0)
|
||||
if len(roNodes) != 0 {
|
||||
if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
|
||||
if !stoppingBalance {
|
||||
log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes))
|
||||
br.AddRecord(StrRecord("stopping balance is disabled"))
|
||||
return nil, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Info("Handle stopping nodes",
|
||||
zap.Any("stopping nodes", roNodes),
|
||||
zap.Any("available nodes", rwNodes),
|
||||
)
|
||||
br.AddRecord(StrRecordf("executing stopping balance: %v", roNodes))
|
||||
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
|
||||
if b.permitBalanceChannel(replica.GetCollectionID()) {
|
||||
channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)...)
|
||||
}
|
||||
if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
|
||||
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)...)
|
||||
}
|
||||
} else {
|
||||
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) {
|
||||
channelPlans = append(channelPlans, b.genChannelPlan(ctx, br, replica, rwNodes)...)
|
||||
}
|
||||
|
||||
if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
|
||||
segmentPlans = append(segmentPlans, b.genSegmentPlan(ctx, br, replica, rwNodes)...)
|
||||
}
|
||||
return b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)
|
||||
}
|
||||
|
||||
return segmentPlans, channelPlans
|
||||
return b.genSegmentPlan(ctx, br, replica, rwNodes)
|
||||
}
|
||||
|
||||
func (b *ScoreBasedBalancer) genStoppingChannelPlan(ctx context.Context, replica *meta.Replica, rwNodes []int64, roNodes []int64) []ChannelAssignPlan {
|
||||
|
||||
@ -0,0 +1,64 @@
|
||||
package balance
|
||||
|
||||
import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
)
|
||||
|
||||
func assignChannelToWALLocatedFirstForNodeInfo(
|
||||
channels []*meta.DmChannel,
|
||||
nodeItems []*session.NodeInfo,
|
||||
) (notFoundChannels []*meta.DmChannel, plans []ChannelAssignPlan, scoreDelta map[int64]int) {
|
||||
plans = make([]ChannelAssignPlan, 0)
|
||||
notFoundChannels = make([]*meta.DmChannel, 0)
|
||||
scoreDelta = make(map[int64]int)
|
||||
for _, c := range channels {
|
||||
nodeID := snmanager.StaticStreamingNodeManager.GetWALLocated(c.GetChannelName())
|
||||
// Check if nodeID is in the list of nodeItems
|
||||
// The nodeID may not be in the nodeItems when multi replica mode.
|
||||
// Only one replica can be assigned to the node that wal is located.
|
||||
found := false
|
||||
for _, item := range nodeItems {
|
||||
if item.ID() == nodeID {
|
||||
plans = append(plans, ChannelAssignPlan{
|
||||
From: -1,
|
||||
To: item.ID(),
|
||||
Channel: c,
|
||||
})
|
||||
found = true
|
||||
scoreDelta[item.ID()] += 1
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
notFoundChannels = append(notFoundChannels, c)
|
||||
}
|
||||
}
|
||||
return notFoundChannels, plans, scoreDelta
|
||||
}
|
||||
|
||||
// filterSQNIfStreamingServiceEnabled filter out the non-sqn querynode.
|
||||
func filterSQNIfStreamingServiceEnabled(nodes []int64) []int64 {
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
sqns := snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs()
|
||||
expectedSQNs := make([]int64, 0, len(nodes))
|
||||
unexpectedNodes := make([]int64, 0)
|
||||
for _, node := range nodes {
|
||||
if sqns.Contain(node) {
|
||||
expectedSQNs = append(expectedSQNs, node)
|
||||
} else {
|
||||
unexpectedNodes = append(unexpectedNodes, node)
|
||||
}
|
||||
}
|
||||
if len(unexpectedNodes) > 0 {
|
||||
log.Warn("unexpected streaming querynode found when enable streaming service", zap.Int64s("unexpectedNodes", unexpectedNodes))
|
||||
}
|
||||
return expectedSQNs
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
@ -0,0 +1,76 @@
|
||||
package balance
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
|
||||
"github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/pkg/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/streaming/util/types"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
func TestAssignChannelToWALLocatedFirst(t *testing.T) {
|
||||
balancer := mock_balancer.NewMockBalancer(t)
|
||||
snmanager.StaticStreamingNodeManager.SetBalancerReady(balancer)
|
||||
|
||||
balancer.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) error {
|
||||
versions := []typeutil.VersionInt64Pair{
|
||||
{Global: 1, Local: 2},
|
||||
}
|
||||
pchans := [][]types.PChannelInfoAssigned{
|
||||
{
|
||||
types.PChannelInfoAssigned{
|
||||
Channel: types.PChannelInfo{Name: "pchannel", Term: 1},
|
||||
Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost:1"},
|
||||
},
|
||||
types.PChannelInfoAssigned{
|
||||
Channel: types.PChannelInfo{Name: "pchannel2", Term: 1},
|
||||
Node: types.StreamingNodeInfo{ServerID: 2, Address: "localhost:1"},
|
||||
},
|
||||
types.PChannelInfoAssigned{
|
||||
Channel: types.PChannelInfo{Name: "pchannel3", Term: 1},
|
||||
Node: types.StreamingNodeInfo{ServerID: 3, Address: "localhost:1"},
|
||||
},
|
||||
},
|
||||
}
|
||||
for i := 0; i < len(versions); i++ {
|
||||
cb(versions[i], pchans[i])
|
||||
}
|
||||
<-ctx.Done()
|
||||
return context.Cause(ctx)
|
||||
})
|
||||
|
||||
channels := []*meta.DmChannel{
|
||||
{VchannelInfo: &datapb.VchannelInfo{ChannelName: "pchannel_v1"}},
|
||||
{VchannelInfo: &datapb.VchannelInfo{ChannelName: "pchannel2_v2"}},
|
||||
{VchannelInfo: &datapb.VchannelInfo{ChannelName: "pchannel3_v1"}},
|
||||
}
|
||||
|
||||
var scoreDelta map[int64]int
|
||||
nodeInfos := []*session.NodeInfo{
|
||||
session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 1}),
|
||||
session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 2}),
|
||||
}
|
||||
|
||||
notFounChannels, plans, scoreDelta := assignChannelToWALLocatedFirstForNodeInfo(channels, nodeInfos)
|
||||
assert.Len(t, notFounChannels, 1)
|
||||
assert.Equal(t, notFounChannels[0].GetChannelName(), "pchannel3_v1")
|
||||
assert.Len(t, plans, 2)
|
||||
assert.Len(t, scoreDelta, 2)
|
||||
for _, plan := range plans {
|
||||
if plan.Channel.GetChannelName() == "pchannel_v1" {
|
||||
assert.Equal(t, plan.To, int64(1))
|
||||
assert.Equal(t, scoreDelta[1], 1)
|
||||
} else {
|
||||
assert.Equal(t, plan.To, int64(2))
|
||||
assert.Equal(t, scoreDelta[2], 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -30,6 +30,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
@ -228,9 +229,13 @@ func (c *ChannelChecker) findRepeatedChannels(ctx context.Context, replicaID int
|
||||
func (c *ChannelChecker) createChannelLoadTask(ctx context.Context, channels []*meta.DmChannel, replica *meta.Replica) []task.Task {
|
||||
plans := make([]balance.ChannelAssignPlan, 0)
|
||||
for _, ch := range channels {
|
||||
rwNodes := replica.GetChannelRWNodes(ch.GetChannelName())
|
||||
if len(rwNodes) == 0 {
|
||||
rwNodes = replica.GetRWNodes()
|
||||
var rwNodes []int64
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
rwNodes = replica.GetRWSQNodes()
|
||||
} else {
|
||||
if rwNodes = replica.GetChannelRWNodes(ch.GetChannelName()); len(rwNodes) == 0 {
|
||||
rwNodes = replica.GetRWNodes()
|
||||
}
|
||||
}
|
||||
plan := c.getBalancerFunc().AssignChannel(ctx, replica.GetCollectionID(), []*meta.DmChannel{ch}, rwNodes, true)
|
||||
plans = append(plans, plan...)
|
||||
|
||||
@ -18,13 +18,23 @@ var NilReplica = newReplica(&querypb.Replica{
|
||||
// So only read only operations are allowed on these type.
|
||||
type Replica struct {
|
||||
replicaPB *querypb.Replica
|
||||
rwNodes typeutil.UniqueSet // a helper field for manipulating replica's Available Nodes slice field.
|
||||
// Nodes is the legacy querynode that is not embedded in the streamingnode, which can only load sealed segment.
|
||||
rwNodes typeutil.UniqueSet // a helper field for manipulating replica's Available Nodes slice field.
|
||||
// always keep consistent with replicaPB.Nodes.
|
||||
// mutual exclusive with roNodes.
|
||||
roNodes typeutil.UniqueSet // a helper field for manipulating replica's RO Nodes slice field.
|
||||
// always keep consistent with replicaPB.RoNodes.
|
||||
// node used by replica but cannot add more channel or segment ont it.
|
||||
// node used by replica but cannot add segment on it.
|
||||
// include rebalance node or node out of resource group.
|
||||
|
||||
// SQNodes is the querynode that is embedded in the streamingnode, which can only watch channel and load growing segment.
|
||||
rwSQNodes typeutil.UniqueSet // a helper field for manipulating replica's RW SQ Nodes slice field.
|
||||
// always keep consistent with replicaPB.RwSqNodes.
|
||||
// mutable exclusive with roSQNodes.
|
||||
roSQNodes typeutil.UniqueSet // a helper field for manipulating replica's RO SQ Nodes slice field.
|
||||
// always keep consistent with replicaPB.RoSqNodes.
|
||||
// node used by replica but cannot add more channel on it.
|
||||
// include the rebalance node.
|
||||
}
|
||||
|
||||
// Deprecated: may break the consistency of ReplicaManager, use `Spawn` of `ReplicaManager` or `newReplica` instead.
|
||||
@ -44,6 +54,8 @@ func newReplica(replica *querypb.Replica) *Replica {
|
||||
replicaPB: proto.Clone(replica).(*querypb.Replica),
|
||||
rwNodes: typeutil.NewUniqueSet(replica.Nodes...),
|
||||
roNodes: typeutil.NewUniqueSet(replica.RoNodes...),
|
||||
rwSQNodes: typeutil.NewUniqueSet(replica.RwSqNodes...),
|
||||
roSQNodes: typeutil.NewUniqueSet(replica.RoSqNodes...),
|
||||
}
|
||||
}
|
||||
|
||||
@ -65,10 +77,12 @@ func (replica *Replica) GetResourceGroup() string {
|
||||
// GetNodes returns the rw nodes of the replica.
|
||||
// readonly, don't modify the returned slice.
|
||||
func (replica *Replica) GetNodes() []int64 {
|
||||
nodes := make([]int64, 0)
|
||||
nodes = append(nodes, replica.replicaPB.GetRoNodes()...)
|
||||
nodes = append(nodes, replica.replicaPB.GetNodes()...)
|
||||
return nodes
|
||||
nodes := typeutil.NewUniqueSet()
|
||||
nodes.Insert(replica.replicaPB.GetRoNodes()...)
|
||||
nodes.Insert(replica.replicaPB.GetNodes()...)
|
||||
nodes.Insert(replica.replicaPB.GetRwSqNodes()...)
|
||||
nodes.Insert(replica.replicaPB.GetRoSqNodes()...)
|
||||
return nodes.Collect()
|
||||
}
|
||||
|
||||
// GetRONodes returns the ro nodes of the replica.
|
||||
@ -83,6 +97,18 @@ func (replica *Replica) GetRWNodes() []int64 {
|
||||
return replica.replicaPB.GetNodes()
|
||||
}
|
||||
|
||||
// GetROSQNodes returns the ro sq nodes of the replica.
|
||||
// readonly, don't modify the returned slice.
|
||||
func (replica *Replica) GetROSQNodes() []int64 {
|
||||
return replica.replicaPB.GetRoSqNodes()
|
||||
}
|
||||
|
||||
// GetRWSQNodes returns the rw sq nodes of the replica.
|
||||
// readonly, don't modify the returned slice.
|
||||
func (replica *Replica) GetRWSQNodes() []int64 {
|
||||
return replica.replicaPB.GetRwSqNodes()
|
||||
}
|
||||
|
||||
// RangeOverRWNodes iterates over the read and write nodes of the replica.
|
||||
func (replica *Replica) RangeOverRWNodes(f func(node int64) bool) {
|
||||
replica.rwNodes.Range(f)
|
||||
@ -93,6 +119,16 @@ func (replica *Replica) RangeOverRONodes(f func(node int64) bool) {
|
||||
replica.roNodes.Range(f)
|
||||
}
|
||||
|
||||
// RangeOverRWSQNodes iterates over the read and write streaming query nodes of the replica.
|
||||
func (replica *Replica) RangeOverRWSQNodes(f func(node int64) bool) {
|
||||
replica.rwSQNodes.Range(f)
|
||||
}
|
||||
|
||||
// RangeOverROSQNodes iterates over the ro streaming query nodes of the replica.
|
||||
func (replica *Replica) RangeOverROSQNodes(f func(node int64) bool) {
|
||||
replica.roSQNodes.Range(f)
|
||||
}
|
||||
|
||||
// RWNodesCount returns the count of rw nodes of the replica.
|
||||
func (replica *Replica) RWNodesCount() int {
|
||||
return replica.rwNodes.Len()
|
||||
@ -103,6 +139,16 @@ func (replica *Replica) RONodesCount() int {
|
||||
return replica.roNodes.Len()
|
||||
}
|
||||
|
||||
// RWSQNodesCount returns the count of rw nodes of the replica.
|
||||
func (replica *Replica) RWSQNodesCount() int {
|
||||
return replica.rwSQNodes.Len()
|
||||
}
|
||||
|
||||
// ROSQNodesCount returns the count of ro nodes of the replica.
|
||||
func (replica *Replica) ROSQNodesCount() int {
|
||||
return replica.roSQNodes.Len()
|
||||
}
|
||||
|
||||
// NodesCount returns the count of rw nodes and ro nodes of the replica.
|
||||
func (replica *Replica) NodesCount() int {
|
||||
return replica.rwNodes.Len() + replica.roNodes.Len()
|
||||
@ -110,7 +156,7 @@ func (replica *Replica) NodesCount() int {
|
||||
|
||||
// Contains checks if the node is in rw nodes of the replica.
|
||||
func (replica *Replica) Contains(node int64) bool {
|
||||
return replica.ContainRONode(node) || replica.ContainRWNode(node)
|
||||
return replica.ContainRONode(node) || replica.ContainRWNode(node) || replica.ContainSQNode(node) || replica.ContainRWSQNode(node)
|
||||
}
|
||||
|
||||
// ContainRONode checks if the node is in ro nodes of the replica.
|
||||
@ -123,6 +169,21 @@ func (replica *Replica) ContainRWNode(node int64) bool {
|
||||
return replica.rwNodes.Contain(node)
|
||||
}
|
||||
|
||||
// ContainSQNode checks if the node is in rw sq nodes of the replica.
|
||||
func (replica *Replica) ContainSQNode(node int64) bool {
|
||||
return replica.ContainROSQNode(node) || replica.ContainRWSQNode(node)
|
||||
}
|
||||
|
||||
// ContainRWSQNode checks if the node is in rw sq nodes of the replica.
|
||||
func (replica *Replica) ContainROSQNode(node int64) bool {
|
||||
return replica.roSQNodes.Contain(node)
|
||||
}
|
||||
|
||||
// ContainRWSQNode checks if the node is in rw sq nodes of the replica.
|
||||
func (replica *Replica) ContainRWSQNode(node int64) bool {
|
||||
return replica.rwSQNodes.Contain(node)
|
||||
}
|
||||
|
||||
// Deprecated: Warning, break the consistency of ReplicaManager, use `SetAvailableNodesInSameCollectionAndRG` in ReplicaManager instead.
|
||||
// TODO: removed in future, only for old unittest now.
|
||||
func (replica *Replica) AddRWNode(nodes ...int64) {
|
||||
@ -154,6 +215,8 @@ func (replica *Replica) CopyForWrite() *mutableReplica {
|
||||
replicaPB: proto.Clone(replica.replicaPB).(*querypb.Replica),
|
||||
rwNodes: typeutil.NewUniqueSet(replica.replicaPB.Nodes...),
|
||||
roNodes: typeutil.NewUniqueSet(replica.replicaPB.RoNodes...),
|
||||
rwSQNodes: typeutil.NewUniqueSet(replica.replicaPB.RwSqNodes...),
|
||||
roSQNodes: typeutil.NewUniqueSet(replica.replicaPB.RoSqNodes...),
|
||||
},
|
||||
exclusiveRWNodeToChannel: exclusiveRWNodeToChannel,
|
||||
}
|
||||
@ -209,6 +272,30 @@ func (replica *mutableReplica) RemoveNode(nodes ...int64) {
|
||||
replica.tryBalanceNodeForChannel()
|
||||
}
|
||||
|
||||
// AddRWSQNode adds the node to rw sq nodes of the replica.
|
||||
func (replica *mutableReplica) AddRWSQNode(nodes ...int64) {
|
||||
replica.roSQNodes.Remove(nodes...)
|
||||
replica.replicaPB.RoSqNodes = replica.roSQNodes.Collect()
|
||||
replica.rwSQNodes.Insert(nodes...)
|
||||
replica.replicaPB.RwSqNodes = replica.rwSQNodes.Collect()
|
||||
}
|
||||
|
||||
// AddROSQNode add the node to ro sq nodes of the replica.
|
||||
func (replica *mutableReplica) AddROSQNode(nodes ...int64) {
|
||||
replica.rwSQNodes.Remove(nodes...)
|
||||
replica.replicaPB.RwSqNodes = replica.rwSQNodes.Collect()
|
||||
replica.roSQNodes.Insert(nodes...)
|
||||
replica.replicaPB.RoSqNodes = replica.roSQNodes.Collect()
|
||||
}
|
||||
|
||||
// RemoveSQNode removes the node from rw sq nodes and ro sq nodes of the replica.
|
||||
func (replica *mutableReplica) RemoveSQNode(nodes ...int64) {
|
||||
replica.rwSQNodes.Remove(nodes...)
|
||||
replica.replicaPB.RwSqNodes = replica.rwSQNodes.Collect()
|
||||
replica.roSQNodes.Remove(nodes...)
|
||||
replica.replicaPB.RoSqNodes = replica.roSQNodes.Collect()
|
||||
}
|
||||
|
||||
func (replica *mutableReplica) removeChannelExclusiveNodes(nodes ...int64) {
|
||||
channelNodeMap := make(map[string][]int64)
|
||||
for _, nodeID := range nodes {
|
||||
|
||||
@ -497,6 +497,21 @@ func (m *ReplicaManager) RemoveNode(ctx context.Context, replicaID typeutil.Uniq
|
||||
return m.put(ctx, mutableReplica.IntoReplica())
|
||||
}
|
||||
|
||||
// RemoveSQNode removes the sq node from all replicas of given collection.
|
||||
func (m *ReplicaManager) RemoveSQNode(ctx context.Context, replicaID typeutil.UniqueID, nodes ...typeutil.UniqueID) error {
|
||||
m.rwmutex.Lock()
|
||||
defer m.rwmutex.Unlock()
|
||||
|
||||
replica, ok := m.replicas[replicaID]
|
||||
if !ok {
|
||||
return merr.WrapErrReplicaNotFound(replicaID)
|
||||
}
|
||||
|
||||
mutableReplica := replica.CopyForWrite()
|
||||
mutableReplica.RemoveSQNode(nodes...) // ro -> unused
|
||||
return m.put(ctx, mutableReplica.IntoReplica())
|
||||
}
|
||||
|
||||
func (m *ReplicaManager) GetResourceGroupByCollection(ctx context.Context, collection typeutil.UniqueID) typeutil.Set[string] {
|
||||
replicas := m.GetByCollection(ctx, collection)
|
||||
ret := typeutil.NewSet(lo.Map(replicas, func(r *Replica, _ int) string { return r.GetResourceGroup() })...)
|
||||
@ -542,3 +557,48 @@ func (m *ReplicaManager) GetReplicasJSON(ctx context.Context, meta *Meta) string
|
||||
}
|
||||
return string(ret)
|
||||
}
|
||||
|
||||
// RecoverSQNodesInCollection recovers all sq nodes in collection with latest node list.
|
||||
// Promise a node will be only assigned to one replica in same collection at same time.
|
||||
// 1. Move the rw nodes to ro nodes if current replica use too much sqn.
|
||||
// 2. Add new incoming nodes into the replica if they are not ro node of other replicas in same collection.
|
||||
// 3. replicas will shared the nodes in resource group fairly.
|
||||
func (m *ReplicaManager) RecoverSQNodesInCollection(ctx context.Context, collectionID int64, sqnNodeIDs typeutil.UniqueSet) error {
|
||||
m.rwmutex.Lock()
|
||||
defer m.rwmutex.Unlock()
|
||||
|
||||
collReplicas, ok := m.coll2Replicas[collectionID]
|
||||
if !ok {
|
||||
return errors.Errorf("collection %d not loaded", collectionID)
|
||||
}
|
||||
|
||||
helper := newReplicaSQNAssignmentHelper(collReplicas.replicas, sqnNodeIDs)
|
||||
helper.updateExpectedNodeCountForReplicas(len(sqnNodeIDs))
|
||||
|
||||
modifiedReplicas := make([]*Replica, 0)
|
||||
// recover node by given sqn node list.
|
||||
helper.RangeOverReplicas(func(assignment *replicaAssignmentInfo) {
|
||||
roNodes := assignment.GetNewRONodes()
|
||||
recoverableNodes, incomingNodeCount := assignment.GetRecoverNodesAndIncomingNodeCount()
|
||||
// There may be not enough incoming nodes for current replica,
|
||||
// Even we filtering the nodes that are used by other replica of same collection in other resource group,
|
||||
// current replica's expected node may be still used by other replica of same collection in same resource group.
|
||||
incomingNode := helper.AllocateIncomingNodes(incomingNodeCount)
|
||||
if len(roNodes) == 0 && len(recoverableNodes) == 0 && len(incomingNode) == 0 {
|
||||
// nothing to do.
|
||||
return
|
||||
}
|
||||
mutableReplica := m.replicas[assignment.GetReplicaID()].CopyForWrite()
|
||||
mutableReplica.AddROSQNode(roNodes...) // rw -> ro
|
||||
mutableReplica.AddRWSQNode(recoverableNodes...) // ro -> rw
|
||||
mutableReplica.AddRWSQNode(incomingNode...) // unused -> rw
|
||||
log.Info(
|
||||
"new replica recovery streaming query node found",
|
||||
zap.Int64("replicaID", assignment.GetReplicaID()),
|
||||
zap.Int64s("newRONodes", roNodes),
|
||||
zap.Int64s("roToRWNodes", recoverableNodes),
|
||||
zap.Int64s("newIncomingNodes", incomingNode))
|
||||
modifiedReplicas = append(modifiedReplicas, mutableReplica.IntoReplica())
|
||||
})
|
||||
return m.put(ctx, modifiedReplicas...)
|
||||
}
|
||||
|
||||
@ -179,6 +179,40 @@ func newReplicaAssignmentInfo(replica *Replica, nodeInRG typeutil.UniqueSet) *re
|
||||
}
|
||||
}
|
||||
|
||||
func newReplicaSQNAssignmentInfo(replica *Replica, nodes typeutil.UniqueSet) *replicaAssignmentInfo {
|
||||
// node in replica can be split into 3 part.
|
||||
rwNodes := make(typeutil.UniqueSet, replica.RWSQNodesCount())
|
||||
newRONodes := make(typeutil.UniqueSet, replica.ROSQNodesCount())
|
||||
unrecoverableRONodes := make(typeutil.UniqueSet, replica.ROSQNodesCount())
|
||||
recoverableRONodes := make(typeutil.UniqueSet, replica.ROSQNodesCount())
|
||||
|
||||
replica.RangeOverRWSQNodes(func(nodeID int64) bool {
|
||||
if nodes.Contain(nodeID) {
|
||||
rwNodes.Insert(nodeID)
|
||||
} else {
|
||||
newRONodes.Insert(nodeID)
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
replica.RangeOverROSQNodes(func(nodeID int64) bool {
|
||||
if nodes.Contain(nodeID) {
|
||||
recoverableRONodes.Insert(nodeID)
|
||||
} else {
|
||||
unrecoverableRONodes.Insert(nodeID)
|
||||
}
|
||||
return true
|
||||
})
|
||||
return &replicaAssignmentInfo{
|
||||
replicaID: replica.GetID(),
|
||||
expectedNodeCount: 0,
|
||||
rwNodes: rwNodes,
|
||||
newRONodes: newRONodes,
|
||||
recoverableRONodes: recoverableRONodes,
|
||||
unrecoverableRONodes: unrecoverableRONodes,
|
||||
}
|
||||
}
|
||||
|
||||
type replicaAssignmentInfo struct {
|
||||
replicaID typeutil.UniqueID
|
||||
expectedNodeCount int // expected node count for each replica.
|
||||
@ -236,6 +270,11 @@ func (s *replicaAssignmentInfo) GetRecoverNodesAndIncomingNodeCount() (recoverNo
|
||||
return recoverNodes, incomingNodeCount
|
||||
}
|
||||
|
||||
// GetUnrecoverableNodes returns the unrecoverable ro nodes for these replica.
|
||||
func (s *replicaAssignmentInfo) GetUnrecoverableNodes() []int64 {
|
||||
return s.unrecoverableRONodes.Collect()
|
||||
}
|
||||
|
||||
// RangeOverAllNodes iterate all nodes in replica.
|
||||
func (s *replicaAssignmentInfo) RangeOverAllNodes(f func(nodeID int64)) {
|
||||
ff := func(nodeID int64) bool {
|
||||
@ -270,3 +309,32 @@ func (s replicaAssignmentInfoSortByAvailableAndRecoverable) Less(i, j int) bool
|
||||
// Otherwise unstable assignment may cause unnecessary node transfer.
|
||||
return left < right || (left == right && s.replicaAssignmentInfoSorter[i].replicaID < s.replicaAssignmentInfoSorter[j].replicaID)
|
||||
}
|
||||
|
||||
// newReplicaSQNAssignmentHelper creates a new replicaSQNAssignmentHelper.
|
||||
func newReplicaSQNAssignmentHelper(
|
||||
replicas []*Replica,
|
||||
nodes typeutil.UniqueSet,
|
||||
) *replicasInSameRGAssignmentHelper {
|
||||
// We use a fake resource group name to create a helper.
|
||||
assignmentInfos := make([]*replicaAssignmentInfo, 0, len(replicas))
|
||||
for _, replica := range replicas {
|
||||
assignmentInfos = append(assignmentInfos, newReplicaSQNAssignmentInfo(replica, nodes))
|
||||
}
|
||||
h := &replicasInSameRGAssignmentHelper{
|
||||
rgName: "",
|
||||
nodesInRG: nodes,
|
||||
incomingNodes: nodes.Clone(),
|
||||
replicas: assignmentInfos,
|
||||
}
|
||||
// generate incoming nodes for collection.
|
||||
h.RangeOverReplicas(func(assignment *replicaAssignmentInfo) {
|
||||
assignment.RangeOverAllNodes(func(nodeID int64) {
|
||||
if nodes.Contain(nodeID) {
|
||||
h.incomingNodes.Remove(nodeID)
|
||||
}
|
||||
})
|
||||
})
|
||||
// update expected node count for all replicas in same resource group.
|
||||
h.updateExpectedNodeCountForReplicas(len(nodes))
|
||||
return h
|
||||
}
|
||||
|
||||
@ -154,7 +154,7 @@ func (suite *ReplicaManagerSuite) TestGet() {
|
||||
suite.Equal(collectionID, replica.GetCollectionID())
|
||||
suite.Equal(replica, mgr.Get(ctx, replica.GetID()))
|
||||
suite.Equal(len(replica.replicaPB.GetNodes()), replica.RWNodesCount())
|
||||
suite.Equal(replica.replicaPB.GetNodes(), replica.GetNodes())
|
||||
suite.ElementsMatch(replica.replicaPB.GetNodes(), replica.GetNodes())
|
||||
replicaNodes[replica.GetID()] = replica.GetNodes()
|
||||
nodes = append(nodes, replica.GetNodes()...)
|
||||
}
|
||||
@ -221,7 +221,7 @@ func (suite *ReplicaManagerSuite) TestRecover() {
|
||||
replica := mgr.Get(ctx, 2100)
|
||||
suite.NotNil(replica)
|
||||
suite.EqualValues(1000, replica.GetCollectionID())
|
||||
suite.EqualValues([]int64{1, 2, 3}, replica.GetNodes())
|
||||
suite.ElementsMatch([]int64{1, 2, 3}, replica.GetNodes())
|
||||
suite.Len(replica.GetNodes(), len(replica.GetNodes()))
|
||||
for _, node := range replica.GetNodes() {
|
||||
suite.True(replica.Contains(node))
|
||||
@ -332,12 +332,14 @@ func (suite *ReplicaManagerSuite) clearMemory() {
|
||||
type ReplicaManagerV2Suite struct {
|
||||
suite.Suite
|
||||
|
||||
rgs map[string]typeutil.UniqueSet
|
||||
collections map[int64]collectionLoadConfig
|
||||
kv kv.MetaKv
|
||||
catalog metastore.QueryCoordCatalog
|
||||
mgr *ReplicaManager
|
||||
ctx context.Context
|
||||
rgs map[string]typeutil.UniqueSet
|
||||
sqNodes typeutil.UniqueSet
|
||||
outboundSQNodes []int64
|
||||
collections map[int64]collectionLoadConfig
|
||||
kv kv.MetaKv
|
||||
catalog metastore.QueryCoordCatalog
|
||||
mgr *ReplicaManager
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (suite *ReplicaManagerV2Suite) SetupSuite() {
|
||||
@ -350,6 +352,8 @@ func (suite *ReplicaManagerV2Suite) SetupSuite() {
|
||||
"RG4": typeutil.NewUniqueSet(7, 8, 9, 10),
|
||||
"RG5": typeutil.NewUniqueSet(11, 12, 13, 14, 15),
|
||||
}
|
||||
suite.sqNodes = typeutil.NewUniqueSet(16, 17, 18, 19, 20)
|
||||
suite.outboundSQNodes = []int64{}
|
||||
suite.collections = map[int64]collectionLoadConfig{
|
||||
1000: {
|
||||
spawnConfig: map[string]int{"RG1": 1},
|
||||
@ -406,6 +410,7 @@ func (suite *ReplicaManagerV2Suite) TestSpawn() {
|
||||
rgsOfCollection[rg] = suite.rgs[rg]
|
||||
}
|
||||
mgr.RecoverNodesInCollection(ctx, id, rgsOfCollection)
|
||||
mgr.RecoverSQNodesInCollection(ctx, id, suite.sqNodes)
|
||||
for rg := range cfg.spawnConfig {
|
||||
for _, node := range suite.rgs[rg].Collect() {
|
||||
replica := mgr.GetByCollectionAndNode(ctx, id, node)
|
||||
@ -428,6 +433,10 @@ func (suite *ReplicaManagerV2Suite) testIfBalanced() {
|
||||
for _, r := range replicas {
|
||||
rgToReplica[r.GetResourceGroup()] = append(rgToReplica[r.GetResourceGroup()], r)
|
||||
}
|
||||
|
||||
maximumSQNodes := -1
|
||||
minimumSQNodes := -1
|
||||
sqNodes := make([]int64, 0)
|
||||
for _, replicas := range rgToReplica {
|
||||
maximumNodes := -1
|
||||
minimumNodes := -1
|
||||
@ -440,7 +449,15 @@ func (suite *ReplicaManagerV2Suite) testIfBalanced() {
|
||||
if minimumNodes == -1 || r.RWNodesCount() < minimumNodes {
|
||||
minimumNodes = r.RWNodesCount()
|
||||
}
|
||||
nodes = append(nodes, r.GetNodes()...)
|
||||
if maximumSQNodes == -1 || r.RWSQNodesCount() > maximumSQNodes {
|
||||
maximumSQNodes = r.RWSQNodesCount()
|
||||
}
|
||||
if minimumSQNodes == -1 || r.RWSQNodesCount() < minimumSQNodes {
|
||||
minimumSQNodes = r.RWSQNodesCount()
|
||||
}
|
||||
nodes = append(nodes, r.GetRWNodes()...)
|
||||
nodes = append(nodes, r.GetRONodes()...)
|
||||
sqNodes = append(sqNodes, r.GetRWSQNodes()...)
|
||||
r.RangeOverRONodes(func(node int64) bool {
|
||||
if availableNodes.Contain(node) {
|
||||
nodes = append(nodes, node)
|
||||
@ -451,6 +468,10 @@ func (suite *ReplicaManagerV2Suite) testIfBalanced() {
|
||||
suite.ElementsMatch(nodes, suite.rgs[replicas[0].GetResourceGroup()].Collect())
|
||||
suite.True(maximumNodes-minimumNodes <= 1)
|
||||
}
|
||||
availableSQNodes := suite.sqNodes.Clone()
|
||||
availableSQNodes.Remove(suite.outboundSQNodes...)
|
||||
suite.ElementsMatch(availableSQNodes.Collect(), sqNodes)
|
||||
suite.True(maximumSQNodes-minimumSQNodes <= 1)
|
||||
}
|
||||
}
|
||||
|
||||
@ -475,6 +496,7 @@ func (suite *ReplicaManagerV2Suite) TestTransferReplicaAndAddNode() {
|
||||
suite.mgr.TransferReplica(ctx, 1005, "RG4", "RG5", 1)
|
||||
suite.recoverReplica(1, false)
|
||||
suite.rgs["RG5"].Insert(16, 17, 18)
|
||||
suite.sqNodes.Insert(20, 21, 22)
|
||||
suite.recoverReplica(2, true)
|
||||
suite.testIfBalanced()
|
||||
}
|
||||
@ -482,6 +504,7 @@ func (suite *ReplicaManagerV2Suite) TestTransferReplicaAndAddNode() {
|
||||
func (suite *ReplicaManagerV2Suite) TestTransferNode() {
|
||||
suite.rgs["RG4"].Remove(7)
|
||||
suite.rgs["RG5"].Insert(7)
|
||||
suite.outboundSQNodes = []int64{16, 17, 18}
|
||||
suite.recoverReplica(2, true)
|
||||
suite.testIfBalanced()
|
||||
}
|
||||
@ -497,7 +520,10 @@ func (suite *ReplicaManagerV2Suite) recoverReplica(k int, clearOutbound bool) {
|
||||
for rg := range cfg.spawnConfig {
|
||||
rgsOfCollection[rg] = suite.rgs[rg]
|
||||
}
|
||||
sqNodes := suite.sqNodes.Clone()
|
||||
sqNodes.Remove(suite.outboundSQNodes...)
|
||||
suite.mgr.RecoverNodesInCollection(ctx, id, rgsOfCollection)
|
||||
suite.mgr.RecoverSQNodesInCollection(ctx, id, sqNodes)
|
||||
}
|
||||
|
||||
// clear all outbound nodes
|
||||
@ -507,6 +533,7 @@ func (suite *ReplicaManagerV2Suite) recoverReplica(k int, clearOutbound bool) {
|
||||
for _, r := range replicas {
|
||||
outboundNodes := r.GetRONodes()
|
||||
suite.mgr.RemoveNode(ctx, r.GetID(), outboundNodes...)
|
||||
suite.mgr.RemoveSQNode(ctx, r.GetID(), r.GetROSQNodes()...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -26,6 +26,55 @@ func (suite *ReplicaSuite) SetupSuite() {
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *ReplicaSuite) TestSNNodes() {
|
||||
replicaPB := &querypb.Replica{
|
||||
ID: 1,
|
||||
CollectionID: 2,
|
||||
Nodes: []int64{1, 2, 3},
|
||||
ResourceGroup: DefaultResourceGroupName,
|
||||
RoNodes: []int64{4},
|
||||
RwSqNodes: []int64{6, 7, 8, 2},
|
||||
RoSqNodes: []int64{5},
|
||||
}
|
||||
r := newReplica(replicaPB)
|
||||
suite.Len(r.GetNodes(), 8)
|
||||
suite.Len(r.GetROSQNodes(), r.ROSQNodesCount())
|
||||
suite.Len(r.GetRWSQNodes(), r.RWSQNodesCount())
|
||||
cnt := 0
|
||||
r.RangeOverRWSQNodes(func(nodeID int64) bool {
|
||||
cnt++
|
||||
return true
|
||||
})
|
||||
suite.Equal(r.RWSQNodesCount(), cnt)
|
||||
|
||||
cnt = 0
|
||||
r.RangeOverROSQNodes(func(nodeID int64) bool {
|
||||
cnt++
|
||||
return true
|
||||
})
|
||||
suite.Equal(r.RONodesCount(), cnt)
|
||||
|
||||
suite.Len(r.GetChannelRWNodes("channel1"), 0)
|
||||
|
||||
copiedR := r.CopyForWrite()
|
||||
copiedR.AddRWSQNode(9, 5)
|
||||
r2 := copiedR.IntoReplica()
|
||||
suite.Equal(6, r2.RWSQNodesCount())
|
||||
suite.Equal(0, r2.ROSQNodesCount())
|
||||
|
||||
copiedR = r.CopyForWrite()
|
||||
copiedR.AddROSQNode(7, 8)
|
||||
r2 = copiedR.IntoReplica()
|
||||
suite.Equal(2, r2.RWSQNodesCount())
|
||||
suite.Equal(3, r2.ROSQNodesCount())
|
||||
|
||||
copiedR = r.CopyForWrite()
|
||||
copiedR.RemoveSQNode(5, 8)
|
||||
r2 = copiedR.IntoReplica()
|
||||
suite.Equal(3, r2.RWSQNodesCount())
|
||||
suite.Equal(0, r2.ROSQNodesCount())
|
||||
}
|
||||
|
||||
func (suite *ReplicaSuite) TestReadOperations() {
|
||||
r := newReplica(suite.replicaPB)
|
||||
suite.testRead(r)
|
||||
|
||||
@ -23,11 +23,14 @@ import (
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/syncutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
// check replica, find read only nodes and remove it from replica if all segment/channel has been moved
|
||||
@ -55,6 +58,10 @@ func (ob *ReplicaObserver) Start() {
|
||||
|
||||
ob.wg.Add(1)
|
||||
go ob.schedule(ctx)
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
ob.wg.Add(1)
|
||||
go ob.scheduleStreamingQN(ctx)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ -85,12 +92,74 @@ func (ob *ReplicaObserver) schedule(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// scheduleStreamingQN is used to check streaming query node in replica
|
||||
func (ob *ReplicaObserver) scheduleStreamingQN(ctx context.Context) {
|
||||
defer ob.wg.Done()
|
||||
log.Info("Start streaming query node check replica loop")
|
||||
|
||||
listener := snmanager.StaticStreamingNodeManager.ListenNodeChanged()
|
||||
for {
|
||||
ob.waitNodeChangedOrTimeout(ctx, listener)
|
||||
if ctx.Err() != nil {
|
||||
log.Info("Stop streaming query node check replica observer")
|
||||
return
|
||||
}
|
||||
|
||||
ids := snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs()
|
||||
ob.checkStreamingQueryNodesInReplica(ids)
|
||||
}
|
||||
}
|
||||
|
||||
func (ob *ReplicaObserver) waitNodeChangedOrTimeout(ctx context.Context, listener *syncutil.VersionedListener) {
|
||||
ctxWithTimeout, cancel := context.WithTimeout(ctx, params.Params.QueryCoordCfg.CheckNodeInReplicaInterval.GetAsDuration(time.Second))
|
||||
defer cancel()
|
||||
listener.Wait(ctxWithTimeout)
|
||||
}
|
||||
|
||||
func (ob *ReplicaObserver) checkStreamingQueryNodesInReplica(sqNodeIDs typeutil.UniqueSet) {
|
||||
ctx := context.Background()
|
||||
log := log.Ctx(ctx).WithRateGroup("qcv2.replicaObserver", 1, 60)
|
||||
collections := ob.meta.GetAll(context.Background())
|
||||
|
||||
for _, collectionID := range collections {
|
||||
ob.meta.RecoverSQNodesInCollection(context.Background(), collectionID, sqNodeIDs)
|
||||
}
|
||||
|
||||
for _, collectionID := range collections {
|
||||
replicas := ob.meta.ReplicaManager.GetByCollection(ctx, collectionID)
|
||||
for _, replica := range replicas {
|
||||
roSQNodes := replica.GetROSQNodes()
|
||||
rwSQNodes := replica.GetRWSQNodes()
|
||||
if len(roSQNodes) == 0 {
|
||||
continue
|
||||
}
|
||||
removeNodes := make([]int64, 0, len(roSQNodes))
|
||||
for _, node := range roSQNodes {
|
||||
channels := ob.distMgr.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(node))
|
||||
segments := ob.distMgr.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID), meta.WithNodeID(node))
|
||||
if len(channels) == 0 && len(segments) == 0 {
|
||||
removeNodes = append(removeNodes, node)
|
||||
}
|
||||
}
|
||||
if len(removeNodes) == 0 {
|
||||
continue
|
||||
}
|
||||
logger := log.With(
|
||||
zap.Int64("collectionID", replica.GetCollectionID()),
|
||||
zap.Int64("replicaID", replica.GetID()),
|
||||
zap.Int64s("removedNodes", removeNodes),
|
||||
zap.Int64s("roNodes", roSQNodes),
|
||||
zap.Int64s("rwNodes", rwSQNodes),
|
||||
)
|
||||
if err := ob.meta.ReplicaManager.RemoveSQNode(ctx, replica.GetID(), removeNodes...); err != nil {
|
||||
logger.Warn("fail to remove streaming query node from replica", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
logger.Info("all segment/channel has been removed from ro streaming query node, remove it from replica")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ob *ReplicaObserver) checkNodesInReplica() {
|
||||
ctx := context.Background()
|
||||
log := log.Ctx(ctx).WithRateGroup("qcv2.replicaObserver", 1, 60)
|
||||
@ -135,7 +204,7 @@ func (ob *ReplicaObserver) checkNodesInReplica() {
|
||||
logger.Warn("fail to remove node from replica", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
logger.Info("all segment/channel has been removed from ro node, try to remove it from replica")
|
||||
logger.Info("all segment/channel has been removed from ro node, remove it from replica")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,16 +20,21 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
|
||||
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
|
||||
"github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||
"github.com/milvus-io/milvus/pkg/kv"
|
||||
"github.com/milvus-io/milvus/pkg/streaming/util/types"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
@ -52,6 +57,7 @@ type ReplicaObserverSuite struct {
|
||||
}
|
||||
|
||||
func (suite *ReplicaObserverSuite) SetupSuite() {
|
||||
streamingutil.SetStreamingServiceEnabled()
|
||||
paramtable.Init()
|
||||
paramtable.Get().Save(Params.QueryCoordCfg.CheckNodeInReplicaInterval.Key, "1")
|
||||
}
|
||||
@ -196,9 +202,105 @@ func (suite *ReplicaObserverSuite) TestCheckNodesInReplica() {
|
||||
}, 30*time.Second, 2*time.Second)
|
||||
}
|
||||
|
||||
func (suite *ReplicaObserverSuite) TestCheckSQnodesInReplica() {
|
||||
balancer := mock_balancer.NewMockBalancer(suite.T())
|
||||
snmanager.StaticStreamingNodeManager.SetBalancerReady(balancer)
|
||||
|
||||
change := make(chan struct{})
|
||||
balancer.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) error {
|
||||
versions := []typeutil.VersionInt64Pair{
|
||||
{Global: 1, Local: 2},
|
||||
{Global: 1, Local: 3},
|
||||
}
|
||||
pchans := [][]types.PChannelInfoAssigned{
|
||||
{
|
||||
types.PChannelInfoAssigned{
|
||||
Channel: types.PChannelInfo{Name: "pchannel", Term: 1},
|
||||
Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost:1"},
|
||||
},
|
||||
types.PChannelInfoAssigned{
|
||||
Channel: types.PChannelInfo{Name: "pchannel2", Term: 1},
|
||||
Node: types.StreamingNodeInfo{ServerID: 2, Address: "localhost:1"},
|
||||
},
|
||||
types.PChannelInfoAssigned{
|
||||
Channel: types.PChannelInfo{Name: "pchannel3", Term: 1},
|
||||
Node: types.StreamingNodeInfo{ServerID: 3, Address: "localhost:1"},
|
||||
},
|
||||
},
|
||||
{
|
||||
types.PChannelInfoAssigned{
|
||||
Channel: types.PChannelInfo{Name: "pchannel", Term: 1},
|
||||
Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost:1"},
|
||||
},
|
||||
types.PChannelInfoAssigned{
|
||||
Channel: types.PChannelInfo{Name: "pchannel2", Term: 1},
|
||||
Node: types.StreamingNodeInfo{ServerID: 2, Address: "localhost:1"},
|
||||
},
|
||||
types.PChannelInfoAssigned{
|
||||
Channel: types.PChannelInfo{Name: "pchannel3", Term: 2},
|
||||
Node: types.StreamingNodeInfo{ServerID: 2, Address: "localhost:1"},
|
||||
},
|
||||
},
|
||||
}
|
||||
for i := 0; i < len(versions); i++ {
|
||||
cb(versions[i], pchans[i])
|
||||
<-change
|
||||
}
|
||||
<-ctx.Done()
|
||||
return context.Cause(ctx)
|
||||
})
|
||||
|
||||
ctx := context.Background()
|
||||
err := suite.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(suite.collectionID, 2))
|
||||
suite.NoError(err)
|
||||
replicas, err := suite.meta.Spawn(ctx, suite.collectionID, map[string]int{
|
||||
"rg1": 1,
|
||||
"rg2": 1,
|
||||
}, nil)
|
||||
suite.NoError(err)
|
||||
suite.Equal(2, len(replicas))
|
||||
|
||||
suite.Eventually(func() bool {
|
||||
replica := suite.meta.ReplicaManager.GetByCollection(ctx, suite.collectionID)
|
||||
total := 0
|
||||
for _, r := range replica {
|
||||
total += r.RWSQNodesCount()
|
||||
}
|
||||
return total == 3
|
||||
}, 6*time.Second, 2*time.Second)
|
||||
replica := suite.meta.ReplicaManager.GetByCollection(ctx, suite.collectionID)
|
||||
nodes := typeutil.NewUniqueSet()
|
||||
for _, r := range replica {
|
||||
suite.LessOrEqual(r.RWSQNodesCount(), 2)
|
||||
suite.Equal(r.ROSQNodesCount(), 0)
|
||||
nodes.Insert(r.GetRWSQNodes()...)
|
||||
}
|
||||
suite.Equal(nodes.Len(), 3)
|
||||
|
||||
close(change)
|
||||
|
||||
suite.Eventually(func() bool {
|
||||
replica := suite.meta.ReplicaManager.GetByCollection(ctx, suite.collectionID)
|
||||
total := 0
|
||||
for _, r := range replica {
|
||||
total += r.RWSQNodesCount()
|
||||
}
|
||||
return total == 2
|
||||
}, 6*time.Second, 2*time.Second)
|
||||
replica = suite.meta.ReplicaManager.GetByCollection(ctx, suite.collectionID)
|
||||
nodes = typeutil.NewUniqueSet()
|
||||
for _, r := range replica {
|
||||
suite.Equal(r.RWSQNodesCount(), 1)
|
||||
suite.Equal(r.ROSQNodesCount(), 0)
|
||||
nodes.Insert(r.GetRWSQNodes()...)
|
||||
}
|
||||
suite.Equal(nodes.Len(), 2)
|
||||
}
|
||||
|
||||
func (suite *ReplicaObserverSuite) TearDownSuite() {
|
||||
suite.kv.Close()
|
||||
suite.observer.Stop()
|
||||
streamingutil.UnsetStreamingServiceEnabled()
|
||||
}
|
||||
|
||||
func TestReplicaObserver(t *testing.T) {
|
||||
|
||||
@ -24,9 +24,11 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/proto/querypb"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
@ -227,12 +229,18 @@ func (s *Server) ResumeNode(ctx context.Context, req *querypb.ResumeNodeRequest)
|
||||
return merr.Status(errors.Wrap(err, errMsg)), nil
|
||||
}
|
||||
|
||||
if s.nodeMgr.Get(req.GetNodeID()) == nil {
|
||||
info := s.nodeMgr.Get(req.GetNodeID())
|
||||
if info == nil {
|
||||
err := merr.WrapErrNodeNotFound(req.GetNodeID(), errMsg)
|
||||
log.Warn(errMsg, zap.Error(err))
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
if info.IsEmbeddedQueryNodeInStreamingNode() {
|
||||
return merr.Status(
|
||||
merr.WrapErrParameterInvalidMsg("embedded query node in streaming node can't be resumed")), nil
|
||||
}
|
||||
|
||||
s.meta.ResourceManager.HandleNodeUp(ctx, req.GetNodeID())
|
||||
|
||||
return merr.Success(), nil
|
||||
@ -274,6 +282,13 @@ func (s *Server) TransferSegment(ctx context.Context, req *querypb.TransferSegme
|
||||
err := merr.WrapErrNodeNotAvailable(srcNode, "the target node is invalid")
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
sqn := snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs()
|
||||
if sqn.Contain(req.GetTargetNodeID()) {
|
||||
return merr.Status(
|
||||
merr.WrapErrParameterInvalidMsg("embedded query node in streaming node can't be the destination of transfer segment")), nil
|
||||
}
|
||||
}
|
||||
dstNodeSet.Insert(req.GetTargetNodeID())
|
||||
}
|
||||
dstNodeSet.Remove(srcNode)
|
||||
@ -339,7 +354,11 @@ func (s *Server) TransferChannel(ctx context.Context, req *querypb.TransferChann
|
||||
// when no dst node specified, default to use all other nodes in same
|
||||
dstNodeSet := typeutil.NewUniqueSet()
|
||||
if req.GetToAllNodes() {
|
||||
dstNodeSet.Insert(replica.GetRWNodes()...)
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
dstNodeSet.Insert(replica.GetRWSQNodes()...)
|
||||
} else {
|
||||
dstNodeSet.Insert(replica.GetRWNodes()...)
|
||||
}
|
||||
} else {
|
||||
// check whether dstNode is healthy
|
||||
if err := s.isStoppingNode(ctx, req.GetTargetNodeID()); err != nil {
|
||||
|
||||
@ -859,8 +859,16 @@ func (s *Server) tryHandleNodeUp() {
|
||||
}
|
||||
|
||||
func (s *Server) handleNodeUp(node int64) {
|
||||
nodeInfo := s.nodeMgr.Get(node)
|
||||
if nodeInfo == nil {
|
||||
return
|
||||
}
|
||||
s.taskScheduler.AddExecutor(node)
|
||||
s.distController.StartDistInstance(s.ctx, node)
|
||||
if nodeInfo.IsEmbeddedQueryNodeInStreamingNode() {
|
||||
// The querynode embedded in the streaming node can not work with streaming node.
|
||||
return
|
||||
}
|
||||
// need assign to new rg and replica
|
||||
s.meta.ResourceManager.HandleNodeUp(s.ctx, node)
|
||||
}
|
||||
|
||||
@ -24,6 +24,7 @@ import (
|
||||
"github.com/blang/semver/v4"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
)
|
||||
|
||||
@ -152,6 +153,10 @@ func (n *NodeInfo) Labels() map[string]string {
|
||||
return n.immutableInfo.Labels
|
||||
}
|
||||
|
||||
func (n *NodeInfo) IsEmbeddedQueryNodeInStreamingNode() bool {
|
||||
return n.immutableInfo.Labels[sessionutil.LabelStreamingNodeEmbeddedQueryNode] == "1"
|
||||
}
|
||||
|
||||
func (n *NodeInfo) SegmentCnt() int {
|
||||
n.mu.RLock()
|
||||
defer n.mu.RUnlock()
|
||||
|
||||
@ -18,13 +18,16 @@ package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
@ -109,6 +112,13 @@ func AssignReplica(ctx context.Context, m *meta.Meta, resourceGroups []string, r
|
||||
"replica=[%d] resource group=[%s], resource group num can only be 0, 1 or same as replica number", replicaNumber, strings.Join(resourceGroups, ","))
|
||||
}
|
||||
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
streamingNodeCount := snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs().Len()
|
||||
if replicaNumber > int32(streamingNodeCount) {
|
||||
return nil, merr.WrapErrStreamingNodeNotEnough(streamingNodeCount, int(replicaNumber), fmt.Sprintf("when load %d replica count", replicaNumber))
|
||||
}
|
||||
}
|
||||
|
||||
replicaNumInRG := make(map[string]int)
|
||||
if len(resourceGroups) == 0 {
|
||||
// All replicas should be spawned in default resource group.
|
||||
@ -160,6 +170,9 @@ func SpawnReplicasWithRG(ctx context.Context, m *meta.Meta, collection int64, re
|
||||
}
|
||||
// Active recover it.
|
||||
RecoverReplicaOfCollection(ctx, m, collection)
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
m.RecoverSQNodesInCollection(ctx, collection, snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs())
|
||||
}
|
||||
return replicas, nil
|
||||
}
|
||||
|
||||
|
||||
@ -3,11 +3,16 @@ package balancer
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/streaming/util/types"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
var _ Balancer = (*balancerImpl)(nil)
|
||||
var (
|
||||
_ Balancer = (*balancerImpl)(nil)
|
||||
ErrBalancerClosed = errors.New("balancer is closed")
|
||||
)
|
||||
|
||||
// Balancer is a load balancer to balance the load of log node.
|
||||
// Given the balance result to assign or remove channels to corresponding log node.
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/streaming/util/types"
|
||||
"github.com/milvus-io/milvus/pkg/util/contextutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/syncutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
@ -30,7 +31,10 @@ func RecoverBalancer(
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "fail to recover channel manager")
|
||||
}
|
||||
ctx, cancel := context.WithCancelCause(context.Background())
|
||||
b := &balancerImpl{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
lifetime: typeutil.NewLifetime(),
|
||||
logger: resource.Resource().Logger().With(log.FieldComponent("balancer"), zap.String("policy", policy)),
|
||||
channelMetaManager: manager,
|
||||
@ -44,6 +48,8 @@ func RecoverBalancer(
|
||||
|
||||
// balancerImpl is a implementation of Balancer.
|
||||
type balancerImpl struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelCauseFunc
|
||||
lifetime *typeutil.Lifetime
|
||||
logger *log.MLogger
|
||||
channelMetaManager *channel.ChannelManager
|
||||
@ -58,6 +64,8 @@ func (b *balancerImpl) WatchChannelAssignments(ctx context.Context, cb func(vers
|
||||
return status.NewOnShutdownError("balancer is closing")
|
||||
}
|
||||
defer b.lifetime.Done()
|
||||
|
||||
ctx, _ = contextutil.MergeContext(ctx, b.ctx)
|
||||
return b.channelMetaManager.WatchAssignmentResult(ctx, cb)
|
||||
}
|
||||
|
||||
@ -93,6 +101,8 @@ func (b *balancerImpl) sendRequestAndWaitFinish(ctx context.Context, newReq *req
|
||||
// Close close the balancer.
|
||||
func (b *balancerImpl) Close() {
|
||||
b.lifetime.SetState(typeutil.LifetimeStateStopped)
|
||||
// cancel all watch opeartion by context.
|
||||
b.cancel(ErrBalancerClosed)
|
||||
b.lifetime.Wait()
|
||||
|
||||
b.backgroundTaskNotifier.Cancel()
|
||||
|
||||
@ -3,6 +3,7 @@ package balancer_test
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@ -16,6 +17,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/proto/streamingpb"
|
||||
"github.com/milvus-io/milvus/pkg/streaming/util/types"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/syncutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
@ -91,7 +93,6 @@ func TestBalancer(t *testing.T) {
|
||||
b, err := balancer.RecoverBalancer(ctx, "pchannel_count_fair")
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, b)
|
||||
defer b.Close()
|
||||
|
||||
b.MarkAsUnavailable(ctx, []types.PChannelInfo{{
|
||||
Name: "test-channel-1",
|
||||
@ -113,4 +114,18 @@ func TestBalancer(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
assert.ErrorIs(t, err, doneErr)
|
||||
|
||||
// create a inifite block watcher and can be interrupted by close of balancer.
|
||||
f := syncutil.NewFuture[error]()
|
||||
go func() {
|
||||
err := b.WatchChannelAssignments(context.Background(), func(version typeutil.VersionInt64Pair, relations []types.PChannelInfoAssigned) error {
|
||||
return nil
|
||||
})
|
||||
f.Set(err)
|
||||
}()
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
assert.False(t, f.Ready())
|
||||
|
||||
b.Close()
|
||||
assert.ErrorIs(t, f.Get(), balancer.ErrBalancerClosed)
|
||||
}
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
|
||||
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
|
||||
_ "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/policy" // register the balancer policy
|
||||
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster"
|
||||
@ -62,6 +63,7 @@ func (s *Server) initBasicComponent(ctx context.Context) (err error) {
|
||||
return struct{}{}, err
|
||||
}
|
||||
s.balancer.Set(balancer)
|
||||
snmanager.StaticStreamingNodeManager.SetBalancerReady(balancer)
|
||||
s.logger.Info("recover balancer done")
|
||||
return struct{}{}, nil
|
||||
}))
|
||||
|
||||
@ -141,7 +141,7 @@ func (c *channelLifetime) Run() error {
|
||||
func() { go func() { c.Cancel() }() },
|
||||
)
|
||||
if err != nil {
|
||||
handler.Close()
|
||||
scanner.Close()
|
||||
return err
|
||||
}
|
||||
ds.Start()
|
||||
|
||||
@ -48,8 +48,9 @@ const (
|
||||
// DefaultServiceRoot default root path used in kv by Session
|
||||
DefaultServiceRoot = "session/"
|
||||
// DefaultIDKey default id key for Session
|
||||
DefaultIDKey = "id"
|
||||
SupportedLabelPrefix = "MILVUS_SERVER_LABEL_"
|
||||
DefaultIDKey = "id"
|
||||
SupportedLabelPrefix = "MILVUS_SERVER_LABEL_"
|
||||
LabelStreamingNodeEmbeddedQueryNode = "QUERYNODE_STREAMING-EMBEDDED"
|
||||
)
|
||||
|
||||
// SessionEventType session event type
|
||||
|
||||
@ -1,6 +1,10 @@
|
||||
package streamingutil
|
||||
|
||||
import "os"
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
)
|
||||
|
||||
const MilvusStreamingServiceEnabled = "MILVUS_STREAMING_SERVICE_ENABLED"
|
||||
|
||||
@ -16,3 +20,14 @@ func MustEnableStreamingService() {
|
||||
panic("start a streaming node without enabling streaming service, please set environment variable MILVUS_STREAMING_SERVICE_ENABLED = 1")
|
||||
}
|
||||
}
|
||||
|
||||
// EnableEmbededQueryNode set server labels for embedded query node.
|
||||
func EnableEmbededQueryNode() {
|
||||
MustEnableStreamingService()
|
||||
os.Setenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbeddedQueryNode, "1")
|
||||
}
|
||||
|
||||
// IsEmbeddedQueryNode returns whether the current node is an embedded query node in streaming node.
|
||||
func IsEmbeddedQueryNode() bool {
|
||||
return os.Getenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbeddedQueryNode) == "1"
|
||||
}
|
||||
|
||||
@ -689,11 +689,18 @@ message ChannelNodeInfo {
|
||||
message Replica {
|
||||
int64 ID = 1;
|
||||
int64 collectionID = 2;
|
||||
// nodes and ro_nodes can only load sealed segment.
|
||||
// only manage the legacy querynode that not embedded in the streamingnode.
|
||||
repeated int64 nodes = 3; // all (read and write) nodes. mutual exclusive with ro_nodes.
|
||||
string resource_group = 4;
|
||||
repeated int64 ro_nodes = 5; // the in-using node but should not be assigned to these replica.
|
||||
// can not load new channel or segment on it anymore.
|
||||
map<string, ChannelNodeInfo> channel_node_infos = 6;
|
||||
// cannot load segment on it anymore.
|
||||
map<string, ChannelNodeInfo> channel_node_infos = 6;
|
||||
// rw_sq_nodes and ro_sq_nodes can only watch channel and assign segment, will be removed in 3.0.
|
||||
// only manage the querynode embedded in the streamingnode.
|
||||
repeated int64 rw_sq_nodes = 7; // all (read and write) nodes. mutual exclusive with ro_sq_nodes.
|
||||
repeated int64 ro_sq_nodes = 8; // the in-using node but should not be assigned to these replica.
|
||||
// cannot watch channel on it anymore.
|
||||
}
|
||||
|
||||
enum SyncType {
|
||||
|
||||
@ -4710,13 +4710,19 @@ type Replica struct {
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
ID int64 `protobuf:"varint,1,opt,name=ID,proto3" json:"ID,omitempty"`
|
||||
CollectionID int64 `protobuf:"varint,2,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
|
||||
ID int64 `protobuf:"varint,1,opt,name=ID,proto3" json:"ID,omitempty"`
|
||||
CollectionID int64 `protobuf:"varint,2,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
|
||||
// nodes and ro_nodes can only load sealed segment.
|
||||
// only manage the legacy querynode that not embedded in the streamingnode.
|
||||
Nodes []int64 `protobuf:"varint,3,rep,packed,name=nodes,proto3" json:"nodes,omitempty"` // all (read and write) nodes. mutual exclusive with ro_nodes.
|
||||
ResourceGroup string `protobuf:"bytes,4,opt,name=resource_group,json=resourceGroup,proto3" json:"resource_group,omitempty"`
|
||||
RoNodes []int64 `protobuf:"varint,5,rep,packed,name=ro_nodes,json=roNodes,proto3" json:"ro_nodes,omitempty"` // the in-using node but should not be assigned to these replica.
|
||||
// can not load new channel or segment on it anymore.
|
||||
// cannot load segment on it anymore.
|
||||
ChannelNodeInfos map[string]*ChannelNodeInfo `protobuf:"bytes,6,rep,name=channel_node_infos,json=channelNodeInfos,proto3" json:"channel_node_infos,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
|
||||
// rw_sq_nodes and ro_sq_nodes can only watch channel and assign segment, will be removed in 3.0.
|
||||
// only manage the querynode embedded in the streamingnode.
|
||||
RwSqNodes []int64 `protobuf:"varint,7,rep,packed,name=rw_sq_nodes,json=rwSqNodes,proto3" json:"rw_sq_nodes,omitempty"` // all (read and write) nodes. mutual exclusive with ro_sq_nodes.
|
||||
RoSqNodes []int64 `protobuf:"varint,8,rep,packed,name=ro_sq_nodes,json=roSqNodes,proto3" json:"ro_sq_nodes,omitempty"` // the in-using node but should not be assigned to these replica.
|
||||
}
|
||||
|
||||
func (x *Replica) Reset() {
|
||||
@ -4793,6 +4799,20 @@ func (x *Replica) GetChannelNodeInfos() map[string]*ChannelNodeInfo {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *Replica) GetRwSqNodes() []int64 {
|
||||
if x != nil {
|
||||
return x.RwSqNodes
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *Replica) GetRoSqNodes() []int64 {
|
||||
if x != nil {
|
||||
return x.RoSqNodes
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type SyncAction struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
@ -8023,7 +8043,7 @@ var file_query_coord_proto_rawDesc = []byte{
|
||||
0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x2c, 0x0a, 0x0f, 0x43,
|
||||
0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x19,
|
||||
0x0a, 0x08, 0x72, 0x77, 0x5f, 0x6e, 0x6f, 0x64, 0x65, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x03,
|
||||
0x52, 0x07, 0x72, 0x77, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x22, 0xe0, 0x02, 0x0a, 0x07, 0x52, 0x65,
|
||||
0x52, 0x07, 0x72, 0x77, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x22, 0xa0, 0x03, 0x0a, 0x07, 0x52, 0x65,
|
||||
0x70, 0x6c, 0x69, 0x63, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28,
|
||||
0x03, 0x52, 0x02, 0x49, 0x44, 0x12, 0x22, 0x0a, 0x0c, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74,
|
||||
0x69, 0x6f, 0x6e, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x63, 0x6f, 0x6c,
|
||||
@ -8039,7 +8059,11 @@ var file_query_coord_proto_rawDesc = []byte{
|
||||
0x72, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x6e,
|
||||
0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79,
|
||||
0x52, 0x10, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66,
|
||||
0x6f, 0x73, 0x1a, 0x68, 0x0a, 0x15, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x4e, 0x6f, 0x64,
|
||||
0x6f, 0x73, 0x12, 0x1e, 0x0a, 0x0b, 0x72, 0x77, 0x5f, 0x73, 0x71, 0x5f, 0x6e, 0x6f, 0x64, 0x65,
|
||||
0x73, 0x18, 0x07, 0x20, 0x03, 0x28, 0x03, 0x52, 0x09, 0x72, 0x77, 0x53, 0x71, 0x4e, 0x6f, 0x64,
|
||||
0x65, 0x73, 0x12, 0x1e, 0x0a, 0x0b, 0x72, 0x6f, 0x5f, 0x73, 0x71, 0x5f, 0x6e, 0x6f, 0x64, 0x65,
|
||||
0x73, 0x18, 0x08, 0x20, 0x03, 0x28, 0x03, 0x52, 0x09, 0x72, 0x6f, 0x53, 0x71, 0x4e, 0x6f, 0x64,
|
||||
0x65, 0x73, 0x1a, 0x68, 0x0a, 0x15, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x4e, 0x6f, 0x64,
|
||||
0x65, 0x49, 0x6e, 0x66, 0x6f, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b,
|
||||
0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x39, 0x0a,
|
||||
0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x6d,
|
||||
|
||||
@ -653,6 +653,15 @@ func WrapErrResourceGroupIllegalConfig(rg any, cfg any, msg ...string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// WrapErrStreamingNodeNotEnough make a streaming node is not enough error
|
||||
func WrapErrStreamingNodeNotEnough(current int, expected int, msg ...string) error {
|
||||
err := wrapFields(ErrServiceResourceInsufficient, value("currentStreamingNode", current), value("expectedStreamingNode", expected))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// go:deprecated
|
||||
// WrapErrResourceGroupNodeNotEnough wraps ErrResourceGroupNodeNotEnough with resource group
|
||||
func WrapErrResourceGroupNodeNotEnough(rg any, current any, expected any, msg ...string) error {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user