diff --git a/internal/querycoord/querynode.go b/internal/querycoord/querynode.go index ff04daa8bb..d561594915 100644 --- a/internal/querycoord/querynode.go +++ b/internal/querycoord/querynode.go @@ -369,7 +369,7 @@ func (qn *queryNode) getNodeInfo() (Node, error) { func (qn *queryNode) syncReplicaSegments(ctx context.Context, in *querypb.SyncReplicaSegmentsRequest) error { if !qn.isOnline() { - return errors.New("ReleaseSegments: queryNode is offline") + return errors.New("SyncSegments: queryNode is offline") } status, err := qn.client.SyncReplicaSegments(ctx, in) diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 694a6e540b..fd99d7ca0f 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -2407,19 +2407,6 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error { zap.Int("segmentNum", len(segments))) wg := errgroup.Group{} - // Remove offline nodes from replica - for replicaID := range replicas { - replicaID := replicaID - wg.Go(func() error { - log.Debug("remove offline nodes from replica", - zap.Int64("taskID", lbt.taskID), - zap.Int64("replicaID", replicaID), - zap.Int64s("offlineNodes", lbt.SourceNodeIDs)) - - return lbt.meta.applyReplicaBalancePlan( - NewRemoveBalancePlan(replicaID, lbt.SourceNodeIDs...)) - }) - } // Remove offline nodes from dmChannels for _, dmChannel := range dmChannels { @@ -2439,7 +2426,7 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error { log.Info("remove offline nodes from dmChannel", zap.Int64("taskID", lbt.getTaskID()), zap.String("dmChannel", dmChannel.DmChannel), - zap.Int64s("nodeIds", dmChannel.NodeIds)) + zap.Int64s("left nodeIds", dmChannel.NodeIds)) return nil }) @@ -2485,11 +2472,11 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error { } } - err := wg.Wait() - if err != nil { + if err := wg.Wait(); err != nil { return err } + // sync segment for replicaID := range replicas { err := syncReplicaSegments(lbt.ctx, lbt.meta, lbt.cluster, replicaID) if err != nil { @@ -2501,6 +2488,24 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error { } } + // Remove offline nodes from replica + for replicaID := range replicas { + replicaID := replicaID + wg.Go(func() error { + log.Debug("remove offline nodes from replica", + zap.Int64("taskID", lbt.taskID), + zap.Int64("replicaID", replicaID), + zap.Int64s("offlineNodes", lbt.SourceNodeIDs)) + + return lbt.meta.applyReplicaBalancePlan( + NewRemoveBalancePlan(replicaID, lbt.SourceNodeIDs...)) + }) + } + + if err := wg.Wait(); err != nil { + return err + } + for _, offlineNodeID := range lbt.SourceNodeIDs { err := lbt.cluster.RemoveNodeInfo(offlineNodeID) if err != nil { diff --git a/internal/querynode/shard_cluster.go b/internal/querynode/shard_cluster.go index d73a4c48aa..7a86b92cfe 100644 --- a/internal/querynode/shard_cluster.go +++ b/internal/querynode/shard_cluster.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "runtime" "sync" "go.uber.org/atomic" @@ -179,7 +180,7 @@ func NewShardCluster(collectionID int64, replicaID int64, vchannelName string, func (sc *ShardCluster) Close() { log.Info("Close shard cluster") sc.closeOnce.Do(func() { - sc.state.Store(int32(unavailable)) + sc.updateShardClusterState(unavailable) close(sc.closeCh) }) } @@ -242,7 +243,7 @@ func (sc *ShardCluster) removeNode(evt nodeEvent) { if segment.nodeID == evt.nodeID { segment.state = segmentStateOffline sc.segments[id] = segment - sc.state.Store(int32(unavailable)) + sc.updateShardClusterState(unavailable) } } // ignore leader process here @@ -424,15 +425,28 @@ func (sc *ShardCluster) selectNodeInReplica(nodeIDs []int64) (int64, bool) { return 0, false } +func (sc *ShardCluster) updateShardClusterState(state shardClusterState) { + old := sc.state.Load() + sc.state.Store(int32(state)) + + pc, _, _, _ := runtime.Caller(1) + callerName := runtime.FuncForPC(pc).Name() + + log.Info("Shard Cluster update state", zap.Int64("collectionID", sc.collectionID), + zap.Int64("replicaID", sc.replicaID), zap.String("channel", sc.vchannelName), + zap.Int32("old state", old), zap.Int32("new state", int32(state)), + zap.String("caller", callerName)) +} + // healthCheck iterate all segments to to check cluster could provide service. func (sc *ShardCluster) healthCheck() { for _, segment := range sc.segments { if segment.state != segmentStateLoaded { // TODO check hand-off or load balance - sc.state.Store(int32(unavailable)) + sc.updateShardClusterState(unavailable) return } } - sc.state.Store(int32(available)) + sc.updateShardClusterState(available) } // watchNodes handles node events.