From ca88e37c98a03bb9837abaaec1fd9cee73a207a0 Mon Sep 17 00:00:00 2001 From: Enwei Jiao Date: Thu, 7 Jul 2022 10:00:21 +0800 Subject: [PATCH] fix receive messages before waitToFinish (#18101) Signed-off-by: Enwei Jiao --- internal/querycoord/condition.go | 24 +++------------------- internal/querycoord/impl_test.go | 2 +- internal/querycoord/task.go | 2 +- internal/querycoord/task_scheduler_test.go | 10 ++++----- 4 files changed, 10 insertions(+), 28 deletions(-) diff --git a/internal/querycoord/condition.go b/internal/querycoord/condition.go index 1a3a48beca..4fbc247696 100644 --- a/internal/querycoord/condition.go +++ b/internal/querycoord/condition.go @@ -16,44 +16,26 @@ package querycoord -import ( - "context" - "errors" -) - type condition interface { waitToFinish() error notify(err error) - Ctx() context.Context } type taskCondition struct { done chan error - ctx context.Context } func (tc *taskCondition) waitToFinish() error { - for { - select { - case <-tc.ctx.Done(): - return errors.New("timeout") - case err := <-tc.done: - return err - } - } + err := <-tc.done + return err } func (tc *taskCondition) notify(err error) { tc.done <- err } -func (tc *taskCondition) Ctx() context.Context { - return tc.ctx -} - -func newTaskCondition(ctx context.Context) *taskCondition { +func newTaskCondition() *taskCondition { return &taskCondition{ done: make(chan error, 1), - ctx: ctx, } } diff --git a/internal/querycoord/impl_test.go b/internal/querycoord/impl_test.go index daf559445a..704eb6d29d 100644 --- a/internal/querycoord/impl_test.go +++ b/internal/querycoord/impl_test.go @@ -595,7 +595,7 @@ func TestLoadBalanceTask(t *testing.T) { loadBalanceTask := &loadBalanceTask{ baseTask: &baseTask{ ctx: baseCtx, - condition: newTaskCondition(baseCtx), + condition: newTaskCondition(), triggerCondition: querypb.TriggerCondition_NodeDown, }, LoadBalanceRequest: loadBalanceSegment, diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index d6c091b90f..7eb62d3b86 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -131,7 +131,7 @@ type baseTask struct { func newBaseTask(ctx context.Context, triggerType querypb.TriggerCondition) *baseTask { childCtx, cancel := context.WithCancel(ctx) - condition := newTaskCondition(childCtx) + condition := newTaskCondition() baseTask := &baseTask{ ctx: childCtx, diff --git a/internal/querycoord/task_scheduler_test.go b/internal/querycoord/task_scheduler_test.go index e16c0974a1..d305192a2d 100644 --- a/internal/querycoord/task_scheduler_test.go +++ b/internal/querycoord/task_scheduler_test.go @@ -92,7 +92,7 @@ func (tt *testTask) execute(ctx context.Context) error { loadTask := &loadSegmentTask{ baseTask: &baseTask{ ctx: tt.ctx, - condition: newTaskCondition(tt.ctx), + condition: newTaskCondition(), triggerCondition: tt.triggerCondition, }, LoadSegmentsRequest: req, @@ -107,7 +107,7 @@ func (tt *testTask) execute(ctx context.Context) error { childTask := &loadSegmentTask{ baseTask: &baseTask{ ctx: tt.ctx, - condition: newTaskCondition(tt.ctx), + condition: newTaskCondition(), triggerCondition: tt.triggerCondition, }, LoadSegmentsRequest: &querypb.LoadSegmentsRequest{ @@ -126,7 +126,7 @@ func (tt *testTask) execute(ctx context.Context) error { childTask := &watchDmChannelTask{ baseTask: &baseTask{ ctx: tt.ctx, - condition: newTaskCondition(tt.ctx), + condition: newTaskCondition(), triggerCondition: tt.triggerCondition, }, WatchDmChannelsRequest: &querypb.WatchDmChannelsRequest{ @@ -165,7 +165,7 @@ func TestWatchQueryChannel_ClearEtcdInfoAfterAssignedNodeDown(t *testing.T) { testTask := &testTask{ baseTask: baseTask{ ctx: baseCtx, - condition: newTaskCondition(baseCtx), + condition: newTaskCondition(), triggerCondition: querypb.TriggerCondition_GrpcRequest, }, baseMsg: &commonpb.MsgBase{ @@ -498,7 +498,7 @@ func Test_saveInternalTaskToEtcd(t *testing.T) { testTask := &testTask{ baseTask: baseTask{ ctx: ctx, - condition: newTaskCondition(ctx), + condition: newTaskCondition(), triggerCondition: querypb.TriggerCondition_GrpcRequest, taskID: 100, },