fix: Fix channel not available error and release collection blocking (#45428)

1. Ensure replica creation is idempotent.
2. Prevent currentTarget update when replica is missing.
3. Move the wait-for-release logic into the DDL framework's callback,
and add a timeout to prevent it from blocking the DDL callback
indefinitely.

issue: https://github.com/milvus-io/milvus/issues/45301,
https://github.com/milvus-io/milvus/issues/45274,
https://github.com/milvus-io/milvus/issues/45295

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-11-12 18:55:37 +08:00 committed by GitHub
parent 28d0755aaa
commit cabc47ce01
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 92 additions and 48 deletions

View File

@ -28,7 +28,7 @@ import (
func (s *Server) alterLoadConfigV2AckCallback(ctx context.Context, result message.BroadcastResultAlterLoadConfigMessageV2) error {
// currently, we only sent the put load config message to the control channel
// TODO: after we support query view in 3.0, we should broadcast the put load config message to all vchannels.
job := job.NewLoadCollectionJob(ctx, result, s.dist, s.meta, s.broker, s.targetMgr, s.targetObserver, s.collectionObserver, s.nodeMgr)
job := job.NewLoadCollectionJob(ctx, result, s.dist, s.meta, s.broker, s.targetMgr, s.targetObserver, s.collectionObserver, s.checkerController, s.nodeMgr)
if err := job.Execute(); err != nil {
return err
}

View File

@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
@ -50,6 +51,7 @@ type LoadCollectionJob struct {
targetMgr meta.TargetManagerInterface
targetObserver *observers.TargetObserver
collectionObserver *observers.CollectionObserver
checkerController *checkers.CheckerController
nodeMgr *session.NodeManager
}
@ -62,6 +64,7 @@ func NewLoadCollectionJob(
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
collectionObserver *observers.CollectionObserver,
checkerController *checkers.CheckerController,
nodeMgr *session.NodeManager,
) *LoadCollectionJob {
return &LoadCollectionJob{
@ -74,6 +77,7 @@ func NewLoadCollectionJob(
targetMgr: targetMgr,
targetObserver: targetObserver,
collectionObserver: collectionObserver,
checkerController: checkerController,
nodeMgr: nodeMgr,
}
}
@ -161,6 +165,12 @@ func (job *LoadCollectionJob) Execute() error {
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Start load collection %d", collection.CollectionID)))
metrics.QueryCoordNumPartitions.WithLabelValues().Add(float64(len(partitions)))
log.Info("put collection and partitions done",
zap.Int64("collectionID", req.GetCollectionId()),
zap.Int64s("partitions", req.GetPartitionIds()),
zap.Int64s("toReleasePartitions", toReleasePartitions),
)
// 5. update next target, no need to rollback if pull target failed, target observer will pull target in periodically
if _, err = job.targetObserver.UpdateNextTarget(req.GetCollectionId()); err != nil {
return err
@ -168,5 +178,20 @@ func (job *LoadCollectionJob) Execute() error {
// 6. register load task into collection observer
job.collectionObserver.LoadPartitions(ctx, req.GetCollectionId(), incomingPartitions.Collect())
// 7. wait for partition released if any partition is released
if len(toReleasePartitions) > 0 {
if err = WaitCurrentTargetUpdated(ctx, job.targetObserver, req.GetCollectionId()); err != nil {
log.Warn("failed to wait current target updated", zap.Error(err))
// return nil to avoid infinite retry on DDL callback
return nil
}
if err = WaitCollectionReleased(ctx, job.dist, job.checkerController, req.GetCollectionId(), toReleasePartitions...); err != nil {
log.Warn("failed to wait partition released", zap.Error(err))
// return nil to avoid infinite retry on DDL callback
return nil
}
log.Info("wait for partition released done", zap.Int64s("toReleasePartitions", toReleasePartitions))
}
return nil
}

View File

@ -105,5 +105,12 @@ func (job *ReleaseCollectionJob) Execute() error {
job.proxyManager.InvalidateShardLeaderCache(job.ctx, &proxypb.InvalidateShardLeaderCacheRequest{
CollectionIDs: []int64{collectionID},
})
if err = WaitCollectionReleased(job.ctx, job.dist, job.checkerController, collectionID); err != nil {
log.Warn("failed to wait collection released", zap.Error(err))
// return nil to avoid infinite retry on DDL callback
return nil
}
log.Info("release collection job done", zap.Int64("collectionID", collectionID))
return nil
}

View File

@ -20,6 +20,7 @@ import (
"context"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
@ -30,12 +31,24 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
const waitCollectionReleasedTimeout = 30 * time.Second
// WaitCollectionReleased blocks until
// all channels and segments of given collection(partitions) are released,
// empty partition list means wait for collection released
func WaitCollectionReleased(dist *meta.DistributionManager, checkerController *checkers.CheckerController, collection int64, partitions ...int64) {
func WaitCollectionReleased(ctx context.Context, dist *meta.DistributionManager, checkerController *checkers.CheckerController, collection int64, partitions ...int64) error {
partitionSet := typeutil.NewUniqueSet(partitions...)
var (
lastChannelCount int
lastSegmentCount int
lastChangeTime = time.Now()
)
for {
if err := ctx.Err(); err != nil {
return errors.Wrapf(err, "context error while waiting for release, collection=%d", collection)
}
var (
channels []*meta.DmChannel
segments []*meta.Segment = dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(collection))
@ -48,28 +61,45 @@ func WaitCollectionReleased(dist *meta.DistributionManager, checkerController *c
channels = dist.ChannelDistManager.GetByCollectionAndFilter(collection)
}
if len(channels)+len(segments) == 0 {
currentChannelCount := len(channels)
currentSegmentCount := len(segments)
if currentChannelCount+currentSegmentCount == 0 {
break
} else {
log.Info("wait for release done", zap.Int64("collection", collection),
zap.Int64s("partitions", partitions),
zap.Int("channel", len(channels)),
zap.Int("segments", len(segments)),
)
}
// If release is in progress, reset last change time
if currentChannelCount < lastChannelCount || currentSegmentCount < lastSegmentCount {
lastChangeTime = time.Now()
}
// If release is not in progress for a while, return error
if time.Since(lastChangeTime) > waitCollectionReleasedTimeout {
return errors.Errorf("wait collection released timeout, collection=%d, channels=%d, segments=%d",
collection, currentChannelCount, currentSegmentCount)
}
log.Ctx(ctx).Info("waitting for release...",
zap.Int64("collection", collection),
zap.Int64s("partitions", partitions),
zap.Int("channel", currentChannelCount),
zap.Int("segments", currentSegmentCount),
)
lastChannelCount = currentChannelCount
lastSegmentCount = currentSegmentCount
// trigger check more frequently
checkerController.Check()
time.Sleep(200 * time.Millisecond)
}
return nil
}
func WaitCurrentTargetUpdated(ctx context.Context, targetObserver *observers.TargetObserver, collection int64) error {
// manual trigger update next target
ready, err := targetObserver.UpdateNextTarget(collection)
if err != nil {
log.Warn("failed to update next target for sync partition job", zap.Error(err))
return err
return errors.Wrapf(err, "failed to update next target, collection=%d", collection)
}
// accelerate check
@ -79,6 +109,8 @@ func WaitCurrentTargetUpdated(ctx context.Context, targetObserver *observers.Tar
case <-ready:
return nil
case <-ctx.Done():
return ctx.Err()
return errors.Wrapf(ctx.Err(), "context error while waiting for current target updated, collection=%d", collection)
case <-time.After(waitCollectionReleasedTimeout):
return errors.Errorf("wait current target updated timeout, collection=%d", collection)
}
}

View File

@ -178,6 +178,13 @@ func (m *ReplicaManager) SpawnWithReplicaConfig(ctx context.Context, params Spaw
enableChannelExclusiveMode := balancePolicy == ChannelLevelScoreBalancerName
replicas := make([]*Replica, 0)
for _, config := range params.Configs {
if existedReplica, ok := m.replicas[config.GetReplicaId()]; ok {
// if the replica is already existed, just update the resource group
mutableReplica := existedReplica.CopyForWrite()
mutableReplica.SetResourceGroup(config.GetResourceGroupName())
replicas = append(replicas, mutableReplica.IntoReplica())
continue
}
replica := NewReplicaWithPriority(&querypb.Replica{
ID: config.GetReplicaId(),
CollectionID: params.CollectionID,
@ -189,6 +196,11 @@ func (m *ReplicaManager) SpawnWithReplicaConfig(ctx context.Context, params Spaw
replica = mutableReplica.IntoReplica()
}
replicas = append(replicas, replica)
log.Ctx(ctx).Info("spawn replica for collection",
zap.Int64("collectionID", params.CollectionID),
zap.Int64("replicaID", config.GetReplicaId()),
zap.String("resourceGroup", config.GetResourceGroupName()),
)
}
if err := m.put(ctx, replicas...); err != nil {
return nil, errors.Wrap(err, "failed to put replicas")
@ -539,6 +551,7 @@ func (m *ReplicaManager) RecoverNodesInCollection(ctx context.Context, collectio
mutableReplica.AddRWNode(incomingNode...) // unused -> rw
log.Info(
"new replica recovery found",
zap.Int64("collectionID", collectionID),
zap.Int64("replicaID", assignment.GetReplicaID()),
zap.Int64s("newRONodes", roNodes),
zap.Int64s("roToRWNodes", recoverableNodes),
@ -705,6 +718,7 @@ func (m *ReplicaManager) RecoverSQNodesInCollection(ctx context.Context, collect
mutableReplica.AddRWSQNode(incomingNode...) // unused -> rw
log.Info(
"new replica recovery streaming query node found",
zap.Int64("collectionID", collectionID),
zap.Int64("replicaID", assignment.GetReplicaID()),
zap.Int64s("newRONodes", roNodes),
zap.Int64s("roToRWNodes", recoverableNodes),

View File

@ -448,7 +448,8 @@ func (ob *TargetObserver) syncNextTargetToDelegator(ctx context.Context, collect
replica := ob.meta.ReplicaManager.GetByCollectionAndNode(ctx, collectionID, d.Node)
if replica == nil {
log.Warn("replica not found", zap.Int64("nodeID", d.Node), zap.Int64("collectionID", collectionID))
continue
// should not happen, don't update current target if replica not found
return false
}
// init all the meta information
if partitions == nil {

View File

@ -266,7 +266,6 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
}
job.WaitCollectionReleased(s.dist, s.checkerController, req.GetCollectionID())
logger.Info("release collection done")
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.QueryCoordReleaseLatency.WithLabelValues().Observe(float64(tr.ElapseSpan().Milliseconds()))
@ -354,12 +353,6 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
}
if collectionReleased {
job.WaitCollectionReleased(s.dist, s.checkerController, req.GetCollectionID())
} else {
job.WaitCurrentTargetUpdated(ctx, s.targetObserver, req.GetCollectionID())
job.WaitCollectionReleased(s.dist, s.checkerController, req.GetCollectionID(), req.GetPartitionIDs()...)
}
logger.Info("release partitions done", zap.Bool("collectionReleased", collectionReleased))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.SuccessLabel).Inc()
meta.GlobalFailedLoadCache.Remove(req.GetCollectionID())

View File

@ -223,34 +223,6 @@ func checkCollectionQueryable(ctx context.Context, m *meta.Meta, targetMgr meta.
return nil
}
func filterDupLeaders(ctx context.Context, replicaManager *meta.ReplicaManager, leaders map[int64]*meta.LeaderView) map[int64]*meta.LeaderView {
type leaderID struct {
ReplicaID int64
Shard string
}
newLeaders := make(map[leaderID]*meta.LeaderView)
for _, view := range leaders {
replica := replicaManager.GetByCollectionAndNode(ctx, view.CollectionID, view.ID)
if replica == nil {
continue
}
id := leaderID{replica.GetID(), view.Channel}
if old, ok := newLeaders[id]; ok && old.Version > view.Version {
continue
}
newLeaders[id] = view
}
result := make(map[int64]*meta.LeaderView)
for _, v := range newLeaders {
result[v.ID] = v
}
return result
}
// GetChannelRWAndRONodesFor260 gets the RW and RO nodes of the channel.
func GetChannelRWAndRONodesFor260(replica *meta.Replica, nodeManager *session.NodeManager) ([]int64, []int64) {
rwNodes, roNodes := replica.GetRWSQNodes(), replica.GetROSQNodes()