xige-16 50739e6c6a
Add more metrics (#21975)
Signed-off-by: xige-16 <xi.ge@zilliz.com>
2023-02-09 16:06:33 +08:00

104 lines
1.8 KiB
Go

package rootcoord
import (
"context"
"time"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type task interface {
GetCtx() context.Context
SetCtx(context.Context)
SetTs(ts Timestamp)
GetTs() Timestamp
SetID(id UniqueID)
GetID() UniqueID
Prepare(ctx context.Context) error
Execute(ctx context.Context) error
WaitToFinish() error
NotifyDone(err error)
OnEnqueue() error
}
type baseTask struct {
ctx context.Context
core *Core
done chan error
ts Timestamp
id UniqueID
tr *timerecord.TimeRecorder
step typeutil.TaskStep
queueDur time.Duration
}
func newBaseTask(ctx context.Context, core *Core) baseTask {
b := baseTask{
core: core,
done: make(chan error, 1),
tr: timerecord.NewTimeRecorder("ddl request"),
}
b.SetCtx(ctx)
return b
}
func (b *baseTask) SetStep(step typeutil.TaskStep) {
b.step = step
switch step {
case typeutil.TaskStepEnqueue:
b.queueDur = 0
b.tr.Record("enqueue done")
case typeutil.TaskStepPreExecute:
b.queueDur = b.tr.Record("start to process")
}
}
func (b *baseTask) SetCtx(ctx context.Context) {
b.ctx = ctx
}
func (b *baseTask) GetCtx() context.Context {
return b.ctx
}
func (b *baseTask) SetTs(ts Timestamp) {
b.ts = ts
}
func (b *baseTask) GetTs() Timestamp {
return b.ts
}
func (b *baseTask) SetID(id UniqueID) {
b.id = id
}
func (b *baseTask) GetID() UniqueID {
return b.id
}
func (b *baseTask) Prepare(ctx context.Context) error {
b.SetStep(typeutil.TaskStepPreExecute)
return nil
}
func (b *baseTask) Execute(ctx context.Context) error {
b.SetStep(typeutil.TaskStepExecute)
return nil
}
func (b *baseTask) WaitToFinish() error {
return <-b.done
}
func (b *baseTask) NotifyDone(err error) {
b.done <- err
}
func (b *baseTask) OnEnqueue() error {
b.SetStep(typeutil.TaskStepEnqueue)
return nil
}