diff --git a/configs/milvus.yaml b/configs/milvus.yaml index e54b06b0f9..48886c6e79 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -143,6 +143,9 @@ queryNode: segcore: chunkRows: 32768 # The number of vectors in a chunk. + chunkManager: + localFileCacheLimit: 2147483648 # 2 GB, 2 * 1024 *1024 *1024 + indexCoord: address: localhost diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 4316fc682f..5c4c45ea13 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -691,10 +691,13 @@ func genVectorChunkManager(ctx context.Context) (*storage.VectorChunkManager, er } schema := genSimpleInsertDataSchema() - vcm := storage.NewVectorChunkManager(lcm, rcm, &etcdpb.CollectionMeta{ + vcm, err := storage.NewVectorChunkManager(lcm, rcm, &etcdpb.CollectionMeta{ ID: defaultCollectionID, Schema: schema, - }, false) + }, Params.QueryNodeCfg.LocalFileCacheLimit, false) + if err != nil { + return nil, err + } return vcm, nil } diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index fe82147978..c8beec8554 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -91,6 +91,7 @@ type queryCollection struct { remoteChunkManager storage.ChunkManager vectorChunkManager *storage.VectorChunkManager localCacheEnabled bool + localCacheSize int64 globalSegmentManager *globalSealedSegmentManager } @@ -142,6 +143,7 @@ func newQueryCollection(releaseCtx context.Context, localChunkManager: localChunkManager, remoteChunkManager: remoteChunkManager, localCacheEnabled: localCacheEnabled, + localCacheSize: Params.QueryNodeCfg.LocalFileCacheLimit, globalSegmentManager: newGlobalSealedSegmentManager(collectionID), } @@ -172,10 +174,7 @@ func (q *queryCollection) close() { // } q.globalSegmentManager.close() if q.vectorChunkManager != nil { - err := q.vectorChunkManager.Close() - if err != nil { - log.Warn("close vector chunk manager error occurs", zap.Error(err)) - } + q.vectorChunkManager.Close() } } @@ -1300,11 +1299,14 @@ func (q *queryCollection) retrieve(msg queryMsg) error { if q.remoteChunkManager == nil { return fmt.Errorf("can not create vector chunk manager for remote chunk manager is nil, msgID = %d", retrieveMsg.ID()) } - q.vectorChunkManager = storage.NewVectorChunkManager(q.localChunkManager, q.remoteChunkManager, + q.vectorChunkManager, err = storage.NewVectorChunkManager(q.localChunkManager, q.remoteChunkManager, &etcdpb.CollectionMeta{ ID: collection.id, Schema: collection.schema, - }, q.localCacheEnabled) + }, q.localCacheSize, q.localCacheEnabled) + if err != nil { + return err + } } // historical retrieve diff --git a/internal/storage/local_chunk_manager.go b/internal/storage/local_chunk_manager.go index 0cfe2fac13..dd6240068f 100644 --- a/internal/storage/local_chunk_manager.go +++ b/internal/storage/local_chunk_manager.go @@ -132,7 +132,7 @@ func (lcm *LocalChunkManager) MultiRead(filePaths []string) ([][]byte, error) { return results, el } -func (lcm *LocalChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) { +func (lcm *LocalChunkManager) ListWithPrefix(prefix string) ([]string, error) { var filePaths []string absPrefix := path.Join(lcm.localPath, prefix) dir := filepath.Dir(absPrefix) @@ -142,6 +142,14 @@ func (lcm *LocalChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, } return nil }) + if err != nil { + return nil, err + } + return filePaths, nil +} + +func (lcm *LocalChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) { + filePaths, err := lcm.ListWithPrefix(prefix) if err != nil { return nil, nil, err } @@ -167,7 +175,7 @@ func (lcm *LocalChunkManager) ReadAt(filePath string, off int64, length int64) ( return res, nil } -func (lcm *LocalChunkManager) Mmap(filePath string) (io.ReaderAt, error) { +func (lcm *LocalChunkManager) Mmap(filePath string) (*mmap.ReaderAt, error) { absPath := path.Join(lcm.localPath, filePath) return mmap.Open(path.Clean(absPath)) } @@ -209,15 +217,7 @@ func (lcm *LocalChunkManager) MultiRemove(filePaths []string) error { } func (lcm *LocalChunkManager) RemoveWithPrefix(prefix string) error { - var filePaths []string - absPrefix := path.Join(lcm.localPath, prefix) - dir := filepath.Dir(absPrefix) - err := filepath.Walk(dir, func(filePath string, f os.FileInfo, err error) error { - if strings.HasPrefix(filePath, absPrefix) && !f.IsDir() { - filePaths = append(filePaths, strings.TrimPrefix(filePath, lcm.localPath)) - } - return nil - }) + filePaths, err := lcm.ListWithPrefix(prefix) if err != nil { return err } diff --git a/internal/storage/lru_cache.go b/internal/storage/lru_cache.go deleted file mode 100644 index e840b38b5d..0000000000 --- a/internal/storage/lru_cache.go +++ /dev/null @@ -1,153 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "container/list" - "errors" -) - -type LRU struct { - evictList *list.List - items map[interface{}]*list.Element - size int - onEvicted func(k, v interface{}) -} - -type entry struct { - key interface{} - value interface{} -} - -func NewLRU(size int, onEvicted func(k, v interface{})) (*LRU, error) { - if size <= 0 { - return nil, errors.New("cache size must be positive") - } - c := &LRU{ - size: size, - evictList: list.New(), - items: make(map[interface{}]*list.Element), - onEvicted: onEvicted, - } - return c, nil -} - -func (c *LRU) Add(key, value interface{}) { - if e, ok := c.items[key]; ok { - c.evictList.MoveToFront(e) - e.Value.(*entry).value = value - return - } - e := &entry{key: key, value: value} - listE := c.evictList.PushFront(e) - c.items[key] = listE - - if c.evictList.Len() > c.size { - c.RemoveOldest() - } -} - -func (c *LRU) Get(key interface{}) (value interface{}, ok bool) { - if e, ok := c.items[key]; ok { - c.evictList.MoveToFront(e) - kv := e.Value.(*entry) - return kv.value, true - } - return nil, false -} - -func (c *LRU) Remove(key interface{}) { - if e, ok := c.items[key]; ok { - c.evictList.Remove(e) - kv := e.Value.(*entry) - delete(c.items, kv.key) - if c.onEvicted != nil { - c.onEvicted(kv.key, kv.value) - } - } -} - -func (c *LRU) Contains(key interface{}) bool { - _, ok := c.items[key] - return ok -} - -// Peek get value but not update the recently-used list. -func (c *LRU) Peek(key interface{}) (value interface{}, ok bool) { - if e, ok := c.items[key]; ok { - kv := e.Value.(*entry) - return kv.value, true - } - return nil, false -} - -func (c *LRU) Keys() []interface{} { - keys := make([]interface{}, len(c.items)) - i := 0 - for ent := c.evictList.Back(); ent != nil; ent = ent.Prev() { - keys[i] = ent.Value.(*entry).key - i++ - } - return keys -} - -func (c *LRU) Len() int { - return c.evictList.Len() -} - -func (c *LRU) Purge() { - for k, v := range c.items { - if c.onEvicted != nil { - c.onEvicted(k, v.Value.(*entry).value) - } - delete(c.items, k) - } - c.evictList.Init() -} - -func (c *LRU) Resize(size int) int { - c.size = size - if size >= c.evictList.Len() { - return 0 - } - diff := c.evictList.Len() - c.size - for i := 0; i < diff; i++ { - c.RemoveOldest() - } - return diff -} - -func (c *LRU) GetOldest() (interface{}, interface{}, bool) { - ent := c.evictList.Back() - if ent != nil { - kv := ent.Value.(*entry) - return kv.key, kv.value, true - } - return nil, nil, false -} - -func (c *LRU) RemoveOldest() { - e := c.evictList.Back() - if e != nil { - c.evictList.Remove(e) - kv := e.Value.(*entry) - delete(c.items, kv.key) - if c.onEvicted != nil { - c.onEvicted(kv.key, kv.value) - } - } -} diff --git a/internal/storage/minio_chunk_manager.go b/internal/storage/minio_chunk_manager.go index 716452c256..bd11cf60b5 100644 --- a/internal/storage/minio_chunk_manager.go +++ b/internal/storage/minio_chunk_manager.go @@ -27,6 +27,7 @@ import ( "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "go.uber.org/zap" + "golang.org/x/exp/mmap" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/errorutil" @@ -168,13 +169,9 @@ func (mcm *MinioChunkManager) MultiRead(keys []string) ([][]byte, error) { } func (mcm *MinioChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) { - objects := mcm.Client.ListObjects(mcm.ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix}) - - var objectsKeys []string - var objectsValues [][]byte - - for object := range objects { - objectsKeys = append(objectsKeys, object.Key) + objectsKeys, err := mcm.ListWithPrefix(prefix) + if err != nil { + return nil, nil, err } objectsValues, err := mcm.MultiRead(objectsKeys) if err != nil { @@ -185,8 +182,8 @@ func (mcm *MinioChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, return objectsKeys, objectsValues, nil } -func (mcm *MinioChunkManager) Mmap(filePath string) (io.ReaderAt, error) { - panic("this method has not been implemented") +func (mcm *MinioChunkManager) Mmap(filePath string) (*mmap.ReaderAt, error) { + return nil, errors.New("this method has not been implemented") } // ReadAt reads specific position data of minio storage if exists. @@ -249,3 +246,14 @@ func (mcm *MinioChunkManager) RemoveWithPrefix(prefix string) error { } return nil } + +func (mcm *MinioChunkManager) ListWithPrefix(prefix string) ([]string, error) { + objects := mcm.Client.ListObjects(mcm.ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix}) + + var objectsKeys []string + + for object := range objects { + objectsKeys = append(objectsKeys, object.Key) + } + return objectsKeys, nil +} diff --git a/internal/storage/minio_chunk_manager_test.go b/internal/storage/minio_chunk_manager_test.go index e81a7a455f..cf9efca1b8 100644 --- a/internal/storage/minio_chunk_manager_test.go +++ b/internal/storage/minio_chunk_manager_test.go @@ -381,25 +381,48 @@ func TestMinIOCM(t *testing.T) { }) t.Run("test GetPath", func(t *testing.T) { - testGetSizeRoot := "get_path" + testGetPathRoot := path.Join(testMinIOKVRoot, "get_path") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - testCM := NewLocalChunkManager(RootPath(localPath)) - defer testCM.RemoveWithPrefix(testGetSizeRoot) + testCM, err := newMinIOChunkManager(ctx, testBucket) + require.NoError(t, err) + defer testCM.RemoveWithPrefix(testGetPathRoot) - key := path.Join(testGetSizeRoot, "TestMinIOKV_GetPath_key") - value := []byte("TestMinIOKV_GetPath_value") + key := path.Join(testGetPathRoot, "TestMinIOKV_GetSize_key") + value := []byte("TestMinIOKV_GetSize_value") - err := testCM.Write(key, value) + err = testCM.Write(key, value) assert.NoError(t, err) p, err := testCM.GetPath(key) assert.NoError(t, err) - assert.Equal(t, p, path.Join(localPath, key)) + assert.Equal(t, p, key) - key2 := path.Join(testGetSizeRoot, "TestMemoryKV_GetSize_key2") + key2 := path.Join(testGetPathRoot, "TestMemoryKV_GetSize_key2") p, err = testCM.GetPath(key2) assert.Error(t, err) assert.Equal(t, p, "") }) + t.Run("test Mmap", func(t *testing.T) { + testMmapRoot := path.Join(testMinIOKVRoot, "mmap") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + testCM, err := newMinIOChunkManager(ctx, testBucket) + require.NoError(t, err) + defer testCM.RemoveWithPrefix(testMmapRoot) + + key := path.Join(testMmapRoot, "TestMinIOKV_GetSize_key") + value := []byte("TestMinIOKV_GetSize_value") + + err = testCM.Write(key, value) + assert.NoError(t, err) + + r, err := testCM.Mmap(key) + assert.Error(t, err) + assert.Nil(t, r) + + }) } diff --git a/internal/storage/types.go b/internal/storage/types.go index 28305d9963..57e4437609 100644 --- a/internal/storage/types.go +++ b/internal/storage/types.go @@ -11,6 +11,10 @@ package storage +import ( + "golang.org/x/exp/mmap" +) + // ChunkManager is to manager chunks. // Include Read, Write, Remove chunks. type ChunkManager interface { @@ -28,8 +32,10 @@ type ChunkManager interface { Read(filePath string) ([]byte, error) // MultiRead reads @filePath and returns content. MultiRead(filePaths []string) ([][]byte, error) + ListWithPrefix(prefix string) ([]string, error) // ReadWithPrefix reads files with same @prefix and returns contents. ReadWithPrefix(prefix string) ([]string, [][]byte, error) + Mmap(filePath string) (*mmap.ReaderAt, error) // ReadAt reads @filePath by offset @off, content stored in @p, return @n as the number of bytes read. // if all bytes are read, @err is io.EOF. // return other error if read failed. diff --git a/internal/storage/vector_chunk_manager.go b/internal/storage/vector_chunk_manager.go index 6409170c89..2c59d140f8 100644 --- a/internal/storage/vector_chunk_manager.go +++ b/internal/storage/vector_chunk_manager.go @@ -19,52 +19,86 @@ package storage import ( "errors" "io" + "sync" "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/internal/util/cache" + "go.uber.org/zap" + "golang.org/x/exp/mmap" +) + +var ( + defaultLocalCacheSize = 64 ) // VectorChunkManager is responsible for read and write vector data. type VectorChunkManager struct { - localChunkManager ChunkManager - remoteChunkManager ChunkManager + cacheStorage ChunkManager + vectorStorage ChunkManager + cache *cache.LRU - schema *etcdpb.CollectionMeta + insertCodec *InsertCodec - localCacheEnable bool + cacheEnable bool + cacheLimit int64 + cacheSize int64 + cacheSizeMutex sync.Mutex + fixSize bool // Prevent cache capactiy from changing too frequently } var _ ChunkManager = (*VectorChunkManager)(nil) // NewVectorChunkManager create a new vector manager object. -func NewVectorChunkManager(localChunkManager ChunkManager, remoteChunkManager ChunkManager, schema *etcdpb.CollectionMeta, localCacheEnable bool) *VectorChunkManager { - return &VectorChunkManager{ - localChunkManager: localChunkManager, - remoteChunkManager: remoteChunkManager, +func NewVectorChunkManager(cacheStorage ChunkManager, vectorStorage ChunkManager, schema *etcdpb.CollectionMeta, cacheLimit int64, cacheEnable bool) (*VectorChunkManager, error) { + insertCodec := NewInsertCodec(schema) + vcm := &VectorChunkManager{ + cacheStorage: cacheStorage, + vectorStorage: vectorStorage, - schema: schema, - localCacheEnable: localCacheEnable, + insertCodec: insertCodec, + cacheEnable: cacheEnable, + cacheLimit: cacheLimit, } + if cacheEnable { + if cacheLimit <= 0 { + return nil, errors.New("cache limit must be positive if cacheEnable") + } + c, err := cache.NewLRU(defaultLocalCacheSize, func(k cache.Key, v cache.Value) { + r := v.(*mmap.ReaderAt) + size := r.Len() + err := r.Close() + if err != nil { + log.Error("Unmmap file failed", zap.Any("file", k)) + } + err = cacheStorage.Remove(k.(string)) + if err != nil { + log.Error("cache storage remove file failed", zap.Any("file", k)) + } + vcm.cacheSizeMutex.Lock() + vcm.cacheSize -= int64(size) + vcm.cacheSizeMutex.Unlock() + }) + if err != nil { + return nil, err + } + vcm.cache = c + } + + return vcm, nil } // For vector data, we will download vector file from storage. And we will // deserialize the file for it has binlog style. At last we store pure vector // data to local storage as cache. -func (vcm *VectorChunkManager) downloadFile(filePath string) ([]byte, error) { - if vcm.localChunkManager.Exist(filePath) { - return vcm.localChunkManager.Read(filePath) - } - content, err := vcm.remoteChunkManager.Read(filePath) - if err != nil { - return nil, err - } - insertCodec := NewInsertCodec(vcm.schema) +func (vcm *VectorChunkManager) deserializeVectorFile(filePath string, content []byte) ([]byte, error) { blob := &Blob{ Key: filePath, Value: content, } - _, _, data, err := insertCodec.Deserialize([]*Blob{blob}) + _, _, data, err := vcm.insertCodec.Deserialize([]*Blob{blob}) if err != nil { return nil, err } @@ -83,94 +117,151 @@ func (vcm *VectorChunkManager) downloadFile(filePath string) ([]byte, error) { // GetPath returns the path of vector data. If cached, return local path. // If not cached return remote path. func (vcm *VectorChunkManager) GetPath(filePath string) (string, error) { - if vcm.localChunkManager.Exist(filePath) && vcm.localCacheEnable { - return vcm.localChunkManager.GetPath(filePath) - } - return vcm.remoteChunkManager.GetPath(filePath) + return vcm.vectorStorage.GetPath(filePath) } func (vcm *VectorChunkManager) GetSize(filePath string) (int64, error) { - if vcm.localChunkManager.Exist(filePath) && vcm.localCacheEnable { - return vcm.localChunkManager.GetSize(filePath) - } - return vcm.remoteChunkManager.GetSize(filePath) + return vcm.vectorStorage.GetSize(filePath) } // Write writes the vector data to local cache if cache enabled. func (vcm *VectorChunkManager) Write(filePath string, content []byte) error { - if !vcm.localCacheEnable { - return errors.New("cannot write local file for local cache is not allowed") - } - return vcm.localChunkManager.Write(filePath, content) + return vcm.vectorStorage.Write(filePath, content) } // MultiWrite writes the vector data to local cache if cache enabled. func (vcm *VectorChunkManager) MultiWrite(contents map[string][]byte) error { - if !vcm.localCacheEnable { - return errors.New("cannot write local file for local cache is not allowed") - } - return vcm.localChunkManager.MultiWrite(contents) + return vcm.vectorStorage.MultiWrite(contents) } // Exist checks whether vector data is saved to local cache. func (vcm *VectorChunkManager) Exist(filePath string) bool { - return vcm.localChunkManager.Exist(filePath) + return vcm.vectorStorage.Exist(filePath) +} + +func (vcm *VectorChunkManager) readWithCache(filePath string) ([]byte, error) { + contents, err := vcm.vectorStorage.Read(filePath) + if err != nil { + return nil, err + } + results, err := vcm.deserializeVectorFile(filePath, contents) + if err != nil { + return nil, err + } + err = vcm.cacheStorage.Write(filePath, results) + if err != nil { + return nil, err + } + r, err := vcm.cacheStorage.Mmap(filePath) + if err != nil { + return nil, err + } + size, err := vcm.cacheStorage.GetSize(filePath) + if err != nil { + return nil, err + } + vcm.cacheSizeMutex.Lock() + vcm.cacheSize += size + vcm.cacheSizeMutex.Unlock() + if !vcm.fixSize { + if vcm.cacheSize < vcm.cacheLimit { + if vcm.cache.Len() == vcm.cache.Capacity() { + newSize := float32(vcm.cache.Capacity()) * 1.25 + vcm.cache.Resize(int(newSize)) + } + } else { + // +1 is for add current value + vcm.cache.Resize(vcm.cache.Len() + 1) + vcm.fixSize = true + } + } + vcm.cache.Add(filePath, r) + return results, nil } // Read reads the pure vector data. If cached, it reads from local. func (vcm *VectorChunkManager) Read(filePath string) ([]byte, error) { - if vcm.localCacheEnable { - if vcm.localChunkManager.Exist(filePath) { - return vcm.localChunkManager.Read(filePath) + if vcm.cacheEnable { + if r, ok := vcm.cache.Get(filePath); ok { + at := r.(*mmap.ReaderAt) + p := make([]byte, at.Len()) + _, err := at.ReadAt(p, 0) + if err != nil { + return p, err + } + return p, nil } - contents, err := vcm.downloadFile(filePath) - if err != nil { - return nil, err - } - err = vcm.localChunkManager.Write(filePath, contents) - if err != nil { - return nil, err - } - return vcm.localChunkManager.Read(filePath) + return vcm.readWithCache(filePath) } - return vcm.downloadFile(filePath) + contents, err := vcm.vectorStorage.Read(filePath) + if err != nil { + return nil, err + } + return vcm.deserializeVectorFile(filePath, contents) } // MultiRead reads the pure vector data. If cached, it reads from local. func (vcm *VectorChunkManager) MultiRead(filePaths []string) ([][]byte, error) { - var results [][]byte - for _, filePath := range filePaths { + results := make([][]byte, len(filePaths)) + for i, filePath := range filePaths { content, err := vcm.Read(filePath) if err != nil { return nil, err } - results = append(results, content) + results[i] = content } return results, nil } func (vcm *VectorChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) { - panic("has not implemented yet") + filePaths, err := vcm.ListWithPrefix(prefix) + if err != nil { + return nil, nil, err + } + results, err := vcm.MultiRead(filePaths) + if err != nil { + return nil, nil, err + } + return filePaths, results, nil +} + +func (vcm *VectorChunkManager) ListWithPrefix(prefix string) ([]string, error) { + return vcm.vectorStorage.ListWithPrefix(prefix) +} + +func (vcm *VectorChunkManager) Mmap(filePath string) (*mmap.ReaderAt, error) { + if vcm.cacheEnable && vcm.cache != nil { + if r, ok := vcm.cache.Get(filePath); ok { + return r.(*mmap.ReaderAt), nil + } + } + return nil, errors.New("the file mmap has not been cached") } // ReadAt reads specific position data of vector. If cached, it reads from local. func (vcm *VectorChunkManager) ReadAt(filePath string, off int64, length int64) ([]byte, error) { - if vcm.localCacheEnable { - if vcm.localChunkManager.Exist(filePath) { - return vcm.localChunkManager.ReadAt(filePath, off, length) + if vcm.cacheEnable { + if r, ok := vcm.cache.Get(filePath); ok { + at := r.(*mmap.ReaderAt) + p := make([]byte, length) + _, err := at.ReadAt(p, off) + if err != nil { + return nil, err + } + return p, nil } - results, err := vcm.downloadFile(filePath) + results, err := vcm.readWithCache(filePath) if err != nil { return nil, err } - err = vcm.localChunkManager.Write(filePath, results) - if err != nil { - return nil, err - } - return vcm.localChunkManager.ReadAt(filePath, off, length) + return results[off : off+length], nil } - results, err := vcm.downloadFile(filePath) + contents, err := vcm.vectorStorage.Read(filePath) + if err != nil { + return nil, err + } + results, err := vcm.deserializeVectorFile(filePath, contents) if err != nil { return nil, err } @@ -184,46 +275,51 @@ func (vcm *VectorChunkManager) ReadAt(filePath string, off int64, length int64) if n < len(p) { return nil, io.EOF } - return p, nil } func (vcm *VectorChunkManager) Remove(filePath string) error { - err := vcm.localChunkManager.Remove(filePath) + err := vcm.vectorStorage.Remove(filePath) if err != nil { return err } - err = vcm.remoteChunkManager.Remove(filePath) - if err != nil { - return err + if vcm.cacheEnable { + vcm.cache.Remove(filePath) } return nil } func (vcm *VectorChunkManager) MultiRemove(filePaths []string) error { - err := vcm.localChunkManager.MultiRemove(filePaths) + err := vcm.vectorStorage.MultiRemove(filePaths) if err != nil { return err } - err = vcm.remoteChunkManager.MultiRemove(filePaths) - if err != nil { - return err + if vcm.cacheEnable { + for _, p := range filePaths { + vcm.cache.Remove(p) + } } return nil } func (vcm *VectorChunkManager) RemoveWithPrefix(prefix string) error { - err := vcm.localChunkManager.RemoveWithPrefix(prefix) + err := vcm.vectorStorage.RemoveWithPrefix(prefix) if err != nil { return err } - err = vcm.remoteChunkManager.RemoveWithPrefix(prefix) - if err != nil { - return err + if vcm.cacheEnable { + filePaths, err := vcm.ListWithPrefix(prefix) + if err != nil { + return err + } + for _, p := range filePaths { + vcm.cache.Remove(p) + } } return nil } -func (vcm *VectorChunkManager) Close() error { - // TODO:Replace the cache with the local chunk manager and clear the cache when closed - return vcm.localChunkManager.RemoveWithPrefix("") +func (vcm *VectorChunkManager) Close() { + if vcm.cache != nil && vcm.cacheEnable { + vcm.cache.Close() + } } diff --git a/internal/storage/vector_chunk_manager_test.go b/internal/storage/vector_chunk_manager_test.go index db4387ccf9..df3e00ce8d 100644 --- a/internal/storage/vector_chunk_manager_test.go +++ b/internal/storage/vector_chunk_manager_test.go @@ -20,15 +20,13 @@ import ( "context" "errors" "os" - "path" "testing" - "github.com/stretchr/testify/assert" - "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/stretchr/testify/assert" ) func initMeta() *etcdpb.CollectionMeta { @@ -131,7 +129,10 @@ func buildVectorChunkManager(localPath string, localCacheEnable bool) (*VectorCh lcm := NewLocalChunkManager(RootPath(localPath)) meta := initMeta() - vcm := NewVectorChunkManager(lcm, rcm, meta, localCacheEnable) + vcm, err := NewVectorChunkManager(lcm, rcm, meta, 16, localCacheEnable) + if err != nil { + return nil, cancel, err + } return vcm, cancel, nil } @@ -149,86 +150,110 @@ func TestMain(m *testing.M) { os.Exit(exitCode) } -func TestVectorChunkManager_GetPath(t *testing.T) { - vcm, cancel, err := buildVectorChunkManager(localPath, true) +func TestNewVectorChunkManager(t *testing.T) { + ctx := context.Background() + bucketName := "vector-chunk-manager" + + rcm, err := newMinIOChunkManager(ctx, bucketName) + assert.Nil(t, err) + assert.NotNil(t, rcm) + lcm := NewLocalChunkManager(RootPath(localPath)) + + meta := initMeta() + vcm, err := NewVectorChunkManager(lcm, rcm, meta, 16, true) + assert.Nil(t, err) assert.NotNil(t, vcm) - assert.NoError(t, err) - key := "1" - err = vcm.Write(key, []byte{1}) - assert.Nil(t, err) - pathGet, err := vcm.GetPath(key) - assert.Nil(t, err) - pathJoin := path.Join(localPath, key) - assert.Equal(t, pathGet, pathJoin) + vcm, err = NewVectorChunkManager(lcm, rcm, meta, -1, true) + assert.NotNil(t, err) + assert.Nil(t, vcm) +} - vcm.localCacheEnable = false - err = vcm.remoteChunkManager.Write(key, []byte{1}) - assert.Nil(t, err) - pathGet, err = vcm.GetPath(key) - assert.Nil(t, err) - assert.Equal(t, pathGet, key) +func TestVectorChunkManager_GetPath(t *testing.T) { + localCaches := []bool{true, false} + for _, localCache := range localCaches { + vcm, cancel, err := buildVectorChunkManager(localPath, localCache) + assert.NoError(t, err) + assert.NotNil(t, vcm) - err = vcm.RemoveWithPrefix(localPath) - assert.NoError(t, err) - cancel() + key := "1" + err = vcm.Write(key, []byte{1}) + assert.Nil(t, err) + pathGet, err := vcm.GetPath(key) + assert.Nil(t, err) + assert.Equal(t, pathGet, key) + + err = vcm.cacheStorage.Write(key, []byte{1}) + assert.Nil(t, err) + pathGet, err = vcm.GetPath(key) + assert.Nil(t, err) + assert.Equal(t, pathGet, key) + + err = vcm.RemoveWithPrefix(localPath) + assert.NoError(t, err) + cancel() + vcm.Close() + } } func TestVectorChunkManager_GetSize(t *testing.T) { - vcm, cancel, err := buildVectorChunkManager(localPath, true) - assert.NotNil(t, vcm) - assert.NoError(t, err) + localCaches := []bool{true, false} + for _, localCache := range localCaches { + vcm, cancel, err := buildVectorChunkManager(localPath, localCache) + assert.NoError(t, err) + assert.NotNil(t, vcm) - key := "1" - err = vcm.Write(key, []byte{1}) - assert.Nil(t, err) - sizeGet, err := vcm.GetSize(key) - assert.Nil(t, err) - assert.EqualValues(t, sizeGet, 1) + key := "1" + err = vcm.Write(key, []byte{1}) + assert.Nil(t, err) + sizeGet, err := vcm.GetSize(key) + assert.Nil(t, err) + assert.EqualValues(t, sizeGet, 1) - vcm.localCacheEnable = false - err = vcm.remoteChunkManager.Write(key, []byte{1}) - assert.Nil(t, err) - sizeGet, err = vcm.GetSize(key) - assert.Nil(t, err) - assert.EqualValues(t, sizeGet, 1) + err = vcm.cacheStorage.Write(key, []byte{1}) + assert.Nil(t, err) + sizeGet, err = vcm.GetSize(key) + assert.Nil(t, err) + assert.EqualValues(t, sizeGet, 1) - err = vcm.RemoveWithPrefix(localPath) - assert.NoError(t, err) - cancel() + err = vcm.RemoveWithPrefix(localPath) + assert.NoError(t, err) + cancel() + vcm.Close() + } } func TestVectorChunkManager_Write(t *testing.T) { - vcm, cancel, err := buildVectorChunkManager(localPath, false) - assert.NoError(t, err) - assert.NotNil(t, vcm) + localCaches := []bool{true, false} + for _, localCache := range localCaches { + vcm, cancel, err := buildVectorChunkManager(localPath, localCache) + assert.NoError(t, err) + assert.NotNil(t, vcm) - key := "1" - err = vcm.Write(key, []byte{1}) - assert.Error(t, err) + key := "1" + err = vcm.Write(key, []byte{1}) + assert.Nil(t, err) - vcm.localCacheEnable = true - err = vcm.Write(key, []byte{1}) - assert.Nil(t, err) + exist := vcm.Exist(key) + assert.True(t, exist) - exist := vcm.Exist(key) - assert.True(t, exist) + contents := map[string][]byte{ + "key_1": {111}, + "key_2": {222}, + } + err = vcm.MultiWrite(contents) + assert.NoError(t, err) - contents := map[string][]byte{ - "key_1": {111}, - "key_2": {222}, + exist = vcm.Exist("key_1") + assert.True(t, exist) + exist = vcm.Exist("key_2") + assert.True(t, exist) + + err = vcm.RemoveWithPrefix(localPath) + assert.NoError(t, err) + cancel() + vcm.Close() } - err = vcm.MultiWrite(contents) - assert.NoError(t, err) - - exist = vcm.Exist("key_1") - assert.True(t, exist) - exist = vcm.Exist("key_2") - assert.True(t, exist) - - err = vcm.RemoveWithPrefix(localPath) - assert.NoError(t, err) - cancel() } func TestVectorChunkManager_Remove(t *testing.T) { @@ -239,7 +264,7 @@ func TestVectorChunkManager_Remove(t *testing.T) { assert.NotNil(t, vcm) key := "1" - err = vcm.remoteChunkManager.Write(key, []byte{1}) + err = vcm.cacheStorage.Write(key, []byte{1}) assert.Nil(t, err) err = vcm.Remove(key) @@ -252,7 +277,7 @@ func TestVectorChunkManager_Remove(t *testing.T) { "key_1": {111}, "key_2": {222}, } - err = vcm.remoteChunkManager.MultiWrite(contents) + err = vcm.cacheStorage.MultiWrite(contents) assert.NoError(t, err) err = vcm.MultiRemove([]string{"key_1", "key_2"}) @@ -266,6 +291,7 @@ func TestVectorChunkManager_Remove(t *testing.T) { err = vcm.RemoveWithPrefix(localPath) assert.NoError(t, err) cancel() + vcm.Close() } } @@ -296,15 +322,8 @@ func (m *mockFailedChunkManager) MultiRemove(key []string) error { func TestVectorChunkManager_Remove_Fail(t *testing.T) { vcm := &VectorChunkManager{ - localChunkManager: &mockFailedChunkManager{fail: true}, - } - assert.Error(t, vcm.Remove("test")) - assert.Error(t, vcm.MultiRemove([]string{"test"})) - assert.Error(t, vcm.RemoveWithPrefix("test")) - - vcm = &VectorChunkManager{ - localChunkManager: &mockFailedChunkManager{fail: false}, - remoteChunkManager: &mockFailedChunkManager{fail: true}, + vectorStorage: &mockFailedChunkManager{fail: true}, + cacheStorage: &mockFailedChunkManager{fail: true}, } assert.Error(t, vcm.Remove("test")) assert.Error(t, vcm.MultiRemove([]string{"test"})) @@ -322,17 +341,11 @@ func TestVectorChunkManager_Read(t *testing.T) { assert.Error(t, err) assert.Nil(t, content) - vcm.localCacheEnable = true - - content, err = vcm.Read("9999") - assert.Error(t, err) - assert.Nil(t, content) - meta := initMeta() binlogs := initBinlogFile(meta) assert.NotNil(t, binlogs) for _, binlog := range binlogs { - err := vcm.remoteChunkManager.Write(binlog.Key, binlog.Value) + err := vcm.vectorStorage.Write(binlog.Key, binlog.Value) assert.Nil(t, err) } @@ -360,11 +373,21 @@ func TestVectorChunkManager_Read(t *testing.T) { } assert.Equal(t, []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 111, 222, 333, 444, 555, 777, 666}, floatResult) + keys, contents, err := vcm.ReadWithPrefix("10") + assert.Nil(t, err) + assert.Equal(t, "101", keys[0]) + assert.Equal(t, []byte{3, 4}, contents[0]) + + assert.Nil(t, err) + assert.Equal(t, "108", keys[1]) + assert.Equal(t, []byte{0, 255}, contents[1]) + floatResult = make([]float32, 0) for i := 0; i < len(content)/4; i++ { - singleData := typeutil.BytesToFloat32(contents[1][i*4 : i*4+4]) + singleData := typeutil.BytesToFloat32(contents[2][i*4 : i*4+4]) floatResult = append(floatResult, singleData) } + assert.Equal(t, "109", keys[2]) assert.Equal(t, []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 111, 222, 333, 444, 555, 777, 666}, floatResult) content, err = vcm.ReadAt("109", 8*4, 8*4) @@ -381,11 +404,23 @@ func TestVectorChunkManager_Read(t *testing.T) { assert.Error(t, err) assert.Nil(t, content) - vcm.localCacheEnable = false content, err = vcm.ReadAt("109", 8*4, 8*4) assert.Nil(t, err) assert.Equal(t, 32, len(content)) + if localCache { + r, err := vcm.Mmap("109") + assert.Nil(t, err) + p := make([]byte, 32) + n, err := r.ReadAt(p, 32) + assert.Nil(t, err) + assert.Equal(t, n, 32) + + r, err = vcm.Mmap("not exist") + assert.Error(t, err) + assert.Nil(t, nil) + } + content, err = vcm.ReadAt("109", 9999, 8*4) assert.Error(t, err) assert.Nil(t, content) @@ -396,6 +431,8 @@ func TestVectorChunkManager_Read(t *testing.T) { err = vcm.RemoveWithPrefix(localPath) assert.NoError(t, err) + cancel() + vcm.Close() } } diff --git a/internal/util/cache/lru_cache.go b/internal/util/cache/lru_cache.go new file mode 100644 index 0000000000..9070c5ab93 --- /dev/null +++ b/internal/util/cache/lru_cache.go @@ -0,0 +1,243 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "container/list" + "context" + "errors" + "fmt" + "sync" +) + +type LRU struct { + ctx context.Context + cancel context.CancelFunc + evictList *list.List + items map[interface{}]*list.Element + capacity int + onEvicted func(k Key, v Value) + m sync.RWMutex + evictedCh chan *entry + stats *Stats +} + +type Stats struct { + hitCount float32 + evictedCount float32 + readCount float32 + writeCount float32 +} + +func (s *Stats) String() string { + var hitRatio float32 + var evictedRatio float32 + if s.readCount != 0 { + hitRatio = s.hitCount / s.readCount + evictedRatio = s.evictedCount / s.writeCount + } + + return fmt.Sprintf("lru cache hit ratio = %f, evictedRatio = %f", hitRatio, evictedRatio) +} + +type Key interface { +} +type Value interface { +} + +type entry struct { + key Key + value Value +} + +func NewLRU(capacity int, onEvicted func(k Key, v Value)) (*LRU, error) { + if capacity <= 0 { + return nil, errors.New("cache size must be positive") + } + ctx, cancel := context.WithCancel(context.Background()) + c := &LRU{ + ctx: ctx, + cancel: cancel, + capacity: capacity, + evictList: list.New(), + items: make(map[interface{}]*list.Element), + onEvicted: onEvicted, + evictedCh: make(chan *entry, 16), + stats: &Stats{}, + } + go c.evictedWorker() + return c, nil +} + +func (c *LRU) evictedWorker() { + for { + select { + case <-c.ctx.Done(): + return + case e, ok := <-c.evictedCh: + if ok { + if c.onEvicted != nil { + c.onEvicted(e.key, e.value) + } + } + } + } +} + +func (c *LRU) Add(key, value Value) { + c.m.Lock() + defer c.m.Unlock() + c.stats.writeCount++ + if e, ok := c.items[key]; ok { + c.evictList.MoveToFront(e) + e.Value.(*entry).value = value + return + } + e := &entry{key: key, value: value} + listE := c.evictList.PushFront(e) + c.items[key] = listE + + if c.evictList.Len() > c.capacity { + c.stats.evictedCount++ + oldestE := c.evictList.Back() + if oldestE != nil { + c.evictList.Remove(oldestE) + kv := oldestE.Value.(*entry) + delete(c.items, kv.key) + if c.onEvicted != nil { + c.evictedCh <- kv + } + } + } +} + +func (c *LRU) Get(key Key) (value Value, ok bool) { + c.m.RLock() + defer c.m.RUnlock() + c.stats.readCount++ + if e, ok := c.items[key]; ok { + c.stats.hitCount++ + c.evictList.MoveToFront(e) + kv := e.Value.(*entry) + return kv.value, true + } + return nil, false +} + +func (c *LRU) Remove(key Key) { + c.m.Lock() + defer c.m.Unlock() + if e, ok := c.items[key]; ok { + c.evictList.Remove(e) + kv := e.Value.(*entry) + delete(c.items, kv.key) + if c.onEvicted != nil { + c.evictedCh <- kv + } + } +} + +func (c *LRU) Contains(key Key) bool { + c.m.RLock() + defer c.m.RUnlock() + _, ok := c.items[key] + return ok +} + +func (c *LRU) Keys() []Key { + c.m.RLock() + defer c.m.RUnlock() + keys := make([]Key, len(c.items)) + i := 0 + for ent := c.evictList.Back(); ent != nil; ent = ent.Prev() { + keys[i] = ent.Value.(*entry).key + i++ + } + return keys +} + +func (c *LRU) Len() int { + c.m.RLock() + defer c.m.RUnlock() + return c.evictList.Len() +} + +func (c *LRU) Capacity() int { + return c.capacity +} + +func (c *LRU) Purge() { + c.m.Lock() + defer c.m.Unlock() + for k, v := range c.items { + if c.onEvicted != nil { + c.evictedCh <- v.Value.(*entry) + } + delete(c.items, k) + } + c.evictList.Init() +} + +func (c *LRU) Resize(capacity int) int { + c.m.Lock() + defer c.m.Unlock() + c.capacity = capacity + if capacity >= c.evictList.Len() { + return 0 + } + diff := c.evictList.Len() - c.capacity + for i := 0; i < diff; i++ { + oldestE := c.evictList.Back() + if oldestE != nil { + c.evictList.Remove(oldestE) + kv := oldestE.Value.(*entry) + delete(c.items, kv.key) + if c.onEvicted != nil { + c.evictedCh <- kv + } + } + } + return diff +} + +func (c *LRU) GetOldest() (Key, Value, bool) { + c.m.RLock() + defer c.m.RUnlock() + ent := c.evictList.Back() + if ent != nil { + kv := ent.Value.(*entry) + return kv.key, kv.value, true + } + return nil, nil, false +} + +func (c *LRU) Close() { + c.Purge() + c.cancel() + remain := len(c.evictedCh) + for i := 0; i < remain; i++ { + e, ok := <-c.evictedCh + if ok { + c.onEvicted(e.key, e.value) + } + } + close(c.evictedCh) +} + +func (c *LRU) Stats() *Stats { + return c.stats +} diff --git a/internal/storage/lru_cache_test.go b/internal/util/cache/lru_cache_test.go similarity index 75% rename from internal/storage/lru_cache_test.go rename to internal/util/cache/lru_cache_test.go index bfc7d4dc46..63a13e6324 100644 --- a/internal/storage/lru_cache_test.go +++ b/internal/util/cache/lru_cache_test.go @@ -14,10 +14,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage +package cache import ( + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -37,8 +39,8 @@ func TestNewLRU(t *testing.T) { } func TestLRU_Add(t *testing.T) { - evicted := 0 - c, err := NewLRU(2, func(interface{}, interface{}) { evicted++ }) + evicted := int32(0) + c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) }) assert.Nil(t, err) testKey1 := "test_key_1" @@ -77,12 +79,14 @@ func TestLRU_Add(t *testing.T) { assert.False(t, ok) assert.Nil(t, v) - assert.EqualValues(t, evicted, 1) + assert.Eventually(t, func() bool { + return atomic.LoadInt32(&evicted) == 1 + }, 1*time.Second, 100*time.Millisecond) } func TestLRU_Contains(t *testing.T) { - evicted := 0 - c, err := NewLRU(1, func(interface{}, interface{}) { evicted++ }) + evicted := int32(0) + c, err := NewLRU(1, func(Key, Value) { atomic.AddInt32(&evicted, 1) }) assert.Nil(t, err) testKey1 := "test_key_1" @@ -101,12 +105,14 @@ func TestLRU_Contains(t *testing.T) { ok = c.Contains(testKey1) assert.False(t, ok) - assert.EqualValues(t, evicted, 1) + assert.Eventually(t, func() bool { + return atomic.LoadInt32(&evicted) == 1 + }, 1*time.Second, 100*time.Millisecond) } func TestLRU_Get(t *testing.T) { - evicted := 0 - c, err := NewLRU(1, func(interface{}, interface{}) { evicted++ }) + evicted := int32(0) + c, err := NewLRU(1, func(Key, Value) { atomic.AddInt32(&evicted, 1) }) assert.Nil(t, err) testKey1 := "test_key_1" @@ -128,11 +134,14 @@ func TestLRU_Get(t *testing.T) { assert.False(t, ok) assert.Nil(t, v) - assert.EqualValues(t, evicted, 1) + assert.Eventually(t, func() bool { + return atomic.LoadInt32(&evicted) == 1 + }, 1*time.Second, 100*time.Millisecond) } + func TestLRU_GetOldest(t *testing.T) { - evicted := 0 - c, err := NewLRU(2, func(interface{}, interface{}) { evicted++ }) + evicted := int32(0) + c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) }) assert.Nil(t, err) testKey1 := "test_key_1" @@ -174,11 +183,14 @@ func TestLRU_GetOldest(t *testing.T) { assert.EqualValues(t, testKey1, k) assert.EqualValues(t, testValue1, v) - assert.EqualValues(t, evicted, 1) + assert.Eventually(t, func() bool { + return atomic.LoadInt32(&evicted) == 1 + }, 1*time.Second, 100*time.Millisecond) } + func TestLRU_Keys(t *testing.T) { - evicted := 0 - c, err := NewLRU(2, func(interface{}, interface{}) { evicted++ }) + evicted := int32(0) + c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) }) assert.Nil(t, err) testKey1 := "test_key_1" @@ -204,8 +216,11 @@ func TestLRU_Keys(t *testing.T) { keys = c.Keys() assert.ElementsMatch(t, []string{testKey3, testKey1}, keys) - assert.EqualValues(t, evicted, 1) + assert.Eventually(t, func() bool { + return atomic.LoadInt32(&evicted) == 1 + }, 1*time.Second, 100*time.Millisecond) } + func TestLRU_Len(t *testing.T) { c, err := NewLRU(2, nil) assert.Nil(t, err) @@ -225,43 +240,31 @@ func TestLRU_Len(t *testing.T) { c.Add(testKey3, testValue3) assert.EqualValues(t, c.Len(), 2) } -func TestLRU_Peek(t *testing.T) { - c, err := NewLRU(2, nil) + +func TestLRU_Capacity(t *testing.T) { + c, err := NewLRU(5, nil) assert.Nil(t, err) + assert.EqualValues(t, c.Len(), 0) testKey1 := "test_key_1" testValue1 := "test_value_1" testKey2 := "test_key_2" testValue2 := "test_value_2" - testKeyNotExist := "not_exist" + testKey3 := "test_key_3" + testValue3 := "test_value_3" c.Add(testKey1, testValue1) - v, ok := c.Peek(testKey1) - assert.True(t, ok) - assert.EqualValues(t, testValue1, v) - + assert.EqualValues(t, c.Capacity(), 5) c.Add(testKey2, testValue2) - k, v, ok := c.GetOldest() - assert.True(t, ok) - assert.EqualValues(t, testKey1, k) - assert.EqualValues(t, testValue1, v) + assert.EqualValues(t, c.Capacity(), 5) - v, ok = c.Peek(testKey1) - assert.True(t, ok) - assert.EqualValues(t, testValue1, v) - - k, v, ok = c.GetOldest() - assert.True(t, ok) - assert.EqualValues(t, testKey1, k) - assert.EqualValues(t, testValue1, v) - - v, ok = c.Peek(testKeyNotExist) - assert.False(t, ok) - assert.Nil(t, v) + c.Add(testKey3, testValue3) + assert.EqualValues(t, c.Capacity(), 5) } + func TestLRU_Purge(t *testing.T) { - evicted := 0 - c, err := NewLRU(2, func(interface{}, interface{}) { evicted++ }) + evicted := int32(0) + c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) }) assert.Nil(t, err) assert.EqualValues(t, c.Len(), 0) @@ -281,11 +284,14 @@ func TestLRU_Purge(t *testing.T) { c.Purge() assert.EqualValues(t, c.Len(), 0) - assert.EqualValues(t, evicted, 3) + assert.Eventually(t, func() bool { + return atomic.LoadInt32(&evicted) == 3 + }, 1*time.Second, 100*time.Millisecond) } + func TestLRU_Remove(t *testing.T) { - evicted := 0 - c, err := NewLRU(2, func(interface{}, interface{}) { evicted++ }) + evicted := int32(0) + c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) }) assert.Nil(t, err) assert.EqualValues(t, c.Len(), 0) @@ -302,11 +308,14 @@ func TestLRU_Remove(t *testing.T) { c.Remove(testKey2) assert.EqualValues(t, c.Len(), 0) - assert.EqualValues(t, evicted, 2) + assert.Eventually(t, func() bool { + return atomic.LoadInt32(&evicted) == 2 + }, 1*time.Second, 100*time.Millisecond) } + func TestLRU_RemoveOldest(t *testing.T) { - evicted := 0 - c, err := NewLRU(2, func(interface{}, interface{}) { evicted++ }) + evicted := int32(0) + c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) }) assert.Nil(t, err) assert.EqualValues(t, c.Len(), 0) @@ -339,12 +348,15 @@ func TestLRU_RemoveOldest(t *testing.T) { assert.Nil(t, v) assert.EqualValues(t, c.Len(), 0) - assert.EqualValues(t, evicted, 2) + assert.Eventually(t, func() bool { + return atomic.LoadInt32(&evicted) == 2 + }, 1*time.Second, 100*time.Millisecond) } + func TestLRU_Resize(t *testing.T) { - evicted := 0 - c, err := NewLRU(2, func(interface{}, interface{}) { evicted++ }) + evicted := int32(0) + c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) }) assert.Nil(t, err) assert.EqualValues(t, c.Len(), 0) @@ -376,10 +388,14 @@ func TestLRU_Resize(t *testing.T) { assert.EqualValues(t, v, testValue2) assert.EqualValues(t, c.Len(), 1) - assert.EqualValues(t, evicted, 1) + assert.Eventually(t, func() bool { + return atomic.LoadInt32(&evicted) == 1 + }, 1*time.Second, 100*time.Millisecond) c.Resize(3) assert.EqualValues(t, c.Len(), 1) - assert.EqualValues(t, evicted, 1) + assert.Eventually(t, func() bool { + return atomic.LoadInt32(&evicted) == 1 + }, 1*time.Second, 100*time.Millisecond) } diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 3c24b59fb9..e4af1fa40e 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -597,6 +597,9 @@ type queryNodeConfig struct { // memory limit OverloadedMemoryThresholdPercentage float64 + + // cache limit + LocalFileCacheLimit int64 } func (p *queryNodeConfig) init(base *BaseTable) { @@ -617,6 +620,8 @@ func (p *queryNodeConfig) init(base *BaseTable) { p.initSegcoreChunkRows() p.initOverloadedMemoryThresholdPercentage() + + p.initLocalFileCacheLimit() } // InitAlias initializes an alias for the QueryNode role. @@ -691,6 +696,15 @@ func (p *queryNodeConfig) initOverloadedMemoryThresholdPercentage() { p.OverloadedMemoryThresholdPercentage = float64(thresholdPercentage) / 100 } +func (p *queryNodeConfig) initLocalFileCacheLimit() { + overloadedMemoryThresholdPercentage := p.Base.LoadWithDefault("querynoe.chunkManager.localFileCacheLimit", "90") + localFileCacheLimit, err := strconv.ParseInt(overloadedMemoryThresholdPercentage, 10, 64) + if err != nil { + panic(err) + } + p.LocalFileCacheLimit = localFileCacheLimit +} + /////////////////////////////////////////////////////////////////////////////// // --- datacoord --- type dataCoordConfig struct {