From f8ff97fe298f49bca565b91ef208b575be98b8a7 Mon Sep 17 00:00:00 2001 From: yah01 Date: Tue, 25 Apr 2023 19:04:34 +0800 Subject: [PATCH] Fix travel timestamp set unique leads to disable task merge (#23605) Signed-off-by: yah01 --- internal/proxy/task_search.go | 2 +- internal/querynodev2/services_test.go | 8 +-- internal/querynodev2/tasks/scheduler.go | 85 +++++++++++++++++-------- internal/querynodev2/tasks/task.go | 6 +- 4 files changed, 68 insertions(+), 33 deletions(-) diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index afa07f3cff..bc103116cf 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -332,7 +332,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error { travelTimestamp := t.request.TravelTimestamp if travelTimestamp == 0 { - travelTimestamp = t.BeginTs() + travelTimestamp = typeutil.MaxTimestamp } err = validateTravelTimestamp(travelTimestamp, t.BeginTs()) if err != nil { diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 0c90670577..64998ea465 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -429,7 +429,7 @@ func (suite *ServiceSuite) genSegmentLoadInfos(schema *schemapb.CollectionSchema suite.collectionID, suite.partitionIDs[i%partNum], suite.validSegmentIDs[i], - 100, + 1000, schema, suite.node.vectorStorage, ) @@ -441,7 +441,7 @@ func (suite *ServiceSuite) genSegmentLoadInfos(schema *schemapb.CollectionSchema suite.partitionIDs[i%partNum], suite.validSegmentIDs[i], vecFieldIDs[0], - 100, + 1000, segments.IndexFaissIVFFlat, segments.L2, suite.node.vectorStorage, @@ -904,11 +904,11 @@ func (suite *ServiceSuite) TestSearch_Concurrent() { // data schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64) - concurrency := 8 + concurrency := 16 futures := make([]*conc.Future[*internalpb.SearchResults], 0, concurrency) for i := 0; i < concurrency; i++ { future := conc.Go(func() (*internalpb.SearchResults, error) { - creq, err := suite.genCSearchRequest(1, IndexFaissIDMap, schema) + creq, err := suite.genCSearchRequest(30, IndexFaissIDMap, schema) req := &querypb.SearchRequest{ Req: creq, FromShardLeader: false, diff --git a/internal/querynodev2/tasks/scheduler.go b/internal/querynodev2/tasks/scheduler.go index b18a039ae2..404d5aea8c 100644 --- a/internal/querynodev2/tasks/scheduler.go +++ b/internal/querynodev2/tasks/scheduler.go @@ -35,7 +35,7 @@ func NewScheduler() *Scheduler { searchProcessNum: atomic.NewInt32(0), searchWaitQueue: make(chan *SearchTask, maxWaitTaskNum), mergingSearchTasks: make([]*SearchTask, 0), - mergedSearchTasks: make(chan *SearchTask, maxReadConcurrency), + mergedSearchTasks: make(chan *SearchTask), // queryProcessQueue: make(chan), pool: conc.NewPool[any](maxReadConcurrency, ants.WithPreAlloc(true)), @@ -64,39 +64,72 @@ func (s *Scheduler) Schedule(ctx context.Context) { go s.processAll(ctx) for { - select { - case <-ctx.Done(): - return + if len(s.mergingSearchTasks) > 0 { // wait for an idle worker or a new task + task := s.mergingSearchTasks[0] + select { + case <-ctx.Done(): + return + case task = <-s.searchWaitQueue: + s.schedule(task) + + case s.mergedSearchTasks <- task: + s.mergingSearchTasks = s.mergingSearchTasks[1:] + } + } else { // wait for a new task if no task + select { + case <-ctx.Done(): + return + + case task := <-s.searchWaitQueue: + s.schedule(task) + } + } + } +} + +func (s *Scheduler) schedule(task Task) { + // add this task + if err := task.Canceled(); err != nil { + task.Done(err) + return + } + s.mergeTasks(task) + metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec() + + mergeLimit := paramtable.Get().QueryNodeCfg.MaxGroupNQ.GetAsInt() + mergeCount := 1 + + // try to merge the coming tasks +outer: + for mergeCount < mergeLimit { + select { case t := <-s.searchWaitQueue: if err := t.Canceled(); err != nil { t.Done(err) continue } - - mergeCount := 0 - mergeLimit := paramtable.Get().QueryNodeCfg.MaxGroupNQ.GetAsInt() - outer: - for i := 0; i < mergeLimit; i++ { - s.mergeTasks(t) - mergeCount++ - metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec() - - select { - case t = <-s.searchWaitQueue: - // Continue the loop to merge task - default: - break outer - } - } - - for i := range s.mergingSearchTasks { - s.mergedSearchTasks <- s.mergingSearchTasks[i] - } - s.mergingSearchTasks = s.mergingSearchTasks[:0] - metrics.QueryNodeReadTaskReadyLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(mergeCount)) + s.mergeTasks(t) + mergeCount++ + metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec() + default: + break outer } } + + // submit existing tasks to the pool + processedCount := 0 +processOuter: + for i := range s.mergingSearchTasks { + select { + case s.mergedSearchTasks <- s.mergingSearchTasks[i]: + processedCount++ + default: + break processOuter + } + } + s.mergingSearchTasks = s.mergingSearchTasks[processedCount:] + metrics.QueryNodeReadTaskReadyLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(processedCount)) } func (s *Scheduler) processAll(ctx context.Context) { diff --git a/internal/querynodev2/tasks/task.go b/internal/querynodev2/tasks/task.go index 1849f83f7c..ed74d85bbe 100644 --- a/internal/querynodev2/tasks/task.go +++ b/internal/querynodev2/tasks/task.go @@ -207,7 +207,6 @@ func (t *SearchTask) Merge(other *SearchTask) bool { t.originTopks = append(t.originTopks, other.originTopks...) t.originNqs = append(t.originNqs, other.originNqs...) t.others = append(t.others, other) - t.others = append(t.others, other.others...) return true } @@ -218,7 +217,10 @@ func (t *SearchTask) Done(err error) { metrics.QueryNodeSearchGroupNQ.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(t.originNqs[0])) metrics.QueryNodeSearchGroupTopK.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(t.originTopks[0])) } - t.notifier <- err + select { + case t.notifier <- err: + default: + } for _, other := range t.others { other.Done(err) }