fix: remove redundant replica recover (#32985)

issue: #22288 

- replica recover should be only triggered by replica recover

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
chyezh 2024-05-13 15:25:32 +08:00 committed by GitHub
parent f6777267e3
commit 293f14a8b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 21 additions and 19 deletions

View File

@ -356,6 +356,7 @@ func (m *ReplicaManager) RecoverNodesInCollection(collectionID typeutil.UniqueID
mutableReplica.AddRWNode(incomingNode...) // unused -> rw
log.Info(
"new replica recovery found",
zap.Int64("replicaID", assignment.GetReplicaID()),
zap.Int64s("newRONodes", roNodes),
zap.Int64s("roToRWNodes", recoverableNodes),
zap.Int64s("newIncomingNodes", incomingNode))

View File

@ -25,7 +25,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
@ -123,8 +122,5 @@ func (ob *ResourceObserver) checkAndRecoverResourceGroup() {
}
}
}
if enableRGAutoRecover {
utils.RecoverAllCollection(ob.meta)
}
log.Debug("check resource group done", zap.Bool("enableRGAutoRecover", enableRGAutoRecover), zap.Int("resourceGroupNum", len(rgNames)))
}

View File

@ -996,8 +996,6 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq
log.Warn("failed to transfer node", zap.Error(err))
return merr.Status(err), nil
}
// Recover all replica on the source and target resource group.
utils.RecoverAllCollection(s.meta)
return merr.Success(), nil
}

View File

@ -526,7 +526,10 @@ func (suite *ServiceSuite) TestTransferNode() {
server.resourceObserver = observers.NewResourceObserver(server.meta)
server.resourceObserver.Start()
server.replicaObserver = observers.NewReplicaObserver(server.meta, server.dist)
server.replicaObserver.Start()
defer server.resourceObserver.Stop()
defer server.replicaObserver.Stop()
err := server.meta.ResourceManager.AddResourceGroup("rg1", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 0},
@ -556,13 +559,15 @@ func (suite *ServiceSuite) TestTransferNode() {
})
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode)
time.Sleep(100 * time.Millisecond)
nodes, err := server.meta.ResourceManager.GetNodes("rg1")
suite.NoError(err)
suite.Len(nodes, 1)
nodesInReplica := server.meta.ReplicaManager.Get(1).GetNodes()
suite.Len(nodesInReplica, 1)
suite.Eventually(func() bool {
nodes, err := server.meta.ResourceManager.GetNodes("rg1")
if err != nil || len(nodes) != 1 {
return false
}
nodesInReplica := server.meta.ReplicaManager.Get(1).GetNodes()
return len(nodesInReplica) == 1
}, 5*time.Second, 100*time.Millisecond)
suite.meta.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
@ -632,14 +637,16 @@ func (suite *ServiceSuite) TestTransferNode() {
})
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode)
time.Sleep(100 * time.Millisecond)
nodes, err = server.meta.ResourceManager.GetNodes("rg3")
suite.NoError(err)
suite.Len(nodes, 1)
nodes, err = server.meta.ResourceManager.GetNodes("rg4")
suite.NoError(err)
suite.Len(nodes, 3)
suite.Eventually(func() bool {
nodes, err := server.meta.ResourceManager.GetNodes("rg3")
if err != nil || len(nodes) != 1 {
return false
}
nodes, err = server.meta.ResourceManager.GetNodes("rg4")
return err == nil && len(nodes) == 3
}, 5*time.Second, 100*time.Millisecond)
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
SourceResourceGroup: "rg3",
TargetResourceGroup: "rg4",