mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
issue: #30150 pr: #30151 This PR fix three problems: 1. the load request generated by leader checker doesn't set load scope 2. leader checker use wrong node id when generate release task, which cause the release task finished immediately 3. the release request generated by leader_checker doesn't set the force flag, the operation to clean leader view on delegator will fail. Signed-off-by: Wei Liu <wei.liu@zilliz.com>
432 lines
12 KiB
Go
432 lines
12 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package task
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"go.uber.org/atomic"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
|
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
|
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
|
)
|
|
|
|
type Executor struct {
|
|
doneCh chan struct{}
|
|
wg sync.WaitGroup
|
|
meta *meta.Meta
|
|
dist *meta.DistributionManager
|
|
broker meta.Broker
|
|
targetMgr *meta.TargetManager
|
|
cluster session.Cluster
|
|
nodeMgr *session.NodeManager
|
|
|
|
executingTasks *typeutil.ConcurrentSet[string] // task index
|
|
executingTaskNum atomic.Int32
|
|
}
|
|
|
|
func NewExecutor(meta *meta.Meta,
|
|
dist *meta.DistributionManager,
|
|
broker meta.Broker,
|
|
targetMgr *meta.TargetManager,
|
|
cluster session.Cluster,
|
|
nodeMgr *session.NodeManager,
|
|
) *Executor {
|
|
return &Executor{
|
|
doneCh: make(chan struct{}),
|
|
meta: meta,
|
|
dist: dist,
|
|
broker: broker,
|
|
targetMgr: targetMgr,
|
|
cluster: cluster,
|
|
nodeMgr: nodeMgr,
|
|
|
|
executingTasks: typeutil.NewConcurrentSet[string](),
|
|
}
|
|
}
|
|
|
|
func (ex *Executor) Start(ctx context.Context) {
|
|
}
|
|
|
|
func (ex *Executor) Stop() {
|
|
ex.wg.Wait()
|
|
}
|
|
|
|
// Execute executes the given action,
|
|
// does nothing and returns false if the action is already committed,
|
|
// returns true otherwise.
|
|
func (ex *Executor) Execute(task Task, step int) bool {
|
|
exist := !ex.executingTasks.Insert(task.Index())
|
|
if exist {
|
|
return false
|
|
}
|
|
if ex.executingTaskNum.Inc() > Params.QueryCoordCfg.TaskExecutionCap.GetAsInt32() {
|
|
ex.executingTasks.Remove(task.Index())
|
|
ex.executingTaskNum.Dec()
|
|
return false
|
|
}
|
|
|
|
log := log.With(
|
|
zap.Int64("taskID", task.ID()),
|
|
zap.Int64("collectionID", task.CollectionID()),
|
|
zap.Int64("replicaID", task.ReplicaID()),
|
|
zap.Int("step", step),
|
|
zap.String("source", task.Source().String()),
|
|
)
|
|
|
|
go func() {
|
|
log.Info("execute the action of task")
|
|
switch task.Actions()[step].(type) {
|
|
case *SegmentAction:
|
|
ex.executeSegmentAction(task.(*SegmentTask), step)
|
|
|
|
case *ChannelAction:
|
|
ex.executeDmChannelAction(task.(*ChannelTask), step)
|
|
}
|
|
}()
|
|
|
|
return true
|
|
}
|
|
|
|
func (ex *Executor) removeTask(task Task, step int) {
|
|
if task.Err() != nil {
|
|
log.Info("execute action done, remove it",
|
|
zap.Int64("taskID", task.ID()),
|
|
zap.Int("step", step),
|
|
zap.Error(task.Err()))
|
|
}
|
|
|
|
ex.executingTasks.Remove(task.Index())
|
|
ex.executingTaskNum.Dec()
|
|
}
|
|
|
|
func (ex *Executor) executeSegmentAction(task *SegmentTask, step int) {
|
|
switch task.Actions()[step].Type() {
|
|
case ActionTypeGrow, ActionTypeUpdate:
|
|
ex.loadSegment(task, step)
|
|
|
|
case ActionTypeReduce:
|
|
ex.releaseSegment(task, step)
|
|
}
|
|
}
|
|
|
|
// loadSegment commits the request to merger,
|
|
// not really executes the request
|
|
func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
|
|
action := task.Actions()[step].(*SegmentAction)
|
|
defer action.rpcReturned.Store(true)
|
|
ctx := task.Context()
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64("taskID", task.ID()),
|
|
zap.Int64("collectionID", task.CollectionID()),
|
|
zap.Int64("replicaID", task.ReplicaID()),
|
|
zap.Int64("segmentID", task.segmentID),
|
|
zap.Int64("node", action.Node()),
|
|
zap.String("source", task.Source().String()),
|
|
)
|
|
|
|
var err error
|
|
defer func() {
|
|
if err != nil {
|
|
task.Fail(err)
|
|
}
|
|
ex.removeTask(task, step)
|
|
}()
|
|
|
|
schema, err := ex.broker.GetCollectionSchema(ctx, task.CollectionID())
|
|
if err != nil {
|
|
log.Warn("failed to get schema of collection", zap.Error(err))
|
|
return err
|
|
}
|
|
partitions, err := utils.GetPartitions(ex.meta.CollectionManager, task.CollectionID())
|
|
if err != nil {
|
|
log.Warn("failed to get partitions of collection", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
loadMeta := packLoadMeta(
|
|
ex.meta.GetLoadType(task.CollectionID()),
|
|
"",
|
|
task.CollectionID(),
|
|
partitions...,
|
|
)
|
|
|
|
// get channel first, in case of target updated after segment info fetched
|
|
channel := ex.targetMgr.GetDmChannel(task.CollectionID(), task.shard, meta.NextTargetFirst)
|
|
if channel == nil {
|
|
return merr.WrapErrChannelNotAvailable(task.shard)
|
|
}
|
|
resp, err := ex.broker.GetSegmentInfo(ctx, task.SegmentID())
|
|
if err != nil || len(resp.GetInfos()) == 0 {
|
|
log.Warn("failed to get segment info from DataCoord", zap.Error(err))
|
|
return err
|
|
}
|
|
segment := resp.GetInfos()[0]
|
|
|
|
indexes, err := ex.broker.GetIndexInfo(ctx, task.CollectionID(), segment.GetID())
|
|
if err != nil {
|
|
if !errors.Is(err, merr.ErrIndexNotFound) {
|
|
log.Warn("failed to get index of segment", zap.Error(err))
|
|
return err
|
|
}
|
|
indexes = nil
|
|
}
|
|
|
|
loadInfo := utils.PackSegmentLoadInfo(resp, channel.GetSeekPosition(), indexes)
|
|
|
|
// Get collection index info
|
|
indexInfo, err := ex.broker.DescribeIndex(ctx, task.CollectionID())
|
|
if err != nil {
|
|
log.Warn("fail to get index meta of collection")
|
|
return err
|
|
}
|
|
|
|
req := packLoadSegmentRequest(
|
|
task,
|
|
action,
|
|
schema,
|
|
loadMeta,
|
|
loadInfo,
|
|
indexInfo,
|
|
)
|
|
|
|
// Get shard leader for the given replica and segment
|
|
leaderID, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), segment.GetInsertChannel())
|
|
if !ok {
|
|
msg := "no shard leader for the segment to execute loading"
|
|
err = merr.WrapErrChannelNotFound(segment.GetInsertChannel(), "shard delegator not found")
|
|
log.Warn(msg, zap.Error(err))
|
|
return err
|
|
}
|
|
log = log.With(zap.Int64("shardLeader", leaderID))
|
|
|
|
startTs := time.Now()
|
|
log.Info("load segments...")
|
|
status, err := ex.cluster.LoadSegments(task.Context(), leaderID, req)
|
|
if err != nil {
|
|
log.Warn("failed to load segment", zap.Error(err))
|
|
return err
|
|
}
|
|
if !merr.Ok(status) {
|
|
err = merr.Error(status)
|
|
log.Warn("failed to load segment", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
elapsed := time.Since(startTs)
|
|
log.Info("load segments done", zap.Duration("elapsed", elapsed))
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
|
|
defer ex.removeTask(task, step)
|
|
startTs := time.Now()
|
|
action := task.Actions()[step].(*SegmentAction)
|
|
defer action.rpcReturned.Store(true)
|
|
|
|
log := log.With(
|
|
zap.Int64("taskID", task.ID()),
|
|
zap.Int64("collectionID", task.CollectionID()),
|
|
zap.Int64("replicaID", task.ReplicaID()),
|
|
zap.Int64("segmentID", task.segmentID),
|
|
zap.Int64("node", action.Node()),
|
|
zap.String("source", task.Source().String()),
|
|
)
|
|
|
|
ctx := task.Context()
|
|
|
|
dstNode := action.Node()
|
|
req := packReleaseSegmentRequest(task, action)
|
|
if action.Scope() == querypb.DataScope_Streaming {
|
|
// Any modification to the segment distribution have to set NeedTransfer true,
|
|
// to protect the version, which serves search/query
|
|
req.NeedTransfer = true
|
|
} else {
|
|
req.Shard = task.shard
|
|
|
|
if ex.meta.CollectionManager.Exist(task.CollectionID()) {
|
|
leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), req.GetShard())
|
|
if !ok {
|
|
log.Warn("no shard leader for the segment to execute releasing", zap.String("shard", req.GetShard()))
|
|
return
|
|
}
|
|
dstNode = leader
|
|
log = log.With(zap.Int64("shardLeader", leader))
|
|
req.NeedTransfer = true
|
|
}
|
|
}
|
|
|
|
log.Info("release segment...")
|
|
status, err := ex.cluster.ReleaseSegments(ctx, dstNode, req)
|
|
if err != nil {
|
|
log.Warn("failed to release segment, it may be a false failure", zap.Error(err))
|
|
return
|
|
}
|
|
if status.GetErrorCode() != commonpb.ErrorCode_Success {
|
|
log.Warn("failed to release segment", zap.String("reason", status.GetReason()))
|
|
return
|
|
}
|
|
elapsed := time.Since(startTs)
|
|
log.Info("release segment done", zap.Int64("taskID", task.ID()), zap.Duration("time taken", elapsed))
|
|
}
|
|
|
|
func (ex *Executor) executeDmChannelAction(task *ChannelTask, step int) {
|
|
switch task.Actions()[step].Type() {
|
|
case ActionTypeGrow:
|
|
ex.subDmChannel(task, step)
|
|
|
|
case ActionTypeReduce:
|
|
ex.unsubDmChannel(task, step)
|
|
}
|
|
}
|
|
|
|
func (ex *Executor) subDmChannel(task *ChannelTask, step int) error {
|
|
defer ex.removeTask(task, step)
|
|
startTs := time.Now()
|
|
action := task.Actions()[step].(*ChannelAction)
|
|
log := log.With(
|
|
zap.Int64("taskID", task.ID()),
|
|
zap.Int64("collectionID", task.CollectionID()),
|
|
zap.Int64("replicaID", task.ReplicaID()),
|
|
zap.String("channel", task.Channel()),
|
|
zap.Int64("node", action.Node()),
|
|
zap.String("source", task.Source().String()),
|
|
)
|
|
|
|
var err error
|
|
defer func() {
|
|
if err != nil {
|
|
task.Fail(err)
|
|
}
|
|
}()
|
|
|
|
ctx := task.Context()
|
|
|
|
schema, err := ex.broker.GetCollectionSchema(ctx, task.CollectionID())
|
|
if err != nil {
|
|
log.Warn("failed to get schema of collection")
|
|
return err
|
|
}
|
|
partitions, err := utils.GetPartitions(ex.meta.CollectionManager, task.CollectionID())
|
|
if err != nil {
|
|
log.Warn("failed to get partitions of collection")
|
|
return err
|
|
}
|
|
indexInfo, err := ex.broker.DescribeIndex(ctx, task.CollectionID())
|
|
if err != nil {
|
|
log.Warn("fail to get index meta of collection")
|
|
return err
|
|
}
|
|
metricType, err := getMetricType(indexInfo, schema)
|
|
if err != nil {
|
|
log.Warn("failed to get metric type", zap.Error(err))
|
|
return err
|
|
}
|
|
loadMeta := packLoadMeta(
|
|
ex.meta.GetLoadType(task.CollectionID()),
|
|
metricType,
|
|
task.CollectionID(),
|
|
partitions...,
|
|
)
|
|
|
|
dmChannel := ex.targetMgr.GetDmChannel(task.CollectionID(), action.ChannelName(), meta.NextTarget)
|
|
if dmChannel == nil {
|
|
msg := "channel does not exist in next target, skip it"
|
|
log.Warn(msg, zap.String("channelName", action.ChannelName()))
|
|
return merr.WrapErrChannelReduplicate(action.ChannelName())
|
|
}
|
|
req := packSubChannelRequest(task, action, schema, loadMeta, dmChannel, indexInfo)
|
|
err = fillSubChannelRequest(ctx, req, ex.broker)
|
|
if err != nil {
|
|
log.Warn("failed to subscribe channel, failed to fill the request with segments",
|
|
zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
ts := dmChannel.GetSeekPosition().GetTimestamp()
|
|
log.Info("subscribe channel...",
|
|
zap.Uint64("checkpoint", ts),
|
|
zap.Duration("sinceCheckpoint", time.Since(tsoutil.PhysicalTime(ts))),
|
|
)
|
|
status, err := ex.cluster.WatchDmChannels(ctx, action.Node(), req)
|
|
if err != nil {
|
|
log.Warn("failed to subscribe channel, it may be a false failure", zap.Error(err))
|
|
return err
|
|
}
|
|
if !merr.Ok(status) {
|
|
err = merr.Error(status)
|
|
log.Warn("failed to subscribe channel", zap.Error(err))
|
|
return err
|
|
}
|
|
elapsed := time.Since(startTs)
|
|
log.Info("subscribe channel done", zap.Int64("taskID", task.ID()), zap.Duration("time taken", elapsed))
|
|
return nil
|
|
}
|
|
|
|
func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) error {
|
|
defer ex.removeTask(task, step)
|
|
startTs := time.Now()
|
|
action := task.Actions()[step].(*ChannelAction)
|
|
log := log.With(
|
|
zap.Int64("taskID", task.ID()),
|
|
zap.Int64("collectionID", task.CollectionID()),
|
|
zap.Int64("replicaID", task.ReplicaID()),
|
|
zap.String("channel", task.Channel()),
|
|
zap.Int64("node", action.Node()),
|
|
zap.String("source", task.Source().String()),
|
|
)
|
|
|
|
var err error
|
|
defer func() {
|
|
if err != nil {
|
|
task.Fail(err)
|
|
}
|
|
}()
|
|
|
|
ctx := task.Context()
|
|
|
|
req := packUnsubDmChannelRequest(task, action)
|
|
log.Info("unsubscribe channel...")
|
|
status, err := ex.cluster.UnsubDmChannel(ctx, action.Node(), req)
|
|
if err != nil {
|
|
log.Warn("failed to unsubscribe channel, it may be a false failure", zap.Error(err))
|
|
return err
|
|
}
|
|
if !merr.Ok(status) {
|
|
err = merr.Error(status)
|
|
log.Warn("failed to unsubscribe channel", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
elapsed := time.Since(startTs)
|
|
log.Info("unsubscribe channel done", zap.Int64("taskID", task.ID()), zap.Duration("time taken", elapsed))
|
|
return nil
|
|
}
|