mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
issue: #37679 pr: #37694 pr #36549 introduce the logic error which update current target when only parts of channel is ready. This PR fix the logic error and let dist handler keep pull distribution on querynode until all delegator becomes serviceable. Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
e222289038
commit
1bd502b585
14
internal/querycoordv2/dist/dist_handler.go
vendored
14
internal/querycoordv2/dist/dist_handler.go
vendored
@ -38,6 +38,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
type TriggerUpdateTargetVersion = func(collectionID int64)
|
||||
@ -194,6 +195,8 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons
|
||||
channels := lo.SliceToMap(resp.GetChannels(), func(channel *querypb.ChannelVersionInfo) (string, *querypb.ChannelVersionInfo) {
|
||||
return channel.GetChannel(), channel
|
||||
})
|
||||
|
||||
collectionsToSync := typeutil.NewUniqueSet()
|
||||
for _, lview := range resp.GetLeaderViews() {
|
||||
segments := make(map[int64]*meta.Segment)
|
||||
|
||||
@ -246,9 +249,10 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons
|
||||
err := merr.WrapErrServiceInternal(fmt.Sprintf("target version mismatch, collection: %d, channel: %s, current target version: %v, leader version: %v",
|
||||
lview.GetCollection(), lview.GetChannel(), currentTargetVersion, lview.TargetVersion))
|
||||
|
||||
// segment and channel already loaded, trigger target observer to check target version
|
||||
dh.syncTargetVersionFn(lview.GetCollection())
|
||||
view.UnServiceableError = err
|
||||
// make dist handler pull next distribution until all delegator is serviceable
|
||||
dh.lastUpdateTs = 0
|
||||
collectionsToSync.Insert(lview.Collection)
|
||||
log.Info("leader is not available due to target version not ready",
|
||||
zap.Int64("collectionID", view.CollectionID),
|
||||
zap.Int64("nodeID", view.ID),
|
||||
@ -258,6 +262,12 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons
|
||||
}
|
||||
|
||||
dh.dist.LeaderViewManager.Update(resp.GetNodeID(), updates...)
|
||||
|
||||
// segment and channel already loaded, trigger target observer to update
|
||||
collectionsToSync.Range(func(collection int64) bool {
|
||||
dh.syncTargetVersionFn(collection)
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (dh *distHandler) getDistribution(ctx context.Context) (*querypb.GetDataDistributionResponse, error) {
|
||||
|
||||
@ -374,16 +374,16 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
|
||||
channelReadyLeaders := lo.Filter(ob.distMgr.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel)), func(leader *meta.LeaderView, _ int) bool {
|
||||
return utils.CheckDelegatorDataReady(ob.nodeMgr, ob.targetMgr, leader, meta.NextTarget) == nil
|
||||
})
|
||||
collectionReadyLeaders = append(collectionReadyLeaders, channelReadyLeaders...)
|
||||
|
||||
// to avoid stuck here in dynamic increase replica case, we just check available delegator number
|
||||
if int32(len(collectionReadyLeaders)) < replicaNum {
|
||||
if int32(len(channelReadyLeaders)) < replicaNum {
|
||||
log.RatedInfo(10, "channel not ready",
|
||||
zap.Int("readyReplicaNum", len(channelReadyLeaders)),
|
||||
zap.String("channelName", channel),
|
||||
)
|
||||
return false
|
||||
}
|
||||
collectionReadyLeaders = append(collectionReadyLeaders, channelReadyLeaders...)
|
||||
}
|
||||
|
||||
var collectionInfo *milvuspb.DescribeCollectionResponse
|
||||
|
||||
@ -147,7 +147,6 @@ func (s *TargetTestSuit) TestQueryCoordRestart() {
|
||||
s.initCollection(name, 1, 2, 2, 2000)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
info, err := s.Cluster.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
|
||||
Base: commonpbutil.NewMsgBase(),
|
||||
CollectionName: name,
|
||||
@ -156,6 +155,16 @@ func (s *TargetTestSuit) TestQueryCoordRestart() {
|
||||
s.True(merr.Ok(info.GetStatus()))
|
||||
collectionID := info.GetCollectionID()
|
||||
|
||||
// wait until all shards are ready
|
||||
// cause showCollections won't just wait all collection becomes loaded, proxy will use retry to block until all shard are ready
|
||||
s.Eventually(func() bool {
|
||||
resp, err := s.Cluster.QueryCoord.GetShardLeaders(ctx, &querypb.GetShardLeadersRequest{
|
||||
Base: commonpbutil.NewMsgBase(),
|
||||
CollectionID: collectionID,
|
||||
})
|
||||
return err == nil && merr.Ok(resp.GetStatus()) && len(resp.Shards) == 2
|
||||
}, 60*time.Second, 1*time.Second)
|
||||
|
||||
// trigger old coord stop
|
||||
s.Cluster.StopQueryCoord()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user