diff --git a/internal/querycoord/condition.go b/internal/querycoord/condition.go index b15df235a4..a282705ccf 100644 --- a/internal/querycoord/condition.go +++ b/internal/querycoord/condition.go @@ -16,18 +16,18 @@ import ( "errors" ) -type Condition interface { +type condition interface { waitToFinish() error notify(err error) Ctx() context.Context } -type TaskCondition struct { +type taskCondition struct { done chan error ctx context.Context } -func (tc *TaskCondition) waitToFinish() error { +func (tc *taskCondition) waitToFinish() error { for { select { case <-tc.ctx.Done(): @@ -38,16 +38,16 @@ func (tc *TaskCondition) waitToFinish() error { } } -func (tc *TaskCondition) notify(err error) { +func (tc *taskCondition) notify(err error) { tc.done <- err } -func (tc *TaskCondition) Ctx() context.Context { +func (tc *taskCondition) Ctx() context.Context { return tc.ctx } -func NewTaskCondition(ctx context.Context) *TaskCondition { - return &TaskCondition{ +func newTaskCondition(ctx context.Context) *taskCondition { + return &taskCondition{ done: make(chan error, 1), ctx: ctx, } diff --git a/internal/querycoord/impl_test.go b/internal/querycoord/impl_test.go index aac4d69d06..2798b52da1 100644 --- a/internal/querycoord/impl_test.go +++ b/internal/querycoord/impl_test.go @@ -455,7 +455,7 @@ func TestLoadBalanceTask(t *testing.T) { loadBalanceTask := &loadBalanceTask{ baseTask: &baseTask{ ctx: baseCtx, - Condition: NewTaskCondition(baseCtx), + condition: newTaskCondition(baseCtx), triggerCondition: querypb.TriggerCondition_nodeDown, }, LoadBalanceRequest: loadBalanceSegment, diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index d3ebef92d5..81be05fc0b 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -87,7 +87,7 @@ type task interface { } type baseTask struct { - Condition + condition ctx context.Context cancel context.CancelFunc result *commonpb.Status @@ -106,12 +106,12 @@ type baseTask struct { func newBaseTask(ctx context.Context, triggerType querypb.TriggerCondition) *baseTask { childCtx, cancel := context.WithCancel(ctx) - condition := NewTaskCondition(childCtx) + condition := newTaskCondition(childCtx) baseTask := &baseTask{ ctx: childCtx, cancel: cancel, - Condition: condition, + condition: condition, state: taskUndo, retryCount: MaxRetryNum, triggerCondition: triggerType, diff --git a/internal/querycoord/task_scheduler_test.go b/internal/querycoord/task_scheduler_test.go index 9fea3435dd..ef3f23953e 100644 --- a/internal/querycoord/task_scheduler_test.go +++ b/internal/querycoord/task_scheduler_test.go @@ -62,7 +62,7 @@ func (tt *testTask) execute(ctx context.Context) error { childTask := &loadSegmentTask{ baseTask: &baseTask{ ctx: tt.ctx, - Condition: NewTaskCondition(tt.ctx), + condition: newTaskCondition(tt.ctx), triggerCondition: tt.triggerCondition, }, LoadSegmentsRequest: &querypb.LoadSegmentsRequest{ @@ -80,7 +80,7 @@ func (tt *testTask) execute(ctx context.Context) error { childTask := &watchDmChannelTask{ baseTask: &baseTask{ ctx: tt.ctx, - Condition: NewTaskCondition(tt.ctx), + condition: newTaskCondition(tt.ctx), triggerCondition: tt.triggerCondition, }, WatchDmChannelsRequest: &querypb.WatchDmChannelsRequest{ @@ -98,7 +98,7 @@ func (tt *testTask) execute(ctx context.Context) error { childTask := &watchQueryChannelTask{ baseTask: &baseTask{ ctx: tt.ctx, - Condition: NewTaskCondition(tt.ctx), + condition: newTaskCondition(tt.ctx), triggerCondition: tt.triggerCondition, }, AddQueryChannelRequest: &querypb.AddQueryChannelRequest{ @@ -136,7 +136,7 @@ func TestWatchQueryChannel_ClearEtcdInfoAfterAssignedNodeDown(t *testing.T) { testTask := &testTask{ baseTask: baseTask{ ctx: baseCtx, - Condition: NewTaskCondition(baseCtx), + condition: newTaskCondition(baseCtx), triggerCondition: querypb.TriggerCondition_grpcRequest, }, baseMsg: &commonpb.MsgBase{