diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index b3e64ae21f..897bb0237e 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -112,7 +112,8 @@ func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool leaderSegmentDist := distMgr.LeaderViewManager.GetSealedSegmentDist(action.SegmentID()) nodeSegmentDist := distMgr.SegmentDistManager.GetSegmentDist(action.SegmentID()) return lo.Contains(leaderSegmentDist, action.Node()) && - lo.Contains(nodeSegmentDist, action.Node()) + lo.Contains(nodeSegmentDist, action.Node()) && + action.rpcReturned.Load() } else if action.Type() == ActionTypeReduce { // FIXME: Now shard leader's segment view is a map of segment ID to node ID, // loading segment replaces the node ID with the new one, diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index b5e2b508a3..9dc0337966 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -47,9 +47,6 @@ type Executor struct { cluster session.Cluster nodeMgr *session.NodeManager - // Merge load segment requests - merger *Merger[segmentIndex, *querypb.LoadSegmentsRequest] - executingTasks *typeutil.ConcurrentSet[string] // task index executingTaskNum atomic.Int32 } @@ -69,19 +66,15 @@ func NewExecutor(meta *meta.Meta, targetMgr: targetMgr, cluster: cluster, nodeMgr: nodeMgr, - merger: NewMerger[segmentIndex, *querypb.LoadSegmentsRequest](), executingTasks: typeutil.NewConcurrentSet[string](), } } func (ex *Executor) Start(ctx context.Context) { - ex.merger.Start(ctx) - ex.scheduleRequests() } func (ex *Executor) Stop() { - ex.merger.Stop() ex.wg.Wait() } @@ -121,82 +114,6 @@ func (ex *Executor) Execute(task Task, step int) bool { return true } -func (ex *Executor) scheduleRequests() { - ex.wg.Add(1) - go func() { - defer ex.wg.Done() - for mergeTask := range ex.merger.Chan() { - task := mergeTask.(*LoadSegmentsTask) - log.Info("get merge task, process it", - zap.Int64("collectionID", task.req.GetCollectionID()), - zap.Int64("replicaID", task.req.GetReplicaID()), - zap.String("shard", task.req.GetInfos()[0].GetInsertChannel()), - zap.Int64("nodeID", task.req.GetDstNodeID()), - zap.Int("taskNum", len(task.tasks)), - ) - go ex.processMergeTask(mergeTask.(*LoadSegmentsTask)) - } - }() -} - -func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) { - startTs := time.Now() - task := mergeTask.tasks[0] - action := task.Actions()[mergeTask.steps[0]] - - var err error - defer func() { - if err != nil { - for i := range mergeTask.tasks { - mergeTask.tasks[i].Fail(err) - } - } - for i := range mergeTask.tasks { - ex.removeTask(mergeTask.tasks[i], mergeTask.steps[i]) - } - }() - - taskIDs := make([]int64, 0, len(mergeTask.tasks)) - segments := make([]int64, 0, len(mergeTask.tasks)) - for _, task := range mergeTask.tasks { - taskIDs = append(taskIDs, task.ID()) - segments = append(segments, task.SegmentID()) - } - log := log.With( - zap.Int64s("taskIDs", taskIDs), - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("replicaID", task.ReplicaID()), - zap.String("shard", task.Shard()), - zap.Int64s("segmentIDs", segments), - zap.Int64("nodeID", action.Node()), - zap.String("source", task.Source().String()), - ) - - // Get shard leader for the given replica and segment - channel := mergeTask.req.GetInfos()[0].GetInsertChannel() - leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), channel) - if !ok { - err = merr.WrapErrChannelNotFound(channel, "shard delegator not found") - log.Warn("no shard leader for the segment to execute loading", zap.Error(task.Err())) - return - } - - log.Info("load segments...") - status, err := ex.cluster.LoadSegments(task.Context(), leader, mergeTask.req) - if err != nil { - log.Warn("failed to load segment", zap.Error(err)) - return - } - if !merr.Ok(status) { - err = merr.Error(status) - log.Warn("failed to load segment", zap.Error(err)) - return - } - - elapsed := time.Since(startTs) - log.Info("load segments done", zap.Duration("elapsed", elapsed)) -} - func (ex *Executor) removeTask(task Task, step int) { if task.Err() != nil { log.Info("execute action done, remove it", @@ -238,8 +155,8 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { defer func() { if err != nil { task.Fail(err) - ex.removeTask(task, step) } + ex.removeTask(task, step) }() schema, err := ex.broker.GetCollectionSchema(ctx, task.CollectionID()) @@ -276,16 +193,6 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { loadInfo := utils.PackSegmentLoadInfo(resp, indexes) - // Get shard leader for the given replica and segment - leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), segment.GetInsertChannel()) - if !ok { - msg := "no shard leader for the segment to execute loading" - err = merr.WrapErrChannelNotFound(segment.GetInsertChannel(), "shard delegator not found") - log.Warn(msg, zap.Error(err)) - return err - } - log = log.With(zap.Int64("shardLeader", leader)) - // Get collection index info indexInfo, err := ex.broker.DescribeIndex(ctx, task.CollectionID()) if err != nil { @@ -293,10 +200,41 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { return err } - req := packLoadSegmentRequest(task, action, schema, loadMeta, loadInfo, indexInfo) - loadTask := NewLoadSegmentsTask(task, step, req) - ex.merger.Add(loadTask) - log.Info("load segment task committed") + req := packLoadSegmentRequest( + task, + action, + schema, + loadMeta, + loadInfo, + indexInfo, + ) + + // Get shard leader for the given replica and segment + leaderID, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), segment.GetInsertChannel()) + if !ok { + msg := "no shard leader for the segment to execute loading" + err = merr.WrapErrChannelNotFound(segment.GetInsertChannel(), "shard delegator not found") + log.Warn(msg, zap.Error(err)) + return err + } + log = log.With(zap.Int64("shardLeader", leaderID)) + + startTs := time.Now() + log.Info("load segments...") + status, err := ex.cluster.LoadSegments(task.Context(), leaderID, req) + if err != nil { + log.Warn("failed to load segment", zap.Error(err)) + return err + } + if !merr.Ok(status) { + err = merr.Error(status) + log.Warn("failed to load segment", zap.Error(err)) + return err + } + + elapsed := time.Since(startTs) + log.Info("load segments done", zap.Duration("elapsed", elapsed)) + return nil } diff --git a/internal/querycoordv2/task/merge_task.go b/internal/querycoordv2/task/merge_task.go deleted file mode 100644 index e02115e870..0000000000 --- a/internal/querycoordv2/task/merge_task.go +++ /dev/null @@ -1,83 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package task - -import ( - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus/internal/proto/querypb" -) - -type MergeableTask[K comparable, R any] interface { - ID() K - Merge(other MergeableTask[K, R]) -} - -var _ MergeableTask[segmentIndex, *querypb.LoadSegmentsRequest] = (*LoadSegmentsTask)(nil) - -type segmentIndex struct { - NodeID int64 - CollectionID int64 - Shard string -} - -type LoadSegmentsTask struct { - tasks []*SegmentTask - steps []int - req *querypb.LoadSegmentsRequest -} - -func NewLoadSegmentsTask(task *SegmentTask, step int, req *querypb.LoadSegmentsRequest) *LoadSegmentsTask { - return &LoadSegmentsTask{ - tasks: []*SegmentTask{task}, - steps: []int{step}, - req: req, - } -} - -func (task *LoadSegmentsTask) ID() segmentIndex { - return segmentIndex{ - NodeID: task.req.GetDstNodeID(), - CollectionID: task.req.GetCollectionID(), - Shard: task.req.GetInfos()[0].GetInsertChannel(), - } -} - -func (task *LoadSegmentsTask) Merge(other MergeableTask[segmentIndex, *querypb.LoadSegmentsRequest]) { - otherTask := other.(*LoadSegmentsTask) - task.tasks = append(task.tasks, otherTask.tasks...) - task.steps = append(task.steps, otherTask.steps...) - task.req.Infos = append(task.req.Infos, otherTask.req.GetInfos()...) - positions := make(map[string]*msgpb.MsgPosition) - for _, position := range task.req.DeltaPositions { - positions[position.GetChannelName()] = position - } - for _, position := range otherTask.req.GetDeltaPositions() { - merged, ok := positions[position.GetChannelName()] - if !ok || merged.GetTimestamp() > position.GetTimestamp() { - merged = position - } - positions[position.GetChannelName()] = merged - } - task.req.DeltaPositions = make([]*msgpb.MsgPosition, 0, len(positions)) - for _, position := range positions { - task.req.DeltaPositions = append(task.req.DeltaPositions, position) - } -} - -func (task *LoadSegmentsTask) Result() *querypb.LoadSegmentsRequest { - return task.req -} diff --git a/internal/querycoordv2/task/merger.go b/internal/querycoordv2/task/merger.go deleted file mode 100644 index eb4158db8f..0000000000 --- a/internal/querycoordv2/task/merger.go +++ /dev/null @@ -1,139 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package task - -import ( - "context" - "sync" - "time" - - "go.uber.org/zap" - - . "github.com/milvus-io/milvus/internal/querycoordv2/params" - "github.com/milvus-io/milvus/pkg/log" -) - -// Merger merges tasks with the same mergeID. -const waitQueueCap = 256 - -type Merger[K comparable, R any] struct { - stopCh chan struct{} - wg sync.WaitGroup - queues map[K][]MergeableTask[K, R] // TaskID -> Queue - waitQueue chan MergeableTask[K, R] - outCh chan MergeableTask[K, R] - - stopOnce sync.Once -} - -func NewMerger[K comparable, R any]() *Merger[K, R] { - return &Merger[K, R]{ - stopCh: make(chan struct{}), - queues: make(map[K][]MergeableTask[K, R]), - waitQueue: make(chan MergeableTask[K, R], waitQueueCap), - outCh: make(chan MergeableTask[K, R], Params.QueryCoordCfg.TaskMergeCap.GetAsInt()), - } -} - -func (merger *Merger[K, R]) Start(ctx context.Context) { - merger.schedule(ctx) -} - -func (merger *Merger[K, R]) Stop() { - merger.stopOnce.Do(func() { - close(merger.stopCh) - merger.wg.Wait() - }) -} - -func (merger *Merger[K, R]) Chan() <-chan MergeableTask[K, R] { - return merger.outCh -} - -func (merger *Merger[K, R]) schedule(ctx context.Context) { - merger.wg.Add(1) - go func() { - defer merger.wg.Done() - ticker := time.NewTicker(500 * time.Millisecond) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - close(merger.outCh) - log.Info("Merger stopped due to context canceled") - return - - case <-merger.stopCh: - close(merger.outCh) - log.Info("Merger stopped") - return - - case <-ticker.C: - merger.drain() - for id := range merger.queues { - merger.triggerExecution(id) - } - } - } - }() -} - -func (merger *Merger[K, R]) Add(task MergeableTask[K, R]) { - merger.waitQueue <- task -} - -func (merger *Merger[K, R]) drain() { - for { - select { - case task := <-merger.waitQueue: - queue, ok := merger.queues[task.ID()] - if !ok { - queue = []MergeableTask[K, R]{} - } - queue = append(queue, task) - merger.queues[task.ID()] = queue - default: - return - } - } -} - -func (merger *Merger[K, R]) triggerExecution(id K) { - tasks := merger.queues[id] - delete(merger.queues, id) - - var task MergeableTask[K, R] - merged := 0 - for i := 0; i < len(tasks); i++ { - if merged == 0 { - task = tasks[i] - } else { - task.Merge(tasks[i]) - } - merged++ - if merged >= Params.QueryCoordCfg.TaskMergeCap.GetAsInt() { - merger.outCh <- task - merged = 0 - } - } - - if merged != 0 { - merger.outCh <- task - } - - log.Info("merge tasks done, trigger execution", zap.Any("mergeID", task.ID())) -} diff --git a/internal/querycoordv2/task/merger_test.go b/internal/querycoordv2/task/merger_test.go deleted file mode 100644 index 9f37664902..0000000000 --- a/internal/querycoordv2/task/merger_test.go +++ /dev/null @@ -1,146 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package task - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/suite" - - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus/internal/proto/querypb" - . "github.com/milvus-io/milvus/internal/querycoordv2/params" - "github.com/milvus-io/milvus/pkg/util/paramtable" -) - -type MergerSuite struct { - suite.Suite - // Data - collectionID int64 - replicaID int64 - nodeID int64 - requests map[int64]*querypb.LoadSegmentsRequest - - merger *Merger[segmentIndex, *querypb.LoadSegmentsRequest] -} - -func (suite *MergerSuite) SetupSuite() { - paramtable.Init() - paramtable.Get().Save(Params.QueryCoordCfg.TaskMergeCap.Key, "3") - suite.collectionID = 1000 - suite.replicaID = 100 - suite.nodeID = 1 - suite.requests = map[int64]*querypb.LoadSegmentsRequest{ - 1: { - DstNodeID: suite.nodeID, - CollectionID: suite.collectionID, - Infos: []*querypb.SegmentLoadInfo{ - { - SegmentID: 1, - InsertChannel: "dmc0", - }, - }, - DeltaPositions: []*msgpb.MsgPosition{ - { - ChannelName: "dmc0", - Timestamp: 2, - }, - { - ChannelName: "dmc1", - Timestamp: 3, - }, - }, - }, - 2: { - DstNodeID: suite.nodeID, - CollectionID: suite.collectionID, - Infos: []*querypb.SegmentLoadInfo{ - { - SegmentID: 2, - InsertChannel: "dmc0", - }, - }, - DeltaPositions: []*msgpb.MsgPosition{ - { - ChannelName: "dmc0", - Timestamp: 3, - }, - { - ChannelName: "dmc1", - Timestamp: 2, - }, - }, - }, - 3: { - DstNodeID: suite.nodeID, - CollectionID: suite.collectionID, - Infos: []*querypb.SegmentLoadInfo{ - { - SegmentID: 3, - InsertChannel: "dmc0", - }, - }, - DeltaPositions: []*msgpb.MsgPosition{ - { - ChannelName: "dmc0", - Timestamp: 1, - }, - { - ChannelName: "dmc1", - Timestamp: 1, - }, - }, - }, - } -} - -func (suite *MergerSuite) SetupTest() { - suite.merger = NewMerger[segmentIndex, *querypb.LoadSegmentsRequest]() -} - -func (suite *MergerSuite) TestMerge() { - const ( - requestNum = 5 - timeout = 5 * time.Second - ) - ctx := context.Background() - - for segmentID := int64(1); segmentID <= 3; segmentID++ { - task, err := NewSegmentTask(ctx, timeout, WrapIDSource(0), suite.collectionID, suite.replicaID, - NewSegmentAction(suite.nodeID, ActionTypeGrow, "", segmentID)) - suite.NoError(err) - suite.merger.Add(NewLoadSegmentsTask(task, 0, suite.requests[segmentID])) - } - - suite.merger.Start(ctx) - defer suite.merger.Stop() - taskI := <-suite.merger.Chan() - task := taskI.(*LoadSegmentsTask) - suite.Len(task.tasks, 3) - suite.Len(task.steps, 3) - suite.EqualValues(1, task.Result().DeltaPositions[0].Timestamp) - suite.EqualValues(1, task.Result().DeltaPositions[1].Timestamp) - suite.merger.Stop() - _, ok := <-suite.merger.Chan() - suite.Equal(ok, false) -} - -func TestMerger(t *testing.T) { - suite.Run(t, new(MergerSuite)) -} diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 67acb2961b..1feeafdeb2 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1298,7 +1298,7 @@ func (suite *TaskSuite) TestNoExecutor() { CollectionID: suite.collection, ChannelName: Params.CommonCfg.RootCoordDml.GetValue() + "-test", } - suite.nodeMgr.Add(session.NewNodeInfo(targetNode, "localhost")) + suite.meta.ReplicaManager.Put( utils.CreateTestReplica(suite.replica, suite.collection, []int64{1, 2, 3, -1})) @@ -1333,24 +1333,6 @@ func (suite *TaskSuite) TestNoExecutor() { // Process tasks suite.dispatchAndWait(targetNode) - suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum) - - // Process tasks done - // Dist contains channels - view := &meta.LeaderView{ - ID: targetNode, - CollectionID: suite.collection, - Segments: map[int64]*querypb.SegmentDist{}, - } - for _, segment := range suite.loadSegments { - view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0} - } - distSegments := lo.Map(segments, func(info *datapb.SegmentInfo, _ int) *meta.Segment { - return meta.SegmentFromInfo(info) - }) - suite.dist.LeaderViewManager.Update(targetNode, view) - suite.dist.SegmentDistManager.Update(targetNode, distSegments...) - suite.dispatchAndWait(targetNode) suite.AssertTaskNum(0, 0, 0, 0) } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 51cad5fead..b36c1e5d48 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1189,8 +1189,10 @@ type queryCoordConfig struct { // Deprecated: Since 2.2.0 RetryNum ParamItem `refreshable:"true"` // Deprecated: Since 2.2.0 - RetryInterval ParamItem `refreshable:"true"` - TaskMergeCap ParamItem `refreshable:"false"` + RetryInterval ParamItem `refreshable:"true"` + // Deprecated: Since 2.3.4 + TaskMergeCap ParamItem `refreshable:"false"` + TaskExecutionCap ParamItem `refreshable:"true"` // ---- Handoff --- @@ -1258,7 +1260,7 @@ func (p *queryCoordConfig) init(base *BaseTable) { p.TaskMergeCap = ParamItem{ Key: "queryCoord.taskMergeCap", Version: "2.2.0", - DefaultValue: "16", + DefaultValue: "1", Export: true, } p.TaskMergeCap.Init(base.mgr)