fix receive messages before waitToFinish (#18101)

Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>
This commit is contained in:
Enwei Jiao 2022-07-07 10:00:21 +08:00 committed by GitHub
parent d4a1e94f32
commit ca88e37c98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 10 additions and 28 deletions

View File

@ -16,44 +16,26 @@
package querycoord package querycoord
import (
"context"
"errors"
)
type condition interface { type condition interface {
waitToFinish() error waitToFinish() error
notify(err error) notify(err error)
Ctx() context.Context
} }
type taskCondition struct { type taskCondition struct {
done chan error done chan error
ctx context.Context
} }
func (tc *taskCondition) waitToFinish() error { func (tc *taskCondition) waitToFinish() error {
for { err := <-tc.done
select { return err
case <-tc.ctx.Done():
return errors.New("timeout")
case err := <-tc.done:
return err
}
}
} }
func (tc *taskCondition) notify(err error) { func (tc *taskCondition) notify(err error) {
tc.done <- err tc.done <- err
} }
func (tc *taskCondition) Ctx() context.Context { func newTaskCondition() *taskCondition {
return tc.ctx
}
func newTaskCondition(ctx context.Context) *taskCondition {
return &taskCondition{ return &taskCondition{
done: make(chan error, 1), done: make(chan error, 1),
ctx: ctx,
} }
} }

View File

@ -595,7 +595,7 @@ func TestLoadBalanceTask(t *testing.T) {
loadBalanceTask := &loadBalanceTask{ loadBalanceTask := &loadBalanceTask{
baseTask: &baseTask{ baseTask: &baseTask{
ctx: baseCtx, ctx: baseCtx,
condition: newTaskCondition(baseCtx), condition: newTaskCondition(),
triggerCondition: querypb.TriggerCondition_NodeDown, triggerCondition: querypb.TriggerCondition_NodeDown,
}, },
LoadBalanceRequest: loadBalanceSegment, LoadBalanceRequest: loadBalanceSegment,

View File

@ -131,7 +131,7 @@ type baseTask struct {
func newBaseTask(ctx context.Context, triggerType querypb.TriggerCondition) *baseTask { func newBaseTask(ctx context.Context, triggerType querypb.TriggerCondition) *baseTask {
childCtx, cancel := context.WithCancel(ctx) childCtx, cancel := context.WithCancel(ctx)
condition := newTaskCondition(childCtx) condition := newTaskCondition()
baseTask := &baseTask{ baseTask := &baseTask{
ctx: childCtx, ctx: childCtx,

View File

@ -92,7 +92,7 @@ func (tt *testTask) execute(ctx context.Context) error {
loadTask := &loadSegmentTask{ loadTask := &loadSegmentTask{
baseTask: &baseTask{ baseTask: &baseTask{
ctx: tt.ctx, ctx: tt.ctx,
condition: newTaskCondition(tt.ctx), condition: newTaskCondition(),
triggerCondition: tt.triggerCondition, triggerCondition: tt.triggerCondition,
}, },
LoadSegmentsRequest: req, LoadSegmentsRequest: req,
@ -107,7 +107,7 @@ func (tt *testTask) execute(ctx context.Context) error {
childTask := &loadSegmentTask{ childTask := &loadSegmentTask{
baseTask: &baseTask{ baseTask: &baseTask{
ctx: tt.ctx, ctx: tt.ctx,
condition: newTaskCondition(tt.ctx), condition: newTaskCondition(),
triggerCondition: tt.triggerCondition, triggerCondition: tt.triggerCondition,
}, },
LoadSegmentsRequest: &querypb.LoadSegmentsRequest{ LoadSegmentsRequest: &querypb.LoadSegmentsRequest{
@ -126,7 +126,7 @@ func (tt *testTask) execute(ctx context.Context) error {
childTask := &watchDmChannelTask{ childTask := &watchDmChannelTask{
baseTask: &baseTask{ baseTask: &baseTask{
ctx: tt.ctx, ctx: tt.ctx,
condition: newTaskCondition(tt.ctx), condition: newTaskCondition(),
triggerCondition: tt.triggerCondition, triggerCondition: tt.triggerCondition,
}, },
WatchDmChannelsRequest: &querypb.WatchDmChannelsRequest{ WatchDmChannelsRequest: &querypb.WatchDmChannelsRequest{
@ -165,7 +165,7 @@ func TestWatchQueryChannel_ClearEtcdInfoAfterAssignedNodeDown(t *testing.T) {
testTask := &testTask{ testTask := &testTask{
baseTask: baseTask{ baseTask: baseTask{
ctx: baseCtx, ctx: baseCtx,
condition: newTaskCondition(baseCtx), condition: newTaskCondition(),
triggerCondition: querypb.TriggerCondition_GrpcRequest, triggerCondition: querypb.TriggerCondition_GrpcRequest,
}, },
baseMsg: &commonpb.MsgBase{ baseMsg: &commonpb.MsgBase{
@ -498,7 +498,7 @@ func Test_saveInternalTaskToEtcd(t *testing.T) {
testTask := &testTask{ testTask := &testTask{
baseTask: baseTask{ baseTask: baseTask{
ctx: ctx, ctx: ctx,
condition: newTaskCondition(ctx), condition: newTaskCondition(),
triggerCondition: querypb.TriggerCondition_GrpcRequest, triggerCondition: querypb.TriggerCondition_GrpcRequest,
taskID: 100, taskID: 100,
}, },