milvus/internal/datacoord/cluster_store.go
sunby ee38d14f5d
Fix channel lost after data node crash (#6545)
If we start up 2 data nodes and one of them crashes. We expect that all
channels of crashed node will be allcoated to the alive node. But now we
discover that these channels are lost after data node crash. The reason
is we pass a NodeInfo with empty channel info. We fix it and improve log
print.

issue: #6501
Signed-off-by: sunby <bingyi.sun@zilliz.com>
2021-07-15 16:38:31 +08:00

188 lines
4.3 KiB
Go

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datacoord
import (
"context"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
)
type ClusterStore interface {
GetNodes() []*NodeInfo
SetNode(nodeID UniqueID, node *NodeInfo)
DeleteNode(nodeID UniqueID)
GetNode(nodeID UniqueID) *NodeInfo
SetClient(nodeID UniqueID, client types.DataNode)
SetWatched(nodeID UniqueID, channelsName []string)
}
type NodeInfo struct {
Info *datapb.DataNodeInfo
eventCh chan *NodeEvent
client types.DataNode
ctx context.Context
cancel context.CancelFunc
}
const eventChBuffer = 1024
type NodeEvent struct {
Type NodeEventType
Req interface{}
}
func NewNodeInfo(ctx context.Context, info *datapb.DataNodeInfo) *NodeInfo {
ctx, cancel := context.WithCancel(ctx)
return &NodeInfo{
Info: info,
eventCh: make(chan *NodeEvent, eventChBuffer),
ctx: ctx,
cancel: cancel,
}
}
func (n *NodeInfo) ShadowClone(opts ...NodeOpt) *NodeInfo {
cloned := &NodeInfo{
Info: n.Info,
eventCh: n.eventCh,
client: n.client,
ctx: n.ctx,
cancel: n.cancel,
}
for _, opt := range opts {
opt(cloned)
}
return cloned
}
func (n *NodeInfo) Clone(opts ...NodeOpt) *NodeInfo {
info := proto.Clone(n.Info).(*datapb.DataNodeInfo)
cloned := &NodeInfo{
Info: info,
eventCh: n.eventCh,
client: n.client,
ctx: n.ctx,
cancel: n.cancel,
}
for _, opt := range opts {
opt(cloned)
}
return cloned
}
func (n *NodeInfo) GetEventChannel() chan *NodeEvent {
return n.eventCh
}
func (n *NodeInfo) GetClient() types.DataNode {
return n.client
}
func (n *NodeInfo) Dispose() {
defer n.cancel()
if n.client != nil {
n.client.Stop()
}
}
type NodesInfo struct {
nodes map[UniqueID]*NodeInfo
}
func NewNodesInfo() *NodesInfo {
c := &NodesInfo{
nodes: make(map[UniqueID]*NodeInfo),
}
return c
}
func (c *NodesInfo) GetNodes() []*NodeInfo {
nodes := make([]*NodeInfo, 0, len(c.nodes))
for _, node := range c.nodes {
nodes = append(nodes, node)
}
return nodes
}
func (c *NodesInfo) SetNode(nodeID UniqueID, node *NodeInfo) {
c.nodes[nodeID] = node
metrics.DataCoordDataNodeList.WithLabelValues("online").Inc()
metrics.DataCoordDataNodeList.WithLabelValues("offline").Dec()
}
func (c *NodesInfo) DeleteNode(nodeID UniqueID) {
delete(c.nodes, nodeID)
metrics.DataCoordDataNodeList.WithLabelValues("online").Dec()
metrics.DataCoordDataNodeList.WithLabelValues("offline").Inc()
}
func (c *NodesInfo) GetNode(nodeID UniqueID) *NodeInfo {
node, ok := c.nodes[nodeID]
if !ok {
return nil
}
return node
}
func (c *NodesInfo) SetClient(nodeID UniqueID, client types.DataNode) {
if node, ok := c.nodes[nodeID]; ok {
c.nodes[nodeID] = node.ShadowClone(SetClient(client))
}
}
func (c *NodesInfo) SetWatched(nodeID UniqueID, channelsName []string) {
if node, ok := c.nodes[nodeID]; ok {
c.nodes[nodeID] = node.Clone(SetWatched(channelsName))
}
}
type NodeOpt func(n *NodeInfo)
func SetWatched(channelsName []string) NodeOpt {
return func(n *NodeInfo) {
channelsMap := make(map[string]struct{})
for _, channelName := range channelsName {
channelsMap[channelName] = struct{}{}
}
for _, ch := range n.Info.Channels {
_, ok := channelsMap[ch.GetName()]
if !ok {
continue
}
if ch.State == datapb.ChannelWatchState_Uncomplete {
ch.State = datapb.ChannelWatchState_Complete
}
}
}
}
func SetClient(client types.DataNode) NodeOpt {
return func(n *NodeInfo) {
n.client = client
}
}
func AddChannels(channels []*datapb.ChannelStatus) NodeOpt {
return func(n *NodeInfo) {
n.Info.Channels = append(n.Info.Channels, channels...)
}
}
func SetChannels(channels []*datapb.ChannelStatus) NodeOpt {
return func(n *NodeInfo) {
n.Info.Channels = channels
}
}