diff --git a/internal/rootcoord/scheduler.go b/internal/rootcoord/scheduler.go index 1f5e3d0e47..9ce202a6eb 100644 --- a/internal/rootcoord/scheduler.go +++ b/internal/rootcoord/scheduler.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus/internal/tso" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/lock" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type IScheduler interface { @@ -46,6 +47,7 @@ type scheduler struct { tsoAllocator tso.Allocator taskChan chan task + taskHeap typeutil.Heap[task] lock sync.Mutex @@ -56,16 +58,22 @@ type scheduler struct { lockMapping map[LockLevel]*lock.KeyLock[string] } +func GetTaskHeapOrder(t task) Timestamp { + return t.GetTs() +} + func newScheduler(ctx context.Context, idAllocator allocator.Interface, tsoAllocator tso.Allocator) *scheduler { ctx1, cancel := context.WithCancel(ctx) // TODO n := 1024 * 10 + taskArr := make([]task, 0) s := &scheduler{ ctx: ctx1, cancel: cancel, idAllocator: idAllocator, tsoAllocator: tsoAllocator, taskChan: make(chan task, n), + taskHeap: typeutil.NewObjectArrayBasedMinimumHeap[task, Timestamp](taskArr, GetTaskHeapOrder), minDdlTs: *atomic.NewUint64(0), clusterLock: lock.NewKeyLock[string](), databaseLock: lock.NewKeyLock[string](), @@ -93,7 +101,7 @@ func (s *scheduler) Stop() { } func (s *scheduler) execute(task task) { - defer s.setMinDdlTs(task.GetTs()) // we should update ts, whatever task succeeds or not. + defer s.setMinDdlTs() // we should update ts, whatever task succeeds or not. task.SetInQueueDuration() if err := task.Prepare(task.GetCtx()); err != nil { task.NotifyDone(err) @@ -153,6 +161,7 @@ func (s *scheduler) setTs(task task) error { return err } task.SetTs(ts) + s.taskHeap.Push(task) return nil } @@ -186,8 +195,14 @@ func (s *scheduler) GetMinDdlTs() Timestamp { return s.minDdlTs.Load() } -func (s *scheduler) setMinDdlTs(ts Timestamp) { - s.minDdlTs.Store(ts) +func (s *scheduler) setMinDdlTs() { + s.lock.Lock() + defer s.lock.Unlock() + + for s.taskHeap.Len() > 0 && s.taskHeap.Peek().IsFinished() { + t := s.taskHeap.Pop() + s.minDdlTs.Store(t.GetTs()) + } } func (s *scheduler) executeTaskWithLock(task task, lockerKey LockerKey) error { @@ -195,9 +210,12 @@ func (s *scheduler) executeTaskWithLock(task task, lockerKey LockerKey) error { if err := s.setID(task); err != nil { return err } + s.lock.Lock() if err := s.setTs(task); err != nil { + s.lock.Unlock() return err } + s.lock.Unlock() s.execute(task) return nil } diff --git a/internal/rootcoord/scheduler_test.go b/internal/rootcoord/scheduler_test.go index 99c0806baf..a5f782d0dc 100644 --- a/internal/rootcoord/scheduler_test.go +++ b/internal/rootcoord/scheduler_test.go @@ -82,6 +82,29 @@ func newMockNormalTask() *mockNormalTask { return task } +type mockLockerKeyTask struct { + baseTask + lockerKey string + rw bool +} + +func (m *mockLockerKeyTask) GetLockerKey() LockerKey { + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(m.lockerKey, m.rw), + ) +} + +func newMockLockerKeyTask(lockerKey string, rw bool) *mockLockerKeyTask { + task := &mockLockerKeyTask{ + baseTask: newBaseTask(context.Background(), nil), + lockerKey: lockerKey, + rw: rw, + } + task.SetCtx(context.Background()) + return task +} + func Test_scheduler_Start_Stop(t *testing.T) { idAlloc := newMockIDAllocator() tsoAlloc := newMockTsoAllocator() @@ -247,6 +270,87 @@ func Test_scheduler_updateDdlMinTsLoop(t *testing.T) { assert.Zero(t, s.GetMinDdlTs()) s.Stop() }) + + t.Run("concurrent task schedule", func(t *testing.T) { + idAlloc := newMockIDAllocator() + tsoAlloc := newMockTsoAllocator() + tso := atomic.NewUint64(100) + idAlloc.AllocOneF = func() (UniqueID, error) { + return 100, nil + } + tsoAlloc.GenerateTSOF = func(count uint32) (uint64, error) { + got := tso.Inc() + return got, nil + } + ctx := context.Background() + s := newScheduler(ctx, idAlloc, tsoAlloc) + paramtable.Init() + paramtable.Get().Save(Params.ProxyCfg.TimeTickInterval.Key, "1") + s.Start() + + for i := 0; i < 100; i++ { + if s.GetMinDdlTs() > Timestamp(100) { + break + } + assert.True(t, i < 100) + time.Sleep(time.Millisecond) + } + + w := &sync.WaitGroup{} + w.Add(5) + // locker key rw true + lockerKey := "hello" + go func() { + defer w.Done() + n := 200 + for i := 0; i < n; i++ { + task := newMockLockerKeyTask(lockerKey, true) + err := s.AddTask(task) + assert.NoError(t, err) + } + }() + + // locker key rw false + go func() { + defer w.Done() + n := 200 + for i := 0; i < n; i++ { + task := newMockLockerKeyTask(lockerKey, false) + err := s.AddTask(task) + assert.NoError(t, err) + } + }() + + go func() { + defer w.Done() + n := 200 + for i := 0; i < n; i++ { + task := newMockLockerKeyTask(lockerKey, false) + err := s.AddTask(task) + assert.NoError(t, err) + } + }() + + go func() { + defer w.Done() + n := 200 + for i := 0; i < n; i++ { + task := newMockNormalTask() + err := s.AddTask(task) + assert.NoError(t, err) + } + }() + + lastMin := s.GetMinDdlTs() + go func() { + defer w.Done() + current := s.GetMinDdlTs() + assert.True(t, current >= lastMin) + lastMin = current + time.Sleep(time.Millisecond * 100) + }() + w.Wait() + }) } type WithLockKeyTask struct { diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index 4b0927b3bf..580a99e671 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -20,6 +20,7 @@ import ( "context" "time" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/log" @@ -52,16 +53,18 @@ type task interface { Execute(ctx context.Context) error WaitToFinish() error NotifyDone(err error) + IsFinished() bool SetInQueueDuration() GetLockerKey() LockerKey } type baseTask struct { - ctx context.Context - core *Core - done chan error - ts Timestamp - id UniqueID + ctx context.Context + core *Core + done chan error + isFinished *atomic.Bool + ts Timestamp + id UniqueID tr *timerecord.TimeRecorder queueDur time.Duration @@ -69,9 +72,10 @@ type baseTask struct { func newBaseTask(ctx context.Context, core *Core) baseTask { b := baseTask{ - core: core, - done: make(chan error, 1), - tr: timerecord.NewTimeRecorderWithTrace(ctx, "new task"), + core: core, + done: make(chan error, 1), + tr: timerecord.NewTimeRecorderWithTrace(ctx, "new task"), + isFinished: atomic.NewBool(false), } b.SetCtx(ctx) return b @@ -115,12 +119,17 @@ func (b *baseTask) WaitToFinish() error { func (b *baseTask) NotifyDone(err error) { b.done <- err + b.isFinished.Store(true) } func (b *baseTask) SetInQueueDuration() { b.queueDur = b.tr.ElapseSpan() } +func (b *baseTask) IsFinished() bool { + return b.isFinished.Load() +} + func (b *baseTask) GetLockerKey() LockerKey { return nil } diff --git a/pkg/util/typeutil/heap.go b/pkg/util/typeutil/heap.go new file mode 100644 index 0000000000..94a9ab0840 --- /dev/null +++ b/pkg/util/typeutil/heap.go @@ -0,0 +1,197 @@ +package typeutil + +import ( + "container/heap" + + "golang.org/x/exp/constraints" +) + +var _ HeapInterface = (*heapArray[int])(nil) + +// HeapInterface is the interface that a heap must implement. +type HeapInterface interface { + heap.Interface + Peek() interface{} +} + +// Heap is a heap of E. +// Use `golang.org/x/exp/constraints` directly if you want to change any element. +type Heap[E any] interface { + // Len returns the size of the heap. + Len() int + + // Push pushes an element onto the heap. + Push(x E) + + // Pop returns the element at the top of the heap. + // Panics if the heap is empty. + Pop() E + + // Peek returns the element at the top of the heap. + // Panics if the heap is empty. + Peek() E +} + +// heapArray is a heap backed by an array. +type heapArray[E constraints.Ordered] []E + +// Len returns the length of the heap. +func (h heapArray[E]) Len() int { + return len(h) +} + +// Less returns true if the element at index i is less than the element at index j. +func (h heapArray[E]) Less(i, j int) bool { + return h[i] < h[j] +} + +// Swap swaps the elements at indexes i and j. +func (h heapArray[E]) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +// Push pushes the last one at len. +func (h *heapArray[E]) Push(x interface{}) { + // Push and Pop use pointer receivers because they modify the slice's length, + // not just its contents. + *h = append(*h, x.(E)) +} + +// Pop pop the last one at len. +func (h *heapArray[E]) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +// Peek returns the element at the top of the heap. +func (h *heapArray[E]) Peek() interface{} { + return (*h)[0] +} + +type objectHeapArray[O any, E constraints.Ordered] struct { + objects []O + getOrderFunc func(O) E +} + +func (h *objectHeapArray[O, E]) Len() int { + return len(h.objects) +} + +func (h *objectHeapArray[O, E]) Less(i, j int) bool { + return h.getOrderFunc(h.objects[i]) < h.getOrderFunc(h.objects[j]) +} + +func (h *objectHeapArray[O, E]) Swap(i, j int) { + h.objects[i], h.objects[j] = h.objects[j], h.objects[i] +} + +func (h *objectHeapArray[O, E]) Push(x interface{}) { + h.objects = append(h.objects, x.(O)) +} + +func (h *objectHeapArray[O, E]) Pop() interface{} { + old := h.objects + n := len(old) + x := old[n-1] + h.objects = old[0 : n-1] + return x +} + +func (h *objectHeapArray[O, E]) Peek() interface{} { + return h.objects[0] +} + +// reverseOrderedInterface is a heap base interface that reverses the order of the elements. +type reverseOrderedInterface[E constraints.Ordered] struct { + HeapInterface +} + +// Less returns true if the element at index j is less than the element at index i. +func (r reverseOrderedInterface[E]) Less(i, j int) bool { + return r.HeapInterface.Less(j, i) +} + +// NewHeap returns a new heap from a underlying representation. +func NewHeap[E any](inner HeapInterface) Heap[E] { + return &heapImpl[E, HeapInterface]{ + inner: inner, + } +} + +// NewArrayBasedMaximumHeap returns a new maximum heap. +func NewArrayBasedMaximumHeap[E constraints.Ordered](initial []E) Heap[E] { + ha := heapArray[E](initial) + reverse := reverseOrderedInterface[E]{ + HeapInterface: &ha, + } + heap.Init(reverse) + return &heapImpl[E, reverseOrderedInterface[E]]{ + inner: reverse, + } +} + +// NewArrayBasedMinimumHeap returns a new minimum heap. +func NewArrayBasedMinimumHeap[E constraints.Ordered](initial []E) Heap[E] { + ha := heapArray[E](initial) + heap.Init(&ha) + return &heapImpl[E, *heapArray[E]]{ + inner: &ha, + } +} + +func NewObjectArrayBasedMaximumHeap[O any, E constraints.Ordered](initial []O, getOrderFunc func(O) E) Heap[O] { + if initial == nil { + initial = make([]O, 0) + } + ha := &objectHeapArray[O, E]{ + objects: initial, + getOrderFunc: getOrderFunc, + } + reverse := reverseOrderedInterface[E]{ + HeapInterface: ha, + } + heap.Init(reverse) + return &heapImpl[O, reverseOrderedInterface[E]]{ + inner: reverse, + } +} + +func NewObjectArrayBasedMinimumHeap[O any, E constraints.Ordered](initial []O, getOrderFunc func(O) E) Heap[O] { + if initial == nil { + initial = make([]O, 0) + } + ha := &objectHeapArray[O, E]{ + objects: initial, + getOrderFunc: getOrderFunc, + } + heap.Init(ha) + return &heapImpl[O, *objectHeapArray[O, E]]{ + inner: ha, + } +} + +// heapImpl is a min-heap of E. +type heapImpl[E any, H HeapInterface] struct { + inner H +} + +// Len returns the length of the heap. +func (h *heapImpl[E, H]) Len() int { + return h.inner.Len() +} + +// Push pushes an element onto the heap. +func (h *heapImpl[E, H]) Push(x E) { + heap.Push(h.inner, x) +} + +// Pop pops an element from the heap. +func (h *heapImpl[E, H]) Pop() E { + return heap.Pop(h.inner).(E) +} + +// Peek returns the element at the top of the heap. +func (h *heapImpl[E, H]) Peek() E { + return h.inner.Peek().(E) +} diff --git a/pkg/util/typeutil/heap_test.go b/pkg/util/typeutil/heap_test.go new file mode 100644 index 0000000000..c4d6774102 --- /dev/null +++ b/pkg/util/typeutil/heap_test.go @@ -0,0 +1,91 @@ +package typeutil + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMinimumHeap(t *testing.T) { + h := []int{4, 5, 2} + heap := NewArrayBasedMinimumHeap(h) + assert.Equal(t, 2, heap.Peek()) + assert.Equal(t, 3, heap.Len()) + heap.Push(3) + assert.Equal(t, 2, heap.Peek()) + assert.Equal(t, 4, heap.Len()) + heap.Push(1) + assert.Equal(t, 1, heap.Peek()) + assert.Equal(t, 5, heap.Len()) + for i := 1; i <= 5; i++ { + assert.Equal(t, i, heap.Peek()) + assert.Equal(t, i, heap.Pop()) + } +} + +func TestMaximumHeap(t *testing.T) { + h := []int{4, 1, 2} + heap := NewArrayBasedMaximumHeap(h) + assert.Equal(t, 4, heap.Peek()) + assert.Equal(t, 3, heap.Len()) + heap.Push(3) + assert.Equal(t, 4, heap.Peek()) + assert.Equal(t, 4, heap.Len()) + heap.Push(5) + assert.Equal(t, 5, heap.Peek()) + assert.Equal(t, 5, heap.Len()) + for i := 5; i >= 1; i-- { + assert.Equal(t, i, heap.Peek()) + assert.Equal(t, i, heap.Pop()) + } +} + +type FooHeapObject struct { + value int +} + +func GetFooHeapObjectOrderFunc(obj *FooHeapObject) int { + return obj.value +} + +func TestMinimumObjectHeap(t *testing.T) { + h := []*FooHeapObject{ + {value: 4}, + {value: 5}, + {value: 2}, + } + heap := NewObjectArrayBasedMinimumHeap(h, GetFooHeapObjectOrderFunc) + assert.Equal(t, 2, heap.Peek().value) + assert.Equal(t, 3, heap.Len()) + heap.Push(&FooHeapObject{value: 3}) + assert.Equal(t, 2, heap.Peek().value) + assert.Equal(t, 4, heap.Len()) + heap.Push(&FooHeapObject{value: 1}) + assert.Equal(t, 1, heap.Peek().value) + assert.Equal(t, 5, heap.Len()) + for i := 1; i <= 5; i++ { + assert.Equal(t, i, heap.Peek().value) + assert.Equal(t, i, heap.Pop().value) + } +} + +func TestMaximumObjectHeap(t *testing.T) { + h := []*FooHeapObject{ + {value: 4}, + {value: 1}, + {value: 2}, + } + heap := NewObjectArrayBasedMaximumHeap(h, GetFooHeapObjectOrderFunc) + assert.Equal(t, 4, heap.Peek().value) + assert.Equal(t, 3, heap.Len()) + heap.Push(&FooHeapObject{value: 3}) + assert.Equal(t, 4, heap.Peek().value) + assert.Equal(t, 4, heap.Len()) + heap.Push(&FooHeapObject{value: 5}) + assert.Equal(t, 5, heap.Peek().value) + assert.Equal(t, 5, heap.Len()) + for i := 5; i >= 1; i-- { + assert.Equal(t, i, heap.Peek().value) + assert.Equal(t, i, heap.Pop().value) + } +}