diff --git a/pkg/util/cache/cache.go b/pkg/util/cache/cache.go index 28613b64d4..90d2fefe52 100644 --- a/pkg/util/cache/cache.go +++ b/pkg/util/cache/cache.go @@ -4,15 +4,18 @@ import ( "container/list" "fmt" "sync" + "time" - "github.com/cockroachdb/errors" "go.uber.org/atomic" "golang.org/x/sync/singleflight" + + "github.com/milvus-io/milvus/pkg/util/merr" ) var ( - ErrNoSuchItem = errors.New("no such item") - ErrNotEnoughSpace = errors.New("not enough space") + ErrNoSuchItem = merr.WrapErrServiceInternal("no such item") + ErrNotEnoughSpace = merr.WrapErrServiceInternal("not enough space") + ErrTimeOut = merr.WrapErrServiceInternal("time out") ) type cacheItem[K comparable, V any] struct { @@ -21,21 +24,6 @@ type cacheItem[K comparable, V any] struct { pinCount atomic.Int32 } -func newCacheItem[K comparable, V any](key K, value V) *cacheItem[K, V] { - return &cacheItem[K, V]{ - key: key, - value: value, - } -} - -func (item *cacheItem[K, V]) Unpin() { - item.pinCount.Dec() -} - -func (i *cacheItem[K, V]) Value() V { - return i.value -} - type ( Loader[K comparable, V any] func(key K) (V, bool) Finalizer[K comparable, V any] func(key K, value V) error @@ -48,10 +36,16 @@ type ( type Scavenger[K comparable] interface { // Collect records entry additions, if there is room, return true, or else return false and a collector. // The collector is a function which can be invoked repetedly, each invocation will test if there is enough - // room provided that all entries in the collector is evicted. + // room provided that all entries in the collector is evicted. Typically, the collector will get multiple false + // before it gets a true. Collect(key K) (bool, func(K) bool) // Throw records entry removals. Throw(key K) + // Spare returns a collector function based on given key. + // The collector is a function which can be invoked repetedly, each invocation will test if there is enough + // room for all the pending entries if the thrown entry is evicted. Typically, the collector will get multiple true + // before it gets a false. + Spare(key K) func(K) bool } type LazyScavenger[K comparable] struct { @@ -84,8 +78,30 @@ func (s *LazyScavenger[K]) Throw(key K) { s.size -= s.weight(key) } +func (s *LazyScavenger[K]) Spare(key K) func(K) bool { + w := s.weight(key) + available := s.capacity - s.size + w + return func(k K) bool { + available -= s.weight(k) + return available >= 0 + } +} + type Cache[K comparable, V any] interface { Do(key K, doer func(V) error) error + DoWait(key K, timeout time.Duration, doer func(V) error) error +} + +type Waiter[K comparable] struct { + key K + c *sync.Cond +} + +func newWaiter[K comparable](key K) Waiter[K] { + return Waiter[K]{ + key: key, + c: sync.NewCond(&sync.Mutex{}), + } } // lruCache extends the ccache library to provide pinning and unpinning of items. @@ -96,6 +112,8 @@ type lruCache[K comparable, V any] struct { accessList *list.List loaderSingleFlight singleflight.Group + waitQueue *list.List + loader Loader[K, V] finalizer Finalizer[K, V] scavenger Scavenger[K] @@ -157,6 +175,7 @@ func newLRUCache[K comparable, V any]( return &lruCache[K, V]{ items: make(map[K]*list.Element), accessList: list.New(), + waitQueue: list.New(), loaderSingleFlight: singleflight.Group{}, loader: loader, finalizer: finalizer, @@ -170,17 +189,88 @@ func (c *lruCache[K, V]) Do(key K, doer func(V) error) error { if err != nil { return err } - defer item.Unpin() - return doer(item.Value()) + defer c.Unpin(key) + return doer(item.value) } -func (c *lruCache[K, V]) peek(key K) *cacheItem[K, V] { +func (c *lruCache[K, V]) DoWait(key K, timeout time.Duration, doer func(V) error) error { + timedWait := func(cond *sync.Cond, timeout time.Duration) bool { + c := make(chan struct{}) + go func() { + cond.L.Lock() + defer cond.L.Unlock() + defer close(c) + cond.Wait() + }() + select { + case <-c: + return false // completed normally + case <-time.After(timeout): + return true // timed out + } + } + + var ele *list.Element + start := time.Now() + for { + item, err := c.getAndPin(key) + if err == nil { + if ele != nil { + c.rwlock.Lock() + c.waitQueue.Remove(ele) + c.rwlock.Unlock() + } + defer c.Unpin(key) + return doer(item.value) + } else if err != ErrNotEnoughSpace { + return err + } + if ele == nil { + // If no enough space, enqueue the key + c.rwlock.Lock() + waiter := newWaiter(key) + ele = c.waitQueue.PushBack(&waiter) + c.rwlock.Unlock() + } + // Wait for the key to be available + timeLeft := time.Until(start.Add(timeout)) + if timeLeft <= 0 || timedWait(ele.Value.(*Waiter[K]).c, timeLeft) { + return ErrTimeOut + } + } +} + +func (c *lruCache[K, V]) Unpin(key K) { + c.rwlock.Lock() + defer c.rwlock.Unlock() + e, ok := c.items[key] + if !ok { + return + } + item := e.Value.(*cacheItem[K, V]) + item.pinCount.Dec() + if c.waitQueue.Len() > 0 { + // Notify waiters + collector := c.scavenger.Spare(key) + for e := c.waitQueue.Front(); e != nil; e = e.Next() { + w := e.Value.(*Waiter[K]) + if ok := collector(w.key); ok { + w.c.Broadcast() + } else { + break + } + } + } +} + +func (c *lruCache[K, V]) peekAndPin(key K) *cacheItem[K, V] { c.rwlock.Lock() defer c.rwlock.Unlock() e, ok := c.items[key] if ok { item := e.Value.(*cacheItem[K, V]) c.accessList.MoveToFront(e) + item.pinCount.Inc() return item } return nil @@ -188,8 +278,7 @@ func (c *lruCache[K, V]) peek(key K) *cacheItem[K, V] { // GetAndPin gets and pins the given key if it exists func (c *lruCache[K, V]) getAndPin(key K) (*cacheItem[K, V], error) { - if item := c.peek(key); item != nil { - item.pinCount.Inc() + if item := c.peekAndPin(key); item != nil { return item, nil } @@ -202,8 +291,7 @@ func (c *lruCache[K, V]) getAndPin(key K) (*cacheItem[K, V], error) { strKey := fmt.Sprint(key) item, err, _ := c.loaderSingleFlight.Do(strKey, func() (interface{}, error) { - if item := c.peek(key); item != nil { - item.pinCount.Inc() + if item := c.peekAndPin(key); item != nil { return item, nil } @@ -222,6 +310,7 @@ func (c *lruCache[K, V]) getAndPin(key K) (*cacheItem[K, V], error) { if err == nil { return item.(*cacheItem[K, V]), nil } + return nil, err } return nil, ErrNoSuchItem @@ -261,7 +350,7 @@ func (c *lruCache[K, V]) setAndPin(key K, value V) (*cacheItem[K, V], error) { c.rwlock.Lock() defer c.rwlock.Unlock() - item := newCacheItem[K, V](key, value) + item := &cacheItem[K, V]{key: key, value: value} item.pinCount.Inc() // tryScavenge is done again since the load call is lock free. diff --git a/pkg/util/cache/cache_test.go b/pkg/util/cache/cache_test.go index e97dd87a4b..882f87d796 100644 --- a/pkg/util/cache/cache_test.go +++ b/pkg/util/cache/cache_test.go @@ -2,11 +2,12 @@ package cache import ( "sync" - "sync/atomic" "testing" + "time" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" + "go.uber.org/atomic" ) func TestLRUCache(t *testing.T) { @@ -175,4 +176,75 @@ func TestLRUCacheConcurrency(t *testing.T) { wg.Done() assert.Equal(t, ErrNotEnoughSpace, err) }) + + t.Run("test time out", func(t *testing.T) { + cache := NewCacheBuilder[int, int]().WithLoader(func(key int) (int, bool) { + return key, true + }).WithCapacity(1).WithFinalizer(func(key, value int) error { + return nil + }).Build() + + var wg sync.WaitGroup // Let key 1000 be blocked + var wg1 sync.WaitGroup // Make sure goroutine is started + wg.Add(1) + wg1.Add(1) + go cache.Do(1000, func(v int) error { + wg1.Done() + wg.Wait() + return nil + }) + wg1.Wait() + err := cache.DoWait(1001, time.Nanosecond, func(v int) error { + return nil + }) + wg.Done() + assert.Equal(t, ErrTimeOut, err) + }) + + t.Run("test wait", func(t *testing.T) { + cache := NewCacheBuilder[int, int]().WithLoader(func(key int) (int, bool) { + return key, true + }).WithCapacity(1).WithFinalizer(func(key, value int) error { + return nil + }).Build() + + var wg1 sync.WaitGroup // Make sure goroutine is started + + wg1.Add(1) + go cache.Do(1000, func(v int) error { + wg1.Done() + time.Sleep(time.Second) + return nil + }) + wg1.Wait() + err := cache.DoWait(1001, time.Second*2, func(v int) error { + return nil + }) + assert.NoError(t, err) + }) + + t.Run("test wait race condition", func(t *testing.T) { + numEvict := new(atomic.Int32) + cache := NewCacheBuilder[int, int]().WithLoader(func(key int) (int, bool) { + return key, true + }).WithCapacity(5).WithFinalizer(func(key, value int) error { + numEvict.Add(1) + return nil + }).Build() + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + for j := 0; j < 100; j++ { + err := cache.DoWait(j, 2*time.Second, func(v int) error { + return nil + }) + assert.NoError(t, err) + } + }(i) + } + wg.Wait() + }) }