diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index 125b1556cd..2191ad69e5 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -8,6 +8,7 @@ // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. + package datacoord import ( @@ -23,11 +24,12 @@ import ( ) type cluster struct { - mu sync.RWMutex - ctx context.Context - dataManager *clusterNodeManager - sessionManager sessionManager - posProvider positionProvider + mu sync.RWMutex + ctx context.Context + dataManager *clusterNodeManager + sessionManager sessionManager + candidateManager *candidateManager + posProvider positionProvider startupPolicy clusterStartupPolicy registerPolicy dataNodeRegisterPolicy @@ -92,13 +94,16 @@ func newCluster(ctx context.Context, dataManager *clusterNodeManager, unregisterPolicy: defaultUnregisterPolicy(), assignPolicy: defaultAssignPolicy(), } + for _, opt := range opts { opt.apply(c) } + c.candidateManager = newCandidateManager(20, c.validateDataNode, c.enableDataNode) return c } +// startup applies statup policy func (c *cluster) startup(dataNodes []*datapb.DataNodeInfo) error { deltaChange := c.dataManager.updateCluster(dataNodes) nodes, chanBuffer := c.dataManager.getDataNodes(false) @@ -117,18 +122,68 @@ func (c *cluster) startup(dataNodes []*datapb.DataNodeInfo) error { // refresh rough refresh datanode status after event received func (c *cluster) refresh(dataNodes []*datapb.DataNodeInfo) error { + c.mu.Lock() + defer c.mu.Unlock() deltaChange := c.dataManager.updateCluster(dataNodes) - nodes, chanBuffer := c.dataManager.getDataNodes(false) - var rets []*datapb.DataNodeInfo - var err error - rets, chanBuffer = c.startupPolicy.apply(nodes, deltaChange, chanBuffer) - c.dataManager.updateDataNodes(rets, chanBuffer) - rets, err = c.watch(rets) - if err != nil { - log.Warn("Failed to watch all the status change", zap.Error(err)) - //does not trigger new another refresh, pending evt will do + log.Debug("refresh delta", zap.Any("new", deltaChange.newNodes), + zap.Any("restart", deltaChange.restarts), + zap.Any("offline", deltaChange.offlines)) + + // cannot use startup policy directly separate into three parts: + // 1. add new nodes into candidates list + for _, dn := range dataNodes { + for _, newAddr := range deltaChange.newNodes { + if dn.Address == newAddr { + c.candidateManager.add(dn) + } + } } - c.dataManager.updateDataNodes(rets, chanBuffer) // even if some watch failed, status should sync into etcd + + // 2. restart nodes try to watch + restartNodes := make([]*datapb.DataNodeInfo, 0, len(deltaChange.restarts)) + for _, node := range deltaChange.restarts { + info, ok := c.dataManager.dataNodes[node] + if ok { + restartNodes = append(restartNodes, info.info) + for _, cs := range info.info.Channels { + cs.State = datapb.ChannelWatchState_Uncomplete + } + } + } + _, buffer := c.dataManager.getDataNodes(true) + c.updateNodeWatch(restartNodes, buffer) + + // 3. offline do unregister + unregisterNodes := make([]*datapb.DataNodeInfo, 0, len(deltaChange.offlines)) // possible nodes info to unregister + for _, node := range deltaChange.offlines { + info := c.dataManager.unregister(node) + if info != nil { + unregisterNodes = append(unregisterNodes, info) + } + } + for _, node := range unregisterNodes { + cluster, buffer := c.dataManager.getDataNodes(true) + if len(cluster) > 0 { // cluster has online nodes, migrate channels + ret := c.unregisterPolicy.apply(cluster, node) + c.updateNodeWatch(ret, buffer) + } else { + // no online node, put all watched channels to buffer + buffer = append(buffer, node.Channels...) + c.updateNodeWatch([]*datapb.DataNodeInfo{}, buffer) + } + } + + return nil +} + +// updateNodeWatch save nodes uncomplete status and try to watch channels which is unwatched, save the execution result +func (c *cluster) updateNodeWatch(nodes []*datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) error { + c.dataManager.updateDataNodes(nodes, buffer) + rets, err := c.watch(nodes) + if err != nil { + log.Warn("Failed to watch all the status change", zap.Error(err)) // + } + c.dataManager.updateDataNodes(rets, buffer) return err } @@ -158,13 +213,29 @@ func paraRun(works []func(), maxRunner int) { close(ch) } +func (c *cluster) validateDataNode(dn *datapb.DataNodeInfo) error { + log.Warn("[CM] start validate candidate", zap.String("addr", dn.Address)) + _, err := c.sessionManager.getOrCreateSession(dn.Address) // this might take time if address went offline + log.Warn("[CM] candidate validation finished", zap.String("addr", dn.Address), zap.Error(err)) + if err != nil { + return err + } + return nil +} + +func (c *cluster) enableDataNode(dn *datapb.DataNodeInfo) error { + log.Warn("[CM] enabling candidate", zap.String("addr", dn.Address)) + c.register(dn) + return nil +} + func (c *cluster) watch(nodes []*datapb.DataNodeInfo) ([]*datapb.DataNodeInfo, error) { works := make([]func(), 0, len(nodes)) mut := sync.Mutex{} errs := make([]error, 0, len(nodes)) for _, n := range nodes { works = append(works, func() { - logMsg := fmt.Sprintf("Begin to watch channels for node %s:", n.Address) + logMsg := fmt.Sprintf("Begin to watch channels for node %s, channels:", n.Address) uncompletes := make([]vchannel, 0, len(n.Channels)) for _, ch := range n.Channels { if ch.State == datapb.ChannelWatchState_Uncomplete { @@ -260,7 +331,7 @@ func (c *cluster) unregister(n *datapb.DataNodeInfo) { defer c.mu.Unlock() c.sessionManager.releaseSession(n.Address) - oldNode := c.dataManager.unregister(n) + oldNode := c.dataManager.unregister(n.Address) if oldNode != nil { n = oldNode } @@ -338,7 +409,7 @@ func (c *cluster) flush(segments []*datapb.SegmentInfo) { if !ok { continue } - cli, err := c.sessionManager.getOrCreateSession(node) + cli, err := c.sessionManager.getSession(node) if err != nil { log.Warn("get session failed", zap.String("addr", node), zap.Error(err)) continue @@ -370,4 +441,5 @@ func (c *cluster) releaseSessions() { c.mu.Lock() defer c.mu.Unlock() c.sessionManager.release() + c.candidateManager.dispose() } diff --git a/internal/datacoord/cluster_candidate.go b/internal/datacoord/cluster_candidate.go new file mode 100644 index 0000000000..da8644fad2 --- /dev/null +++ b/internal/datacoord/cluster_candidate.go @@ -0,0 +1,124 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package datacoord + +import ( + "context" + "sync" + + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/proto/datapb" + "go.uber.org/zap" +) + +// candidateManager manages data node candidates +type candidateManager struct { + candidatePool sync.Map // current processing candidates + taskQueue chan candidate // task queue to notify workers + cancel func() // global cancel func + validate func(*datapb.DataNodeInfo) error // candidate validation + enable func(*datapb.DataNodeInfo) error // enable operation if candidate validate +} + +// candidate stands for datanode info from etcd +// it needs to be validated before put into cluster +// since etcd key has a lease timeout of 10 seconds +type candidate struct { + key string // key to specify candidate, usually candidate address + node *datapb.DataNodeInfo // node info + ctx context.Context //context obj to control validation process + cancel func() // cancel func to cancel single candidate +} + +// newCandidateManager create candidate with specified worker number +func newCandidateManager(wn int, validate, enable func(*datapb.DataNodeInfo) error) *candidateManager { + if wn <= 0 { + wn = 20 + } + ctx, cancel := context.WithCancel(context.Background()) + cm := &candidateManager{ + candidatePool: sync.Map{}, + cancel: cancel, + taskQueue: make(chan candidate, wn), // wn * 2 cap, wn worker & wn buffer + validate: validate, + enable: enable, + } + for i := 0; i < wn; i++ { + //start worker + go cm.work(ctx) + } + return cm +} + +// work processes the candidates from channel +// each task can be cancel by candidate contex or by global cancel fund +func (cm *candidateManager) work(ctx context.Context) { + for { + select { + case cand := <-cm.taskQueue: + ch := make(chan struct{}) + var err error + go func() { + err = cm.validate(cand.node) + ch <- struct{}{} + }() + select { + case <-ch: + if err == nil { + cm.enable(cand.node) // success, enable candidate + } else { + log.Warn("[CM] candidate failed", zap.String("addr", cand.node.Address)) + } + case <-cand.ctx.Done(): + } + cm.candidatePool.Delete(cand.key) // remove from candidatePool + case <-ctx.Done(): + return + } + } +} + +// add datanode into candidate pool +// the operation is non-blocking +func (cm *candidateManager) add(dn *datapb.DataNodeInfo) { + log.Warn("[CM]add new candidate", zap.String("addr", dn.Address)) + key := dn.Address + ctx, cancel := context.WithCancel(context.Background()) + cand := candidate{ + key: key, + node: dn, + ctx: ctx, + cancel: cancel, + } + _, loaded := cm.candidatePool.LoadOrStore(key, cand) + if !loaded { + go func() { // start goroutine to non-blocking add into queue + cm.taskQueue <- cand + }() + } +} + +// stop the candidate validation process if it exists in the pool +func (cm *candidateManager) stop(key string) { + val, loaded := cm.candidatePool.LoadAndDelete(key) + if loaded { + cand, ok := val.(candidate) + if ok { + cand.cancel() + } + } +} + +// dispose the manager for stopping app +func (cm *candidateManager) dispose() { + cm.cancel() +} diff --git a/internal/datacoord/cluster_data_manager.go b/internal/datacoord/cluster_data_manager.go index 5952215e97..c55908e0c3 100644 --- a/internal/datacoord/cluster_data_manager.go +++ b/internal/datacoord/cluster_data_manager.go @@ -126,6 +126,7 @@ func (c *clusterNodeManager) updateCluster(dataNodes []*datapb.DataNodeInfo) *cl } } +// updateDataNodes update dataNodes input mereged with existing cluster and buffer func (c *clusterNodeManager) updateDataNodes(dataNodes []*datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) error { for _, node := range dataNodes { c.dataNodes[node.Address].info = node @@ -134,6 +135,7 @@ func (c *clusterNodeManager) updateDataNodes(dataNodes []*datapb.DataNodeInfo, b return c.txnSaveNodes(dataNodes, buffer) } +// getDataNodes get current synced data nodes with buffered channel func (c *clusterNodeManager) getDataNodes(onlyOnline bool) (map[string]*datapb.DataNodeInfo, []*datapb.ChannelStatus) { ret := make(map[string]*datapb.DataNodeInfo) for k, v := range c.dataNodes { @@ -158,8 +160,9 @@ func (c *clusterNodeManager) register(n *datapb.DataNodeInfo) { c.updateMetrics() } -func (c *clusterNodeManager) unregister(n *datapb.DataNodeInfo) *datapb.DataNodeInfo { - node, ok := c.dataNodes[n.Address] +// unregister removes node with specified address, returns node info if exists +func (c *clusterNodeManager) unregister(addr string) *datapb.DataNodeInfo { + node, ok := c.dataNodes[addr] if !ok { return nil } diff --git a/internal/datacoord/cluster_session_manager.go b/internal/datacoord/cluster_session_manager.go index 125a7e69b4..f0423479cf 100644 --- a/internal/datacoord/cluster_session_manager.go +++ b/internal/datacoord/cluster_session_manager.go @@ -12,14 +12,16 @@ package datacoord import ( "context" + "errors" "sync" "github.com/milvus-io/milvus/internal/types" ) -const retryTimes = 2 - type sessionManager interface { + // try get session, without retry + getSession(addr string) (types.DataNode, error) + // try get session from manager with addr, if not exists, create one getOrCreateSession(addr string) (types.DataNode, error) releaseSession(addr string) release() @@ -40,7 +42,17 @@ func newClusterSessionManager(ctx context.Context, dataClientCreator dataNodeCre } } -// lock acquired +// getSession with out creation if not found +func (m *clusterSessionManager) getSession(addr string) (types.DataNode, error) { + m.RLock() + defer m.RUnlock() + cli, has := m.sessions[addr] + if has { + return cli, nil + } + return nil, errors.New("not found") +} + func (m *clusterSessionManager) createSession(addr string) (types.DataNode, error) { cli, err := m.dataClientCreator(m.ctx, addr) if err != nil { @@ -52,7 +64,9 @@ func (m *clusterSessionManager) createSession(addr string) (types.DataNode, erro if err := cli.Start(); err != nil { return nil, err } + m.Lock() m.sessions[addr] = cli + m.Unlock() return cli, nil } @@ -64,12 +78,7 @@ func (m *clusterSessionManager) getOrCreateSession(addr string) (types.DataNode, if has { return dn, nil } - m.Lock() - defer m.Unlock() - dn, has = m.sessions[addr] - if has { - return dn, nil - } + // does not need double check, addr has outer sync.Map dn, err := m.createSession(addr) return dn, err } diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 05293c367a..448e3ce13b 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -317,6 +317,10 @@ func newMockSessionManager(ch chan interface{}) sessionManager { } } +func (m *mockSessionManager) getSession(addr string) (types.DataNode, error) { + return newMockDataNodeClient(0, m.ch) +} + func (m *mockSessionManager) getOrCreateSession(addr string) (types.DataNode, error) { return newMockDataNodeClient(0, m.ch) } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 3372ed3d25..70c0aa1a06 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -504,7 +504,7 @@ func TestDataNodeTtChannel(t *testing.T) { }, } } - + svr.cluster.sessionManager.getOrCreateSession("localhost:7777") // trigger create session manually svr.cluster.register(&datapb.DataNodeInfo{ Address: "localhost:7777", Version: 0, diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 1f29c758c4..89b1fba330 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -67,7 +67,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error { connectGrpcFunc := func() error { opts := trace.GetInterceptorOpts() log.Debug("DataNode connect ", zap.String("address", c.addr)) - conn, err := grpc.DialContext(c.ctx, c.addr, + ctx, cancel := context.WithTimeout(c.ctx, time.Millisecond*200) + defer cancel() + conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(200*time.Millisecond), grpc.WithDisableRetry(), grpc.WithUnaryInterceptor(