Make allocateNode run async in case of block offline event (#17185)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2022-05-25 18:53:59 +08:00 committed by GitHub
parent 3a8099f68d
commit 7409bfc56d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 368 additions and 43 deletions

View File

@ -49,7 +49,7 @@ func (b *replicaBalancer) addNode(nodeID int64) ([]*balancePlan, error) {
ret = append(ret, &balancePlan{
nodeID: nodeID,
sourceReplica: -1,
sourceReplica: invalidReplicaID,
targetReplica: replicas[0].GetReplicaID(),
})
}

View File

@ -104,6 +104,7 @@ type Meta interface {
getReplicaByID(replicaID int64) (*milvuspb.ReplicaInfo, error)
getReplicasByCollectionID(collectionID int64) ([]*milvuspb.ReplicaInfo, error)
getReplicasByNodeID(nodeID int64) ([]*milvuspb.ReplicaInfo, error)
applyReplicaBalancePlan(p *balancePlan) error
}
// MetaReplica records the current load information on all querynodes
@ -1282,6 +1283,11 @@ func (m *MetaReplica) getReplicasByCollectionID(collectionID int64) ([]*milvuspb
return replicas, nil
}
// applyReplicaBalancePlan applies replica balance plan to replica info.
func (m *MetaReplica) applyReplicaBalancePlan(p *balancePlan) error {
return m.replicas.ApplyBalancePlan(p, m.getKvClient())
}
//func (m *MetaReplica) printMeta() {
// m.RLock()
// defer m.RUnlock()

View File

@ -39,7 +39,6 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
@ -387,7 +386,7 @@ func (qc *QueryCoord) allocateNode(nodeID int64) error {
return err
}
for _, p := range plans {
if err := qc.applyBalancePlan(p); err != nil {
if err := qc.meta.applyReplicaBalancePlan(p); err != nil {
log.Warn("failed to apply balance plan", zap.Error(err), zap.Any("plan", p))
}
}
@ -409,42 +408,6 @@ func (qc *QueryCoord) getUnallocatedNodes() []int64 {
return ret
}
func (qc *QueryCoord) applyBalancePlan(p *balancePlan) error {
if p.sourceReplica != -1 {
replica, err := qc.meta.getReplicaByID(p.sourceReplica)
if err != nil {
return err
}
replica = removeNodeFromReplica(replica, p.nodeID)
if err := qc.meta.setReplicaInfo(replica); err != nil {
return err
}
}
if p.targetReplica != -1 {
replica, err := qc.meta.getReplicaByID(p.targetReplica)
if err != nil {
return err
}
replica.NodeIds = append(replica.NodeIds, p.nodeID)
if err := qc.meta.setReplicaInfo(replica); err != nil {
return err
}
}
return nil
}
func removeNodeFromReplica(replica *milvuspb.ReplicaInfo, nodeID int64) *milvuspb.ReplicaInfo {
for i := 0; i < len(replica.NodeIds); i++ {
if replica.NodeIds[i] != nodeID {
continue
}
replica.NodeIds = append(replica.NodeIds[:i], replica.NodeIds[i+1:]...)
return replica
}
return replica
}
func (qc *QueryCoord) handleNodeEvent(ctx context.Context) {
for {
select {
@ -471,9 +434,11 @@ func (qc *QueryCoord) handleNodeEvent(ctx context.Context) {
log.Error("QueryCoord failed to register a QueryNode", zap.Int64("nodeID", serverID), zap.String("error info", err.Error()))
continue
}
if err := qc.allocateNode(serverID); err != nil {
log.Error("unable to allcoate node", zap.Int64("nodeID", serverID), zap.Error(err))
}
go func(serverID int64) {
if err := qc.allocateNode(serverID); err != nil {
log.Error("unable to allcoate node", zap.Int64("nodeID", serverID), zap.Error(err))
}
}(serverID)
qc.metricsCacheManager.InvalidateSystemInfoMetrics()
case sessionutil.SessionDelEvent:

View File

@ -17,14 +17,22 @@
package querycoord
import (
"errors"
"fmt"
"sync"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
)
const (
invalidReplicaID int64 = -1
)
type replicaSlice = []*milvuspb.ReplicaInfo
// ReplicaInfos maintains replica related meta information.
type ReplicaInfos struct {
globalGuard sync.RWMutex // We have to make sure atomically update replicas and index
@ -35,6 +43,7 @@ type ReplicaInfos struct {
nodeIndex map[UniqueID]map[UniqueID]*milvuspb.ReplicaInfo // nodeID, replicaID -> []*ReplicaInfo
}
// NewReplicaInfos creates a ReplicaInfos instance with internal map created.
func NewReplicaInfos() *ReplicaInfos {
return &ReplicaInfos{
globalGuard: sync.RWMutex{},
@ -43,20 +52,37 @@ func NewReplicaInfos() *ReplicaInfos {
}
}
// Get returns the ReplicaInfo with provided replicaID.
// If the ReplicaInfo does not exist, nil will be returned
func (rep *ReplicaInfos) Get(replicaID UniqueID) (*milvuspb.ReplicaInfo, bool) {
rep.globalGuard.RLock()
defer rep.globalGuard.RUnlock()
return rep.get(replicaID)
}
// get is the internal common util function to get replica with provided replicaID.
// the lock shall be accquired first
// NO outer invocation is allowed
func (rep *ReplicaInfos) get(replicaID UniqueID) (*milvuspb.ReplicaInfo, bool) {
info, ok := rep.replicas[replicaID]
clone := proto.Clone(info).(*milvuspb.ReplicaInfo)
return clone, ok
}
// Make sure atomically update replica and index
// Insert atomically updates replica and node index.
func (rep *ReplicaInfos) Insert(info *milvuspb.ReplicaInfo) {
rep.globalGuard.Lock()
defer rep.globalGuard.Unlock()
rep.upsert(info)
}
// upsert is the internal common util function to upsert replica info.
// it also update the related nodeIndex information.
// the lock shall be accquired first
// NO outer invocation is allowed
func (rep *ReplicaInfos) upsert(info *milvuspb.ReplicaInfo) {
old, ok := rep.replicas[info.ReplicaID]
info = proto.Clone(info).(*milvuspb.ReplicaInfo)
@ -81,6 +107,7 @@ func (rep *ReplicaInfos) Insert(info *milvuspb.ReplicaInfo) {
}
}
// GetReplicasByNodeID returns the replicas associated with provided node id.
func (rep *ReplicaInfos) GetReplicasByNodeID(nodeID UniqueID) []*milvuspb.ReplicaInfo {
rep.globalGuard.RLock()
defer rep.globalGuard.RUnlock()
@ -99,6 +126,7 @@ func (rep *ReplicaInfos) GetReplicasByNodeID(nodeID UniqueID) []*milvuspb.Replic
return clones
}
// Remove deletes provided replica ids from meta.
func (rep *ReplicaInfos) Remove(replicaIds ...UniqueID) {
rep.globalGuard.Lock()
defer rep.globalGuard.Unlock()
@ -112,3 +140,83 @@ func (rep *ReplicaInfos) Remove(replicaIds ...UniqueID) {
}
}
}
// ApplyBalancePlan applies balancePlan to replica nodes.
func (rep *ReplicaInfos) ApplyBalancePlan(p *balancePlan, kv kv.MetaKv) error {
rep.globalGuard.Lock()
defer rep.globalGuard.Unlock()
var sourceReplica, targetReplica *milvuspb.ReplicaInfo
var ok bool
// check source and target replica ids are valid
if p.sourceReplica != invalidReplicaID {
sourceReplica, ok = rep.get(p.sourceReplica)
if !ok {
return errors.New("replica not found")
}
}
if p.targetReplica != invalidReplicaID {
targetReplica, ok = rep.replicas[p.targetReplica]
if !ok {
return errors.New("replica not found")
}
}
var replicasChanged []*milvuspb.ReplicaInfo
// generate ReplicaInfo to save to MetaKv
if sourceReplica != nil {
// remove node from replica node list
removeNodeFromReplica(sourceReplica, p.nodeID)
replicasChanged = append(replicasChanged, sourceReplica)
}
if targetReplica != nil {
// add node to replica
targetReplica.NodeIds = append(targetReplica.NodeIds, p.nodeID)
replicasChanged = append(replicasChanged, targetReplica)
}
// save to etcd first
if len(replicasChanged) > 0 {
data := make(map[string]string)
for _, info := range replicasChanged {
infoBytes, err := proto.Marshal(info)
if err != nil {
return err
}
key := fmt.Sprintf("%s/%d", ReplicaMetaPrefix, info.ReplicaID)
data[key] = string(infoBytes)
}
err := kv.MultiSave(data)
if err != nil {
return err
}
}
// apply change to in-memory meta
if sourceReplica != nil {
rep.upsert(sourceReplica)
}
if targetReplica != nil {
rep.upsert(targetReplica)
}
return nil
}
// removeNodeFromReplica helper function to remove nodeID from replica NodeIds list.
func removeNodeFromReplica(replica *milvuspb.ReplicaInfo, nodeID int64) *milvuspb.ReplicaInfo {
for i := 0; i < len(replica.NodeIds); i++ {
if replica.NodeIds[i] != nodeID {
continue
}
replica.NodeIds = append(replica.NodeIds[:i], replica.NodeIds[i+1:]...)
return replica
}
return replica
}

View File

@ -0,0 +1,246 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querycoord
import (
"errors"
"testing"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
)
type mockMetaKV struct {
mock.Mock
}
func (m *mockMetaKV) Load(key string) (string, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) MultiLoad(keys []string) ([]string, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) LoadWithPrefix(key string) ([]string, []string, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) Save(key string, value string) error {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) MultiSave(kvs map[string]string) error {
args := m.Called(kvs)
return args.Error(0)
}
func (m *mockMetaKV) Remove(key string) error {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) MultiRemove(keys []string) error {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) RemoveWithPrefix(key string) error {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) Close() {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) MultiSaveAndRemove(saves map[string]string, removals []string) error {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) MultiRemoveWithPrefix(keys []string) error {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) GetPath(key string) string {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) LoadWithRevision(key string) ([]string, []string, int64, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) Watch(key string) clientv3.WatchChan {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) WatchWithPrefix(key string) clientv3.WatchChan {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) WatchWithRevision(key string, revision int64) clientv3.WatchChan {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) SaveWithLease(key string, value string, id clientv3.LeaseID) error {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) SaveWithIgnoreLease(key string, value string) error {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) Grant(ttl int64) (id clientv3.LeaseID, err error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) CompareValueAndSwap(key string, value string, target string, opts ...clientv3.OpOption) error {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) error {
panic("not implemented") // TODO: Implement
}
func TestReplicaInfos_ApplyBalancePlan(t *testing.T) {
kv := &mockMetaKV{}
kv.On("MultiSave", mock.AnythingOfType("map[string]string")).Return(nil)
t.Run("source replica not exist", func(t *testing.T) {
replicas := NewReplicaInfos()
err := replicas.ApplyBalancePlan(&balancePlan{
nodeID: 1,
sourceReplica: 1,
targetReplica: invalidReplicaID,
}, kv)
assert.Error(t, err)
})
t.Run("target replica not exist", func(t *testing.T) {
replicas := NewReplicaInfos()
err := replicas.ApplyBalancePlan(&balancePlan{
nodeID: 1,
sourceReplica: invalidReplicaID,
targetReplica: 1,
}, kv)
assert.Error(t, err)
})
t.Run("add node to replica", func(t *testing.T) {
replicas := NewReplicaInfos()
replicas.Insert(&milvuspb.ReplicaInfo{
ReplicaID: 1,
NodeIds: []int64{1},
})
err := replicas.ApplyBalancePlan(&balancePlan{
nodeID: 2,
sourceReplica: invalidReplicaID,
targetReplica: 1,
}, kv)
assert.NoError(t, err)
info, has := replicas.Get(1)
require.True(t, has)
require.NotNil(t, info)
assert.Contains(t, info.GetNodeIds(), int64(2))
result := replicas.GetReplicasByNodeID(2)
assert.Equal(t, 1, len(result))
kv.AssertCalled(t, "MultiSave", mock.AnythingOfType("map[string]string"))
})
t.Run("remove node from replica", func(t *testing.T) {
replicas := NewReplicaInfos()
replicas.Insert(&milvuspb.ReplicaInfo{
ReplicaID: 1,
NodeIds: []int64{1},
})
err := replicas.ApplyBalancePlan(&balancePlan{
nodeID: 1,
sourceReplica: 1,
targetReplica: invalidReplicaID,
}, kv)
assert.NoError(t, err)
info, has := replicas.Get(1)
require.True(t, has)
require.NotNil(t, info)
assert.NotContains(t, info.GetNodeIds(), int64(1))
result := replicas.GetReplicasByNodeID(1)
assert.Equal(t, 0, len(result))
kv.AssertCalled(t, "MultiSave", mock.AnythingOfType("map[string]string"))
})
t.Run("remove non-existing node from replica", func(t *testing.T) {
replicas := NewReplicaInfos()
replicas.Insert(&milvuspb.ReplicaInfo{
ReplicaID: 1,
NodeIds: []int64{1},
})
err := replicas.ApplyBalancePlan(&balancePlan{
nodeID: 2,
sourceReplica: 1,
targetReplica: invalidReplicaID,
}, kv)
assert.NoError(t, err)
})
t.Run("save to etcd fail", func(t *testing.T) {
kv := &mockMetaKV{}
kv.On("MultiSave", mock.AnythingOfType("map[string]string")).Return(errors.New("mocked error"))
replicas := NewReplicaInfos()
replicas.Insert(&milvuspb.ReplicaInfo{
ReplicaID: 1,
NodeIds: []int64{1},
})
err := replicas.ApplyBalancePlan(&balancePlan{
nodeID: 2,
sourceReplica: invalidReplicaID,
targetReplica: 1,
}, kv)
kv.AssertCalled(t, "MultiSave", mock.AnythingOfType("map[string]string"))
assert.Error(t, err)
})
}