mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
feat: adding grouped virtual resource allocator (#33669)
See #33559 --------- Signed-off-by: Ted Xu <ted.xu@zilliz.com>
This commit is contained in:
parent
03a5f7e6c0
commit
958ecd500b
@ -25,6 +25,8 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
)
|
||||
|
||||
var zero = &Resource{0, 0, 0}
|
||||
|
||||
type Resource struct {
|
||||
Memory int64 // Memory occupation in bytes
|
||||
CPU int64 // CPU in cycles per second
|
||||
@ -63,13 +65,22 @@ func (r Resource) Le(limit *Resource) bool {
|
||||
type Allocator[T comparable] interface {
|
||||
// Allocate allocates the resource, returns true if the resource is allocated. If allocation failed, returns the short resource.
|
||||
// The short resource is a positive value, e.g., if there is additional 8 bytes in disk needed, returns (0, 0, 8).
|
||||
// Allocate on identical id is not allowed, in which case it returns (false, nil). Use #Reallocate instead.
|
||||
Allocate(id T, r *Resource) (allocated bool, short *Resource)
|
||||
// Reallocate re-allocates the resource on given id with delta resource. Delta can be negative, in which case the resource is released.
|
||||
// If delta is negative and the allocated resource is less than the delta, returns (false, nil).
|
||||
Reallocate(id T, delta *Resource) (allocated bool, short *Resource)
|
||||
// Release releases the resource
|
||||
Release(id T)
|
||||
Release(id T) *Resource
|
||||
// Used returns the used resource
|
||||
Used() Resource
|
||||
// Wait waits for new release. Releases could be initiated by #Release or #Reallocate.
|
||||
Wait()
|
||||
// Inspect returns the allocated resources
|
||||
Inspect() map[T]*Resource
|
||||
|
||||
// notify notifies the waiters.
|
||||
notify()
|
||||
}
|
||||
|
||||
type FixedSizeAllocator[T comparable] struct {
|
||||
@ -78,17 +89,23 @@ type FixedSizeAllocator[T comparable] struct {
|
||||
lock sync.RWMutex
|
||||
used Resource
|
||||
allocs map[T]*Resource
|
||||
cond sync.Cond
|
||||
}
|
||||
|
||||
func (a *FixedSizeAllocator[T]) Allocate(id T, r *Resource) (allocated bool, short *Resource) {
|
||||
if r.Le(zero) {
|
||||
return false, nil
|
||||
}
|
||||
a.lock.Lock()
|
||||
defer a.lock.Unlock()
|
||||
if a.used.Add(r).Le(a.limit) {
|
||||
|
||||
_, ok := a.allocs[id]
|
||||
if ok {
|
||||
// Re-allocate on identical id is not allowed
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if a.used.Add(r).Le(a.limit) {
|
||||
a.allocs[id] = r
|
||||
return true, nil
|
||||
}
|
||||
@ -97,15 +114,47 @@ func (a *FixedSizeAllocator[T]) Allocate(id T, r *Resource) (allocated bool, sho
|
||||
return false, short
|
||||
}
|
||||
|
||||
func (a *FixedSizeAllocator[T]) Release(id T) {
|
||||
func (a *FixedSizeAllocator[T]) Reallocate(id T, delta *Resource) (allocated bool, short *Resource) {
|
||||
a.lock.Lock()
|
||||
r, ok := a.allocs[id]
|
||||
a.lock.Unlock()
|
||||
|
||||
if !ok {
|
||||
return a.Allocate(id, delta)
|
||||
}
|
||||
|
||||
a.lock.Lock()
|
||||
defer a.lock.Unlock()
|
||||
r.Add(delta)
|
||||
if !zero.Le(r) {
|
||||
r.Sub(delta)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if a.used.Add(delta).Le(a.limit) {
|
||||
if !zero.Le(delta) {
|
||||
// If delta is negative, notify waiters
|
||||
a.notify()
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
short = a.used.Diff(a.limit)
|
||||
r.Sub(delta)
|
||||
a.used.Sub(delta)
|
||||
return false, short
|
||||
}
|
||||
|
||||
func (a *FixedSizeAllocator[T]) Release(id T) *Resource {
|
||||
a.lock.Lock()
|
||||
defer a.lock.Unlock()
|
||||
r, ok := a.allocs[id]
|
||||
if !ok {
|
||||
return
|
||||
return zero
|
||||
}
|
||||
delete(a.allocs, id)
|
||||
a.used.Sub(r)
|
||||
a.notify()
|
||||
return r
|
||||
}
|
||||
|
||||
func (a *FixedSizeAllocator[T]) Used() Resource {
|
||||
@ -120,14 +169,26 @@ func (a *FixedSizeAllocator[T]) Inspect() map[T]*Resource {
|
||||
return maps.Clone(a.allocs)
|
||||
}
|
||||
|
||||
func (a *FixedSizeAllocator[T]) Wait() {
|
||||
a.cond.L.Lock()
|
||||
a.cond.Wait()
|
||||
a.cond.L.Unlock()
|
||||
}
|
||||
|
||||
func (a *FixedSizeAllocator[T]) notify() {
|
||||
a.cond.Broadcast()
|
||||
}
|
||||
|
||||
func NewFixedSizeAllocator[T comparable](limit *Resource) *FixedSizeAllocator[T] {
|
||||
return &FixedSizeAllocator[T]{
|
||||
limit: limit,
|
||||
allocs: make(map[T]*Resource),
|
||||
cond: sync.Cond{L: &sync.Mutex{}},
|
||||
}
|
||||
}
|
||||
|
||||
// PhysicalAwareFixedSizeAllocator allocates resources with additional consideration of physical resource usage.
|
||||
// Note: wait on PhysicalAwareFixedSizeAllocator may only be notified if there is virtual resource released.
|
||||
type PhysicalAwareFixedSizeAllocator[T comparable] struct {
|
||||
FixedSizeAllocator[T]
|
||||
|
||||
@ -155,6 +216,23 @@ func (a *PhysicalAwareFixedSizeAllocator[T]) Allocate(id T, r *Resource) (alloca
|
||||
return false, expected.Diff(a.hwLimit)
|
||||
}
|
||||
|
||||
func (a *PhysicalAwareFixedSizeAllocator[T]) Reallocate(id T, delta *Resource) (allocated bool, short *Resource) {
|
||||
memoryUsage := int64(hardware.GetUsedMemoryCount())
|
||||
diskUsage := int64(0)
|
||||
if usageStats, err := disk.Usage(a.dir); err != nil {
|
||||
diskUsage = int64(usageStats.Used)
|
||||
}
|
||||
|
||||
expected := &Resource{
|
||||
Memory: a.Used().Memory + delta.Memory + memoryUsage,
|
||||
Disk: a.Used().Disk + delta.Disk + diskUsage,
|
||||
}
|
||||
if expected.Le(a.hwLimit) {
|
||||
return a.FixedSizeAllocator.Reallocate(id, delta)
|
||||
}
|
||||
return false, expected.Diff(a.hwLimit)
|
||||
}
|
||||
|
||||
func NewPhysicalAwareFixedSizeAllocator[T comparable](limit *Resource, hwMemoryLimit, hwDiskLimit int64, dir string) *PhysicalAwareFixedSizeAllocator[T] {
|
||||
return &PhysicalAwareFixedSizeAllocator[T]{
|
||||
FixedSizeAllocator: FixedSizeAllocator[T]{
|
||||
|
||||
@ -22,13 +22,21 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
)
|
||||
|
||||
func inspect[T comparable](a Allocator[T]) {
|
||||
m := a.Inspect()
|
||||
log.Info("Allocation", zap.Any("allocations", m), zap.Any("used", a.Used()))
|
||||
}
|
||||
|
||||
func TestFixedSizeAllocator(t *testing.T) {
|
||||
a := NewFixedSizeAllocator[string](&Resource{100, 100, 100})
|
||||
|
||||
// Allocate
|
||||
allocated, _ := a.Allocate("a1", &Resource{10, 10, 10})
|
||||
assert.Equal(t, true, allocated)
|
||||
allocated, _ = a.Allocate("a2", &Resource{90, 90, 90})
|
||||
@ -36,13 +44,36 @@ func TestFixedSizeAllocator(t *testing.T) {
|
||||
allocated, short := a.Allocate("a3", &Resource{10, 0, 0})
|
||||
assert.Equal(t, false, allocated)
|
||||
assert.Equal(t, &Resource{10, 0, 0}, short)
|
||||
allocated, _ = a.Allocate("a0", &Resource{-10, 0, 0})
|
||||
assert.Equal(t, false, allocated)
|
||||
inspect[string](a)
|
||||
|
||||
// Release
|
||||
a.Release("a2")
|
||||
allocated, _ = a.Allocate("a3", &Resource{10, 0, 0})
|
||||
assert.Equal(t, true, allocated)
|
||||
|
||||
// Inspect
|
||||
m := a.Inspect()
|
||||
assert.Equal(t, 2, len(m))
|
||||
|
||||
// Allocate on identical id is not allowed
|
||||
allocated, _ = a.Allocate("a1", &Resource{10, 0, 0})
|
||||
assert.Equal(t, false, allocated)
|
||||
|
||||
// Reallocate
|
||||
allocated, _ = a.Reallocate("a1", &Resource{10, 0, 0})
|
||||
assert.Equal(t, true, allocated)
|
||||
allocated, _ = a.Reallocate("a1", &Resource{-10, 0, 0})
|
||||
assert.Equal(t, true, allocated)
|
||||
allocated, _ = a.Reallocate("a1", &Resource{-20, 0, 0})
|
||||
assert.Equal(t, false, allocated)
|
||||
allocated, _ = a.Reallocate("a1", &Resource{80, 0, 0})
|
||||
assert.Equal(t, true, allocated)
|
||||
allocated, _ = a.Reallocate("a1", &Resource{10, 0, 0})
|
||||
assert.Equal(t, false, allocated)
|
||||
allocated, _ = a.Reallocate("a4", &Resource{0, 10, 0})
|
||||
assert.Equal(t, true, allocated)
|
||||
}
|
||||
|
||||
func TestFixedSizeAllocatorRace(t *testing.T) {
|
||||
@ -61,6 +92,28 @@ func TestFixedSizeAllocatorRace(t *testing.T) {
|
||||
assert.Equal(t, 100, len(m))
|
||||
}
|
||||
|
||||
func TestWait(t *testing.T) {
|
||||
a := NewFixedSizeAllocator[string](&Resource{100, 100, 100})
|
||||
allocated, _ := a.Allocate("a1", &Resource{100, 100, 100})
|
||||
assert.True(t, allocated)
|
||||
for i := 0; i < 100; i++ {
|
||||
go func(index int) {
|
||||
allocated, _ := a.Reallocate("a1", &Resource{-1, -1, -1})
|
||||
assert.Equal(t, true, allocated)
|
||||
}(i)
|
||||
}
|
||||
|
||||
allocated, _ = a.Allocate("a2", &Resource{100, 100, 100})
|
||||
i := 1
|
||||
for !allocated {
|
||||
a.Wait()
|
||||
allocated, _ = a.Allocate("a2", &Resource{100, 100, 100})
|
||||
i++
|
||||
}
|
||||
assert.True(t, allocated)
|
||||
assert.True(t, i < 100 && i > 1)
|
||||
}
|
||||
|
||||
func TestPhysicalAwareFixedSizeAllocator(t *testing.T) {
|
||||
hwMemoryLimit := int64(float32(hardware.GetMemoryCount()) * 0.9)
|
||||
hwDiskLimit := int64(1<<63 - 1)
|
||||
@ -73,4 +126,10 @@ func TestPhysicalAwareFixedSizeAllocator(t *testing.T) {
|
||||
allocated, short := a.Allocate("a3", &Resource{10, 0, 0})
|
||||
assert.Equal(t, false, allocated)
|
||||
assert.Equal(t, &Resource{10, 0, 0}, short)
|
||||
|
||||
// Reallocate
|
||||
allocated, _ = a.Reallocate("a1", &Resource{0, -10, 0})
|
||||
assert.True(t, allocated)
|
||||
allocated, _ = a.Reallocate("a1", &Resource{10, 0, 0})
|
||||
assert.False(t, allocated)
|
||||
}
|
||||
|
||||
147
pkg/util/vralloc/sharedalloc.go
Normal file
147
pkg/util/vralloc/sharedalloc.go
Normal file
@ -0,0 +1,147 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package vralloc
|
||||
|
||||
type SharedAllocator struct {
|
||||
Allocator[string]
|
||||
parent *GroupedAllocator
|
||||
name string
|
||||
}
|
||||
|
||||
// GroupedAllocator is a shared allocator that can be grouped with other shared allocators. The sum of used resources of all
|
||||
// children should not exceed the limit.
|
||||
type GroupedAllocator struct {
|
||||
SharedAllocator
|
||||
name string
|
||||
children map[string]Allocator[string]
|
||||
}
|
||||
|
||||
// Allocate allocates the resource, returns true if the resource is allocated. If allocation failed, returns the short resource.
|
||||
// The short resource is a positive value, e.g., if there is additional 8 bytes in disk needed, returns (0, 0, 8).
|
||||
func (sa *SharedAllocator) Allocate(id string, r *Resource) (allocated bool, short *Resource) {
|
||||
allocated, short = sa.Allocator.Allocate(id, r)
|
||||
if !allocated {
|
||||
return
|
||||
}
|
||||
if sa.parent != nil {
|
||||
allocated, short = sa.parent.Reallocate(sa.name, r) // Ask for allocation on self name.
|
||||
if !allocated {
|
||||
sa.Allocator.Release(id)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Reallocate re-allocates the resource on given id with delta resource. Delta can be negative, in which case the resource is released.
|
||||
// If delta is negative and the allocated resource is less than the delta, returns (false, nil).
|
||||
func (sa *SharedAllocator) Reallocate(id string, delta *Resource) (allocated bool, short *Resource) {
|
||||
allocated, short = sa.Allocator.Reallocate(id, delta)
|
||||
if !allocated {
|
||||
return
|
||||
}
|
||||
if sa.parent != nil {
|
||||
allocated, short = sa.parent.Reallocate(sa.name, delta)
|
||||
if !allocated {
|
||||
sa.Allocator.Reallocate(id, zero.Diff(delta))
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Release releases the resource
|
||||
func (sa *SharedAllocator) Release(id string) *Resource {
|
||||
r := sa.Allocator.Release(id)
|
||||
if sa.parent != nil {
|
||||
sa.parent.Reallocate(sa.name, zero.Diff(r))
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
// Allocate allocates the resource, returns true if the resource is allocated. If allocation failed, returns the short resource.
|
||||
// The short resource is a positive value, e.g., if there is additional 8 bytes in disk needed, returns (0, 0, 8).
|
||||
// Allocate on identical id is not allowed, in which case it returns (false, nil). Use #Reallocate instead.
|
||||
func (ga *GroupedAllocator) Allocate(id string, r *Resource) (allocated bool, short *Resource) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Release releases the resource
|
||||
func (ga *GroupedAllocator) Release(id string) *Resource {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ga *GroupedAllocator) Reallocate(id string, delta *Resource) (allocated bool, short *Resource) {
|
||||
allocated, short = ga.SharedAllocator.Reallocate(id, delta)
|
||||
if allocated {
|
||||
// Propagate to parent.
|
||||
if ga.parent != nil {
|
||||
allocated, short = ga.parent.Reallocate(ga.name, delta)
|
||||
if !allocated {
|
||||
ga.SharedAllocator.Reallocate(id, zero.Diff(delta))
|
||||
return
|
||||
}
|
||||
}
|
||||
// Notify siblings of id.
|
||||
for name := range ga.children {
|
||||
if name != id {
|
||||
ga.children[name].notify()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (ga *GroupedAllocator) GetAllocator(name string) Allocator[string] {
|
||||
return ga.children[name]
|
||||
}
|
||||
|
||||
type GroupedAllocatorBuilder struct {
|
||||
ga GroupedAllocator
|
||||
}
|
||||
|
||||
func NewGroupedAllocatorBuilder(name string, limit *Resource) *GroupedAllocatorBuilder {
|
||||
return &GroupedAllocatorBuilder{
|
||||
ga: GroupedAllocator{
|
||||
SharedAllocator: SharedAllocator{
|
||||
Allocator: NewFixedSizeAllocator[string](limit),
|
||||
name: name,
|
||||
},
|
||||
name: name,
|
||||
children: make(map[string]Allocator[string]),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (b *GroupedAllocatorBuilder) AddChild(name string, limit *Resource) *GroupedAllocatorBuilder {
|
||||
b.ga.children[name] = &SharedAllocator{
|
||||
Allocator: NewFixedSizeAllocator[string](limit),
|
||||
parent: &b.ga,
|
||||
name: name,
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *GroupedAllocatorBuilder) AddChildGroup(allocator *GroupedAllocator) *GroupedAllocatorBuilder {
|
||||
allocator.parent = &b.ga
|
||||
b.ga.children[allocator.name] = allocator
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *GroupedAllocatorBuilder) Build() *GroupedAllocator {
|
||||
return &b.ga
|
||||
}
|
||||
100
pkg/util/vralloc/sharedalloc_test.go
Normal file
100
pkg/util/vralloc/sharedalloc_test.go
Normal file
@ -0,0 +1,100 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package vralloc
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestGroupedAllocator(t *testing.T) {
|
||||
t.Run("test allocator", func(t *testing.T) {
|
||||
a := NewGroupedAllocatorBuilder("a", &Resource{100, 100, 100}).
|
||||
AddChild("c1", &Resource{10, 10, 10}).
|
||||
AddChild("c2", &Resource{10, 10, 10}).
|
||||
AddChild("c3", &Resource{90, 90, 90}).
|
||||
Build()
|
||||
|
||||
c1 := a.GetAllocator("c1")
|
||||
c2 := a.GetAllocator("c2")
|
||||
c3 := a.GetAllocator("c3")
|
||||
|
||||
// Allocate
|
||||
allocated, _ := c1.Allocate("x11", &Resource{10, 10, 10})
|
||||
assert.Equal(t, true, allocated)
|
||||
allocated, short := c1.Allocate("x12", &Resource{90, 90, 90})
|
||||
assert.Equal(t, false, allocated)
|
||||
assert.Equal(t, &Resource{90, 90, 90}, short)
|
||||
allocated, _ = c2.Allocate("x21", &Resource{10, 10, 10})
|
||||
assert.Equal(t, true, allocated)
|
||||
allocated, short = c3.Allocate("x31", &Resource{90, 90, 90})
|
||||
assert.Equal(t, false, allocated)
|
||||
assert.Equal(t, &Resource{10, 10, 10}, short)
|
||||
inspect[string](a)
|
||||
|
||||
// Release
|
||||
c1.Release("x11")
|
||||
allocated, _ = c3.Allocate("x31", &Resource{90, 90, 90})
|
||||
assert.Equal(t, true, allocated)
|
||||
|
||||
// Inspect
|
||||
m := a.Inspect()
|
||||
assert.Equal(t, 3, len(m))
|
||||
})
|
||||
|
||||
t.Run("test 3 level", func(t *testing.T) {
|
||||
// a
|
||||
// c1 c2
|
||||
// c3 c4
|
||||
// Leaf nodes: c1, c3, c4
|
||||
|
||||
root := NewGroupedAllocatorBuilder("a", &Resource{100, 100, 100}).
|
||||
AddChild("c1", &Resource{100, 100, 100}).
|
||||
AddChildGroup(NewGroupedAllocatorBuilder("c2", &Resource{100, 100, 100}).AddChild("c3", &Resource{100, 100, 100}).AddChild("c4", &Resource{100, 100, 100}).Build()).
|
||||
Build()
|
||||
|
||||
c1 := root.GetAllocator("c1")
|
||||
c2 := root.GetAllocator("c2").(*GroupedAllocator)
|
||||
c3 := c2.GetAllocator("c3")
|
||||
// c4 := c2.GetAllocator("c4")
|
||||
|
||||
// Allocate
|
||||
allocated, _ := c1.Allocate("x11", &Resource{100, 100, 100})
|
||||
assert.Equal(t, true, allocated)
|
||||
allocated, _ = c2.Allocate("x12", &Resource{90, 90, 90})
|
||||
assert.Equal(t, false, allocated) // allocation on grouped allocator is not allowed
|
||||
allocated, _ = c3.Allocate("x21", &Resource{10, 10, 10})
|
||||
assert.Equal(t, false, allocated) // not enough resource
|
||||
|
||||
wg := new(sync.WaitGroup)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
allocated, _ = c3.Allocate("x21", &Resource{10, 10, 10})
|
||||
if !allocated {
|
||||
c3.Wait()
|
||||
allocated, _ = c3.Allocate("x21", &Resource{10, 10, 10})
|
||||
assert.Equal(t, true, allocated)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
c1.Release("x11")
|
||||
wg.Wait()
|
||||
})
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user