diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 5ed6f09461..4d74053c15 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -170,7 +170,7 @@ queryCoord: distPullInterval: 500 loadTimeoutSeconds: 600 checkHandoffInterval: 5000 - taskMergeCap: 8 + taskMergeCap: 16 enableActiveStandby: false # Enable active-standby # Related configuration of queryNode, used to run hybrid search between vector and scalar data. diff --git a/internal/querycoordv2/job/job.go b/internal/querycoordv2/job/job.go index 06e87aee75..8e9c145324 100644 --- a/internal/querycoordv2/job/job.go +++ b/internal/querycoordv2/job/job.go @@ -207,6 +207,7 @@ func (job *LoadCollectionJob) Execute() error { Status: querypb.LoadStatus_Loading, }, CreatedAt: time.Now(), + UpdatedAt: time.Now(), }) if err != nil { msg := "failed to store collection" diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index dd45f1210a..7cf2ae38c4 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -15,6 +15,7 @@ type Collection struct { *querypb.CollectionLoadInfo LoadPercentage int32 CreatedAt time.Time + UpdatedAt time.Time } func (collection *Collection) Clone() *Collection { @@ -27,6 +28,7 @@ type Partition struct { *querypb.PartitionLoadInfo LoadPercentage int32 CreatedAt time.Time + UpdatedAt time.Time } func (partition *Partition) Clone() *Partition { @@ -279,6 +281,7 @@ func (m *CollectionManager) putCollection(collection *Collection, withSave bool) return err } } + collection.UpdatedAt = time.Now() m.collections[collection.CollectionID] = collection return nil @@ -334,6 +337,7 @@ func (m *CollectionManager) putPartition(partitions []*Partition, withSave bool) } } for _, partition := range partitions { + partition.UpdatedAt = time.Now() m.partitions[partition.GetPartitionID()] = partition } return nil diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index d7555b0623..3384656a22 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -70,7 +70,7 @@ func (ob *CollectionObserver) observeTimeout() { collections := ob.meta.CollectionManager.GetAllCollections() for _, collection := range collections { if collection.GetStatus() != querypb.LoadStatus_Loading || - time.Now().Before(collection.CreatedAt.Add(Params.QueryCoordCfg.LoadTimeoutSeconds)) { + time.Now().Before(collection.UpdatedAt.Add(Params.QueryCoordCfg.LoadTimeoutSeconds)) { continue } @@ -172,6 +172,9 @@ func (ob *CollectionObserver) observeCollectionLoadStatus(collection *meta.Colle updated := collection.Clone() updated.LoadPercentage = int32(loadedCount * 100 / targetNum) + if updated.LoadPercentage <= collection.LoadPercentage { + return + } if loadedCount >= len(segmentTargets)+len(channelTargets) { updated.Status = querypb.LoadStatus_Loaded ob.meta.CollectionManager.UpdateCollection(updated) @@ -181,11 +184,9 @@ func (ob *CollectionObserver) observeCollectionLoadStatus(collection *meta.Colle } else { ob.meta.CollectionManager.UpdateCollectionInMemory(updated) } - if updated.LoadPercentage != collection.LoadPercentage { - log.Info("collection load status updated", - zap.Int32("loadPercentage", updated.LoadPercentage), - zap.Int32("collectionStatus", int32(updated.GetStatus()))) - } + log.Info("collection load status updated", + zap.Int32("loadPercentage", updated.LoadPercentage), + zap.Int32("collectionStatus", int32(updated.GetStatus()))) } func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partition) { @@ -230,18 +231,21 @@ func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partiti zap.Int("load-segment-count", loadedCount-subChannelCount)) } - partition = partition.Clone() - partition.LoadPercentage = int32(loadedCount * 100 / targetNum) + updated := partition.Clone() + updated.LoadPercentage = int32(loadedCount * 100 / targetNum) + if updated.LoadPercentage <= partition.LoadPercentage { + return + } if loadedCount >= len(segmentTargets)+len(channelTargets) { - partition.Status = querypb.LoadStatus_Loaded - ob.meta.CollectionManager.PutPartition(partition) + updated.Status = querypb.LoadStatus_Loaded + ob.meta.CollectionManager.UpdatePartition(updated) - elapsed := time.Since(partition.CreatedAt) + elapsed := time.Since(updated.CreatedAt) metrics.QueryCoordLoadLatency.WithLabelValues().Observe(float64(elapsed.Milliseconds())) } else { - ob.meta.CollectionManager.UpdatePartitionInMemory(partition) + ob.meta.CollectionManager.UpdatePartitionInMemory(updated) } log.Info("partition load status updated", - zap.Int32("loadPercentage", partition.LoadPercentage), - zap.Int32("partitionStatus", int32(partition.GetStatus()))) + zap.Int32("loadPercentage", updated.LoadPercentage), + zap.Int32("partitionStatus", int32(updated.GetStatus()))) } diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index c5c7d77ef4..3db655f8f1 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -3,7 +3,6 @@ package task import ( "context" "sync" - "time" "github.com/milvus-io/milvus/api/commonpb" "github.com/milvus-io/milvus/internal/log" @@ -14,10 +13,6 @@ import ( "go.uber.org/zap" ) -const ( - actionTimeout = 120 * time.Second -) - type actionIndex struct { Task int64 Step int @@ -155,9 +150,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) { } log.Info("load segments...") - ctx, cancel := context.WithTimeout(task.Context(), actionTimeout) - status, err := ex.cluster.LoadSegments(ctx, leader, mergeTask.req) - cancel() + status, err := ex.cluster.LoadSegments(task.Context(), leader, mergeTask.req) if err != nil { log.Warn("failed to load segment, it may be a false failure", zap.Error(err)) return @@ -196,7 +189,7 @@ func (ex *Executor) executeSegmentAction(task *SegmentTask, step int) { // loadSegment commits the request to merger, // not really executes the request -func (ex *Executor) loadSegment(task *SegmentTask, step int) { +func (ex *Executor) loadSegment(task *SegmentTask, step int) error { action := task.Actions()[step].(*SegmentAction) log := log.With( zap.Int64("taskID", task.ID()), @@ -206,25 +199,26 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) { zap.Int64("source", task.SourceID()), ) - shouldRemoveAction := true + var err error defer func() { - if shouldRemoveAction { + if err != nil { + task.SetErr(err) + task.Cancel() ex.removeAction(task, step) } }() - ctx, cancel := context.WithTimeout(task.Context(), actionTimeout) - defer cancel() - + ctx := task.Context() schema, err := ex.broker.GetCollectionSchema(ctx, task.CollectionID()) if err != nil { log.Warn("failed to get schema of collection", zap.Error(err)) - return + task.SetErr(err) + return err } partitions, err := utils.GetPartitions(ex.meta.CollectionManager, ex.broker, task.CollectionID()) if err != nil { log.Warn("failed to get partitions of collection", zap.Error(err)) - return + return err } loadMeta := packLoadMeta( ex.meta.GetLoadType(task.CollectionID()), @@ -234,13 +228,13 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) { segments, err := ex.broker.GetSegmentInfo(ctx, task.SegmentID()) if err != nil || len(segments) == 0 { log.Warn("failed to get segment info from DataCoord", zap.Error(err)) - return + return err } segment := segments[0] indexes, err := ex.broker.GetIndexInfo(ctx, task.CollectionID(), segment.GetID()) if err != nil { log.Warn("failed to get index of segment", zap.Error(err)) - return + return err } loadInfo := utils.PackSegmentLoadInfo(segment, indexes) @@ -248,9 +242,9 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) { leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), segment.GetInsertChannel()) if !ok { msg := "no shard leader for the segment to execute loading" - task.SetErr(utils.WrapError(msg, ErrTaskStale)) + err = utils.WrapError(msg, ErrTaskStale) log.Warn(msg, zap.String("shard", segment.GetInsertChannel())) - return + return err } log = log.With(zap.Int64("shardLeader", leader)) @@ -258,7 +252,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) { loadTask := NewLoadSegmentsTask(task, step, req) ex.merger.Add(loadTask) log.Info("load segment task committed") - shouldRemoveAction = false + return nil } func (ex *Executor) releaseSegment(task *SegmentTask, step int) { @@ -275,8 +269,7 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) { zap.Int64("source", task.SourceID()), ) - ctx, cancel := context.WithTimeout(task.Context(), actionTimeout) - defer cancel() + ctx := task.Context() dstNode := action.Node() req := packReleaseSegmentRequest(task, action) @@ -334,7 +327,7 @@ func (ex *Executor) executeDmChannelAction(task *ChannelTask, step int) { } } -func (ex *Executor) subDmChannel(task *ChannelTask, step int) { +func (ex *Executor) subDmChannel(task *ChannelTask, step int) error { defer ex.removeAction(task, step) action := task.Actions()[step].(*ChannelAction) @@ -346,18 +339,25 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) { zap.Int64("source", task.SourceID()), ) - ctx, cancel := context.WithTimeout(task.Context(), actionTimeout) - defer cancel() + var err error + defer func() { + if err != nil { + task.SetErr(err) + task.Cancel() + } + }() + + ctx := task.Context() schema, err := ex.broker.GetCollectionSchema(ctx, task.CollectionID()) if err != nil { log.Warn("failed to get schema of collection") - return + return err } partitions, err := utils.GetPartitions(ex.meta.CollectionManager, ex.broker, task.CollectionID()) if err != nil { log.Warn("failed to get partitions of collection") - return + return err } loadMeta := packLoadMeta( ex.meta.GetLoadType(task.CollectionID()), @@ -371,22 +371,24 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) { if err != nil { log.Warn("failed to subscribe DmChannel, failed to fill the request with segments", zap.Error(err)) - return + return err } log.Info("subscribe channel...") status, err := ex.cluster.WatchDmChannels(ctx, action.Node(), req) if err != nil { log.Warn("failed to subscribe DmChannel, it may be a false failure", zap.Error(err)) - return + return err } if status.ErrorCode != commonpb.ErrorCode_Success { + err = utils.WrapError("failed to subscribe DmChannel", ErrFailedResponse) log.Warn("failed to subscribe DmChannel", zap.String("reason", status.GetReason())) - return + return err } log.Info("subscribe DmChannel done") + return nil } -func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) { +func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) error { defer ex.removeAction(task, step) action := task.Actions()[step].(*ChannelAction) @@ -398,17 +400,26 @@ func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) { zap.Int64("source", task.SourceID()), ) - ctx, cancel := context.WithTimeout(task.Context(), actionTimeout) - defer cancel() + var err error + defer func() { + if err != nil { + task.SetErr(err) + task.Cancel() + } + }() + + ctx := task.Context() req := packUnsubDmChannelRequest(task, action) status, err := ex.cluster.UnsubDmChannel(ctx, action.Node(), req) if err != nil { log.Warn("failed to unsubscribe DmChannel, it may be a false failure", zap.Error(err)) - return + return err } if status.ErrorCode != commonpb.ErrorCode_Success { + err = utils.WrapError("failed to unsubscribe DmChannel", ErrFailedResponse) log.Warn("failed to unsubscribe DmChannel", zap.String("reason", status.GetReason())) - return + return err } + return nil } diff --git a/internal/querycoordv2/task/merger.go b/internal/querycoordv2/task/merger.go index fe66886c56..d6714db250 100644 --- a/internal/querycoordv2/task/merger.go +++ b/internal/querycoordv2/task/merger.go @@ -12,7 +12,7 @@ import ( ) // Merger merges tasks with the same mergeID. -const waitQueueCap = 128 +const waitQueueCap = 256 type Merger[K comparable, R any] struct { stopCh chan struct{} diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 493fb99ddb..1ec2aaea1d 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -37,6 +37,8 @@ var ( ErrResourceNotEnough = errors.New("ResourceNotEnough") ErrTaskQueueFull = errors.New("TaskQueueFull") + + ErrFailedResponse = errors.New("RpcFailed") ) type Type = int32 @@ -478,7 +480,9 @@ func (scheduler *taskScheduler) process(task Task) bool { task.SetStatus(TaskStatusSucceeded) } else if scheduler.checkCanceled(task) { task.SetStatus(TaskStatusCanceled) - task.SetErr(ErrTaskCanceled) + if task.Err() == nil { + task.SetErr(ErrTaskCanceled) + } } else if scheduler.checkStale(task) { task.SetStatus(TaskStatusStale) task.SetErr(ErrTaskStale) diff --git a/internal/querycoordv2/task/task.go b/internal/querycoordv2/task/task.go index cf05a032f1..a7fe6d3ef6 100644 --- a/internal/querycoordv2/task/task.go +++ b/internal/querycoordv2/task/task.go @@ -75,8 +75,8 @@ type baseTask struct { step int } -func newBaseTask(ctx context.Context, timeout time.Duration, sourceID, collectionID, replicaID UniqueID, shard string) *baseTask { - ctx, cancel := context.WithTimeout(ctx, timeout) +func newBaseTask(ctx context.Context, sourceID, collectionID, replicaID UniqueID, shard string) *baseTask { + ctx, cancel := context.WithCancel(ctx) return &baseTask{ sourceID: sourceID, @@ -237,7 +237,7 @@ func NewSegmentTask(ctx context.Context, } } - base := newBaseTask(ctx, timeout, sourceID, collectionID, replicaID, shard) + base := newBaseTask(ctx, sourceID, collectionID, replicaID, shard) base.actions = actions return &SegmentTask{ baseTask: base, @@ -287,7 +287,7 @@ func NewChannelTask(ctx context.Context, } } - base := newBaseTask(ctx, timeout, sourceID, collectionID, replicaID, channel) + base := newBaseTask(ctx, sourceID, collectionID, replicaID, channel) base.actions = actions return &ChannelTask{ baseTask: base, diff --git a/internal/querycoordv2/task/utils.go b/internal/querycoordv2/task/utils.go index fcc89e2616..d662f86e49 100644 --- a/internal/querycoordv2/task/utils.go +++ b/internal/querycoordv2/task/utils.go @@ -4,17 +4,13 @@ import ( "context" "time" - "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/api/commonpb" "github.com/milvus-io/milvus/api/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" - . "github.com/milvus-io/milvus/internal/querycoordv2/params" - "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/typeutil" - "github.com/samber/lo" ) func Wait(ctx context.Context, timeout time.Duration, tasks ...Task) error { @@ -174,64 +170,3 @@ func getShardLeader(replicaMgr *meta.ReplicaManager, distMgr *meta.DistributionM } return distMgr.GetShardLeader(replica, channel) } - -func getSegmentDeltaPositions(ctx context.Context, targetMgr *meta.TargetManager, broker meta.Broker, collectionID, partitionID int64, channel string) ([]*internalpb.MsgPosition, error) { - deltaChannelName, err := funcutil.ConvertChannelName(channel, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta) - if err != nil { - return nil, err - } - - // vchannels, _, err := broker.GetRecoveryInfo(ctx, collectionID, partitionID) - // if err != nil { - // return nil, err - // } - - deltaChannels := make([]*datapb.VchannelInfo, 0) - for _, info := range targetMgr.GetDmChannelsByCollection(collectionID) { - deltaChannelInfo, err := generatDeltaChannelInfo(info.VchannelInfo) - if err != nil { - return nil, err - } - if deltaChannelInfo.ChannelName == deltaChannelName { - deltaChannels = append(deltaChannels, deltaChannelInfo) - } - } - deltaChannels = mergeWatchDeltaChannelInfo(deltaChannels) - - return lo.Map(deltaChannels, func(channel *datapb.VchannelInfo, _ int) *internalpb.MsgPosition { - return channel.GetSeekPosition() - }), nil -} - -func generatDeltaChannelInfo(info *datapb.VchannelInfo) (*datapb.VchannelInfo, error) { - deltaChannelName, err := funcutil.ConvertChannelName(info.ChannelName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta) - if err != nil { - return nil, err - } - deltaChannel := proto.Clone(info).(*datapb.VchannelInfo) - deltaChannel.ChannelName = deltaChannelName - deltaChannel.UnflushedSegmentIds = nil - deltaChannel.FlushedSegmentIds = nil - deltaChannel.DroppedSegmentIds = nil - return deltaChannel, nil -} - -func mergeWatchDeltaChannelInfo(infos []*datapb.VchannelInfo) []*datapb.VchannelInfo { - minPositions := make(map[string]int) - for index, info := range infos { - _, ok := minPositions[info.ChannelName] - if !ok { - minPositions[info.ChannelName] = index - } - minTimeStampIndex := minPositions[info.ChannelName] - if info.SeekPosition.GetTimestamp() < infos[minTimeStampIndex].SeekPosition.GetTimestamp() { - minPositions[info.ChannelName] = index - } - } - var result []*datapb.VchannelInfo - for _, index := range minPositions { - result = append(result, infos[index]) - } - - return result -} diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 98eb2068ef..a8c5a210c0 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -683,7 +683,7 @@ func (p *queryCoordConfig) initTaskRetryInterval() { } func (p *queryCoordConfig) initTaskMergeCap() { - p.TaskMergeCap = p.Base.ParseInt32WithDefault("queryCoord.taskMergeCap", 8) + p.TaskMergeCap = p.Base.ParseInt32WithDefault("queryCoord.taskMergeCap", 16) } func (p *queryCoordConfig) initAutoHandoff() {