mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 09:38:39 +08:00
fix sync distribution with wrong version (#28130)
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
a21042dde7
commit
7485eeb689
@ -185,7 +185,8 @@ func (suite *CollectionObserverSuite) SetupTest() {
|
|||||||
|
|
||||||
// Dependencies
|
// Dependencies
|
||||||
suite.dist = meta.NewDistributionManager()
|
suite.dist = meta.NewDistributionManager()
|
||||||
suite.meta = meta.NewMeta(suite.idAllocator, suite.store, session.NewNodeManager())
|
nodeMgr := session.NewNodeManager()
|
||||||
|
suite.meta = meta.NewMeta(suite.idAllocator, suite.store, nodeMgr)
|
||||||
suite.broker = meta.NewMockBroker(suite.T())
|
suite.broker = meta.NewMockBroker(suite.T())
|
||||||
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
|
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
|
||||||
suite.targetObserver = NewTargetObserver(suite.meta,
|
suite.targetObserver = NewTargetObserver(suite.meta,
|
||||||
@ -196,7 +197,7 @@ func (suite *CollectionObserverSuite) SetupTest() {
|
|||||||
suite.checkerController = &checkers.CheckerController{}
|
suite.checkerController = &checkers.CheckerController{}
|
||||||
|
|
||||||
mockCluster := session.NewMockCluster(suite.T())
|
mockCluster := session.NewMockCluster(suite.T())
|
||||||
suite.leaderObserver = NewLeaderObserver(suite.dist, suite.meta, suite.targetMgr, suite.broker, mockCluster)
|
suite.leaderObserver = NewLeaderObserver(suite.dist, suite.meta, suite.targetMgr, suite.broker, mockCluster, nodeMgr)
|
||||||
mockCluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
|
mockCluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
|
||||||
|
|
||||||
// Test object
|
// Test object
|
||||||
|
|||||||
@ -48,6 +48,7 @@ type LeaderObserver struct {
|
|||||||
target *meta.TargetManager
|
target *meta.TargetManager
|
||||||
broker meta.Broker
|
broker meta.Broker
|
||||||
cluster session.Cluster
|
cluster session.Cluster
|
||||||
|
nodeMgr *session.NodeManager
|
||||||
|
|
||||||
dispatcher *taskDispatcher[int64]
|
dispatcher *taskDispatcher[int64]
|
||||||
|
|
||||||
@ -118,6 +119,11 @@ func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64
|
|||||||
for _, replica := range replicas {
|
for _, replica := range replicas {
|
||||||
leaders := o.dist.ChannelDistManager.GetShardLeadersByReplica(replica)
|
leaders := o.dist.ChannelDistManager.GetShardLeadersByReplica(replica)
|
||||||
for ch, leaderID := range leaders {
|
for ch, leaderID := range leaders {
|
||||||
|
if ok, _ := o.nodeMgr.IsStoppingNode(leaderID); ok {
|
||||||
|
// no need to correct leader's view which is loaded on stopping node
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
leaderView := o.dist.LeaderViewManager.GetLeaderShardView(leaderID, ch)
|
leaderView := o.dist.LeaderViewManager.GetLeaderShardView(leaderID, ch)
|
||||||
if leaderView == nil {
|
if leaderView == nil {
|
||||||
continue
|
continue
|
||||||
@ -326,6 +332,7 @@ func NewLeaderObserver(
|
|||||||
targetMgr *meta.TargetManager,
|
targetMgr *meta.TargetManager,
|
||||||
broker meta.Broker,
|
broker meta.Broker,
|
||||||
cluster session.Cluster,
|
cluster session.Cluster,
|
||||||
|
nodeMgr *session.NodeManager,
|
||||||
) *LeaderObserver {
|
) *LeaderObserver {
|
||||||
ob := &LeaderObserver{
|
ob := &LeaderObserver{
|
||||||
dist: dist,
|
dist: dist,
|
||||||
@ -333,6 +340,7 @@ func NewLeaderObserver(
|
|||||||
target: targetMgr,
|
target: targetMgr,
|
||||||
broker: broker,
|
broker: broker,
|
||||||
cluster: cluster,
|
cluster: cluster,
|
||||||
|
nodeMgr: nodeMgr,
|
||||||
}
|
}
|
||||||
|
|
||||||
dispatcher := newTaskDispatcher[int64](ob.observeCollection)
|
dispatcher := newTaskDispatcher[int64](ob.observeCollection)
|
||||||
|
|||||||
@ -72,7 +72,8 @@ func (suite *LeaderObserverTestSuite) SetupTest() {
|
|||||||
// meta
|
// meta
|
||||||
store := querycoord.NewCatalog(suite.kv)
|
store := querycoord.NewCatalog(suite.kv)
|
||||||
idAllocator := RandomIncrementIDAllocator()
|
idAllocator := RandomIncrementIDAllocator()
|
||||||
suite.meta = meta.NewMeta(idAllocator, store, session.NewNodeManager())
|
nodeMgr := session.NewNodeManager()
|
||||||
|
suite.meta = meta.NewMeta(idAllocator, store, nodeMgr)
|
||||||
suite.broker = meta.NewMockBroker(suite.T())
|
suite.broker = meta.NewMockBroker(suite.T())
|
||||||
|
|
||||||
suite.mockCluster = session.NewMockCluster(suite.T())
|
suite.mockCluster = session.NewMockCluster(suite.T())
|
||||||
@ -81,7 +82,7 @@ func (suite *LeaderObserverTestSuite) SetupTest() {
|
|||||||
// }, nil).Maybe()
|
// }, nil).Maybe()
|
||||||
distManager := meta.NewDistributionManager()
|
distManager := meta.NewDistributionManager()
|
||||||
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
|
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
|
||||||
suite.observer = NewLeaderObserver(distManager, suite.meta, targetManager, suite.broker, suite.mockCluster)
|
suite.observer = NewLeaderObserver(distManager, suite.meta, targetManager, suite.broker, suite.mockCluster, nodeMgr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *LeaderObserverTestSuite) TearDownTest() {
|
func (suite *LeaderObserverTestSuite) TearDownTest() {
|
||||||
|
|||||||
@ -362,6 +362,7 @@ func (s *Server) initObserver() {
|
|||||||
s.targetMgr,
|
s.targetMgr,
|
||||||
s.broker,
|
s.broker,
|
||||||
s.cluster,
|
s.cluster,
|
||||||
|
s.nodeMgr,
|
||||||
)
|
)
|
||||||
s.targetObserver = observers.NewTargetObserver(
|
s.targetObserver = observers.NewTargetObserver(
|
||||||
s.meta,
|
s.meta,
|
||||||
|
|||||||
@ -1317,7 +1317,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
|
|||||||
|
|
||||||
// translate segment action
|
// translate segment action
|
||||||
removeActions := make([]*querypb.SyncAction, 0)
|
removeActions := make([]*querypb.SyncAction, 0)
|
||||||
addSegments := make(map[int64][]*querypb.SegmentLoadInfo)
|
group, ctx := errgroup.WithContext(ctx)
|
||||||
for _, action := range req.GetActions() {
|
for _, action := range req.GetActions() {
|
||||||
log := log.With(zap.String("Action",
|
log := log.With(zap.String("Action",
|
||||||
action.GetType().String()))
|
action.GetType().String()))
|
||||||
@ -1331,7 +1331,26 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
|
|||||||
log.Warn("sync request from legacy querycoord without load info, skip")
|
log.Warn("sync request from legacy querycoord without load info, skip")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
addSegments[action.GetNodeID()] = append(addSegments[action.GetNodeID()], action.GetInfo())
|
|
||||||
|
// to pass segment'version, we call load segment one by one
|
||||||
|
action := action
|
||||||
|
group.Go(func() error {
|
||||||
|
return shardDelegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
|
||||||
|
Base: commonpbutil.NewMsgBase(
|
||||||
|
commonpbutil.WithMsgType(commonpb.MsgType_LoadSegments),
|
||||||
|
commonpbutil.WithMsgID(req.Base.GetMsgID()),
|
||||||
|
),
|
||||||
|
Infos: []*querypb.SegmentLoadInfo{action.GetInfo()},
|
||||||
|
Schema: req.GetSchema(),
|
||||||
|
LoadMeta: req.GetLoadMeta(),
|
||||||
|
CollectionID: req.GetCollectionID(),
|
||||||
|
ReplicaID: req.GetReplicaID(),
|
||||||
|
DstNodeID: action.GetNodeID(),
|
||||||
|
Version: action.GetVersion(),
|
||||||
|
NeedTransfer: false,
|
||||||
|
LoadScope: querypb.LoadScope_Delta,
|
||||||
|
})
|
||||||
|
})
|
||||||
case querypb.SyncType_UpdateVersion:
|
case querypb.SyncType_UpdateVersion:
|
||||||
log.Info("sync action", zap.Int64("TargetVersion", action.GetTargetVersion()))
|
log.Info("sync action", zap.Int64("TargetVersion", action.GetTargetVersion()))
|
||||||
pipeline := node.pipelineManager.Get(req.GetChannel())
|
pipeline := node.pipelineManager.Get(req.GetChannel())
|
||||||
@ -1353,25 +1372,10 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for nodeID, infos := range addSegments {
|
err := group.Wait()
|
||||||
err := shardDelegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
|
if err != nil {
|
||||||
Base: commonpbutil.NewMsgBase(
|
log.Warn("failed to sync distribution", zap.Error(err))
|
||||||
commonpbutil.WithMsgType(commonpb.MsgType_LoadSegments),
|
return merr.Status(err), nil
|
||||||
commonpbutil.WithMsgID(req.Base.GetMsgID()),
|
|
||||||
),
|
|
||||||
Infos: infos,
|
|
||||||
Schema: req.GetSchema(),
|
|
||||||
LoadMeta: req.GetLoadMeta(),
|
|
||||||
CollectionID: req.GetCollectionID(),
|
|
||||||
ReplicaID: req.GetReplicaID(),
|
|
||||||
DstNodeID: nodeID,
|
|
||||||
Version: req.GetVersion(),
|
|
||||||
NeedTransfer: false,
|
|
||||||
LoadScope: querypb.LoadScope_Delta,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return merr.Status(err), nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, action := range removeActions {
|
for _, action := range removeActions {
|
||||||
|
|||||||
@ -30,6 +30,7 @@ import (
|
|||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||||
@ -45,6 +46,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/storage"
|
"github.com/milvus-io/milvus/internal/storage"
|
||||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||||
"github.com/milvus-io/milvus/internal/util/streamrpc"
|
"github.com/milvus-io/milvus/internal/util/streamrpc"
|
||||||
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||||
@ -1765,6 +1767,7 @@ func (suite *ServiceSuite) TestSyncDistribution_Normal() {
|
|||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode)
|
suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode)
|
||||||
|
|
||||||
|
// test sync targte version
|
||||||
syncVersionAction := &querypb.SyncAction{
|
syncVersionAction := &querypb.SyncAction{
|
||||||
Type: querypb.SyncType_UpdateVersion,
|
Type: querypb.SyncType_UpdateVersion,
|
||||||
SealedInTarget: []int64{3},
|
SealedInTarget: []int64{3},
|
||||||
@ -1777,6 +1780,37 @@ func (suite *ServiceSuite) TestSyncDistribution_Normal() {
|
|||||||
status, err = suite.node.SyncDistribution(ctx, req)
|
status, err = suite.node.SyncDistribution(ctx, req)
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
|
suite.Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
|
||||||
|
|
||||||
|
// test sync segments
|
||||||
|
segmentVersion := int64(111)
|
||||||
|
syncSegmentVersion := &querypb.SyncAction{
|
||||||
|
Type: querypb.SyncType_Set,
|
||||||
|
SegmentID: suite.validSegmentIDs[0],
|
||||||
|
NodeID: 0,
|
||||||
|
PartitionID: suite.partitionIDs[0],
|
||||||
|
Info: &querypb.SegmentLoadInfo{},
|
||||||
|
Version: segmentVersion,
|
||||||
|
}
|
||||||
|
req.Actions = []*querypb.SyncAction{syncSegmentVersion}
|
||||||
|
|
||||||
|
testChannel := "test_sync_segment"
|
||||||
|
req.Channel = testChannel
|
||||||
|
|
||||||
|
// expected call load segment with right segment version
|
||||||
|
var versionMatch bool
|
||||||
|
mockDelegator := delegator.NewMockShardDelegator(suite.T())
|
||||||
|
mockDelegator.EXPECT().LoadSegments(mock.Anything, mock.Anything).
|
||||||
|
RunAndReturn(func(ctx context.Context, req *querypb.LoadSegmentsRequest) error {
|
||||||
|
log.Info("version", zap.Int64("versionInload", req.GetVersion()))
|
||||||
|
versionMatch = req.GetVersion() == segmentVersion
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
suite.node.delegators.Insert(testChannel, mockDelegator)
|
||||||
|
|
||||||
|
status, err = suite.node.SyncDistribution(ctx, req)
|
||||||
|
suite.NoError(err)
|
||||||
|
suite.Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
|
||||||
|
suite.True(versionMatch)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() {
|
func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user