From 7409bfc56d984eae68dd05948fa5636a7e96102e Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 25 May 2022 18:53:59 +0800 Subject: [PATCH] Make allocateNode run async in case of block offline event (#17185) Signed-off-by: Congqi Xia --- internal/querycoord/group_balance.go | 2 +- internal/querycoord/meta.go | 6 + internal/querycoord/query_coord.go | 47 +---- internal/querycoord/replica.go | 110 +++++++++++- internal/querycoord/replica_test.go | 246 +++++++++++++++++++++++++++ 5 files changed, 368 insertions(+), 43 deletions(-) create mode 100644 internal/querycoord/replica_test.go diff --git a/internal/querycoord/group_balance.go b/internal/querycoord/group_balance.go index 32e6e0c3f5..cb9c535d50 100644 --- a/internal/querycoord/group_balance.go +++ b/internal/querycoord/group_balance.go @@ -49,7 +49,7 @@ func (b *replicaBalancer) addNode(nodeID int64) ([]*balancePlan, error) { ret = append(ret, &balancePlan{ nodeID: nodeID, - sourceReplica: -1, + sourceReplica: invalidReplicaID, targetReplica: replicas[0].GetReplicaID(), }) } diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index 5002525478..c1de1095ab 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -104,6 +104,7 @@ type Meta interface { getReplicaByID(replicaID int64) (*milvuspb.ReplicaInfo, error) getReplicasByCollectionID(collectionID int64) ([]*milvuspb.ReplicaInfo, error) getReplicasByNodeID(nodeID int64) ([]*milvuspb.ReplicaInfo, error) + applyReplicaBalancePlan(p *balancePlan) error } // MetaReplica records the current load information on all querynodes @@ -1282,6 +1283,11 @@ func (m *MetaReplica) getReplicasByCollectionID(collectionID int64) ([]*milvuspb return replicas, nil } +// applyReplicaBalancePlan applies replica balance plan to replica info. +func (m *MetaReplica) applyReplicaBalancePlan(p *balancePlan) error { + return m.replicas.ApplyBalancePlan(p, m.getKvClient()) +} + //func (m *MetaReplica) printMeta() { // m.RLock() // defer m.RUnlock() diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 5617367137..edbf24e8d0 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -39,7 +39,6 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" @@ -387,7 +386,7 @@ func (qc *QueryCoord) allocateNode(nodeID int64) error { return err } for _, p := range plans { - if err := qc.applyBalancePlan(p); err != nil { + if err := qc.meta.applyReplicaBalancePlan(p); err != nil { log.Warn("failed to apply balance plan", zap.Error(err), zap.Any("plan", p)) } } @@ -409,42 +408,6 @@ func (qc *QueryCoord) getUnallocatedNodes() []int64 { return ret } -func (qc *QueryCoord) applyBalancePlan(p *balancePlan) error { - if p.sourceReplica != -1 { - replica, err := qc.meta.getReplicaByID(p.sourceReplica) - if err != nil { - return err - } - replica = removeNodeFromReplica(replica, p.nodeID) - if err := qc.meta.setReplicaInfo(replica); err != nil { - return err - } - } - if p.targetReplica != -1 { - replica, err := qc.meta.getReplicaByID(p.targetReplica) - if err != nil { - return err - } - - replica.NodeIds = append(replica.NodeIds, p.nodeID) - if err := qc.meta.setReplicaInfo(replica); err != nil { - return err - } - } - return nil -} - -func removeNodeFromReplica(replica *milvuspb.ReplicaInfo, nodeID int64) *milvuspb.ReplicaInfo { - for i := 0; i < len(replica.NodeIds); i++ { - if replica.NodeIds[i] != nodeID { - continue - } - replica.NodeIds = append(replica.NodeIds[:i], replica.NodeIds[i+1:]...) - return replica - } - return replica -} - func (qc *QueryCoord) handleNodeEvent(ctx context.Context) { for { select { @@ -471,9 +434,11 @@ func (qc *QueryCoord) handleNodeEvent(ctx context.Context) { log.Error("QueryCoord failed to register a QueryNode", zap.Int64("nodeID", serverID), zap.String("error info", err.Error())) continue } - if err := qc.allocateNode(serverID); err != nil { - log.Error("unable to allcoate node", zap.Int64("nodeID", serverID), zap.Error(err)) - } + go func(serverID int64) { + if err := qc.allocateNode(serverID); err != nil { + log.Error("unable to allcoate node", zap.Int64("nodeID", serverID), zap.Error(err)) + } + }(serverID) qc.metricsCacheManager.InvalidateSystemInfoMetrics() case sessionutil.SessionDelEvent: diff --git a/internal/querycoord/replica.go b/internal/querycoord/replica.go index 237339c494..3381d800ae 100644 --- a/internal/querycoord/replica.go +++ b/internal/querycoord/replica.go @@ -17,14 +17,22 @@ package querycoord import ( + "errors" + "fmt" "sync" "github.com/golang/protobuf/proto" + "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/proto/milvuspb" ) +const ( + invalidReplicaID int64 = -1 +) + type replicaSlice = []*milvuspb.ReplicaInfo +// ReplicaInfos maintains replica related meta information. type ReplicaInfos struct { globalGuard sync.RWMutex // We have to make sure atomically update replicas and index @@ -35,6 +43,7 @@ type ReplicaInfos struct { nodeIndex map[UniqueID]map[UniqueID]*milvuspb.ReplicaInfo // nodeID, replicaID -> []*ReplicaInfo } +// NewReplicaInfos creates a ReplicaInfos instance with internal map created. func NewReplicaInfos() *ReplicaInfos { return &ReplicaInfos{ globalGuard: sync.RWMutex{}, @@ -43,20 +52,37 @@ func NewReplicaInfos() *ReplicaInfos { } } +// Get returns the ReplicaInfo with provided replicaID. +// If the ReplicaInfo does not exist, nil will be returned func (rep *ReplicaInfos) Get(replicaID UniqueID) (*milvuspb.ReplicaInfo, bool) { rep.globalGuard.RLock() defer rep.globalGuard.RUnlock() + return rep.get(replicaID) +} + +// get is the internal common util function to get replica with provided replicaID. +// the lock shall be accquired first +// NO outer invocation is allowed +func (rep *ReplicaInfos) get(replicaID UniqueID) (*milvuspb.ReplicaInfo, bool) { info, ok := rep.replicas[replicaID] clone := proto.Clone(info).(*milvuspb.ReplicaInfo) return clone, ok } -// Make sure atomically update replica and index +// Insert atomically updates replica and node index. func (rep *ReplicaInfos) Insert(info *milvuspb.ReplicaInfo) { rep.globalGuard.Lock() defer rep.globalGuard.Unlock() + rep.upsert(info) +} + +// upsert is the internal common util function to upsert replica info. +// it also update the related nodeIndex information. +// the lock shall be accquired first +// NO outer invocation is allowed +func (rep *ReplicaInfos) upsert(info *milvuspb.ReplicaInfo) { old, ok := rep.replicas[info.ReplicaID] info = proto.Clone(info).(*milvuspb.ReplicaInfo) @@ -81,6 +107,7 @@ func (rep *ReplicaInfos) Insert(info *milvuspb.ReplicaInfo) { } } +// GetReplicasByNodeID returns the replicas associated with provided node id. func (rep *ReplicaInfos) GetReplicasByNodeID(nodeID UniqueID) []*milvuspb.ReplicaInfo { rep.globalGuard.RLock() defer rep.globalGuard.RUnlock() @@ -99,6 +126,7 @@ func (rep *ReplicaInfos) GetReplicasByNodeID(nodeID UniqueID) []*milvuspb.Replic return clones } +// Remove deletes provided replica ids from meta. func (rep *ReplicaInfos) Remove(replicaIds ...UniqueID) { rep.globalGuard.Lock() defer rep.globalGuard.Unlock() @@ -112,3 +140,83 @@ func (rep *ReplicaInfos) Remove(replicaIds ...UniqueID) { } } } + +// ApplyBalancePlan applies balancePlan to replica nodes. +func (rep *ReplicaInfos) ApplyBalancePlan(p *balancePlan, kv kv.MetaKv) error { + rep.globalGuard.Lock() + defer rep.globalGuard.Unlock() + + var sourceReplica, targetReplica *milvuspb.ReplicaInfo + var ok bool + + // check source and target replica ids are valid + if p.sourceReplica != invalidReplicaID { + sourceReplica, ok = rep.get(p.sourceReplica) + if !ok { + return errors.New("replica not found") + } + } + + if p.targetReplica != invalidReplicaID { + targetReplica, ok = rep.replicas[p.targetReplica] + if !ok { + return errors.New("replica not found") + } + } + + var replicasChanged []*milvuspb.ReplicaInfo + + // generate ReplicaInfo to save to MetaKv + if sourceReplica != nil { + // remove node from replica node list + removeNodeFromReplica(sourceReplica, p.nodeID) + replicasChanged = append(replicasChanged, sourceReplica) + } + if targetReplica != nil { + // add node to replica + targetReplica.NodeIds = append(targetReplica.NodeIds, p.nodeID) + replicasChanged = append(replicasChanged, targetReplica) + } + + // save to etcd first + if len(replicasChanged) > 0 { + data := make(map[string]string) + + for _, info := range replicasChanged { + infoBytes, err := proto.Marshal(info) + if err != nil { + return err + } + + key := fmt.Sprintf("%s/%d", ReplicaMetaPrefix, info.ReplicaID) + data[key] = string(infoBytes) + } + err := kv.MultiSave(data) + if err != nil { + return err + } + } + + // apply change to in-memory meta + if sourceReplica != nil { + rep.upsert(sourceReplica) + } + + if targetReplica != nil { + rep.upsert(targetReplica) + } + + return nil +} + +// removeNodeFromReplica helper function to remove nodeID from replica NodeIds list. +func removeNodeFromReplica(replica *milvuspb.ReplicaInfo, nodeID int64) *milvuspb.ReplicaInfo { + for i := 0; i < len(replica.NodeIds); i++ { + if replica.NodeIds[i] != nodeID { + continue + } + replica.NodeIds = append(replica.NodeIds[:i], replica.NodeIds[i+1:]...) + return replica + } + return replica +} diff --git a/internal/querycoord/replica_test.go b/internal/querycoord/replica_test.go new file mode 100644 index 0000000000..8cde1cf9ac --- /dev/null +++ b/internal/querycoord/replica_test.go @@ -0,0 +1,246 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package querycoord + +import ( + "errors" + "testing" + + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +type mockMetaKV struct { + mock.Mock +} + +func (m *mockMetaKV) Load(key string) (string, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) MultiLoad(keys []string) ([]string, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) LoadWithPrefix(key string) ([]string, []string, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) Save(key string, value string) error { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) MultiSave(kvs map[string]string) error { + args := m.Called(kvs) + return args.Error(0) +} + +func (m *mockMetaKV) Remove(key string) error { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) MultiRemove(keys []string) error { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) RemoveWithPrefix(key string) error { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) Close() { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) MultiSaveAndRemove(saves map[string]string, removals []string) error { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) MultiRemoveWithPrefix(keys []string) error { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) GetPath(key string) string { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) LoadWithRevision(key string) ([]string, []string, int64, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) Watch(key string) clientv3.WatchChan { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) WatchWithPrefix(key string) clientv3.WatchChan { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) WatchWithRevision(key string, revision int64) clientv3.WatchChan { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) SaveWithLease(key string, value string, id clientv3.LeaseID) error { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) SaveWithIgnoreLease(key string, value string) error { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) Grant(ttl int64) (id clientv3.LeaseID, err error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) CompareValueAndSwap(key string, value string, target string, opts ...clientv3.OpOption) error { + panic("not implemented") // TODO: Implement +} + +func (m *mockMetaKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) error { + panic("not implemented") // TODO: Implement +} + +func TestReplicaInfos_ApplyBalancePlan(t *testing.T) { + kv := &mockMetaKV{} + kv.On("MultiSave", mock.AnythingOfType("map[string]string")).Return(nil) + t.Run("source replica not exist", func(t *testing.T) { + replicas := NewReplicaInfos() + err := replicas.ApplyBalancePlan(&balancePlan{ + nodeID: 1, + sourceReplica: 1, + targetReplica: invalidReplicaID, + }, kv) + assert.Error(t, err) + }) + + t.Run("target replica not exist", func(t *testing.T) { + replicas := NewReplicaInfos() + err := replicas.ApplyBalancePlan(&balancePlan{ + nodeID: 1, + sourceReplica: invalidReplicaID, + targetReplica: 1, + }, kv) + + assert.Error(t, err) + }) + + t.Run("add node to replica", func(t *testing.T) { + replicas := NewReplicaInfos() + + replicas.Insert(&milvuspb.ReplicaInfo{ + ReplicaID: 1, + NodeIds: []int64{1}, + }) + + err := replicas.ApplyBalancePlan(&balancePlan{ + nodeID: 2, + sourceReplica: invalidReplicaID, + targetReplica: 1, + }, kv) + + assert.NoError(t, err) + + info, has := replicas.Get(1) + require.True(t, has) + require.NotNil(t, info) + + assert.Contains(t, info.GetNodeIds(), int64(2)) + + result := replicas.GetReplicasByNodeID(2) + assert.Equal(t, 1, len(result)) + kv.AssertCalled(t, "MultiSave", mock.AnythingOfType("map[string]string")) + }) + + t.Run("remove node from replica", func(t *testing.T) { + replicas := NewReplicaInfos() + + replicas.Insert(&milvuspb.ReplicaInfo{ + ReplicaID: 1, + NodeIds: []int64{1}, + }) + + err := replicas.ApplyBalancePlan(&balancePlan{ + nodeID: 1, + sourceReplica: 1, + targetReplica: invalidReplicaID, + }, kv) + + assert.NoError(t, err) + + info, has := replicas.Get(1) + require.True(t, has) + require.NotNil(t, info) + + assert.NotContains(t, info.GetNodeIds(), int64(1)) + + result := replicas.GetReplicasByNodeID(1) + assert.Equal(t, 0, len(result)) + kv.AssertCalled(t, "MultiSave", mock.AnythingOfType("map[string]string")) + }) + + t.Run("remove non-existing node from replica", func(t *testing.T) { + replicas := NewReplicaInfos() + + replicas.Insert(&milvuspb.ReplicaInfo{ + ReplicaID: 1, + NodeIds: []int64{1}, + }) + + err := replicas.ApplyBalancePlan(&balancePlan{ + nodeID: 2, + sourceReplica: 1, + targetReplica: invalidReplicaID, + }, kv) + + assert.NoError(t, err) + }) + + t.Run("save to etcd fail", func(t *testing.T) { + kv := &mockMetaKV{} + kv.On("MultiSave", mock.AnythingOfType("map[string]string")).Return(errors.New("mocked error")) + replicas := NewReplicaInfos() + + replicas.Insert(&milvuspb.ReplicaInfo{ + ReplicaID: 1, + NodeIds: []int64{1}, + }) + + err := replicas.ApplyBalancePlan(&balancePlan{ + nodeID: 2, + sourceReplica: invalidReplicaID, + targetReplica: 1, + }, kv) + + kv.AssertCalled(t, "MultiSave", mock.AnythingOfType("map[string]string")) + assert.Error(t, err) + }) +}