mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Add the offlines nodes previously crashed to offline nodes channel (#17663)
Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
9ad67e99dd
commit
1e87b54542
@ -89,8 +89,9 @@ type QueryCoord struct {
|
||||
indexCoordClient types.IndexCoord
|
||||
broker *globalMetaBroker
|
||||
|
||||
session *sessionutil.Session
|
||||
eventChan <-chan *sessionutil.SessionEvent
|
||||
session *sessionutil.Session
|
||||
eventChan <-chan *sessionutil.SessionEvent
|
||||
offlineNodesChan chan UniqueID
|
||||
|
||||
stateCode atomic.Value
|
||||
|
||||
@ -288,10 +289,11 @@ func NewQueryCoord(ctx context.Context, factory dependency.Factory) (*QueryCoord
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
service := &QueryCoord{
|
||||
loopCtx: ctx1,
|
||||
loopCancel: cancel,
|
||||
factory: factory,
|
||||
newNodeFn: newQueryNode,
|
||||
loopCtx: ctx1,
|
||||
loopCancel: cancel,
|
||||
factory: factory,
|
||||
newNodeFn: newQueryNode,
|
||||
offlineNodesChan: make(chan UniqueID, 100),
|
||||
}
|
||||
|
||||
service.UpdateStateCode(internalpb.StateCode_Abnormal)
|
||||
@ -346,28 +348,9 @@ func (qc *QueryCoord) watchNodeLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
offlineNodeIDs := qc.cluster.OfflineNodeIDs()
|
||||
if len(offlineNodeIDs) != 0 {
|
||||
loadBalanceSegment := &querypb.LoadBalanceRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_LoadBalanceSegments,
|
||||
SourceID: qc.session.ServerID,
|
||||
},
|
||||
BalanceReason: querypb.TriggerCondition_NodeDown,
|
||||
SourceNodeIDs: offlineNodeIDs,
|
||||
}
|
||||
|
||||
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_NodeDown)
|
||||
loadBalanceTask := &loadBalanceTask{
|
||||
baseTask: baseTask,
|
||||
LoadBalanceRequest: loadBalanceSegment,
|
||||
broker: qc.broker,
|
||||
cluster: qc.cluster,
|
||||
meta: qc.meta,
|
||||
}
|
||||
//TODO::deal enqueue error
|
||||
qc.scheduler.Enqueue(loadBalanceTask)
|
||||
log.Info("start a loadBalance task", zap.Any("task", loadBalanceTask))
|
||||
go qc.loadBalanceNodeLoop(ctx)
|
||||
for _, nodeID := range qc.cluster.OfflineNodeIDs() {
|
||||
qc.offlineNodesChan <- nodeID
|
||||
}
|
||||
|
||||
// TODO silverxia add Rewatch logic
|
||||
@ -405,9 +388,6 @@ func (qc *QueryCoord) getUnallocatedNodes() []int64 {
|
||||
}
|
||||
|
||||
func (qc *QueryCoord) handleNodeEvent(ctx context.Context) {
|
||||
offlineNodeCh := make(chan UniqueID, 100)
|
||||
go qc.loadBalanceNodeLoop(ctx, offlineNodeCh)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -452,19 +432,19 @@ func (qc *QueryCoord) handleNodeEvent(ctx context.Context) {
|
||||
}
|
||||
|
||||
qc.cluster.StopNode(serverID)
|
||||
offlineNodeCh <- serverID
|
||||
qc.offlineNodesChan <- serverID
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (qc *QueryCoord) loadBalanceNodeLoop(ctx context.Context, offlineNodeCh chan UniqueID) {
|
||||
func (qc *QueryCoord) loadBalanceNodeLoop(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case node := <-offlineNodeCh:
|
||||
case node := <-qc.offlineNodesChan:
|
||||
loadBalanceSegment := &querypb.LoadBalanceRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_LoadBalanceSegments,
|
||||
@ -489,7 +469,7 @@ func (qc *QueryCoord) loadBalanceNodeLoop(ctx context.Context, offlineNodeCh cha
|
||||
log.Warn("failed to enqueue LoadBalance task into the scheduler",
|
||||
zap.Int64("nodeID", node),
|
||||
zap.Error(err))
|
||||
offlineNodeCh <- node
|
||||
qc.offlineNodesChan <- node
|
||||
continue
|
||||
}
|
||||
|
||||
@ -502,7 +482,7 @@ func (qc *QueryCoord) loadBalanceNodeLoop(ctx context.Context, offlineNodeCh cha
|
||||
log.Warn("failed to process LoadBalance task",
|
||||
zap.Int64("nodeID", node),
|
||||
zap.Error(err))
|
||||
offlineNodeCh <- node
|
||||
qc.offlineNodesChan <- node
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user