From c13c96e321a31638a10fdf73e31ff1b848342d2a Mon Sep 17 00:00:00 2001 From: Ted Xu Date: Thu, 21 Mar 2024 14:33:06 +0800 Subject: [PATCH] feat: adding cache scavenger (#31264) See #31262 --------- Signed-off-by: Ted Xu --- internal/querynodev2/segments/manager.go | 55 ++-- internal/querynodev2/segments/retrieve.go | 33 ++- internal/querynodev2/segments/search.go | 42 +-- pkg/util/cache/cache.go | 334 +++++++++++++++------- pkg/util/cache/cache_test.go | 190 ++++++++++-- 5 files changed, 458 insertions(+), 196 deletions(-) diff --git a/internal/querynodev2/segments/manager.go b/internal/querynodev2/segments/manager.go index 103e6ec140..e290d17db0 100644 --- a/internal/querynodev2/segments/manager.go +++ b/internal/querynodev2/segments/manager.go @@ -171,38 +171,37 @@ func NewManager() *Manager { Collection: NewCollectionManager(), Segment: segMgr, } - manager.DiskCache = cache.NewLRUCache[int64, Segment]( - int32(cacheMaxItemNum), - func(key int64) (Segment, bool) { - log.Debug("cache missed segment", zap.Int64("segmentID", key)) - segMgr.mu.RLock() - defer segMgr.mu.RUnlock() - segment, ok := segMgr.sealedSegments[key] - if !ok { - // the segment has been released, just ignore it - return nil, false - } + manager.DiskCache = cache.NewCacheBuilder[int64, Segment]().WithCapacity(cacheMaxItemNum).WithLoader(func(key int64) (Segment, bool) { + log.Debug("cache missed segment", zap.Int64("segmentID", key)) + segMgr.mu.RLock() + defer segMgr.mu.RUnlock() - info := segment.LoadInfo() - _, err, _ := sf.Do(fmt.Sprint(segment.ID()), func() (interface{}, error) { - collection := manager.Collection.Get(segment.Collection()) - if collection == nil { - return nil, merr.WrapErrCollectionNotLoaded(segment.Collection(), "failed to load segment fields") - } - err := loadSealedSegmentFields(context.Background(), collection, segment.(*LocalSegment), info.BinlogPaths, info.GetNumOfRows(), WithLoadStatus(LoadStatusMapped)) - return nil, err - }) - if err != nil { - log.Warn("cache sealed segment failed", zap.Error(err)) - return nil, false + segment, ok := segMgr.sealedSegments[key] + if !ok { + // the segment has been released, just ignore it + return nil, false + } + + info := segment.LoadInfo() + _, err, _ := sf.Do(fmt.Sprint(segment.ID()), func() (interface{}, error) { + collection := manager.Collection.Get(segment.Collection()) + if collection == nil { + return nil, merr.WrapErrCollectionNotLoaded(segment.Collection(), "failed to load segment fields") } - return segment, true - }, - func(key int64, segment Segment) { - log.Debug("evict segment from cache", zap.Int64("segmentID", key)) - segment.Release(WithReleaseScope(ReleaseScopeData)) + err := loadSealedSegmentFields(context.Background(), collection, segment.(*LocalSegment), info.BinlogPaths, info.GetNumOfRows(), WithLoadStatus(LoadStatusMapped)) + return nil, err }) + if err != nil { + log.Warn("cache sealed segment failed", zap.Error(err)) + return nil, false + } + return segment, true + }).WithFinalizer(func(key int64, segment Segment) error { + log.Debug("evict segment from cache", zap.Int64("segmentID", key)) + segment.Release(WithReleaseScope(ReleaseScopeData)) + return nil + }).Build() return manager } diff --git a/internal/querynodev2/segments/retrieve.go b/internal/querynodev2/segments/retrieve.go index 5f5572fef1..aea684fcfa 100644 --- a/internal/querynodev2/segments/retrieve.go +++ b/internal/querynodev2/segments/retrieve.go @@ -49,29 +49,32 @@ func retrieveOnSegments(ctx context.Context, mgr *Manager, segments []Segment, s label = metrics.GrowingSegmentLabel } + retriever := func(s Segment) error { + tr := timerecord.NewTimeRecorder("retrieveOnSegments") + result, err := s.Retrieve(ctx, plan) + resultCh <- result + if err != nil { + return err + } + metrics.QueryNodeSQSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), + metrics.QueryLabel, label).Observe(float64(tr.ElapseSpan().Milliseconds())) + return nil + } + for i, segment := range segments { wg.Add(1) go func(seg Segment, i int) { defer wg.Done() - tr := timerecord.NewTimeRecorder("retrieveOnSegments") - if seg.LoadStatus() == LoadStatusMeta { - item, ok := mgr.DiskCache.GetAndPin(seg.ID()) - if !ok { - errs[i] = merr.WrapErrSegmentNotLoaded(seg.ID()) - return - } - defer item.Unpin() - } - result, err := seg.Retrieve(ctx, plan) + var err error + if seg.LoadStatus() == LoadStatusMeta { + err = mgr.DiskCache.Do(seg.ID(), retriever) + } else { + err = retriever(seg) + } if err != nil { errs[i] = err - return } - errs[i] = nil - resultCh <- result - metrics.QueryNodeSQSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), - metrics.QueryLabel, label).Observe(float64(tr.ElapseSpan().Milliseconds())) }(segment, i) } wg.Wait() diff --git a/internal/querynodev2/segments/search.go b/internal/querynodev2/segments/search.go index a33c3e0ec9..1b6d20c389 100644 --- a/internal/querynodev2/segments/search.go +++ b/internal/querynodev2/segments/search.go @@ -26,7 +26,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" - "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" ) @@ -50,6 +49,23 @@ func searchSegments(ctx context.Context, mgr *Manager, segments []Segment, segTy searchLabel = metrics.GrowingSegmentLabel } + searcher := func(s Segment) error { + // record search time + tr := timerecord.NewTimeRecorder("searchOnSegments") + searchResult, err := s.Search(ctx, searchReq) + resultCh <- searchResult + if err != nil { + return err + } + // update metrics + elapsed := tr.ElapseSpan().Milliseconds() + metrics.QueryNodeSQSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), + metrics.SearchLabel, searchLabel).Observe(float64(elapsed)) + metrics.QueryNodeSegmentSearchLatencyPerVector.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), + metrics.SearchLabel, searchLabel).Observe(float64(elapsed) / float64(searchReq.getNumOfQuery())) + return nil + } + // calling segment search in goroutines for i, segment := range segments { wg.Add(1) @@ -60,25 +76,15 @@ func searchSegments(ctx context.Context, mgr *Manager, segments []Segment, segTy segmentsWithoutIndex = append(segmentsWithoutIndex, seg.ID()) mu.Unlock() } - // record search time - tr := timerecord.NewTimeRecorder("searchOnSegments") + var err error if seg.LoadStatus() == LoadStatusMeta { - item, ok := mgr.DiskCache.GetAndPin(seg.ID()) - if !ok { - errs[i] = merr.WrapErrSegmentNotLoaded(seg.ID()) - return - } - defer item.Unpin() + err = mgr.DiskCache.Do(seg.ID(), searcher) + } else { + err = searcher(seg) + } + if err != nil { + errs[i] = err } - searchResult, err := seg.Search(ctx, searchReq) - errs[i] = err - resultCh <- searchResult - // update metrics - elapsed := tr.ElapseSpan().Milliseconds() - metrics.QueryNodeSQSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), - metrics.SearchLabel, searchLabel).Observe(float64(elapsed)) - metrics.QueryNodeSegmentSearchLatencyPerVector.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), - metrics.SearchLabel, searchLabel).Observe(float64(elapsed) / float64(searchReq.getNumOfQuery())) }(segment, i) } wg.Wait() diff --git a/pkg/util/cache/cache.go b/pkg/util/cache/cache.go index 7a45470474..28613b64d4 100644 --- a/pkg/util/cache/cache.go +++ b/pkg/util/cache/cache.go @@ -2,9 +2,17 @@ package cache import ( "container/list" + "fmt" "sync" + "github.com/cockroachdb/errors" "go.uber.org/atomic" + "golang.org/x/sync/singleflight" +) + +var ( + ErrNoSuchItem = errors.New("no such item") + ErrNotEnoughSpace = errors.New("not enough space") ) type cacheItem[K comparable, V any] struct { @@ -29,152 +37,258 @@ func (i *cacheItem[K, V]) Value() V { } type ( - OnCacheMiss[K comparable, V any] func(key K) (V, bool) - OnEvict[K comparable, V any] func(key K, value V) + Loader[K comparable, V any] func(key K) (V, bool) + Finalizer[K comparable, V any] func(key K, value V) error ) +// Scavenger records occupation of cache and decide whether to evict if necessary. +// +// The scavenger makes decision based on keys only, and it is called before value loading, +// because value loading could be very expensive. +type Scavenger[K comparable] interface { + // Collect records entry additions, if there is room, return true, or else return false and a collector. + // The collector is a function which can be invoked repetedly, each invocation will test if there is enough + // room provided that all entries in the collector is evicted. + Collect(key K) (bool, func(K) bool) + // Throw records entry removals. + Throw(key K) +} + +type LazyScavenger[K comparable] struct { + capacity int64 + size int64 + weight func(K) int64 +} + +func NewLazyScavenger[K comparable](weight func(K) int64, capacity int64) *LazyScavenger[K] { + return &LazyScavenger[K]{ + capacity: capacity, + weight: weight, + } +} + +func (s *LazyScavenger[K]) Collect(key K) (bool, func(K) bool) { + w := s.weight(key) + if s.size+w > s.capacity { + needCollect := s.size + w - s.capacity + return false, func(key K) bool { + needCollect -= s.weight(key) + return needCollect <= 0 + } + } + s.size += w + return true, nil +} + +func (s *LazyScavenger[K]) Throw(key K) { + s.size -= s.weight(key) +} + type Cache[K comparable, V any] interface { - GetAndPin(key K) (*cacheItem[K, V], bool) - Contain(key K) bool - Set(key K, value V) - Remove(key K) + Do(key K, doer func(V) error) error } // lruCache extends the ccache library to provide pinning and unpinning of items. type lruCache[K comparable, V any] struct { rwlock sync.RWMutex // the value is *cacheItem[V] - items map[K]*list.Element - accessList *list.List + items map[K]*list.Element + accessList *list.List + loaderSingleFlight singleflight.Group - size int32 - cap int32 - - onCacheMiss OnCacheMiss[K, V] - onEvict OnEvict[K, V] + loader Loader[K, V] + finalizer Finalizer[K, V] + scavenger Scavenger[K] } -func NewLRUCache[K comparable, V any]( - cap int32, - onCacheMiss OnCacheMiss[K, V], - onEvict OnEvict[K, V], +type CacheBuilder[K comparable, V any] struct { + loader Loader[K, V] + finalizer Finalizer[K, V] + scavenger Scavenger[K] +} + +func NewCacheBuilder[K comparable, V any]() *CacheBuilder[K, V] { + return &CacheBuilder[K, V]{ + loader: nil, + finalizer: nil, + scavenger: NewLazyScavenger( + func(key K) int64 { + return 1 + }, + 64, + ), + } +} + +func (b *CacheBuilder[K, V]) WithLoader(loader Loader[K, V]) *CacheBuilder[K, V] { + b.loader = loader + return b +} + +func (b *CacheBuilder[K, V]) WithFinalizer(finalizer Finalizer[K, V]) *CacheBuilder[K, V] { + b.finalizer = finalizer + return b +} + +func (b *CacheBuilder[K, V]) WithLazyScavenger(weight func(K) int64, capacity int64) *CacheBuilder[K, V] { + b.scavenger = NewLazyScavenger(weight, capacity) + return b +} + +func (b *CacheBuilder[K, V]) WithCapacity(capacity int64) *CacheBuilder[K, V] { + b.scavenger = NewLazyScavenger( + func(key K) int64 { + return 1 + }, + capacity, + ) + return b +} + +func (b *CacheBuilder[K, V]) Build() Cache[K, V] { + return newLRUCache(b.loader, b.finalizer, b.scavenger) +} + +func newLRUCache[K comparable, V any]( + loader Loader[K, V], + finalizer Finalizer[K, V], + scavenger Scavenger[K], ) Cache[K, V] { return &lruCache[K, V]{ - items: make(map[K]*list.Element), - accessList: list.New(), - cap: cap, - onCacheMiss: onCacheMiss, - onEvict: onEvict, + items: make(map[K]*list.Element), + accessList: list.New(), + loaderSingleFlight: singleflight.Group{}, + loader: loader, + finalizer: finalizer, + scavenger: scavenger, } } -// GetAndPin gets and pins the given key if it exists, -// NOTE: remember to unpin this key or it will never be evicted -func (c *lruCache[K, V]) GetAndPin(key K) (*cacheItem[K, V], bool) { +// Do picks up an item from cache and executes doer. The entry of interest is garented in the cache when doer is executing. +func (c *lruCache[K, V]) Do(key K, doer func(V) error) error { + item, err := c.getAndPin(key) + if err != nil { + return err + } + defer item.Unpin() + return doer(item.Value()) +} + +func (c *lruCache[K, V]) peek(key K) *cacheItem[K, V] { c.rwlock.Lock() - - iter, ok := c.items[key] + defer c.rwlock.Unlock() + e, ok := c.items[key] if ok { - item := iter.Value.(*cacheItem[K, V]) - c.accessList.Remove(iter) - c.accessList.PushFront(item) - item.pinCount.Inc() + item := e.Value.(*cacheItem[K, V]) + c.accessList.MoveToFront(e) + return item + } + return nil +} - c.rwlock.Unlock() - return item, true +// GetAndPin gets and pins the given key if it exists +func (c *lruCache[K, V]) getAndPin(key K) (*cacheItem[K, V], error) { + if item := c.peek(key); item != nil { + item.pinCount.Inc() + return item, nil } - c.rwlock.Unlock() - if c.onCacheMiss != nil { - value, ok := c.onCacheMiss(key) - if ok { - c.rwlock.Lock() - defer c.rwlock.Unlock() - item := c.setAndPin(key, value) - return item, true + if c.loader != nil { + // Try scavenge if there is room. If not, fail fast. + // Note that the test is not accurate since we are not locking `loader` here. + if _, ok := c.tryScavenge(key); !ok { + return nil, ErrNotEnoughSpace + } + + strKey := fmt.Sprint(key) + item, err, _ := c.loaderSingleFlight.Do(strKey, func() (interface{}, error) { + if item := c.peek(key); item != nil { + item.pinCount.Inc() + return item, nil + } + + value, ok := c.loader(key) + if !ok { + return nil, ErrNoSuchItem + } + + item, err := c.setAndPin(key, value) + if err != nil { + return nil, err + } + return item, nil + }) + + if err == nil { + return item.(*cacheItem[K, V]), nil } } - return nil, false + return nil, ErrNoSuchItem } -func (c *lruCache[K, V]) Contain(key K) bool { - c.rwlock.RLock() - defer c.rwlock.RUnlock() - - _, ok := c.items[key] - return ok -} - -func (c *lruCache[K, V]) Set(key K, value V) { - item := newCacheItem[K, V](key, value) - +func (c *lruCache[K, V]) tryScavenge(key K) ([]K, bool) { c.rwlock.Lock() defer c.rwlock.Unlock() + return c.lockfreeTryScavenge(key) +} - if c.size >= c.cap { - c.evict(c.size - c.cap + 1) +func (c *lruCache[K, V]) lockfreeTryScavenge(key K) ([]K, bool) { + ok, collector := c.scavenger.Collect(key) + toEvict := make([]K, 0) + if !ok { + done := false + for p := c.accessList.Back(); p != nil && !done; p = p.Prev() { + evictItem := p.Value.(*cacheItem[K, V]) + if evictItem.pinCount.Load() > 0 { + continue + } + toEvict = append(toEvict, evictItem.key) + done = collector(evictItem.key) + } + if !done { + return nil, false + } + } else { + // If no collection needed, give back the space. + c.scavenger.Throw(key) } - - c.add(item) + return toEvict, true } // for cache miss -func (c *lruCache[K, V]) setAndPin(key K, value V) *cacheItem[K, V] { - item := newCacheItem[K, V](key, value) - item.pinCount.Inc() - - if c.size >= c.cap { - c.evict(c.size - c.cap + 1) - } - - c.add(item) - return item -} - -func (c *lruCache[K, V]) add(item *cacheItem[K, V]) { - iter := c.accessList.PushFront(item) - c.items[item.key] = iter - c.size++ -} - -func (c *lruCache[K, V]) evict(n int32) { - if c.size < n { - n = c.size - } - for ; n > 0; n-- { - for { - oldest := c.accessList.Back() - item := oldest.Value.(*cacheItem[K, V]) - if item.pinCount.Load() > 0 { - c.accessList.MoveToFront(oldest) - } else { - break - } - } - - // evict - validOldest := c.accessList.Back() - item := validOldest.Value.(*cacheItem[K, V]) - c.accessList.Remove(validOldest) - delete(c.items, item.key) - c.size-- - - // TODO(yah01): this could be optimized as it doesn't need to acquire the lock - if c.onEvict != nil { - c.onEvict(item.key, item.value) - } - } -} - -func (c *lruCache[K, V]) Remove(key K) { +func (c *lruCache[K, V]) setAndPin(key K, value V) (*cacheItem[K, V], error) { c.rwlock.Lock() defer c.rwlock.Unlock() - iter, ok := c.items[key] - if ok { - c.accessList.Remove(iter) - delete(c.items, key) - c.size-- + item := newCacheItem[K, V](key, value) + item.pinCount.Inc() + + // tryScavenge is done again since the load call is lock free. + toEvict, ok := c.lockfreeTryScavenge(key) + + if !ok { + if c.finalizer != nil { + c.finalizer(key, value) + } + return nil, ErrNotEnoughSpace } + + for _, ek := range toEvict { + e := c.items[ek] + delete(c.items, ek) + c.accessList.Remove(e) + c.scavenger.Throw(ek) + + if c.finalizer != nil { + item := e.Value.(*cacheItem[K, V]) + c.finalizer(ek, item.value) + } + } + + c.scavenger.Collect(key) + e := c.accessList.PushFront(item) + c.items[item.key] = e + + return item, nil } diff --git a/pkg/util/cache/cache_test.go b/pkg/util/cache/cache_test.go index 08421234f3..e97dd87a4b 100644 --- a/pkg/util/cache/cache_test.go +++ b/pkg/util/cache/cache_test.go @@ -1,38 +1,178 @@ package cache import ( + "sync" + "sync/atomic" "testing" - "github.com/stretchr/testify/suite" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" ) -type CacheSuite struct { - suite.Suite -} - -func (s *CacheSuite) TestLRUCache() { - size := 10 - cache := NewLRUCache[int, int](int32(size), func(key int) (int, bool) { +func TestLRUCache(t *testing.T) { + cacheBuilder := NewCacheBuilder[int, int]().WithLoader(func(key int) (int, bool) { return key, true - }, nil) + }) - for i := 1; i <= size; i++ { - item, ok := cache.GetAndPin(i) - s.True(ok) - s.Equal(i, item.Value()) - item.Unpin() - } + t.Run("test loader", func(t *testing.T) { + size := 10 + cache := cacheBuilder.WithCapacity(int64(size)).Build() - s.False(cache.Contain(size + 1)) - for i := 1; i <= size; i++ { - item, ok := cache.GetAndPin(size + i) - s.True(ok) - s.Equal(size+i, item.Value()) - s.False(cache.Contain(i)) - item.Unpin() - } + for i := 0; i < size; i++ { + err := cache.Do(i, func(v int) error { + assert.Equal(t, i, v) + return nil + }) + assert.NoError(t, err) + } + }) + + t.Run("test finalizer", func(t *testing.T) { + size := 10 + finalizeSeq := make([]int, 0) + cache := cacheBuilder.WithCapacity(int64(size)).WithFinalizer(func(key, value int) error { + finalizeSeq = append(finalizeSeq, key) + return nil + }).Build() + + for i := 0; i < size*2; i++ { + err := cache.Do(i, func(v int) error { + assert.Equal(t, i, v) + return nil + }) + assert.NoError(t, err) + } + assert.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, finalizeSeq) + + // Hit the cache again, there should be no swap-out + for i := size; i < size*2; i++ { + err := cache.Do(i, func(v int) error { + assert.Equal(t, i, v) + return nil + }) + assert.NoError(t, err) + } + assert.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, finalizeSeq) + }) + + t.Run("test scavenger", func(t *testing.T) { + finalizeSeq := make([]int, 0) + sumCapacity := 20 // inserting 1 to 19, capacity is set to sum of 20, expecting (19) at last. + cache := cacheBuilder.WithLazyScavenger(func(key int) int64 { + return int64(key) + }, int64(sumCapacity)).WithFinalizer(func(key, value int) error { + finalizeSeq = append(finalizeSeq, key) + return nil + }).Build() + + for i := 0; i < 20; i++ { + err := cache.Do(i, func(v int) error { + assert.Equal(t, i, v) + return nil + }) + assert.NoError(t, err) + } + assert.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18}, finalizeSeq) + }) + + t.Run("test do negative", func(t *testing.T) { + cache := cacheBuilder.Build() + theErr := errors.New("error") + err := cache.Do(-1, func(v int) error { + return theErr + }) + assert.Equal(t, theErr, err) + }) + + t.Run("test scavenge negative", func(t *testing.T) { + finalizeSeq := make([]int, 0) + sumCapacity := 20 // inserting 1 to 19, capacity is set to sum of 20, expecting (19) at last. + cache := cacheBuilder.WithLazyScavenger(func(key int) int64 { + return int64(key) + }, int64(sumCapacity)).WithFinalizer(func(key, value int) error { + finalizeSeq = append(finalizeSeq, key) + return nil + }).Build() + + for i := 0; i < 20; i++ { + err := cache.Do(i, func(v int) error { + assert.Equal(t, i, v) + return nil + }) + assert.NoError(t, err) + } + assert.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18}, finalizeSeq) + err := cache.Do(100, func(v int) error { + return nil + }) + assert.Equal(t, ErrNotEnoughSpace, err) + }) + + t.Run("test load negative", func(t *testing.T) { + cache := NewCacheBuilder[int, int]().WithLoader(func(key int) (int, bool) { + if key < 0 { + return 0, false + } + return key, true + }).Build() + err := cache.Do(0, func(v int) error { + return nil + }) + assert.NoError(t, err) + err = cache.Do(-1, func(v int) error { + return nil + }) + assert.Equal(t, ErrNoSuchItem, err) + }) } -func TestCacheSuite(t *testing.T) { - suite.Run(t, new(CacheSuite)) +func TestLRUCacheConcurrency(t *testing.T) { + t.Run("test race condition", func(t *testing.T) { + numEvict := new(atomic.Int32) + cache := NewCacheBuilder[int, int]().WithLoader(func(key int) (int, bool) { + return key, true + }).WithCapacity(10).WithFinalizer(func(key, value int) error { + numEvict.Add(1) + return nil + }).Build() + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + for j := 0; j < 100; j++ { + err := cache.Do(j, func(v int) error { + return nil + }) + assert.NoError(t, err) + } + }(i) + } + wg.Wait() + }) + + t.Run("test not enough space", func(t *testing.T) { + cache := NewCacheBuilder[int, int]().WithLoader(func(key int) (int, bool) { + return key, true + }).WithCapacity(1).WithFinalizer(func(key, value int) error { + return nil + }).Build() + + var wg sync.WaitGroup // Let key 1000 be blocked + var wg1 sync.WaitGroup // Make sure goroutine is started + wg.Add(1) + wg1.Add(1) + go cache.Do(1000, func(v int) error { + wg1.Done() + wg.Wait() + return nil + }) + wg1.Wait() + err := cache.Do(1001, func(v int) error { + return nil + }) + wg.Done() + assert.Equal(t, ErrNotEnoughSpace, err) + }) }