From 01f8faacaedc161043a7607b7cdac33995181c71 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 21 Feb 2025 16:06:12 +0800 Subject: [PATCH] fix: [hotfix] Add sub task pool for multi-stage tasks (#40080) Cherry-pick from master pr: #40079 Related to #40078 Add a subTaskPool to execute sub task in case of logic deadlock described in issue. --------- Signed-off-by: Congqi Xia Signed-off-by: Cai Zhang Signed-off-by: bigsheeper Co-authored-by: Cai Zhang Co-authored-by: bigsheeper --- .../core/src/monitor/prometheus_client.cpp | 5 ++ internal/core/src/monitor/prometheus_client.h | 1 + .../src/segcore/ChunkedSegmentSealedImpl.cpp | 16 +++++- .../core/src/segcore/SegmentSealedImpl.cpp | 16 +++++- internal/proxy/task.go | 5 ++ internal/proxy/task_query.go | 4 ++ internal/proxy/task_scheduler.go | 54 +++++++++++++++++-- internal/querycoordv2/task/scheduler.go | 2 +- .../scheduler/user_task_polling_policy.go | 2 +- pkg/metrics/metrics.go | 1 + pkg/metrics/proxy_metrics.go | 19 +++++++ pkg/util/merr/errors.go | 2 +- pkg/util/merr/errors_test.go | 2 +- pkg/util/merr/utils.go | 4 +- 14 files changed, 122 insertions(+), 11 deletions(-) diff --git a/internal/core/src/monitor/prometheus_client.cpp b/internal/core/src/monitor/prometheus_client.cpp index 6ca081ed8b..8ca4cb183d 100644 --- a/internal/core/src/monitor/prometheus_client.cpp +++ b/internal/core/src/monitor/prometheus_client.cpp @@ -181,6 +181,8 @@ std::map iterativeFilterLatencyLabels{ {"type", "iterative_filter_latency"}}; std::map scalarProportionLabels{ {"type", "scalar_proportion"}}; +std::map getVectorLatencyLabels{ + {"type", "get_vector_latency"}}; DEFINE_PROMETHEUS_HISTOGRAM_FAMILY(internal_core_search_latency, "[cpp]latency(us) of search on segment") DEFINE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_scalar, @@ -200,6 +202,9 @@ DEFINE_PROMETHEUS_HISTOGRAM_WITH_BUCKETS( internal_core_search_latency, scalarProportionLabels, ratioBuckets) +DEFINE_PROMETHEUS_HISTOGRAM(internal_core_get_vector_latency, + internal_core_search_latency, + getVectorLatencyLabels) // mmap metrics std::map mmapAllocatedSpaceAnonLabel = { diff --git a/internal/core/src/monitor/prometheus_client.h b/internal/core/src/monitor/prometheus_client.h index ed8e21cef5..3884dd74e0 100644 --- a/internal/core/src/monitor/prometheus_client.h +++ b/internal/core/src/monitor/prometheus_client.h @@ -138,5 +138,6 @@ DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_vector); DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_groupby); DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_iterative_filter); DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_scalar_proportion); +DECLARE_PROMETHEUS_HISTOGRAM(internal_core_get_vector_latency); } // namespace milvus::monitor diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 71d2bc84ef..5eb5b452cf 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -1759,7 +1759,21 @@ ChunkedSegmentSealedImpl::bulk_subscript(FieldId field_id, } return get_raw_data(field_id, field_meta, seg_offsets, count); } - return get_vector(field_id, seg_offsets, count); + + std::chrono::high_resolution_clock::time_point get_vector_start = + std::chrono::high_resolution_clock::now(); + + auto vector = get_vector(field_id, seg_offsets, count); + + std::chrono::high_resolution_clock::time_point get_vector_end = + std::chrono::high_resolution_clock::now(); + double get_vector_cost = std::chrono::duration( + get_vector_end - get_vector_start) + .count(); + monitor::internal_core_get_vector_latency.Observe(get_vector_cost / + 1000); + + return vector; } Assert(get_bit(field_data_ready_bitset_, field_id)); diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index a0ab793f38..8184298d96 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -1582,7 +1582,21 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, } return get_raw_data(field_id, field_meta, seg_offsets, count); } - return get_vector(field_id, seg_offsets, count); + + std::chrono::high_resolution_clock::time_point get_vector_start = + std::chrono::high_resolution_clock::now(); + + auto vector = get_vector(field_id, seg_offsets, count); + + std::chrono::high_resolution_clock::time_point get_vector_end = + std::chrono::high_resolution_clock::now(); + double get_vector_cost = std::chrono::duration( + get_vector_end - get_vector_start) + .count(); + monitor::internal_core_get_vector_latency.Observe(get_vector_cost / + 1000); + + return vector; } Assert(get_bit(field_data_ready_bitset_, field_id)); diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 8ffabd575c..ec5ab10347 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -140,6 +140,7 @@ type task interface { CanSkipAllocTimestamp() bool SetOnEnqueueTime() GetDurationInQueue() time.Duration + IsSubTask() bool } type baseTask struct { @@ -158,6 +159,10 @@ func (bt *baseTask) GetDurationInQueue() time.Duration { return time.Since(bt.onEnqueueTime) } +func (bt *baseTask) IsSubTask() bool { + return false +} + type dmlTask interface { task setChannels() error diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index 1729e56408..76625d5c2b 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -577,6 +577,10 @@ func (t *queryTask) PostExecute(ctx context.Context) error { return nil } +func (t *queryTask) IsSubTask() bool { + return t.reQuery +} + func (t *queryTask) queryShard(ctx context.Context, nodeID int64, qn types.QueryNodeClient, channel string) error { needOverrideMvcc := false mvccTs := t.MvccTimestamp diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index da1a2ededb..0f4e4ae1ed 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -89,7 +89,7 @@ func (queue *baseTaskQueue) addUnissuedTask(t task) error { defer queue.utLock.Unlock() if queue.utFull() { - return merr.WrapErrServiceRequestLimitExceeded(int32(queue.getMaxTaskNum())) + return merr.WrapErrTooManyRequests(int32(queue.getMaxTaskNum())) } queue.unissuedTasks.PushBack(t) queue.utBufChan <- 1 @@ -228,6 +228,18 @@ type ddTaskQueue struct { lock sync.Mutex } +func (queue *ddTaskQueue) updateMetrics() { + queue.utLock.RLock() + unissuedTasksNum := queue.unissuedTasks.Len() + queue.utLock.RUnlock() + queue.atLock.RLock() + activateTaskNum := len(queue.activeTasks) + queue.atLock.RUnlock() + + metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "ddl", metrics.UnissuedIndexTaskLabel).Set(float64(unissuedTasksNum)) + metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "ddl", metrics.InProgressIndexTaskLabel).Set(float64(activateTaskNum)) +} + type pChanStatInfo struct { pChanStatistics tsSet map[Timestamp]struct{} @@ -241,6 +253,18 @@ type dmTaskQueue struct { pChanStatisticsInfos map[pChan]*pChanStatInfo } +func (queue *dmTaskQueue) updateMetrics() { + queue.utLock.RLock() + unissuedTasksNum := queue.unissuedTasks.Len() + queue.utLock.RUnlock() + queue.atLock.RLock() + activateTaskNum := len(queue.activeTasks) + queue.atLock.RUnlock() + + metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "dml", metrics.UnissuedIndexTaskLabel).Set(float64(unissuedTasksNum)) + metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "dml", metrics.InProgressIndexTaskLabel).Set(float64(activateTaskNum)) +} + func (queue *dmTaskQueue) Enqueue(t task) error { // This statsLock has two functions: // 1) Protect member pChanStatisticsInfos @@ -361,6 +385,18 @@ type dqTaskQueue struct { *baseTaskQueue } +func (queue *dqTaskQueue) updateMetrics() { + queue.utLock.RLock() + unissuedTasksNum := queue.unissuedTasks.Len() + queue.utLock.RUnlock() + queue.atLock.RLock() + activateTaskNum := len(queue.activeTasks) + queue.atLock.RUnlock() + + metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "dql", metrics.UnissuedIndexTaskLabel).Set(float64(unissuedTasksNum)) + metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "dql", metrics.InProgressIndexTaskLabel).Set(float64(activateTaskNum)) +} + func (queue *ddTaskQueue) Enqueue(t task) error { queue.lock.Lock() defer queue.lock.Unlock() @@ -507,6 +543,7 @@ func (sched *taskScheduler) definitionLoop() { return struct{}{}, nil }) } + sched.ddQueue.updateMetrics() } } } @@ -528,6 +565,7 @@ func (sched *taskScheduler) controlLoop() { return struct{}{}, nil }) } + sched.dcQueue.updateMetrics() } } } @@ -547,6 +585,7 @@ func (sched *taskScheduler) manipulationLoop() { return struct{}{}, nil }) } + sched.dmQueue.updateMetrics() } } } @@ -554,7 +593,10 @@ func (sched *taskScheduler) manipulationLoop() { func (sched *taskScheduler) queryLoop() { defer sched.wg.Done() - pool := conc.NewPool[struct{}](paramtable.Get().ProxyCfg.MaxTaskNum.GetAsInt(), conc.WithExpiryDuration(time.Minute)) + poolSize := paramtable.Get().ProxyCfg.MaxTaskNum.GetAsInt() + pool := conc.NewPool[struct{}](poolSize, conc.WithExpiryDuration(time.Minute)) + subTaskPool := conc.NewPool[struct{}](poolSize, conc.WithExpiryDuration(time.Minute)) + for { select { case <-sched.ctx.Done(): @@ -562,13 +604,19 @@ func (sched *taskScheduler) queryLoop() { case <-sched.dqQueue.utChan(): if !sched.dqQueue.utEmpty() { t := sched.scheduleDqTask() - pool.Submit(func() (struct{}, error) { + p := pool + // if task is sub task spawned by another, use sub task pool in case of deadlock + if t.IsSubTask() { + p = subTaskPool + } + p.Submit(func() (struct{}, error) { sched.processTask(t, sched.dqQueue) return struct{}{}, nil }) } else { log.Ctx(context.TODO()).Debug("query queue is empty ...") } + sched.dqQueue.updateMetrics() } } } diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index ef3d110740..c5d3b41b98 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -964,7 +964,7 @@ func (scheduler *taskScheduler) remove(task Task) { log = log.With(zap.Int64("segmentID", task.SegmentID())) if task.Status() == TaskStatusFailed && task.Err() != nil && - !errors.IsAny(task.Err(), merr.ErrChannelNotFound, merr.ErrServiceRequestLimitExceeded) { + !errors.IsAny(task.Err(), merr.ErrChannelNotFound, merr.ErrServiceTooManyRequests) { scheduler.recordSegmentTaskError(task) } diff --git a/internal/util/searchutil/scheduler/user_task_polling_policy.go b/internal/util/searchutil/scheduler/user_task_polling_policy.go index e09d6c0c58..3a9f86f5e5 100644 --- a/internal/util/searchutil/scheduler/user_task_polling_policy.go +++ b/internal/util/searchutil/scheduler/user_task_polling_policy.go @@ -47,7 +47,7 @@ func (p *userTaskPollingPolicy) Push(task Task) (int, error) { if taskGroupLen > 0 { limit := pt.QueryNodeCfg.SchedulePolicyMaxPendingTaskPerUser.GetAsInt() if limit > 0 && taskGroupLen >= limit { - return 0, merr.WrapErrServiceRequestLimitExceeded( + return 0, merr.WrapErrTooManyRequests( int32(limit), fmt.Sprintf("limit by %s", pt.QueryNodeCfg.SchedulePolicyMaxPendingTaskPerUser.Key), ) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 7c7296c881..f4bd41171a 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -124,6 +124,7 @@ const ( pathLabelName = "path" cgoNameLabelName = `cgo_name` cgoTypeLabelName = `cgo_type` + queueTypeLabelName = `queue_type` // entities label LoadedLabel = "loaded" diff --git a/pkg/metrics/proxy_metrics.go b/pkg/metrics/proxy_metrics.go index 53aac27a0b..b221973680 100644 --- a/pkg/metrics/proxy_metrics.go +++ b/pkg/metrics/proxy_metrics.go @@ -427,6 +427,24 @@ var ( Help: "the number of non-zeros in each sparse search task", Buckets: buckets, }, []string{nodeIDLabelName, collectionName}) + + ProxyParseExpressionLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "parse_expr_latency", + Help: "the latency of parse expression", + Buckets: buckets, + }, []string{nodeIDLabelName, functionLabelName, statusLabelName}) + + // ProxyQueueTaskNum records task number of queue in Proxy. + ProxyQueueTaskNum = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "queue_task_num", + Help: "", + }, []string{nodeIDLabelName, queueTypeLabelName, taskStateLabel}) ) // RegisterProxy registers Proxy metrics @@ -490,6 +508,7 @@ func RegisterProxy(registry *prometheus.Registry) { registry.MustRegister(ProxyRecallSearchCount) registry.MustRegister(ProxySearchSparseNumNonZeros) + registry.MustRegister(ProxyQueueTaskNum) RegisterStreamingServiceClient(registry) } diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 94cd57677d..5c6152f0db 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -51,7 +51,7 @@ var ( ErrServiceNotReady = newMilvusError("service not ready", 1, true) // This indicates the service is still in init ErrServiceUnavailable = newMilvusError("service unavailable", 2, true) ErrServiceMemoryLimitExceeded = newMilvusError("memory limit exceeded", 3, false) - ErrServiceRequestLimitExceeded = newMilvusError("request limit exceeded", 4, true) + ErrServiceTooManyRequests = newMilvusError("too many concurrent requests, queue is full", 4, true) ErrServiceInternal = newMilvusError("service internal error", 5, false) // Never return this error out of Milvus ErrServiceCrossClusterRouting = newMilvusError("cross cluster routing", 6, false) ErrServiceDiskLimitExceeded = newMilvusError("disk limit exceeded", 7, false) diff --git a/pkg/util/merr/errors_test.go b/pkg/util/merr/errors_test.go index 26521c40af..fea03f7ffc 100644 --- a/pkg/util/merr/errors_test.go +++ b/pkg/util/merr/errors_test.go @@ -74,7 +74,7 @@ func (s *ErrSuite) TestWrap() { s.ErrorIs(WrapErrServiceNotReady("test", 0, "test init..."), ErrServiceNotReady) s.ErrorIs(WrapErrServiceUnavailable("test", "test init"), ErrServiceUnavailable) s.ErrorIs(WrapErrServiceMemoryLimitExceeded(110, 100, "MLE"), ErrServiceMemoryLimitExceeded) - s.ErrorIs(WrapErrServiceRequestLimitExceeded(100, "too many requests"), ErrServiceRequestLimitExceeded) + s.ErrorIs(WrapErrTooManyRequests(100, "too many requests"), ErrServiceTooManyRequests) s.ErrorIs(WrapErrServiceInternal("never throw out"), ErrServiceInternal) s.ErrorIs(WrapErrServiceCrossClusterRouting("ins-0", "ins-1"), ErrServiceCrossClusterRouting) s.ErrorIs(WrapErrServiceDiskLimitExceeded(110, 100, "DLE"), ErrServiceDiskLimitExceeded) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 2c189553eb..628e54162c 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -367,8 +367,8 @@ func WrapErrServiceMemoryLimitExceeded(predict, limit float32, msg ...string) er return err } -func WrapErrServiceRequestLimitExceeded(limit int32, msg ...string) error { - err := wrapFields(ErrServiceRequestLimitExceeded, +func WrapErrTooManyRequests(limit int32, msg ...string) error { + err := wrapFields(ErrServiceTooManyRequests, value("limit", limit), ) if len(msg) > 0 {