From a3e4b74fe13a4e3a118dc951db8086fa640b78ae Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Tue, 28 Mar 2023 16:06:01 +0800 Subject: [PATCH] Fix LRU Cache Concurrency (#23041) Signed-off-by: xiaofan-luan --- internal/storage/vector_chunk_manager.go | 36 +++++++++++++----------- internal/util/cache/lru_cache.go | 4 +-- internal/util/cache/lru_cache_test.go | 5 +++- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/internal/storage/vector_chunk_manager.go b/internal/storage/vector_chunk_manager.go index d8d5e24956..be6e4561f0 100644 --- a/internal/storage/vector_chunk_manager.go +++ b/internal/storage/vector_chunk_manager.go @@ -21,6 +21,7 @@ import ( "errors" "io" "sync" + "sync/atomic" "time" "go.uber.org/zap" @@ -41,17 +42,17 @@ var ( type VectorChunkManager struct { cacheStorage ChunkManager vectorStorage ChunkManager + evictLock sync.RWMutex cache *cache.LRU[string, *mmap.ReaderAt] insertCodec *InsertCodec - cacheEnable bool - cacheLimit int64 - cacheSize int64 - cacheSizeMutex sync.Mutex - fixSize bool // Prevent cache capactiy from changing too frequently - singleflight singleflight.Group + + cacheEnable bool + cacheLimit int64 + cacheSize int64 + fixSize bool // Prevent cache capactiy from changing too frequently } var _ ChunkManager = (*VectorChunkManager)(nil) @@ -59,13 +60,14 @@ var _ ChunkManager = (*VectorChunkManager)(nil) // NewVectorChunkManager create a new vector manager object. func NewVectorChunkManager(ctx context.Context, cacheStorage ChunkManager, vectorStorage ChunkManager, schema *etcdpb.CollectionMeta, cacheLimit int64, cacheEnable bool) (*VectorChunkManager, error) { insertCodec := NewInsertCodec(schema) + evictLock := sync.RWMutex{} vcm := &VectorChunkManager{ cacheStorage: cacheStorage, vectorStorage: vectorStorage, - - insertCodec: insertCodec, - cacheEnable: cacheEnable, - cacheLimit: cacheLimit, + evictLock: evictLock, + insertCodec: insertCodec, + cacheEnable: cacheEnable, + cacheLimit: cacheLimit, } if cacheEnable { if cacheLimit <= 0 { @@ -73,6 +75,8 @@ func NewVectorChunkManager(ctx context.Context, cacheStorage ChunkManager, vecto } c, err := cache.NewLRU(defaultLocalCacheSize, func(k string, v *mmap.ReaderAt) { size := v.Len() + evictLock.Lock() + defer evictLock.Unlock() err := v.Close() if err != nil { log.Error("Unmmap file failed", zap.Any("file", k)) @@ -81,9 +85,7 @@ func NewVectorChunkManager(ctx context.Context, cacheStorage ChunkManager, vecto if err != nil { log.Error("cache storage remove file failed", zap.Any("file", k)) } - vcm.cacheSizeMutex.Lock() - vcm.cacheSize -= int64(size) - vcm.cacheSizeMutex.Unlock() + atomic.AddInt64(&vcm.cacheSize, -int64(size)) }) if err != nil { return nil, err @@ -171,9 +173,7 @@ func (vcm *VectorChunkManager) readWithCache(ctx context.Context, filePath strin if err != nil { return nil, err } - vcm.cacheSizeMutex.Lock() - vcm.cacheSize += size - vcm.cacheSizeMutex.Unlock() + atomic.AddInt64(&vcm.cacheSize, size) if !vcm.fixSize { if vcm.cacheSize < vcm.cacheLimit { if vcm.cache.Len() == vcm.cache.Capacity() { @@ -199,6 +199,8 @@ func (vcm *VectorChunkManager) readWithCache(ctx context.Context, filePath strin // Read reads the pure vector data. If cached, it reads from local. func (vcm *VectorChunkManager) Read(ctx context.Context, filePath string) ([]byte, error) { + vcm.evictLock.RLock() + defer vcm.evictLock.RUnlock() if vcm.cacheEnable { if r, ok := vcm.cache.Get(filePath); ok { p := make([]byte, r.Len()) @@ -262,6 +264,8 @@ func (vcm *VectorChunkManager) Reader(ctx context.Context, filePath string) (Fil // ReadAt reads specific position data of vector. If cached, it reads from local. func (vcm *VectorChunkManager) ReadAt(ctx context.Context, filePath string, off int64, length int64) ([]byte, error) { + vcm.evictLock.RLock() + defer vcm.evictLock.RUnlock() if vcm.cacheEnable { if r, ok := vcm.cache.Get(filePath); ok { p := make([]byte, length) diff --git a/internal/util/cache/lru_cache.go b/internal/util/cache/lru_cache.go index 263bab3eca..73aff3a734 100644 --- a/internal/util/cache/lru_cache.go +++ b/internal/util/cache/lru_cache.go @@ -112,8 +112,8 @@ func (c *LRU[K, V]) Add(key K, value V) { defer c.m.Unlock() if c.closed() { - // evict since cache closed - c.onEvicted(key, value) + // evict since cache closed asyncly + go c.onEvicted(key, value) return } diff --git a/internal/util/cache/lru_cache_test.go b/internal/util/cache/lru_cache_test.go index b66a49480a..b66ae63355 100644 --- a/internal/util/cache/lru_cache_test.go +++ b/internal/util/cache/lru_cache_test.go @@ -409,7 +409,10 @@ func TestLRU_closed(t *testing.T) { c.Close() c.Add("testKey", "testValue") - assert.Equal(t, int32(1), evicted) + + for atomic.LoadInt32(&evicted) != 1 { + time.Sleep(10 * time.Millisecond) + } _, ok := c.Get("testKey") assert.False(t, ok)