mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Force cluster refresh for each dn change event (#6161)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
a5f74c4a6c
commit
b27e6b52bf
@ -17,6 +17,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
@ -102,68 +103,130 @@ func (c *cluster) startup(dataNodes []*datapb.DataNodeInfo) error {
|
||||
deltaChange := c.dataManager.updateCluster(dataNodes)
|
||||
nodes, chanBuffer := c.dataManager.getDataNodes(false)
|
||||
var rets []*datapb.DataNodeInfo
|
||||
var err error
|
||||
rets, chanBuffer = c.startupPolicy.apply(nodes, deltaChange, chanBuffer)
|
||||
c.dataManager.updateDataNodes(rets, chanBuffer)
|
||||
rets = c.watch(rets)
|
||||
rets, err = c.watch(rets)
|
||||
if err != nil {
|
||||
log.Warn("Failed to watch all the status change", zap.Error(err))
|
||||
//does not trigger new another refresh, pending evt will do
|
||||
}
|
||||
c.dataManager.updateDataNodes(rets, chanBuffer)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cluster) watch(nodes []*datapb.DataNodeInfo) []*datapb.DataNodeInfo {
|
||||
for _, n := range nodes {
|
||||
logMsg := fmt.Sprintf("Begin to watch channels for node %s:", n.Address)
|
||||
uncompletes := make([]vchannel, 0, len(n.Channels))
|
||||
for _, ch := range n.Channels {
|
||||
if ch.State == datapb.ChannelWatchState_Uncomplete {
|
||||
if len(uncompletes) == 0 {
|
||||
logMsg += ch.Name
|
||||
} else {
|
||||
logMsg += "," + ch.Name
|
||||
}
|
||||
uncompletes = append(uncompletes, vchannel{
|
||||
CollectionID: ch.CollectionID,
|
||||
DmlChannel: ch.Name,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if len(uncompletes) == 0 {
|
||||
continue
|
||||
}
|
||||
log.Debug(logMsg)
|
||||
|
||||
vchanInfos, err := c.posProvider.GetVChanPositions(uncompletes, true)
|
||||
if err != nil {
|
||||
log.Warn("get vchannel position failed", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
cli, err := c.sessionManager.getOrCreateSession(n.Address)
|
||||
if err != nil {
|
||||
log.Warn("get session failed", zap.String("addr", n.Address), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
req := &datapb.WatchDmChannelsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
SourceID: Params.NodeID,
|
||||
},
|
||||
Vchannels: vchanInfos,
|
||||
}
|
||||
resp, err := cli.WatchDmChannels(c.ctx, req)
|
||||
if err != nil {
|
||||
log.Warn("watch dm channel failed", zap.String("addr", n.Address), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if resp.ErrorCode != commonpb.ErrorCode_Success {
|
||||
log.Warn("watch channels failed", zap.String("address", n.Address), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
for _, ch := range n.Channels {
|
||||
if ch.State == datapb.ChannelWatchState_Uncomplete {
|
||||
ch.State = datapb.ChannelWatchState_Complete
|
||||
}
|
||||
}
|
||||
// refresh rough refresh datanode status after event received
|
||||
func (c *cluster) refresh(dataNodes []*datapb.DataNodeInfo) error {
|
||||
deltaChange := c.dataManager.updateCluster(dataNodes)
|
||||
nodes, chanBuffer := c.dataManager.getDataNodes(false)
|
||||
var rets []*datapb.DataNodeInfo
|
||||
var err error
|
||||
rets, chanBuffer = c.startupPolicy.apply(nodes, deltaChange, chanBuffer)
|
||||
c.dataManager.updateDataNodes(rets, chanBuffer)
|
||||
rets, err = c.watch(rets)
|
||||
if err != nil {
|
||||
log.Warn("Failed to watch all the status change", zap.Error(err))
|
||||
//does not trigger new another refresh, pending evt will do
|
||||
}
|
||||
return nodes
|
||||
c.dataManager.updateDataNodes(rets, chanBuffer) // even if some watch failed, status should sync into etcd
|
||||
return err
|
||||
}
|
||||
|
||||
// paraRun parallel run, with max Parallel limit
|
||||
func parraRun(works []func(), maxRunner int) {
|
||||
wg := sync.WaitGroup{}
|
||||
ch := make(chan func())
|
||||
wg.Add(len(works))
|
||||
|
||||
for i := 0; i < maxRunner; i++ {
|
||||
go func() {
|
||||
work, ok := <-ch
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
work()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
for _, work := range works {
|
||||
ch <- work
|
||||
}
|
||||
wg.Wait()
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func (c *cluster) watch(nodes []*datapb.DataNodeInfo) ([]*datapb.DataNodeInfo, error) {
|
||||
works := make([]func(), 0, len(nodes))
|
||||
mut := sync.Mutex{}
|
||||
errs := make([]error, 0, len(nodes))
|
||||
for _, n := range nodes {
|
||||
works = append(works, func() {
|
||||
logMsg := fmt.Sprintf("Begin to watch channels for node %s:", n.Address)
|
||||
uncompletes := make([]vchannel, 0, len(n.Channels))
|
||||
for _, ch := range n.Channels {
|
||||
if ch.State == datapb.ChannelWatchState_Uncomplete {
|
||||
if len(uncompletes) == 0 {
|
||||
logMsg += ch.Name
|
||||
} else {
|
||||
logMsg += "," + ch.Name
|
||||
}
|
||||
uncompletes = append(uncompletes, vchannel{
|
||||
CollectionID: ch.CollectionID,
|
||||
DmlChannel: ch.Name,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if len(uncompletes) == 0 {
|
||||
return // all set, just return
|
||||
}
|
||||
log.Debug(logMsg)
|
||||
|
||||
vchanInfos, err := c.posProvider.GetVChanPositions(uncompletes, true)
|
||||
if err != nil {
|
||||
log.Warn("get vchannel position failed", zap.Error(err))
|
||||
mut.Lock()
|
||||
errs = append(errs, err)
|
||||
mut.Unlock()
|
||||
return
|
||||
}
|
||||
cli, err := c.sessionManager.getOrCreateSession(n.Address) // this might take time if address went offline
|
||||
if err != nil {
|
||||
log.Warn("get session failed", zap.String("addr", n.Address), zap.Error(err))
|
||||
mut.Lock()
|
||||
errs = append(errs, err)
|
||||
mut.Unlock()
|
||||
return
|
||||
}
|
||||
req := &datapb.WatchDmChannelsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
SourceID: Params.NodeID,
|
||||
},
|
||||
Vchannels: vchanInfos,
|
||||
}
|
||||
resp, err := cli.WatchDmChannels(c.ctx, req)
|
||||
if err != nil {
|
||||
log.Warn("watch dm channel failed", zap.String("addr", n.Address), zap.Error(err))
|
||||
mut.Lock()
|
||||
errs = append(errs, err)
|
||||
mut.Unlock()
|
||||
}
|
||||
if resp.ErrorCode != commonpb.ErrorCode_Success {
|
||||
log.Warn("watch channels failed", zap.String("address", n.Address), zap.Error(err))
|
||||
mut.Lock()
|
||||
errs = append(errs, fmt.Errorf("watch fail with stat %v, msg:%s", resp.ErrorCode, resp.Reason))
|
||||
mut.Unlock()
|
||||
return
|
||||
}
|
||||
for _, ch := range n.Channels {
|
||||
if ch.State == datapb.ChannelWatchState_Uncomplete {
|
||||
ch.State = datapb.ChannelWatchState_Complete
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
parraRun(works, 3)
|
||||
return nodes, retry.ErrorList(errs)
|
||||
}
|
||||
|
||||
func (c *cluster) register(n *datapb.DataNodeInfo) {
|
||||
@ -172,11 +235,16 @@ func (c *cluster) register(n *datapb.DataNodeInfo) {
|
||||
c.dataManager.register(n)
|
||||
cNodes, chanBuffer := c.dataManager.getDataNodes(true)
|
||||
var rets []*datapb.DataNodeInfo
|
||||
var err error
|
||||
log.Debug("before register policy applied", zap.Any("n.Channels", n.Channels), zap.Any("buffer", chanBuffer))
|
||||
rets, chanBuffer = c.registerPolicy.apply(cNodes, n, chanBuffer)
|
||||
log.Debug("after register policy applied", zap.Any("ret", rets), zap.Any("buffer", chanBuffer))
|
||||
c.dataManager.updateDataNodes(rets, chanBuffer)
|
||||
rets = c.watch(rets)
|
||||
rets, err = c.watch(rets)
|
||||
if err != nil {
|
||||
log.Warn("Failed to watch all the status change", zap.Error(err))
|
||||
//does not trigger new another refresh, pending evt will do
|
||||
}
|
||||
c.dataManager.updateDataNodes(rets, chanBuffer)
|
||||
}
|
||||
|
||||
@ -192,6 +260,7 @@ func (c *cluster) unregister(n *datapb.DataNodeInfo) {
|
||||
cNodes, chanBuffer := c.dataManager.getDataNodes(true)
|
||||
log.Debug("before unregister policy applied", zap.Any("n.Channels", n.Channels), zap.Any("buffer", chanBuffer))
|
||||
var rets []*datapb.DataNodeInfo
|
||||
var err error
|
||||
if len(cNodes) == 0 {
|
||||
for _, chStat := range n.Channels {
|
||||
chStat.State = datapb.ChannelWatchState_Uncomplete
|
||||
@ -202,7 +271,11 @@ func (c *cluster) unregister(n *datapb.DataNodeInfo) {
|
||||
}
|
||||
log.Debug("after register policy applied", zap.Any("ret", rets), zap.Any("buffer", chanBuffer))
|
||||
c.dataManager.updateDataNodes(rets, chanBuffer)
|
||||
rets = c.watch(rets)
|
||||
rets, err = c.watch(rets)
|
||||
if err != nil {
|
||||
log.Warn("Failed to watch all the status change", zap.Error(err))
|
||||
//does not trigger new another refresh, pending evt will do
|
||||
}
|
||||
c.dataManager.updateDataNodes(rets, chanBuffer)
|
||||
}
|
||||
|
||||
@ -211,6 +284,7 @@ func (c *cluster) watchIfNeeded(channel string, collectionID UniqueID) {
|
||||
defer c.mu.Unlock()
|
||||
cNodes, chanBuffer := c.dataManager.getDataNodes(true)
|
||||
var rets []*datapb.DataNodeInfo
|
||||
var err error
|
||||
if len(cNodes) == 0 { // no nodes to assign, put into buffer
|
||||
chanBuffer = append(chanBuffer, &datapb.ChannelStatus{
|
||||
Name: channel,
|
||||
@ -221,7 +295,11 @@ func (c *cluster) watchIfNeeded(channel string, collectionID UniqueID) {
|
||||
rets = c.assignPolicy.apply(cNodes, channel, collectionID)
|
||||
}
|
||||
c.dataManager.updateDataNodes(rets, chanBuffer)
|
||||
rets = c.watch(rets)
|
||||
rets, err = c.watch(rets)
|
||||
if err != nil {
|
||||
log.Warn("Failed to watch all the status change", zap.Error(err))
|
||||
//does not trigger new another refresh, pending evt will do
|
||||
}
|
||||
c.dataManager.updateDataNodes(rets, chanBuffer)
|
||||
}
|
||||
|
||||
|
||||
@ -192,6 +192,27 @@ func (s *Server) initServiceDiscovery() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) loadDataNodes() []*datapb.DataNodeInfo {
|
||||
if s.session == nil {
|
||||
log.Warn("load data nodes but session is nil")
|
||||
return []*datapb.DataNodeInfo{}
|
||||
}
|
||||
sessions, _, err := s.session.GetSessions(typeutil.DataNodeRole)
|
||||
if err != nil {
|
||||
log.Warn("load data nodes faild", zap.Error(err))
|
||||
return []*datapb.DataNodeInfo{}
|
||||
}
|
||||
datanodes := make([]*datapb.DataNodeInfo, 0, len(sessions))
|
||||
for _, session := range sessions {
|
||||
datanodes = append(datanodes, &datapb.DataNodeInfo{
|
||||
Address: session.Address,
|
||||
Version: session.ServerID,
|
||||
Channels: []*datapb.ChannelStatus{},
|
||||
})
|
||||
}
|
||||
return datanodes
|
||||
}
|
||||
|
||||
func (s *Server) startSegmentManager() {
|
||||
helper := createNewSegmentHelper(s.segmentInfoStream)
|
||||
s.segmentManager = newSegmentManager(s.meta, s.allocator, withAllocHelper(helper))
|
||||
@ -368,12 +389,14 @@ func (s *Server) startWatchService(ctx context.Context) {
|
||||
log.Info("Received datanode register",
|
||||
zap.String("address", datanode.Address),
|
||||
zap.Int64("serverID", datanode.Version))
|
||||
s.cluster.register(datanode)
|
||||
//s.cluster.register(datanode)
|
||||
s.cluster.refresh(s.loadDataNodes())
|
||||
case sessionutil.SessionDelEvent:
|
||||
log.Info("Received datanode unregister",
|
||||
zap.String("address", datanode.Address),
|
||||
zap.Int64("serverID", datanode.Version))
|
||||
s.cluster.unregister(datanode)
|
||||
//s.cluster.unregister(datanode)
|
||||
s.cluster.refresh(s.loadDataNodes())
|
||||
default:
|
||||
log.Warn("receive unknown service event type",
|
||||
zap.Any("type", event.EventType))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user