diff --git a/internal/querycoordv2/ddl_callbacks_alter_load_info.go b/internal/querycoordv2/ddl_callbacks_alter_load_info.go index 885e2a6f3f..1437d65153 100644 --- a/internal/querycoordv2/ddl_callbacks_alter_load_info.go +++ b/internal/querycoordv2/ddl_callbacks_alter_load_info.go @@ -28,7 +28,7 @@ import ( func (s *Server) alterLoadConfigV2AckCallback(ctx context.Context, result message.BroadcastResultAlterLoadConfigMessageV2) error { // currently, we only sent the put load config message to the control channel // TODO: after we support query view in 3.0, we should broadcast the put load config message to all vchannels. - job := job.NewLoadCollectionJob(ctx, result, s.dist, s.meta, s.broker, s.targetMgr, s.targetObserver, s.collectionObserver, s.nodeMgr) + job := job.NewLoadCollectionJob(ctx, result, s.dist, s.meta, s.broker, s.targetMgr, s.targetObserver, s.collectionObserver, s.checkerController, s.nodeMgr) if err := job.Execute(); err != nil { return err } diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index 5f1fd29495..7a588542fb 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/querycoordv2/checkers" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/observers" "github.com/milvus-io/milvus/internal/querycoordv2/session" @@ -50,6 +51,7 @@ type LoadCollectionJob struct { targetMgr meta.TargetManagerInterface targetObserver *observers.TargetObserver collectionObserver *observers.CollectionObserver + checkerController *checkers.CheckerController nodeMgr *session.NodeManager } @@ -62,6 +64,7 @@ func NewLoadCollectionJob( targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver, collectionObserver *observers.CollectionObserver, + checkerController *checkers.CheckerController, nodeMgr *session.NodeManager, ) *LoadCollectionJob { return &LoadCollectionJob{ @@ -74,6 +77,7 @@ func NewLoadCollectionJob( targetMgr: targetMgr, targetObserver: targetObserver, collectionObserver: collectionObserver, + checkerController: checkerController, nodeMgr: nodeMgr, } } @@ -161,6 +165,12 @@ func (job *LoadCollectionJob) Execute() error { eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Start load collection %d", collection.CollectionID))) metrics.QueryCoordNumPartitions.WithLabelValues().Add(float64(len(partitions))) + log.Info("put collection and partitions done", + zap.Int64("collectionID", req.GetCollectionId()), + zap.Int64s("partitions", req.GetPartitionIds()), + zap.Int64s("toReleasePartitions", toReleasePartitions), + ) + // 5. update next target, no need to rollback if pull target failed, target observer will pull target in periodically if _, err = job.targetObserver.UpdateNextTarget(req.GetCollectionId()); err != nil { return err @@ -168,5 +178,20 @@ func (job *LoadCollectionJob) Execute() error { // 6. register load task into collection observer job.collectionObserver.LoadPartitions(ctx, req.GetCollectionId(), incomingPartitions.Collect()) + + // 7. wait for partition released if any partition is released + if len(toReleasePartitions) > 0 { + if err = WaitCurrentTargetUpdated(ctx, job.targetObserver, req.GetCollectionId()); err != nil { + log.Warn("failed to wait current target updated", zap.Error(err)) + // return nil to avoid infinite retry on DDL callback + return nil + } + if err = WaitCollectionReleased(ctx, job.dist, job.checkerController, req.GetCollectionId(), toReleasePartitions...); err != nil { + log.Warn("failed to wait partition released", zap.Error(err)) + // return nil to avoid infinite retry on DDL callback + return nil + } + log.Info("wait for partition released done", zap.Int64s("toReleasePartitions", toReleasePartitions)) + } return nil } diff --git a/internal/querycoordv2/job/job_release.go b/internal/querycoordv2/job/job_release.go index 2862b0eeee..11c980e29b 100644 --- a/internal/querycoordv2/job/job_release.go +++ b/internal/querycoordv2/job/job_release.go @@ -105,5 +105,12 @@ func (job *ReleaseCollectionJob) Execute() error { job.proxyManager.InvalidateShardLeaderCache(job.ctx, &proxypb.InvalidateShardLeaderCacheRequest{ CollectionIDs: []int64{collectionID}, }) + + if err = WaitCollectionReleased(job.ctx, job.dist, job.checkerController, collectionID); err != nil { + log.Warn("failed to wait collection released", zap.Error(err)) + // return nil to avoid infinite retry on DDL callback + return nil + } + log.Info("release collection job done", zap.Int64("collectionID", collectionID)) return nil } diff --git a/internal/querycoordv2/job/utils.go b/internal/querycoordv2/job/utils.go index 8744b96c51..936cc1d341 100644 --- a/internal/querycoordv2/job/utils.go +++ b/internal/querycoordv2/job/utils.go @@ -20,6 +20,7 @@ import ( "context" "time" + "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" @@ -30,12 +31,24 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) +const waitCollectionReleasedTimeout = 30 * time.Second + // WaitCollectionReleased blocks until // all channels and segments of given collection(partitions) are released, // empty partition list means wait for collection released -func WaitCollectionReleased(dist *meta.DistributionManager, checkerController *checkers.CheckerController, collection int64, partitions ...int64) { +func WaitCollectionReleased(ctx context.Context, dist *meta.DistributionManager, checkerController *checkers.CheckerController, collection int64, partitions ...int64) error { partitionSet := typeutil.NewUniqueSet(partitions...) + var ( + lastChannelCount int + lastSegmentCount int + lastChangeTime = time.Now() + ) + for { + if err := ctx.Err(); err != nil { + return errors.Wrapf(err, "context error while waiting for release, collection=%d", collection) + } + var ( channels []*meta.DmChannel segments []*meta.Segment = dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(collection)) @@ -48,28 +61,45 @@ func WaitCollectionReleased(dist *meta.DistributionManager, checkerController *c channels = dist.ChannelDistManager.GetByCollectionAndFilter(collection) } - if len(channels)+len(segments) == 0 { + currentChannelCount := len(channels) + currentSegmentCount := len(segments) + if currentChannelCount+currentSegmentCount == 0 { break - } else { - log.Info("wait for release done", zap.Int64("collection", collection), - zap.Int64s("partitions", partitions), - zap.Int("channel", len(channels)), - zap.Int("segments", len(segments)), - ) } + // If release is in progress, reset last change time + if currentChannelCount < lastChannelCount || currentSegmentCount < lastSegmentCount { + lastChangeTime = time.Now() + } + + // If release is not in progress for a while, return error + if time.Since(lastChangeTime) > waitCollectionReleasedTimeout { + return errors.Errorf("wait collection released timeout, collection=%d, channels=%d, segments=%d", + collection, currentChannelCount, currentSegmentCount) + } + + log.Ctx(ctx).Info("waitting for release...", + zap.Int64("collection", collection), + zap.Int64s("partitions", partitions), + zap.Int("channel", currentChannelCount), + zap.Int("segments", currentSegmentCount), + ) + + lastChannelCount = currentChannelCount + lastSegmentCount = currentSegmentCount + // trigger check more frequently checkerController.Check() time.Sleep(200 * time.Millisecond) } + return nil } func WaitCurrentTargetUpdated(ctx context.Context, targetObserver *observers.TargetObserver, collection int64) error { // manual trigger update next target ready, err := targetObserver.UpdateNextTarget(collection) if err != nil { - log.Warn("failed to update next target for sync partition job", zap.Error(err)) - return err + return errors.Wrapf(err, "failed to update next target, collection=%d", collection) } // accelerate check @@ -79,6 +109,8 @@ func WaitCurrentTargetUpdated(ctx context.Context, targetObserver *observers.Tar case <-ready: return nil case <-ctx.Done(): - return ctx.Err() + return errors.Wrapf(ctx.Err(), "context error while waiting for current target updated, collection=%d", collection) + case <-time.After(waitCollectionReleasedTimeout): + return errors.Errorf("wait current target updated timeout, collection=%d", collection) } } diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index f7f502cb92..4f022c1839 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -178,6 +178,13 @@ func (m *ReplicaManager) SpawnWithReplicaConfig(ctx context.Context, params Spaw enableChannelExclusiveMode := balancePolicy == ChannelLevelScoreBalancerName replicas := make([]*Replica, 0) for _, config := range params.Configs { + if existedReplica, ok := m.replicas[config.GetReplicaId()]; ok { + // if the replica is already existed, just update the resource group + mutableReplica := existedReplica.CopyForWrite() + mutableReplica.SetResourceGroup(config.GetResourceGroupName()) + replicas = append(replicas, mutableReplica.IntoReplica()) + continue + } replica := NewReplicaWithPriority(&querypb.Replica{ ID: config.GetReplicaId(), CollectionID: params.CollectionID, @@ -189,6 +196,11 @@ func (m *ReplicaManager) SpawnWithReplicaConfig(ctx context.Context, params Spaw replica = mutableReplica.IntoReplica() } replicas = append(replicas, replica) + log.Ctx(ctx).Info("spawn replica for collection", + zap.Int64("collectionID", params.CollectionID), + zap.Int64("replicaID", config.GetReplicaId()), + zap.String("resourceGroup", config.GetResourceGroupName()), + ) } if err := m.put(ctx, replicas...); err != nil { return nil, errors.Wrap(err, "failed to put replicas") @@ -539,6 +551,7 @@ func (m *ReplicaManager) RecoverNodesInCollection(ctx context.Context, collectio mutableReplica.AddRWNode(incomingNode...) // unused -> rw log.Info( "new replica recovery found", + zap.Int64("collectionID", collectionID), zap.Int64("replicaID", assignment.GetReplicaID()), zap.Int64s("newRONodes", roNodes), zap.Int64s("roToRWNodes", recoverableNodes), @@ -705,6 +718,7 @@ func (m *ReplicaManager) RecoverSQNodesInCollection(ctx context.Context, collect mutableReplica.AddRWSQNode(incomingNode...) // unused -> rw log.Info( "new replica recovery streaming query node found", + zap.Int64("collectionID", collectionID), zap.Int64("replicaID", assignment.GetReplicaID()), zap.Int64s("newRONodes", roNodes), zap.Int64s("roToRWNodes", recoverableNodes), diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 68c7a008df..87c6317dc0 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -448,7 +448,8 @@ func (ob *TargetObserver) syncNextTargetToDelegator(ctx context.Context, collect replica := ob.meta.ReplicaManager.GetByCollectionAndNode(ctx, collectionID, d.Node) if replica == nil { log.Warn("replica not found", zap.Int64("nodeID", d.Node), zap.Int64("collectionID", collectionID)) - continue + // should not happen, don't update current target if replica not found + return false } // init all the meta information if partitions == nil { diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index c83468b095..f475211ce2 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -266,7 +266,6 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl metrics.QueryCoordReleaseCount.WithLabelValues(metrics.FailLabel).Inc() return merr.Status(err), nil } - job.WaitCollectionReleased(s.dist, s.checkerController, req.GetCollectionID()) logger.Info("release collection done") metrics.QueryCoordReleaseCount.WithLabelValues(metrics.SuccessLabel).Inc() metrics.QueryCoordReleaseLatency.WithLabelValues().Observe(float64(tr.ElapseSpan().Milliseconds())) @@ -354,12 +353,6 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart metrics.QueryCoordReleaseCount.WithLabelValues(metrics.FailLabel).Inc() return merr.Status(err), nil } - if collectionReleased { - job.WaitCollectionReleased(s.dist, s.checkerController, req.GetCollectionID()) - } else { - job.WaitCurrentTargetUpdated(ctx, s.targetObserver, req.GetCollectionID()) - job.WaitCollectionReleased(s.dist, s.checkerController, req.GetCollectionID(), req.GetPartitionIDs()...) - } logger.Info("release partitions done", zap.Bool("collectionReleased", collectionReleased)) metrics.QueryCoordReleaseCount.WithLabelValues(metrics.SuccessLabel).Inc() meta.GlobalFailedLoadCache.Remove(req.GetCollectionID()) diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index 169025c9ef..46e13c700a 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -223,34 +223,6 @@ func checkCollectionQueryable(ctx context.Context, m *meta.Meta, targetMgr meta. return nil } -func filterDupLeaders(ctx context.Context, replicaManager *meta.ReplicaManager, leaders map[int64]*meta.LeaderView) map[int64]*meta.LeaderView { - type leaderID struct { - ReplicaID int64 - Shard string - } - - newLeaders := make(map[leaderID]*meta.LeaderView) - for _, view := range leaders { - replica := replicaManager.GetByCollectionAndNode(ctx, view.CollectionID, view.ID) - if replica == nil { - continue - } - - id := leaderID{replica.GetID(), view.Channel} - if old, ok := newLeaders[id]; ok && old.Version > view.Version { - continue - } - - newLeaders[id] = view - } - - result := make(map[int64]*meta.LeaderView) - for _, v := range newLeaders { - result[v.ID] = v - } - return result -} - // GetChannelRWAndRONodesFor260 gets the RW and RO nodes of the channel. func GetChannelRWAndRONodesFor260(replica *meta.Replica, nodeManager *session.NodeManager) ([]int64, []int64) { rwNodes, roNodes := replica.GetRWSQNodes(), replica.GetROSQNodes()