Add candidate management for datacoord dn cluster (#6196)

* Add candidate management for datacoord dn cluster

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

* Fix test cases

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2021-06-30 10:20:15 +08:00 committed by GitHub
parent b90b4f2058
commit ae072b4f1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 245 additions and 31 deletions

View File

@ -8,6 +8,7 @@
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datacoord
import (
@ -23,11 +24,12 @@ import (
)
type cluster struct {
mu sync.RWMutex
ctx context.Context
dataManager *clusterNodeManager
sessionManager sessionManager
posProvider positionProvider
mu sync.RWMutex
ctx context.Context
dataManager *clusterNodeManager
sessionManager sessionManager
candidateManager *candidateManager
posProvider positionProvider
startupPolicy clusterStartupPolicy
registerPolicy dataNodeRegisterPolicy
@ -92,13 +94,16 @@ func newCluster(ctx context.Context, dataManager *clusterNodeManager,
unregisterPolicy: defaultUnregisterPolicy(),
assignPolicy: defaultAssignPolicy(),
}
for _, opt := range opts {
opt.apply(c)
}
c.candidateManager = newCandidateManager(20, c.validateDataNode, c.enableDataNode)
return c
}
// startup applies statup policy
func (c *cluster) startup(dataNodes []*datapb.DataNodeInfo) error {
deltaChange := c.dataManager.updateCluster(dataNodes)
nodes, chanBuffer := c.dataManager.getDataNodes(false)
@ -117,18 +122,68 @@ func (c *cluster) startup(dataNodes []*datapb.DataNodeInfo) error {
// refresh rough refresh datanode status after event received
func (c *cluster) refresh(dataNodes []*datapb.DataNodeInfo) error {
c.mu.Lock()
defer c.mu.Unlock()
deltaChange := c.dataManager.updateCluster(dataNodes)
nodes, chanBuffer := c.dataManager.getDataNodes(false)
var rets []*datapb.DataNodeInfo
var err error
rets, chanBuffer = c.startupPolicy.apply(nodes, deltaChange, chanBuffer)
c.dataManager.updateDataNodes(rets, chanBuffer)
rets, err = c.watch(rets)
if err != nil {
log.Warn("Failed to watch all the status change", zap.Error(err))
//does not trigger new another refresh, pending evt will do
log.Debug("refresh delta", zap.Any("new", deltaChange.newNodes),
zap.Any("restart", deltaChange.restarts),
zap.Any("offline", deltaChange.offlines))
// cannot use startup policy directly separate into three parts:
// 1. add new nodes into candidates list
for _, dn := range dataNodes {
for _, newAddr := range deltaChange.newNodes {
if dn.Address == newAddr {
c.candidateManager.add(dn)
}
}
}
c.dataManager.updateDataNodes(rets, chanBuffer) // even if some watch failed, status should sync into etcd
// 2. restart nodes try to watch
restartNodes := make([]*datapb.DataNodeInfo, 0, len(deltaChange.restarts))
for _, node := range deltaChange.restarts {
info, ok := c.dataManager.dataNodes[node]
if ok {
restartNodes = append(restartNodes, info.info)
for _, cs := range info.info.Channels {
cs.State = datapb.ChannelWatchState_Uncomplete
}
}
}
_, buffer := c.dataManager.getDataNodes(true)
c.updateNodeWatch(restartNodes, buffer)
// 3. offline do unregister
unregisterNodes := make([]*datapb.DataNodeInfo, 0, len(deltaChange.offlines)) // possible nodes info to unregister
for _, node := range deltaChange.offlines {
info := c.dataManager.unregister(node)
if info != nil {
unregisterNodes = append(unregisterNodes, info)
}
}
for _, node := range unregisterNodes {
cluster, buffer := c.dataManager.getDataNodes(true)
if len(cluster) > 0 { // cluster has online nodes, migrate channels
ret := c.unregisterPolicy.apply(cluster, node)
c.updateNodeWatch(ret, buffer)
} else {
// no online node, put all watched channels to buffer
buffer = append(buffer, node.Channels...)
c.updateNodeWatch([]*datapb.DataNodeInfo{}, buffer)
}
}
return nil
}
// updateNodeWatch save nodes uncomplete status and try to watch channels which is unwatched, save the execution result
func (c *cluster) updateNodeWatch(nodes []*datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) error {
c.dataManager.updateDataNodes(nodes, buffer)
rets, err := c.watch(nodes)
if err != nil {
log.Warn("Failed to watch all the status change", zap.Error(err)) //
}
c.dataManager.updateDataNodes(rets, buffer)
return err
}
@ -158,13 +213,29 @@ func paraRun(works []func(), maxRunner int) {
close(ch)
}
func (c *cluster) validateDataNode(dn *datapb.DataNodeInfo) error {
log.Warn("[CM] start validate candidate", zap.String("addr", dn.Address))
_, err := c.sessionManager.getOrCreateSession(dn.Address) // this might take time if address went offline
log.Warn("[CM] candidate validation finished", zap.String("addr", dn.Address), zap.Error(err))
if err != nil {
return err
}
return nil
}
func (c *cluster) enableDataNode(dn *datapb.DataNodeInfo) error {
log.Warn("[CM] enabling candidate", zap.String("addr", dn.Address))
c.register(dn)
return nil
}
func (c *cluster) watch(nodes []*datapb.DataNodeInfo) ([]*datapb.DataNodeInfo, error) {
works := make([]func(), 0, len(nodes))
mut := sync.Mutex{}
errs := make([]error, 0, len(nodes))
for _, n := range nodes {
works = append(works, func() {
logMsg := fmt.Sprintf("Begin to watch channels for node %s:", n.Address)
logMsg := fmt.Sprintf("Begin to watch channels for node %s, channels:", n.Address)
uncompletes := make([]vchannel, 0, len(n.Channels))
for _, ch := range n.Channels {
if ch.State == datapb.ChannelWatchState_Uncomplete {
@ -260,7 +331,7 @@ func (c *cluster) unregister(n *datapb.DataNodeInfo) {
defer c.mu.Unlock()
c.sessionManager.releaseSession(n.Address)
oldNode := c.dataManager.unregister(n)
oldNode := c.dataManager.unregister(n.Address)
if oldNode != nil {
n = oldNode
}
@ -338,7 +409,7 @@ func (c *cluster) flush(segments []*datapb.SegmentInfo) {
if !ok {
continue
}
cli, err := c.sessionManager.getOrCreateSession(node)
cli, err := c.sessionManager.getSession(node)
if err != nil {
log.Warn("get session failed", zap.String("addr", node), zap.Error(err))
continue
@ -370,4 +441,5 @@ func (c *cluster) releaseSessions() {
c.mu.Lock()
defer c.mu.Unlock()
c.sessionManager.release()
c.candidateManager.dispose()
}

View File

@ -0,0 +1,124 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datacoord
import (
"context"
"sync"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"go.uber.org/zap"
)
// candidateManager manages data node candidates
type candidateManager struct {
candidatePool sync.Map // current processing candidates
taskQueue chan candidate // task queue to notify workers
cancel func() // global cancel func
validate func(*datapb.DataNodeInfo) error // candidate validation
enable func(*datapb.DataNodeInfo) error // enable operation if candidate validate
}
// candidate stands for datanode info from etcd
// it needs to be validated before put into cluster
// since etcd key has a lease timeout of 10 seconds
type candidate struct {
key string // key to specify candidate, usually candidate address
node *datapb.DataNodeInfo // node info
ctx context.Context //context obj to control validation process
cancel func() // cancel func to cancel single candidate
}
// newCandidateManager create candidate with specified worker number
func newCandidateManager(wn int, validate, enable func(*datapb.DataNodeInfo) error) *candidateManager {
if wn <= 0 {
wn = 20
}
ctx, cancel := context.WithCancel(context.Background())
cm := &candidateManager{
candidatePool: sync.Map{},
cancel: cancel,
taskQueue: make(chan candidate, wn), // wn * 2 cap, wn worker & wn buffer
validate: validate,
enable: enable,
}
for i := 0; i < wn; i++ {
//start worker
go cm.work(ctx)
}
return cm
}
// work processes the candidates from channel
// each task can be cancel by candidate contex or by global cancel fund
func (cm *candidateManager) work(ctx context.Context) {
for {
select {
case cand := <-cm.taskQueue:
ch := make(chan struct{})
var err error
go func() {
err = cm.validate(cand.node)
ch <- struct{}{}
}()
select {
case <-ch:
if err == nil {
cm.enable(cand.node) // success, enable candidate
} else {
log.Warn("[CM] candidate failed", zap.String("addr", cand.node.Address))
}
case <-cand.ctx.Done():
}
cm.candidatePool.Delete(cand.key) // remove from candidatePool
case <-ctx.Done():
return
}
}
}
// add datanode into candidate pool
// the operation is non-blocking
func (cm *candidateManager) add(dn *datapb.DataNodeInfo) {
log.Warn("[CM]add new candidate", zap.String("addr", dn.Address))
key := dn.Address
ctx, cancel := context.WithCancel(context.Background())
cand := candidate{
key: key,
node: dn,
ctx: ctx,
cancel: cancel,
}
_, loaded := cm.candidatePool.LoadOrStore(key, cand)
if !loaded {
go func() { // start goroutine to non-blocking add into queue
cm.taskQueue <- cand
}()
}
}
// stop the candidate validation process if it exists in the pool
func (cm *candidateManager) stop(key string) {
val, loaded := cm.candidatePool.LoadAndDelete(key)
if loaded {
cand, ok := val.(candidate)
if ok {
cand.cancel()
}
}
}
// dispose the manager for stopping app
func (cm *candidateManager) dispose() {
cm.cancel()
}

View File

@ -126,6 +126,7 @@ func (c *clusterNodeManager) updateCluster(dataNodes []*datapb.DataNodeInfo) *cl
}
}
// updateDataNodes update dataNodes input mereged with existing cluster and buffer
func (c *clusterNodeManager) updateDataNodes(dataNodes []*datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) error {
for _, node := range dataNodes {
c.dataNodes[node.Address].info = node
@ -134,6 +135,7 @@ func (c *clusterNodeManager) updateDataNodes(dataNodes []*datapb.DataNodeInfo, b
return c.txnSaveNodes(dataNodes, buffer)
}
// getDataNodes get current synced data nodes with buffered channel
func (c *clusterNodeManager) getDataNodes(onlyOnline bool) (map[string]*datapb.DataNodeInfo, []*datapb.ChannelStatus) {
ret := make(map[string]*datapb.DataNodeInfo)
for k, v := range c.dataNodes {
@ -158,8 +160,9 @@ func (c *clusterNodeManager) register(n *datapb.DataNodeInfo) {
c.updateMetrics()
}
func (c *clusterNodeManager) unregister(n *datapb.DataNodeInfo) *datapb.DataNodeInfo {
node, ok := c.dataNodes[n.Address]
// unregister removes node with specified address, returns node info if exists
func (c *clusterNodeManager) unregister(addr string) *datapb.DataNodeInfo {
node, ok := c.dataNodes[addr]
if !ok {
return nil
}

View File

@ -12,14 +12,16 @@ package datacoord
import (
"context"
"errors"
"sync"
"github.com/milvus-io/milvus/internal/types"
)
const retryTimes = 2
type sessionManager interface {
// try get session, without retry
getSession(addr string) (types.DataNode, error)
// try get session from manager with addr, if not exists, create one
getOrCreateSession(addr string) (types.DataNode, error)
releaseSession(addr string)
release()
@ -40,7 +42,17 @@ func newClusterSessionManager(ctx context.Context, dataClientCreator dataNodeCre
}
}
// lock acquired
// getSession with out creation if not found
func (m *clusterSessionManager) getSession(addr string) (types.DataNode, error) {
m.RLock()
defer m.RUnlock()
cli, has := m.sessions[addr]
if has {
return cli, nil
}
return nil, errors.New("not found")
}
func (m *clusterSessionManager) createSession(addr string) (types.DataNode, error) {
cli, err := m.dataClientCreator(m.ctx, addr)
if err != nil {
@ -52,7 +64,9 @@ func (m *clusterSessionManager) createSession(addr string) (types.DataNode, erro
if err := cli.Start(); err != nil {
return nil, err
}
m.Lock()
m.sessions[addr] = cli
m.Unlock()
return cli, nil
}
@ -64,12 +78,7 @@ func (m *clusterSessionManager) getOrCreateSession(addr string) (types.DataNode,
if has {
return dn, nil
}
m.Lock()
defer m.Unlock()
dn, has = m.sessions[addr]
if has {
return dn, nil
}
// does not need double check, addr has outer sync.Map
dn, err := m.createSession(addr)
return dn, err
}

View File

@ -317,6 +317,10 @@ func newMockSessionManager(ch chan interface{}) sessionManager {
}
}
func (m *mockSessionManager) getSession(addr string) (types.DataNode, error) {
return newMockDataNodeClient(0, m.ch)
}
func (m *mockSessionManager) getOrCreateSession(addr string) (types.DataNode, error) {
return newMockDataNodeClient(0, m.ch)
}

View File

@ -504,7 +504,7 @@ func TestDataNodeTtChannel(t *testing.T) {
},
}
}
svr.cluster.sessionManager.getOrCreateSession("localhost:7777") // trigger create session manually
svr.cluster.register(&datapb.DataNodeInfo{
Address: "localhost:7777",
Version: 0,

View File

@ -67,7 +67,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error {
connectGrpcFunc := func() error {
opts := trace.GetInterceptorOpts()
log.Debug("DataNode connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(c.ctx, c.addr,
ctx, cancel := context.WithTimeout(c.ctx, time.Millisecond*200)
defer cancel()
conn, err := grpc.DialContext(ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(200*time.Millisecond),
grpc.WithDisableRetry(),
grpc.WithUnaryInterceptor(