From 256d4e209fb7e1a3f3891feeb8521d3112b48108 Mon Sep 17 00:00:00 2001 From: jaime Date: Sun, 8 Sep 2024 17:49:06 +0800 Subject: [PATCH] fix: memory leak in proxy meta cache (#36076) issue: #36074 pr: #36075 Signed-off-by: jaime --- internal/proxy/meta_cache.go | 136 ++++++---------------------- internal/proxy/meta_cache_test.go | 146 ------------------------------ internal/proxy/mock_cache.go | 64 ------------- 3 files changed, 26 insertions(+), 320 deletions(-) diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 3ca9a2f732..6f6e7e61fb 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -59,8 +59,6 @@ type Cache interface { GetCollectionName(ctx context.Context, database string, collectionID int64) (string, error) // GetCollectionInfo get collection's information by name or collection id, such as schema, and etc. GetCollectionInfo(ctx context.Context, database, collectionName string, collectionID int64) (*collectionBasicInfo, error) - // GetCollectionNamesByID get collection name and database name by collection id - GetCollectionNamesByID(ctx context.Context, collectionID []UniqueID) ([]string, []string, error) // GetPartitionID get partition's identifier of specific collection. GetPartitionID(ctx context.Context, database, collectionName string, partitionName string) (typeutil.UniqueID, error) // GetPartitions get all partitions' id of specific collection. @@ -343,19 +341,18 @@ type MetaCache struct { rootCoord types.RootCoordClient queryCoord types.QueryCoordClient - dbInfo map[string]*databaseInfo // database -> db_info - collInfo map[string]map[string]*collectionInfo // database -> collectionName -> collection_info - collLeader map[string]map[string]*shardLeaders // database -> collectionName -> collection_leaders - dbCollectionInfo map[string]map[typeutil.UniqueID]string // database -> collectionID -> collectionName - credMap map[string]*internalpb.CredentialInfo // cache for credential, lazy load - privilegeInfos map[string]struct{} // privileges cache - userToRoles map[string]map[string]struct{} // user to role cache - mu sync.RWMutex - credMut sync.RWMutex - leaderMut sync.RWMutex - shardMgr shardClientMgr - sfGlobal conc.Singleflight[*collectionInfo] - sfDB conc.Singleflight[*databaseInfo] + dbInfo map[string]*databaseInfo // database -> db_info + collInfo map[string]map[string]*collectionInfo // database -> collectionName -> collection_info + collLeader map[string]map[string]*shardLeaders // database -> collectionName -> collection_leaders + credMap map[string]*internalpb.CredentialInfo // cache for credential, lazy load + privilegeInfos map[string]struct{} // privileges cache + userToRoles map[string]map[string]struct{} // user to role cache + mu sync.RWMutex + credMut sync.RWMutex + leaderMut sync.RWMutex + shardMgr shardClientMgr + sfGlobal conc.Singleflight[*collectionInfo] + sfDB conc.Singleflight[*databaseInfo] IDStart int64 IDCount int64 @@ -388,16 +385,15 @@ func InitMetaCache(ctx context.Context, rootCoord types.RootCoordClient, queryCo // NewMetaCache creates a MetaCache with provided RootCoord and QueryNode func NewMetaCache(rootCoord types.RootCoordClient, queryCoord types.QueryCoordClient, shardMgr shardClientMgr) (*MetaCache, error) { return &MetaCache{ - rootCoord: rootCoord, - queryCoord: queryCoord, - dbInfo: map[string]*databaseInfo{}, - collInfo: map[string]map[string]*collectionInfo{}, - collLeader: map[string]map[string]*shardLeaders{}, - dbCollectionInfo: map[string]map[typeutil.UniqueID]string{}, - credMap: map[string]*internalpb.CredentialInfo{}, - shardMgr: shardMgr, - privilegeInfos: map[string]struct{}{}, - userToRoles: map[string]map[string]struct{}{}, + rootCoord: rootCoord, + queryCoord: queryCoord, + dbInfo: map[string]*databaseInfo{}, + collInfo: map[string]map[string]*collectionInfo{}, + collLeader: map[string]map[string]*shardLeaders{}, + credMap: map[string]*internalpb.CredentialInfo{}, + shardMgr: shardMgr, + privilegeInfos: map[string]struct{}{}, + userToRoles: map[string]map[string]struct{}{}, }, nil } @@ -588,90 +584,6 @@ func (m *MetaCache) GetCollectionInfo(ctx context.Context, database string, coll return collInfo.getBasicInfo(), nil } -func (m *MetaCache) GetCollectionNamesByID(ctx context.Context, collectionIDs []UniqueID) ([]string, []string, error) { - hasUpdate := false - - dbNames := make([]string, 0) - collectionNames := make([]string, 0) - for _, collectionID := range collectionIDs { - dbName, collectionName := m.innerGetCollectionByID(collectionID) - if dbName != "" { - dbNames = append(dbNames, dbName) - collectionNames = append(collectionNames, collectionName) - continue - } - if hasUpdate { - return nil, nil, errors.New("collection not found after meta cache has been updated") - } - hasUpdate = true - err := m.updateDBInfo(ctx) - if err != nil { - return nil, nil, err - } - dbName, collectionName = m.innerGetCollectionByID(collectionID) - if dbName == "" { - return nil, nil, errors.New("collection not found") - } - dbNames = append(dbNames, dbName) - collectionNames = append(collectionNames, collectionName) - } - - return dbNames, collectionNames, nil -} - -func (m *MetaCache) innerGetCollectionByID(collectionID int64) (string, string) { - m.mu.RLock() - defer m.mu.RUnlock() - - for database, db := range m.dbCollectionInfo { - name, ok := db[collectionID] - if ok { - return database, name - } - } - return "", "" -} - -func (m *MetaCache) updateDBInfo(ctx context.Context) error { - databaseResp, err := m.rootCoord.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{ - Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_ListDatabases)), - }) - - if err := merr.CheckRPCCall(databaseResp, err); err != nil { - log.Warn("failed to ListDatabases", zap.Error(err)) - return err - } - - dbInfo := make(map[string]map[int64]string) - for _, dbName := range databaseResp.DbNames { - resp, err := m.rootCoord.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections), - ), - DbName: dbName, - }) - - if err := merr.CheckRPCCall(resp, err); err != nil { - log.Warn("failed to ShowCollections", - zap.String("dbName", dbName), - zap.Error(err)) - return err - } - - collections := make(map[int64]string) - for i, collection := range resp.CollectionNames { - collections[resp.CollectionIds[i]] = collection - } - dbInfo[dbName] = collections - } - - m.mu.Lock() - defer m.mu.Unlock() - m.dbCollectionInfo = dbInfo - - return nil -} - // GetCollectionInfo returns the collection information related to provided collection name // If the information is not found, proxy will try to fetch information for other source (RootCoord for now) // TODO: may cause data race of this implementation, should be refactored in future. @@ -1260,9 +1172,13 @@ func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) (err error) { func (m *MetaCache) RemoveDatabase(ctx context.Context, database string) { m.mu.Lock() - defer m.mu.Unlock() delete(m.collInfo, database) delete(m.dbInfo, database) + m.mu.Unlock() + + m.leaderMut.Lock() + delete(m.collLeader, database) + m.leaderMut.Unlock() } func (m *MetaCache) HasDatabase(ctx context.Context, database string) bool { diff --git a/internal/proxy/meta_cache_test.go b/internal/proxy/meta_cache_test.go index 8e4267b234..a1611aacc1 100644 --- a/internal/proxy/meta_cache_test.go +++ b/internal/proxy/meta_cache_test.go @@ -943,152 +943,6 @@ func TestMetaCache_AllocID(t *testing.T) { }) } -func TestGlobalMetaCache_UpdateDBInfo(t *testing.T) { - rootCoord := mocks.NewMockRootCoordClient(t) - queryCoord := mocks.NewMockQueryCoordClient(t) - shardMgr := newShardClientMgr() - ctx := context.Background() - - cache, err := NewMetaCache(rootCoord, queryCoord, shardMgr) - assert.NoError(t, err) - - t.Run("fail to list db", func(t *testing.T) { - rootCoord.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(&milvuspb.ListDatabasesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Code: 500, - }, - }, nil).Once() - err := cache.updateDBInfo(ctx) - assert.Error(t, err) - }) - - t.Run("fail to list collection", func(t *testing.T) { - rootCoord.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(&milvuspb.ListDatabasesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - DbNames: []string{"db1"}, - }, nil).Once() - rootCoord.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Code: 500, - }, - }, nil).Once() - err := cache.updateDBInfo(ctx) - assert.Error(t, err) - }) - - t.Run("success", func(t *testing.T) { - rootCoord.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(&milvuspb.ListDatabasesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - DbNames: []string{"db1"}, - }, nil).Once() - rootCoord.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - CollectionNames: []string{"collection1"}, - CollectionIds: []int64{1}, - }, nil).Once() - err := cache.updateDBInfo(ctx) - assert.NoError(t, err) - assert.Len(t, cache.dbCollectionInfo, 1) - assert.Len(t, cache.dbCollectionInfo["db1"], 1) - assert.Equal(t, "collection1", cache.dbCollectionInfo["db1"][1]) - }) -} - -func TestGlobalMetaCache_GetCollectionNamesByID(t *testing.T) { - rootCoord := mocks.NewMockRootCoordClient(t) - queryCoord := mocks.NewMockQueryCoordClient(t) - shardMgr := newShardClientMgr() - ctx := context.Background() - - t.Run("fail to update db info", func(t *testing.T) { - rootCoord.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(&milvuspb.ListDatabasesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Code: 500, - }, - }, nil).Once() - - cache, err := NewMetaCache(rootCoord, queryCoord, shardMgr) - assert.NoError(t, err) - - _, _, err = cache.GetCollectionNamesByID(ctx, []int64{1}) - assert.Error(t, err) - }) - - t.Run("not found collection", func(t *testing.T) { - rootCoord.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(&milvuspb.ListDatabasesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - DbNames: []string{"db1"}, - }, nil).Once() - rootCoord.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - CollectionNames: []string{"collection1"}, - CollectionIds: []int64{1}, - }, nil).Once() - - cache, err := NewMetaCache(rootCoord, queryCoord, shardMgr) - assert.NoError(t, err) - _, _, err = cache.GetCollectionNamesByID(ctx, []int64{2}) - assert.Error(t, err) - }) - - t.Run("not found collection 2", func(t *testing.T) { - rootCoord.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(&milvuspb.ListDatabasesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - DbNames: []string{"db1"}, - }, nil).Once() - rootCoord.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - CollectionNames: []string{"collection1"}, - CollectionIds: []int64{1}, - }, nil).Once() - - cache, err := NewMetaCache(rootCoord, queryCoord, shardMgr) - assert.NoError(t, err) - _, _, err = cache.GetCollectionNamesByID(ctx, []int64{1, 2}) - assert.Error(t, err) - }) - - t.Run("success", func(t *testing.T) { - rootCoord.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(&milvuspb.ListDatabasesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - DbNames: []string{"db1"}, - }, nil).Once() - rootCoord.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - CollectionNames: []string{"collection1", "collection2"}, - CollectionIds: []int64{1, 2}, - }, nil).Once() - - cache, err := NewMetaCache(rootCoord, queryCoord, shardMgr) - assert.NoError(t, err) - dbNames, collectionNames, err := cache.GetCollectionNamesByID(ctx, []int64{1, 2}) - assert.NoError(t, err) - assert.Equal(t, []string{"collection1", "collection2"}, collectionNames) - assert.Equal(t, []string{"db1", "db1"}, dbNames) - }) -} - func TestMetaCache_InvalidateShardLeaderCache(t *testing.T) { paramtable.Init() paramtable.Get().Save(Params.ProxyCfg.ShardLeaderCacheInterval.Key, "1") diff --git a/internal/proxy/mock_cache.go b/internal/proxy/mock_cache.go index fdc06eb1fb..f1ef527f8e 100644 --- a/internal/proxy/mock_cache.go +++ b/internal/proxy/mock_cache.go @@ -275,70 +275,6 @@ func (_c *MockCache_GetCollectionName_Call) RunAndReturn(run func(context.Contex return _c } -// GetCollectionNamesByID provides a mock function with given fields: ctx, collectionID -func (_m *MockCache) GetCollectionNamesByID(ctx context.Context, collectionID []int64) ([]string, []string, error) { - ret := _m.Called(ctx, collectionID) - - var r0 []string - var r1 []string - var r2 error - if rf, ok := ret.Get(0).(func(context.Context, []int64) ([]string, []string, error)); ok { - return rf(ctx, collectionID) - } - if rf, ok := ret.Get(0).(func(context.Context, []int64) []string); ok { - r0 = rf(ctx, collectionID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]string) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, []int64) []string); ok { - r1 = rf(ctx, collectionID) - } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).([]string) - } - } - - if rf, ok := ret.Get(2).(func(context.Context, []int64) error); ok { - r2 = rf(ctx, collectionID) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 -} - -// MockCache_GetCollectionNamesByID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCollectionNamesByID' -type MockCache_GetCollectionNamesByID_Call struct { - *mock.Call -} - -// GetCollectionNamesByID is a helper method to define mock.On call -// - ctx context.Context -// - collectionID []int64 -func (_e *MockCache_Expecter) GetCollectionNamesByID(ctx interface{}, collectionID interface{}) *MockCache_GetCollectionNamesByID_Call { - return &MockCache_GetCollectionNamesByID_Call{Call: _e.mock.On("GetCollectionNamesByID", ctx, collectionID)} -} - -func (_c *MockCache_GetCollectionNamesByID_Call) Run(run func(ctx context.Context, collectionID []int64)) *MockCache_GetCollectionNamesByID_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]int64)) - }) - return _c -} - -func (_c *MockCache_GetCollectionNamesByID_Call) Return(_a0 []string, _a1 []string, _a2 error) *MockCache_GetCollectionNamesByID_Call { - _c.Call.Return(_a0, _a1, _a2) - return _c -} - -func (_c *MockCache_GetCollectionNamesByID_Call) RunAndReturn(run func(context.Context, []int64) ([]string, []string, error)) *MockCache_GetCollectionNamesByID_Call { - _c.Call.Return(run) - return _c -} - // GetCollectionSchema provides a mock function with given fields: ctx, database, collectionName func (_m *MockCache) GetCollectionSchema(ctx context.Context, database string, collectionName string) (*schemaInfo, error) { ret := _m.Called(ctx, database, collectionName)