feat: adding cache scavenger (#31264)

See #31262

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
This commit is contained in:
Ted Xu 2024-03-21 14:33:06 +08:00 committed by GitHub
parent 9f9ef8ac32
commit c13c96e321
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 458 additions and 196 deletions

View File

@ -171,38 +171,37 @@ func NewManager() *Manager {
Collection: NewCollectionManager(),
Segment: segMgr,
}
manager.DiskCache = cache.NewLRUCache[int64, Segment](
int32(cacheMaxItemNum),
func(key int64) (Segment, bool) {
log.Debug("cache missed segment", zap.Int64("segmentID", key))
segMgr.mu.RLock()
defer segMgr.mu.RUnlock()
segment, ok := segMgr.sealedSegments[key]
if !ok {
// the segment has been released, just ignore it
return nil, false
}
manager.DiskCache = cache.NewCacheBuilder[int64, Segment]().WithCapacity(cacheMaxItemNum).WithLoader(func(key int64) (Segment, bool) {
log.Debug("cache missed segment", zap.Int64("segmentID", key))
segMgr.mu.RLock()
defer segMgr.mu.RUnlock()
info := segment.LoadInfo()
_, err, _ := sf.Do(fmt.Sprint(segment.ID()), func() (interface{}, error) {
collection := manager.Collection.Get(segment.Collection())
if collection == nil {
return nil, merr.WrapErrCollectionNotLoaded(segment.Collection(), "failed to load segment fields")
}
err := loadSealedSegmentFields(context.Background(), collection, segment.(*LocalSegment), info.BinlogPaths, info.GetNumOfRows(), WithLoadStatus(LoadStatusMapped))
return nil, err
})
if err != nil {
log.Warn("cache sealed segment failed", zap.Error(err))
return nil, false
segment, ok := segMgr.sealedSegments[key]
if !ok {
// the segment has been released, just ignore it
return nil, false
}
info := segment.LoadInfo()
_, err, _ := sf.Do(fmt.Sprint(segment.ID()), func() (interface{}, error) {
collection := manager.Collection.Get(segment.Collection())
if collection == nil {
return nil, merr.WrapErrCollectionNotLoaded(segment.Collection(), "failed to load segment fields")
}
return segment, true
},
func(key int64, segment Segment) {
log.Debug("evict segment from cache", zap.Int64("segmentID", key))
segment.Release(WithReleaseScope(ReleaseScopeData))
err := loadSealedSegmentFields(context.Background(), collection, segment.(*LocalSegment), info.BinlogPaths, info.GetNumOfRows(), WithLoadStatus(LoadStatusMapped))
return nil, err
})
if err != nil {
log.Warn("cache sealed segment failed", zap.Error(err))
return nil, false
}
return segment, true
}).WithFinalizer(func(key int64, segment Segment) error {
log.Debug("evict segment from cache", zap.Int64("segmentID", key))
segment.Release(WithReleaseScope(ReleaseScopeData))
return nil
}).Build()
return manager
}

View File

@ -49,29 +49,32 @@ func retrieveOnSegments(ctx context.Context, mgr *Manager, segments []Segment, s
label = metrics.GrowingSegmentLabel
}
retriever := func(s Segment) error {
tr := timerecord.NewTimeRecorder("retrieveOnSegments")
result, err := s.Retrieve(ctx, plan)
resultCh <- result
if err != nil {
return err
}
metrics.QueryNodeSQSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()),
metrics.QueryLabel, label).Observe(float64(tr.ElapseSpan().Milliseconds()))
return nil
}
for i, segment := range segments {
wg.Add(1)
go func(seg Segment, i int) {
defer wg.Done()
tr := timerecord.NewTimeRecorder("retrieveOnSegments")
if seg.LoadStatus() == LoadStatusMeta {
item, ok := mgr.DiskCache.GetAndPin(seg.ID())
if !ok {
errs[i] = merr.WrapErrSegmentNotLoaded(seg.ID())
return
}
defer item.Unpin()
}
result, err := seg.Retrieve(ctx, plan)
var err error
if seg.LoadStatus() == LoadStatusMeta {
err = mgr.DiskCache.Do(seg.ID(), retriever)
} else {
err = retriever(seg)
}
if err != nil {
errs[i] = err
return
}
errs[i] = nil
resultCh <- result
metrics.QueryNodeSQSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()),
metrics.QueryLabel, label).Observe(float64(tr.ElapseSpan().Milliseconds()))
}(segment, i)
}
wg.Wait()

View File

@ -26,7 +26,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
@ -50,6 +49,23 @@ func searchSegments(ctx context.Context, mgr *Manager, segments []Segment, segTy
searchLabel = metrics.GrowingSegmentLabel
}
searcher := func(s Segment) error {
// record search time
tr := timerecord.NewTimeRecorder("searchOnSegments")
searchResult, err := s.Search(ctx, searchReq)
resultCh <- searchResult
if err != nil {
return err
}
// update metrics
elapsed := tr.ElapseSpan().Milliseconds()
metrics.QueryNodeSQSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()),
metrics.SearchLabel, searchLabel).Observe(float64(elapsed))
metrics.QueryNodeSegmentSearchLatencyPerVector.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()),
metrics.SearchLabel, searchLabel).Observe(float64(elapsed) / float64(searchReq.getNumOfQuery()))
return nil
}
// calling segment search in goroutines
for i, segment := range segments {
wg.Add(1)
@ -60,25 +76,15 @@ func searchSegments(ctx context.Context, mgr *Manager, segments []Segment, segTy
segmentsWithoutIndex = append(segmentsWithoutIndex, seg.ID())
mu.Unlock()
}
// record search time
tr := timerecord.NewTimeRecorder("searchOnSegments")
var err error
if seg.LoadStatus() == LoadStatusMeta {
item, ok := mgr.DiskCache.GetAndPin(seg.ID())
if !ok {
errs[i] = merr.WrapErrSegmentNotLoaded(seg.ID())
return
}
defer item.Unpin()
err = mgr.DiskCache.Do(seg.ID(), searcher)
} else {
err = searcher(seg)
}
if err != nil {
errs[i] = err
}
searchResult, err := seg.Search(ctx, searchReq)
errs[i] = err
resultCh <- searchResult
// update metrics
elapsed := tr.ElapseSpan().Milliseconds()
metrics.QueryNodeSQSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()),
metrics.SearchLabel, searchLabel).Observe(float64(elapsed))
metrics.QueryNodeSegmentSearchLatencyPerVector.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()),
metrics.SearchLabel, searchLabel).Observe(float64(elapsed) / float64(searchReq.getNumOfQuery()))
}(segment, i)
}
wg.Wait()

View File

@ -2,9 +2,17 @@ package cache
import (
"container/list"
"fmt"
"sync"
"github.com/cockroachdb/errors"
"go.uber.org/atomic"
"golang.org/x/sync/singleflight"
)
var (
ErrNoSuchItem = errors.New("no such item")
ErrNotEnoughSpace = errors.New("not enough space")
)
type cacheItem[K comparable, V any] struct {
@ -29,152 +37,258 @@ func (i *cacheItem[K, V]) Value() V {
}
type (
OnCacheMiss[K comparable, V any] func(key K) (V, bool)
OnEvict[K comparable, V any] func(key K, value V)
Loader[K comparable, V any] func(key K) (V, bool)
Finalizer[K comparable, V any] func(key K, value V) error
)
// Scavenger records occupation of cache and decide whether to evict if necessary.
//
// The scavenger makes decision based on keys only, and it is called before value loading,
// because value loading could be very expensive.
type Scavenger[K comparable] interface {
// Collect records entry additions, if there is room, return true, or else return false and a collector.
// The collector is a function which can be invoked repetedly, each invocation will test if there is enough
// room provided that all entries in the collector is evicted.
Collect(key K) (bool, func(K) bool)
// Throw records entry removals.
Throw(key K)
}
type LazyScavenger[K comparable] struct {
capacity int64
size int64
weight func(K) int64
}
func NewLazyScavenger[K comparable](weight func(K) int64, capacity int64) *LazyScavenger[K] {
return &LazyScavenger[K]{
capacity: capacity,
weight: weight,
}
}
func (s *LazyScavenger[K]) Collect(key K) (bool, func(K) bool) {
w := s.weight(key)
if s.size+w > s.capacity {
needCollect := s.size + w - s.capacity
return false, func(key K) bool {
needCollect -= s.weight(key)
return needCollect <= 0
}
}
s.size += w
return true, nil
}
func (s *LazyScavenger[K]) Throw(key K) {
s.size -= s.weight(key)
}
type Cache[K comparable, V any] interface {
GetAndPin(key K) (*cacheItem[K, V], bool)
Contain(key K) bool
Set(key K, value V)
Remove(key K)
Do(key K, doer func(V) error) error
}
// lruCache extends the ccache library to provide pinning and unpinning of items.
type lruCache[K comparable, V any] struct {
rwlock sync.RWMutex
// the value is *cacheItem[V]
items map[K]*list.Element
accessList *list.List
items map[K]*list.Element
accessList *list.List
loaderSingleFlight singleflight.Group
size int32
cap int32
onCacheMiss OnCacheMiss[K, V]
onEvict OnEvict[K, V]
loader Loader[K, V]
finalizer Finalizer[K, V]
scavenger Scavenger[K]
}
func NewLRUCache[K comparable, V any](
cap int32,
onCacheMiss OnCacheMiss[K, V],
onEvict OnEvict[K, V],
type CacheBuilder[K comparable, V any] struct {
loader Loader[K, V]
finalizer Finalizer[K, V]
scavenger Scavenger[K]
}
func NewCacheBuilder[K comparable, V any]() *CacheBuilder[K, V] {
return &CacheBuilder[K, V]{
loader: nil,
finalizer: nil,
scavenger: NewLazyScavenger(
func(key K) int64 {
return 1
},
64,
),
}
}
func (b *CacheBuilder[K, V]) WithLoader(loader Loader[K, V]) *CacheBuilder[K, V] {
b.loader = loader
return b
}
func (b *CacheBuilder[K, V]) WithFinalizer(finalizer Finalizer[K, V]) *CacheBuilder[K, V] {
b.finalizer = finalizer
return b
}
func (b *CacheBuilder[K, V]) WithLazyScavenger(weight func(K) int64, capacity int64) *CacheBuilder[K, V] {
b.scavenger = NewLazyScavenger(weight, capacity)
return b
}
func (b *CacheBuilder[K, V]) WithCapacity(capacity int64) *CacheBuilder[K, V] {
b.scavenger = NewLazyScavenger(
func(key K) int64 {
return 1
},
capacity,
)
return b
}
func (b *CacheBuilder[K, V]) Build() Cache[K, V] {
return newLRUCache(b.loader, b.finalizer, b.scavenger)
}
func newLRUCache[K comparable, V any](
loader Loader[K, V],
finalizer Finalizer[K, V],
scavenger Scavenger[K],
) Cache[K, V] {
return &lruCache[K, V]{
items: make(map[K]*list.Element),
accessList: list.New(),
cap: cap,
onCacheMiss: onCacheMiss,
onEvict: onEvict,
items: make(map[K]*list.Element),
accessList: list.New(),
loaderSingleFlight: singleflight.Group{},
loader: loader,
finalizer: finalizer,
scavenger: scavenger,
}
}
// GetAndPin gets and pins the given key if it exists,
// NOTE: remember to unpin this key or it will never be evicted
func (c *lruCache[K, V]) GetAndPin(key K) (*cacheItem[K, V], bool) {
// Do picks up an item from cache and executes doer. The entry of interest is garented in the cache when doer is executing.
func (c *lruCache[K, V]) Do(key K, doer func(V) error) error {
item, err := c.getAndPin(key)
if err != nil {
return err
}
defer item.Unpin()
return doer(item.Value())
}
func (c *lruCache[K, V]) peek(key K) *cacheItem[K, V] {
c.rwlock.Lock()
iter, ok := c.items[key]
defer c.rwlock.Unlock()
e, ok := c.items[key]
if ok {
item := iter.Value.(*cacheItem[K, V])
c.accessList.Remove(iter)
c.accessList.PushFront(item)
item.pinCount.Inc()
item := e.Value.(*cacheItem[K, V])
c.accessList.MoveToFront(e)
return item
}
return nil
}
c.rwlock.Unlock()
return item, true
// GetAndPin gets and pins the given key if it exists
func (c *lruCache[K, V]) getAndPin(key K) (*cacheItem[K, V], error) {
if item := c.peek(key); item != nil {
item.pinCount.Inc()
return item, nil
}
c.rwlock.Unlock()
if c.onCacheMiss != nil {
value, ok := c.onCacheMiss(key)
if ok {
c.rwlock.Lock()
defer c.rwlock.Unlock()
item := c.setAndPin(key, value)
return item, true
if c.loader != nil {
// Try scavenge if there is room. If not, fail fast.
// Note that the test is not accurate since we are not locking `loader` here.
if _, ok := c.tryScavenge(key); !ok {
return nil, ErrNotEnoughSpace
}
strKey := fmt.Sprint(key)
item, err, _ := c.loaderSingleFlight.Do(strKey, func() (interface{}, error) {
if item := c.peek(key); item != nil {
item.pinCount.Inc()
return item, nil
}
value, ok := c.loader(key)
if !ok {
return nil, ErrNoSuchItem
}
item, err := c.setAndPin(key, value)
if err != nil {
return nil, err
}
return item, nil
})
if err == nil {
return item.(*cacheItem[K, V]), nil
}
}
return nil, false
return nil, ErrNoSuchItem
}
func (c *lruCache[K, V]) Contain(key K) bool {
c.rwlock.RLock()
defer c.rwlock.RUnlock()
_, ok := c.items[key]
return ok
}
func (c *lruCache[K, V]) Set(key K, value V) {
item := newCacheItem[K, V](key, value)
func (c *lruCache[K, V]) tryScavenge(key K) ([]K, bool) {
c.rwlock.Lock()
defer c.rwlock.Unlock()
return c.lockfreeTryScavenge(key)
}
if c.size >= c.cap {
c.evict(c.size - c.cap + 1)
func (c *lruCache[K, V]) lockfreeTryScavenge(key K) ([]K, bool) {
ok, collector := c.scavenger.Collect(key)
toEvict := make([]K, 0)
if !ok {
done := false
for p := c.accessList.Back(); p != nil && !done; p = p.Prev() {
evictItem := p.Value.(*cacheItem[K, V])
if evictItem.pinCount.Load() > 0 {
continue
}
toEvict = append(toEvict, evictItem.key)
done = collector(evictItem.key)
}
if !done {
return nil, false
}
} else {
// If no collection needed, give back the space.
c.scavenger.Throw(key)
}
c.add(item)
return toEvict, true
}
// for cache miss
func (c *lruCache[K, V]) setAndPin(key K, value V) *cacheItem[K, V] {
item := newCacheItem[K, V](key, value)
item.pinCount.Inc()
if c.size >= c.cap {
c.evict(c.size - c.cap + 1)
}
c.add(item)
return item
}
func (c *lruCache[K, V]) add(item *cacheItem[K, V]) {
iter := c.accessList.PushFront(item)
c.items[item.key] = iter
c.size++
}
func (c *lruCache[K, V]) evict(n int32) {
if c.size < n {
n = c.size
}
for ; n > 0; n-- {
for {
oldest := c.accessList.Back()
item := oldest.Value.(*cacheItem[K, V])
if item.pinCount.Load() > 0 {
c.accessList.MoveToFront(oldest)
} else {
break
}
}
// evict
validOldest := c.accessList.Back()
item := validOldest.Value.(*cacheItem[K, V])
c.accessList.Remove(validOldest)
delete(c.items, item.key)
c.size--
// TODO(yah01): this could be optimized as it doesn't need to acquire the lock
if c.onEvict != nil {
c.onEvict(item.key, item.value)
}
}
}
func (c *lruCache[K, V]) Remove(key K) {
func (c *lruCache[K, V]) setAndPin(key K, value V) (*cacheItem[K, V], error) {
c.rwlock.Lock()
defer c.rwlock.Unlock()
iter, ok := c.items[key]
if ok {
c.accessList.Remove(iter)
delete(c.items, key)
c.size--
item := newCacheItem[K, V](key, value)
item.pinCount.Inc()
// tryScavenge is done again since the load call is lock free.
toEvict, ok := c.lockfreeTryScavenge(key)
if !ok {
if c.finalizer != nil {
c.finalizer(key, value)
}
return nil, ErrNotEnoughSpace
}
for _, ek := range toEvict {
e := c.items[ek]
delete(c.items, ek)
c.accessList.Remove(e)
c.scavenger.Throw(ek)
if c.finalizer != nil {
item := e.Value.(*cacheItem[K, V])
c.finalizer(ek, item.value)
}
}
c.scavenger.Collect(key)
e := c.accessList.PushFront(item)
c.items[item.key] = e
return item, nil
}

View File

@ -1,38 +1,178 @@
package cache
import (
"sync"
"sync/atomic"
"testing"
"github.com/stretchr/testify/suite"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
)
type CacheSuite struct {
suite.Suite
}
func (s *CacheSuite) TestLRUCache() {
size := 10
cache := NewLRUCache[int, int](int32(size), func(key int) (int, bool) {
func TestLRUCache(t *testing.T) {
cacheBuilder := NewCacheBuilder[int, int]().WithLoader(func(key int) (int, bool) {
return key, true
}, nil)
})
for i := 1; i <= size; i++ {
item, ok := cache.GetAndPin(i)
s.True(ok)
s.Equal(i, item.Value())
item.Unpin()
}
t.Run("test loader", func(t *testing.T) {
size := 10
cache := cacheBuilder.WithCapacity(int64(size)).Build()
s.False(cache.Contain(size + 1))
for i := 1; i <= size; i++ {
item, ok := cache.GetAndPin(size + i)
s.True(ok)
s.Equal(size+i, item.Value())
s.False(cache.Contain(i))
item.Unpin()
}
for i := 0; i < size; i++ {
err := cache.Do(i, func(v int) error {
assert.Equal(t, i, v)
return nil
})
assert.NoError(t, err)
}
})
t.Run("test finalizer", func(t *testing.T) {
size := 10
finalizeSeq := make([]int, 0)
cache := cacheBuilder.WithCapacity(int64(size)).WithFinalizer(func(key, value int) error {
finalizeSeq = append(finalizeSeq, key)
return nil
}).Build()
for i := 0; i < size*2; i++ {
err := cache.Do(i, func(v int) error {
assert.Equal(t, i, v)
return nil
})
assert.NoError(t, err)
}
assert.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, finalizeSeq)
// Hit the cache again, there should be no swap-out
for i := size; i < size*2; i++ {
err := cache.Do(i, func(v int) error {
assert.Equal(t, i, v)
return nil
})
assert.NoError(t, err)
}
assert.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, finalizeSeq)
})
t.Run("test scavenger", func(t *testing.T) {
finalizeSeq := make([]int, 0)
sumCapacity := 20 // inserting 1 to 19, capacity is set to sum of 20, expecting (19) at last.
cache := cacheBuilder.WithLazyScavenger(func(key int) int64 {
return int64(key)
}, int64(sumCapacity)).WithFinalizer(func(key, value int) error {
finalizeSeq = append(finalizeSeq, key)
return nil
}).Build()
for i := 0; i < 20; i++ {
err := cache.Do(i, func(v int) error {
assert.Equal(t, i, v)
return nil
})
assert.NoError(t, err)
}
assert.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18}, finalizeSeq)
})
t.Run("test do negative", func(t *testing.T) {
cache := cacheBuilder.Build()
theErr := errors.New("error")
err := cache.Do(-1, func(v int) error {
return theErr
})
assert.Equal(t, theErr, err)
})
t.Run("test scavenge negative", func(t *testing.T) {
finalizeSeq := make([]int, 0)
sumCapacity := 20 // inserting 1 to 19, capacity is set to sum of 20, expecting (19) at last.
cache := cacheBuilder.WithLazyScavenger(func(key int) int64 {
return int64(key)
}, int64(sumCapacity)).WithFinalizer(func(key, value int) error {
finalizeSeq = append(finalizeSeq, key)
return nil
}).Build()
for i := 0; i < 20; i++ {
err := cache.Do(i, func(v int) error {
assert.Equal(t, i, v)
return nil
})
assert.NoError(t, err)
}
assert.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18}, finalizeSeq)
err := cache.Do(100, func(v int) error {
return nil
})
assert.Equal(t, ErrNotEnoughSpace, err)
})
t.Run("test load negative", func(t *testing.T) {
cache := NewCacheBuilder[int, int]().WithLoader(func(key int) (int, bool) {
if key < 0 {
return 0, false
}
return key, true
}).Build()
err := cache.Do(0, func(v int) error {
return nil
})
assert.NoError(t, err)
err = cache.Do(-1, func(v int) error {
return nil
})
assert.Equal(t, ErrNoSuchItem, err)
})
}
func TestCacheSuite(t *testing.T) {
suite.Run(t, new(CacheSuite))
func TestLRUCacheConcurrency(t *testing.T) {
t.Run("test race condition", func(t *testing.T) {
numEvict := new(atomic.Int32)
cache := NewCacheBuilder[int, int]().WithLoader(func(key int) (int, bool) {
return key, true
}).WithCapacity(10).WithFinalizer(func(key, value int) error {
numEvict.Add(1)
return nil
}).Build()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 0; j < 100; j++ {
err := cache.Do(j, func(v int) error {
return nil
})
assert.NoError(t, err)
}
}(i)
}
wg.Wait()
})
t.Run("test not enough space", func(t *testing.T) {
cache := NewCacheBuilder[int, int]().WithLoader(func(key int) (int, bool) {
return key, true
}).WithCapacity(1).WithFinalizer(func(key, value int) error {
return nil
}).Build()
var wg sync.WaitGroup // Let key 1000 be blocked
var wg1 sync.WaitGroup // Make sure goroutine is started
wg.Add(1)
wg1.Add(1)
go cache.Do(1000, func(v int) error {
wg1.Done()
wg.Wait()
return nil
})
wg1.Wait()
err := cache.Do(1001, func(v int) error {
return nil
})
wg.Done()
assert.Equal(t, ErrNotEnoughSpace, err)
})
}