fix: [2.4] use the object heap to keep the min ddl ts order (#39193)

- issue: #39002
- pr: #39118

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2025-01-14 14:23:05 +08:00 committed by GitHub
parent 8da3ec8a7e
commit 0f4e1d1bf8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 430 additions and 11 deletions

View File

@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type IScheduler interface {
@ -46,6 +47,7 @@ type scheduler struct {
tsoAllocator tso.Allocator
taskChan chan task
taskHeap typeutil.Heap[task]
lock sync.Mutex
@ -56,16 +58,22 @@ type scheduler struct {
lockMapping map[LockLevel]*lock.KeyLock[string]
}
func GetTaskHeapOrder(t task) Timestamp {
return t.GetTs()
}
func newScheduler(ctx context.Context, idAllocator allocator.Interface, tsoAllocator tso.Allocator) *scheduler {
ctx1, cancel := context.WithCancel(ctx)
// TODO
n := 1024 * 10
taskArr := make([]task, 0)
s := &scheduler{
ctx: ctx1,
cancel: cancel,
idAllocator: idAllocator,
tsoAllocator: tsoAllocator,
taskChan: make(chan task, n),
taskHeap: typeutil.NewObjectArrayBasedMinimumHeap[task, Timestamp](taskArr, GetTaskHeapOrder),
minDdlTs: *atomic.NewUint64(0),
clusterLock: lock.NewKeyLock[string](),
databaseLock: lock.NewKeyLock[string](),
@ -93,7 +101,7 @@ func (s *scheduler) Stop() {
}
func (s *scheduler) execute(task task) {
defer s.setMinDdlTs(task.GetTs()) // we should update ts, whatever task succeeds or not.
defer s.setMinDdlTs() // we should update ts, whatever task succeeds or not.
task.SetInQueueDuration()
if err := task.Prepare(task.GetCtx()); err != nil {
task.NotifyDone(err)
@ -153,6 +161,7 @@ func (s *scheduler) setTs(task task) error {
return err
}
task.SetTs(ts)
s.taskHeap.Push(task)
return nil
}
@ -186,8 +195,14 @@ func (s *scheduler) GetMinDdlTs() Timestamp {
return s.minDdlTs.Load()
}
func (s *scheduler) setMinDdlTs(ts Timestamp) {
s.minDdlTs.Store(ts)
func (s *scheduler) setMinDdlTs() {
s.lock.Lock()
defer s.lock.Unlock()
for s.taskHeap.Len() > 0 && s.taskHeap.Peek().IsFinished() {
t := s.taskHeap.Pop()
s.minDdlTs.Store(t.GetTs())
}
}
func (s *scheduler) executeTaskWithLock(task task, lockerKey LockerKey) error {
@ -195,9 +210,12 @@ func (s *scheduler) executeTaskWithLock(task task, lockerKey LockerKey) error {
if err := s.setID(task); err != nil {
return err
}
s.lock.Lock()
if err := s.setTs(task); err != nil {
s.lock.Unlock()
return err
}
s.lock.Unlock()
s.execute(task)
return nil
}

View File

@ -82,6 +82,29 @@ func newMockNormalTask() *mockNormalTask {
return task
}
type mockLockerKeyTask struct {
baseTask
lockerKey string
rw bool
}
func (m *mockLockerKeyTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(m.lockerKey, m.rw),
)
}
func newMockLockerKeyTask(lockerKey string, rw bool) *mockLockerKeyTask {
task := &mockLockerKeyTask{
baseTask: newBaseTask(context.Background(), nil),
lockerKey: lockerKey,
rw: rw,
}
task.SetCtx(context.Background())
return task
}
func Test_scheduler_Start_Stop(t *testing.T) {
idAlloc := newMockIDAllocator()
tsoAlloc := newMockTsoAllocator()
@ -247,6 +270,87 @@ func Test_scheduler_updateDdlMinTsLoop(t *testing.T) {
assert.Zero(t, s.GetMinDdlTs())
s.Stop()
})
t.Run("concurrent task schedule", func(t *testing.T) {
idAlloc := newMockIDAllocator()
tsoAlloc := newMockTsoAllocator()
tso := atomic.NewUint64(100)
idAlloc.AllocOneF = func() (UniqueID, error) {
return 100, nil
}
tsoAlloc.GenerateTSOF = func(count uint32) (uint64, error) {
got := tso.Inc()
return got, nil
}
ctx := context.Background()
s := newScheduler(ctx, idAlloc, tsoAlloc)
paramtable.Init()
paramtable.Get().Save(Params.ProxyCfg.TimeTickInterval.Key, "1")
s.Start()
for i := 0; i < 100; i++ {
if s.GetMinDdlTs() > Timestamp(100) {
break
}
assert.True(t, i < 100)
time.Sleep(time.Millisecond)
}
w := &sync.WaitGroup{}
w.Add(5)
// locker key rw true
lockerKey := "hello"
go func() {
defer w.Done()
n := 200
for i := 0; i < n; i++ {
task := newMockLockerKeyTask(lockerKey, true)
err := s.AddTask(task)
assert.NoError(t, err)
}
}()
// locker key rw false
go func() {
defer w.Done()
n := 200
for i := 0; i < n; i++ {
task := newMockLockerKeyTask(lockerKey, false)
err := s.AddTask(task)
assert.NoError(t, err)
}
}()
go func() {
defer w.Done()
n := 200
for i := 0; i < n; i++ {
task := newMockLockerKeyTask(lockerKey, false)
err := s.AddTask(task)
assert.NoError(t, err)
}
}()
go func() {
defer w.Done()
n := 200
for i := 0; i < n; i++ {
task := newMockNormalTask()
err := s.AddTask(task)
assert.NoError(t, err)
}
}()
lastMin := s.GetMinDdlTs()
go func() {
defer w.Done()
current := s.GetMinDdlTs()
assert.True(t, current >= lastMin)
lastMin = current
time.Sleep(time.Millisecond * 100)
}()
w.Wait()
})
}
type WithLockKeyTask struct {

View File

@ -20,6 +20,7 @@ import (
"context"
"time"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
@ -52,16 +53,18 @@ type task interface {
Execute(ctx context.Context) error
WaitToFinish() error
NotifyDone(err error)
IsFinished() bool
SetInQueueDuration()
GetLockerKey() LockerKey
}
type baseTask struct {
ctx context.Context
core *Core
done chan error
ts Timestamp
id UniqueID
ctx context.Context
core *Core
done chan error
isFinished *atomic.Bool
ts Timestamp
id UniqueID
tr *timerecord.TimeRecorder
queueDur time.Duration
@ -69,9 +72,10 @@ type baseTask struct {
func newBaseTask(ctx context.Context, core *Core) baseTask {
b := baseTask{
core: core,
done: make(chan error, 1),
tr: timerecord.NewTimeRecorderWithTrace(ctx, "new task"),
core: core,
done: make(chan error, 1),
tr: timerecord.NewTimeRecorderWithTrace(ctx, "new task"),
isFinished: atomic.NewBool(false),
}
b.SetCtx(ctx)
return b
@ -115,12 +119,17 @@ func (b *baseTask) WaitToFinish() error {
func (b *baseTask) NotifyDone(err error) {
b.done <- err
b.isFinished.Store(true)
}
func (b *baseTask) SetInQueueDuration() {
b.queueDur = b.tr.ElapseSpan()
}
func (b *baseTask) IsFinished() bool {
return b.isFinished.Load()
}
func (b *baseTask) GetLockerKey() LockerKey {
return nil
}

197
pkg/util/typeutil/heap.go Normal file
View File

@ -0,0 +1,197 @@
package typeutil
import (
"container/heap"
"golang.org/x/exp/constraints"
)
var _ HeapInterface = (*heapArray[int])(nil)
// HeapInterface is the interface that a heap must implement.
type HeapInterface interface {
heap.Interface
Peek() interface{}
}
// Heap is a heap of E.
// Use `golang.org/x/exp/constraints` directly if you want to change any element.
type Heap[E any] interface {
// Len returns the size of the heap.
Len() int
// Push pushes an element onto the heap.
Push(x E)
// Pop returns the element at the top of the heap.
// Panics if the heap is empty.
Pop() E
// Peek returns the element at the top of the heap.
// Panics if the heap is empty.
Peek() E
}
// heapArray is a heap backed by an array.
type heapArray[E constraints.Ordered] []E
// Len returns the length of the heap.
func (h heapArray[E]) Len() int {
return len(h)
}
// Less returns true if the element at index i is less than the element at index j.
func (h heapArray[E]) Less(i, j int) bool {
return h[i] < h[j]
}
// Swap swaps the elements at indexes i and j.
func (h heapArray[E]) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
// Push pushes the last one at len.
func (h *heapArray[E]) Push(x interface{}) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(E))
}
// Pop pop the last one at len.
func (h *heapArray[E]) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// Peek returns the element at the top of the heap.
func (h *heapArray[E]) Peek() interface{} {
return (*h)[0]
}
type objectHeapArray[O any, E constraints.Ordered] struct {
objects []O
getOrderFunc func(O) E
}
func (h *objectHeapArray[O, E]) Len() int {
return len(h.objects)
}
func (h *objectHeapArray[O, E]) Less(i, j int) bool {
return h.getOrderFunc(h.objects[i]) < h.getOrderFunc(h.objects[j])
}
func (h *objectHeapArray[O, E]) Swap(i, j int) {
h.objects[i], h.objects[j] = h.objects[j], h.objects[i]
}
func (h *objectHeapArray[O, E]) Push(x interface{}) {
h.objects = append(h.objects, x.(O))
}
func (h *objectHeapArray[O, E]) Pop() interface{} {
old := h.objects
n := len(old)
x := old[n-1]
h.objects = old[0 : n-1]
return x
}
func (h *objectHeapArray[O, E]) Peek() interface{} {
return h.objects[0]
}
// reverseOrderedInterface is a heap base interface that reverses the order of the elements.
type reverseOrderedInterface[E constraints.Ordered] struct {
HeapInterface
}
// Less returns true if the element at index j is less than the element at index i.
func (r reverseOrderedInterface[E]) Less(i, j int) bool {
return r.HeapInterface.Less(j, i)
}
// NewHeap returns a new heap from a underlying representation.
func NewHeap[E any](inner HeapInterface) Heap[E] {
return &heapImpl[E, HeapInterface]{
inner: inner,
}
}
// NewArrayBasedMaximumHeap returns a new maximum heap.
func NewArrayBasedMaximumHeap[E constraints.Ordered](initial []E) Heap[E] {
ha := heapArray[E](initial)
reverse := reverseOrderedInterface[E]{
HeapInterface: &ha,
}
heap.Init(reverse)
return &heapImpl[E, reverseOrderedInterface[E]]{
inner: reverse,
}
}
// NewArrayBasedMinimumHeap returns a new minimum heap.
func NewArrayBasedMinimumHeap[E constraints.Ordered](initial []E) Heap[E] {
ha := heapArray[E](initial)
heap.Init(&ha)
return &heapImpl[E, *heapArray[E]]{
inner: &ha,
}
}
func NewObjectArrayBasedMaximumHeap[O any, E constraints.Ordered](initial []O, getOrderFunc func(O) E) Heap[O] {
if initial == nil {
initial = make([]O, 0)
}
ha := &objectHeapArray[O, E]{
objects: initial,
getOrderFunc: getOrderFunc,
}
reverse := reverseOrderedInterface[E]{
HeapInterface: ha,
}
heap.Init(reverse)
return &heapImpl[O, reverseOrderedInterface[E]]{
inner: reverse,
}
}
func NewObjectArrayBasedMinimumHeap[O any, E constraints.Ordered](initial []O, getOrderFunc func(O) E) Heap[O] {
if initial == nil {
initial = make([]O, 0)
}
ha := &objectHeapArray[O, E]{
objects: initial,
getOrderFunc: getOrderFunc,
}
heap.Init(ha)
return &heapImpl[O, *objectHeapArray[O, E]]{
inner: ha,
}
}
// heapImpl is a min-heap of E.
type heapImpl[E any, H HeapInterface] struct {
inner H
}
// Len returns the length of the heap.
func (h *heapImpl[E, H]) Len() int {
return h.inner.Len()
}
// Push pushes an element onto the heap.
func (h *heapImpl[E, H]) Push(x E) {
heap.Push(h.inner, x)
}
// Pop pops an element from the heap.
func (h *heapImpl[E, H]) Pop() E {
return heap.Pop(h.inner).(E)
}
// Peek returns the element at the top of the heap.
func (h *heapImpl[E, H]) Peek() E {
return h.inner.Peek().(E)
}

View File

@ -0,0 +1,91 @@
package typeutil
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestMinimumHeap(t *testing.T) {
h := []int{4, 5, 2}
heap := NewArrayBasedMinimumHeap(h)
assert.Equal(t, 2, heap.Peek())
assert.Equal(t, 3, heap.Len())
heap.Push(3)
assert.Equal(t, 2, heap.Peek())
assert.Equal(t, 4, heap.Len())
heap.Push(1)
assert.Equal(t, 1, heap.Peek())
assert.Equal(t, 5, heap.Len())
for i := 1; i <= 5; i++ {
assert.Equal(t, i, heap.Peek())
assert.Equal(t, i, heap.Pop())
}
}
func TestMaximumHeap(t *testing.T) {
h := []int{4, 1, 2}
heap := NewArrayBasedMaximumHeap(h)
assert.Equal(t, 4, heap.Peek())
assert.Equal(t, 3, heap.Len())
heap.Push(3)
assert.Equal(t, 4, heap.Peek())
assert.Equal(t, 4, heap.Len())
heap.Push(5)
assert.Equal(t, 5, heap.Peek())
assert.Equal(t, 5, heap.Len())
for i := 5; i >= 1; i-- {
assert.Equal(t, i, heap.Peek())
assert.Equal(t, i, heap.Pop())
}
}
type FooHeapObject struct {
value int
}
func GetFooHeapObjectOrderFunc(obj *FooHeapObject) int {
return obj.value
}
func TestMinimumObjectHeap(t *testing.T) {
h := []*FooHeapObject{
{value: 4},
{value: 5},
{value: 2},
}
heap := NewObjectArrayBasedMinimumHeap(h, GetFooHeapObjectOrderFunc)
assert.Equal(t, 2, heap.Peek().value)
assert.Equal(t, 3, heap.Len())
heap.Push(&FooHeapObject{value: 3})
assert.Equal(t, 2, heap.Peek().value)
assert.Equal(t, 4, heap.Len())
heap.Push(&FooHeapObject{value: 1})
assert.Equal(t, 1, heap.Peek().value)
assert.Equal(t, 5, heap.Len())
for i := 1; i <= 5; i++ {
assert.Equal(t, i, heap.Peek().value)
assert.Equal(t, i, heap.Pop().value)
}
}
func TestMaximumObjectHeap(t *testing.T) {
h := []*FooHeapObject{
{value: 4},
{value: 1},
{value: 2},
}
heap := NewObjectArrayBasedMaximumHeap(h, GetFooHeapObjectOrderFunc)
assert.Equal(t, 4, heap.Peek().value)
assert.Equal(t, 3, heap.Len())
heap.Push(&FooHeapObject{value: 3})
assert.Equal(t, 4, heap.Peek().value)
assert.Equal(t, 4, heap.Len())
heap.Push(&FooHeapObject{value: 5})
assert.Equal(t, 5, heap.Peek().value)
assert.Equal(t, 5, heap.Len())
for i := 5; i >= 1; i-- {
assert.Equal(t, i, heap.Peek().value)
assert.Equal(t, i, heap.Pop().value)
}
}