From cf1490403b2e5f0061157ea4556ea6039b56a678 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Wed, 14 Dec 2022 19:41:23 +0800 Subject: [PATCH] Fixbug: proxy meta_cache partially update (#21232) Signed-off-by: zhenshan.cao Signed-off-by: zhenshan.cao --- internal/proxy/meta_cache.go | 10 ++- internal/proxy/meta_cache_test.go | 118 ++++++++++++++++++++++++------ 2 files changed, 103 insertions(+), 25 deletions(-) diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 8100c7c63a..c00a3da166 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -88,6 +88,10 @@ type collectionInfo struct { isLoaded bool } +func (info *collectionInfo) isCollectionCached() bool { + return info != nil && info.collID != UniqueID(0) && info.schema != nil +} + // shardLeaders wraps shard leader mapping for iteration. type shardLeaders struct { idx *atomic.Int64 @@ -187,7 +191,7 @@ func (m *MetaCache) GetCollectionID(ctx context.Context, collectionName string) m.mu.RLock() collInfo, ok := m.collInfo[collectionName] - if !ok { + if !ok || !collInfo.isCollectionCached() { metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GeCollectionID", metrics.CacheMissLabel).Inc() tr := timerecord.NewTimeRecorder("UpdateCache") m.mu.RUnlock() @@ -216,7 +220,7 @@ func (m *MetaCache) GetCollectionInfo(ctx context.Context, collectionName string collInfo, ok := m.collInfo[collectionName] m.mu.RUnlock() - if !ok { + if !ok || !collInfo.isCollectionCached() { tr := timerecord.NewTimeRecorder("UpdateCache") metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheMissLabel).Inc() coll, err := m.describeCollection(ctx, collectionName) @@ -272,7 +276,7 @@ func (m *MetaCache) GetCollectionSchema(ctx context.Context, collectionName stri m.mu.RLock() collInfo, ok := m.collInfo[collectionName] - if !ok { + if !ok || !collInfo.isCollectionCached() { metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionSchema", metrics.CacheMissLabel).Inc() tr := timerecord.NewTimeRecorder("UpdateCache") m.mu.RUnlock() diff --git a/internal/proxy/meta_cache_test.go b/internal/proxy/meta_cache_test.go index ff31378bbf..b8b83d4908 100644 --- a/internal/proxy/meta_cache_test.go +++ b/internal/proxy/meta_cache_test.go @@ -20,7 +20,10 @@ import ( "context" "errors" "fmt" + "sync" + "sync/atomic" "testing" + "time" "github.com/milvus-io/milvus/internal/util/funcutil" @@ -42,15 +45,33 @@ import ( type MockRootCoordClientInterface struct { types.RootCoord Error bool - AccessCount int + AccessCount int32 listPolicy func(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) } +func (m *MockRootCoordClientInterface) IncAccessCount() { + atomic.AddInt32(&m.AccessCount, 1) +} + +func (m *MockRootCoordClientInterface) GetAccessCount() int { + ret := atomic.LoadInt32(&m.AccessCount) + return int(ret) +} + type MockQueryCoordClientInterface struct { types.QueryCoord Error bool - AccessCount int + AccessCount int32 +} + +func (m *MockQueryCoordClientInterface) IncAccessCount() { + atomic.AddInt32(&m.AccessCount, 1) +} + +func (m *MockQueryCoordClientInterface) GetAccessCount() int { + ret := atomic.LoadInt32(&m.AccessCount) + return int(ret) } func (m *MockRootCoordClientInterface) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) { @@ -105,7 +126,7 @@ func (m *MockRootCoordClientInterface) DescribeCollection(ctx context.Context, i if m.Error { return nil, errors.New("mocked error") } - m.AccessCount++ + m.IncAccessCount() if in.CollectionName == "collection1" { return &milvuspb.DescribeCollectionResponse{ Status: &commonpb.Status{ @@ -154,7 +175,7 @@ func (m *MockRootCoordClientInterface) GetCredential(ctx context.Context, req *r if m.Error { return nil, errors.New("mocked error") } - m.AccessCount++ + m.IncAccessCount() if req.Username == "mockUser" { encryptedPassword, _ := crypto.PasswordEncrypt("mockPass") return &rootcoordpb.GetCredentialResponse{ @@ -198,7 +219,7 @@ func (m *MockQueryCoordClientInterface) ShowCollections(ctx context.Context, req if m.Error { return nil, errors.New("mocked error") } - m.AccessCount++ + m.IncAccessCount() rsp := &querypb.ShowCollectionsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -221,22 +242,22 @@ func TestMetaCache_GetCollection(t *testing.T) { id, err := globalMetaCache.GetCollectionID(ctx, "collection1") assert.Nil(t, err) assert.Equal(t, id, typeutil.UniqueID(1)) - assert.Equal(t, rootCoord.AccessCount, 1) + assert.Equal(t, rootCoord.GetAccessCount(), 1) // should'nt be accessed to remote root coord. schema, err := globalMetaCache.GetCollectionSchema(ctx, "collection1") - assert.Equal(t, rootCoord.AccessCount, 1) + assert.Equal(t, rootCoord.GetAccessCount(), 1) assert.Nil(t, err) assert.Equal(t, schema, &schemapb.CollectionSchema{ AutoID: true, Fields: []*schemapb.FieldSchema{}, }) id, err = globalMetaCache.GetCollectionID(ctx, "collection2") - assert.Equal(t, rootCoord.AccessCount, 2) + assert.Equal(t, rootCoord.GetAccessCount(), 2) assert.Nil(t, err) assert.Equal(t, id, typeutil.UniqueID(2)) schema, err = globalMetaCache.GetCollectionSchema(ctx, "collection2") - assert.Equal(t, rootCoord.AccessCount, 2) + assert.Equal(t, rootCoord.GetAccessCount(), 2) assert.Nil(t, err) assert.Equal(t, schema, &schemapb.CollectionSchema{ AutoID: true, @@ -245,11 +266,11 @@ func TestMetaCache_GetCollection(t *testing.T) { // test to get from cache, this should trigger root request id, err = globalMetaCache.GetCollectionID(ctx, "collection1") - assert.Equal(t, rootCoord.AccessCount, 2) + assert.Equal(t, rootCoord.GetAccessCount(), 2) assert.Nil(t, err) assert.Equal(t, id, typeutil.UniqueID(1)) schema, err = globalMetaCache.GetCollectionSchema(ctx, "collection1") - assert.Equal(t, rootCoord.AccessCount, 2) + assert.Equal(t, rootCoord.GetAccessCount(), 2) assert.Nil(t, err) assert.Equal(t, schema, &schemapb.CollectionSchema{ AutoID: true, @@ -327,6 +348,59 @@ func TestMetaCache_GetPartitionID(t *testing.T) { assert.Equal(t, id, typeutil.UniqueID(4)) } +func TestMetaCache_ConcurrentTest1(t *testing.T) { + ctx := context.Background() + rootCoord := &MockRootCoordClientInterface{} + queryCoord := &MockQueryCoordClientInterface{} + mgr := newShardClientMgr() + err := InitMetaCache(ctx, rootCoord, queryCoord, mgr) + assert.Nil(t, err) + + var wg sync.WaitGroup + cnt := 100 + getCollectionCacheFunc := func(wg *sync.WaitGroup) { + defer wg.Done() + for i := 0; i < cnt; i++ { + //GetCollectionSchema will never fail + schema, err := globalMetaCache.GetCollectionSchema(ctx, "collection1") + assert.Nil(t, err) + assert.Equal(t, schema, &schemapb.CollectionSchema{ + AutoID: true, + Fields: []*schemapb.FieldSchema{}, + }) + time.Sleep(10 * time.Millisecond) + } + } + + getPartitionCacheFunc := func(wg *sync.WaitGroup) { + defer wg.Done() + for i := 0; i < cnt; i++ { + //GetPartitions may fail + globalMetaCache.GetPartitions(ctx, "collection1") + time.Sleep(10 * time.Millisecond) + } + } + + invalidCacheFunc := func(wg *sync.WaitGroup) { + defer wg.Done() + for i := 0; i < cnt; i++ { + //periodically invalid collection cache + globalMetaCache.RemoveCollection(ctx, "collection1") + time.Sleep(10 * time.Millisecond) + } + } + + wg.Add(1) + go getCollectionCacheFunc(&wg) + + wg.Add(1) + go invalidCacheFunc(&wg) + + wg.Add(1) + go getPartitionCacheFunc(&wg) + wg.Wait() +} + func TestMetaCache_GetPartitionError(t *testing.T) { ctx := context.Background() rootCoord := &MockRootCoordClientInterface{} @@ -544,25 +618,25 @@ func TestMetaCache_LoadCache(t *testing.T) { assert.NoError(t, err) assert.True(t, info.isLoaded) // no collectionInfo of collection1, should access RootCoord - assert.Equal(t, rootCoord.AccessCount, 1) + assert.Equal(t, rootCoord.GetAccessCount(), 1) // not loaded, should access QueryCoord - assert.Equal(t, queryCoord.AccessCount, 1) + assert.Equal(t, queryCoord.GetAccessCount(), 1) info, err = globalMetaCache.GetCollectionInfo(ctx, "collection1") assert.NoError(t, err) assert.True(t, info.isLoaded) // shouldn't access QueryCoord or RootCoord again - assert.Equal(t, rootCoord.AccessCount, 1) - assert.Equal(t, queryCoord.AccessCount, 1) + assert.Equal(t, rootCoord.GetAccessCount(), 1) + assert.Equal(t, queryCoord.GetAccessCount(), 1) // test collection2 not fully loaded info, err = globalMetaCache.GetCollectionInfo(ctx, "collection2") assert.NoError(t, err) assert.False(t, info.isLoaded) // no collectionInfo of collection2, should access RootCoord - assert.Equal(t, rootCoord.AccessCount, 2) + assert.Equal(t, rootCoord.GetAccessCount(), 2) // not loaded, should access QueryCoord - assert.Equal(t, queryCoord.AccessCount, 2) + assert.Equal(t, queryCoord.GetAccessCount(), 2) }) t.Run("test RemoveCollectionLoadCache", func(t *testing.T) { @@ -571,7 +645,7 @@ func TestMetaCache_LoadCache(t *testing.T) { assert.NoError(t, err) assert.True(t, info.isLoaded) // should access QueryCoord - assert.Equal(t, queryCoord.AccessCount, 3) + assert.Equal(t, queryCoord.GetAccessCount(), 3) }) } @@ -587,13 +661,13 @@ func TestMetaCache_RemoveCollection(t *testing.T) { assert.NoError(t, err) assert.True(t, info.isLoaded) // no collectionInfo of collection1, should access RootCoord - assert.Equal(t, rootCoord.AccessCount, 1) + assert.Equal(t, rootCoord.GetAccessCount(), 1) info, err = globalMetaCache.GetCollectionInfo(ctx, "collection1") assert.NoError(t, err) assert.True(t, info.isLoaded) // shouldn't access RootCoord again - assert.Equal(t, rootCoord.AccessCount, 1) + assert.Equal(t, rootCoord.GetAccessCount(), 1) globalMetaCache.RemoveCollection(ctx, "collection1") // no collectionInfo of collection2, should access RootCoord @@ -601,7 +675,7 @@ func TestMetaCache_RemoveCollection(t *testing.T) { assert.NoError(t, err) assert.True(t, info.isLoaded) // shouldn't access RootCoord again - assert.Equal(t, rootCoord.AccessCount, 2) + assert.Equal(t, rootCoord.GetAccessCount(), 2) globalMetaCache.RemoveCollectionsByID(ctx, UniqueID(1)) // no collectionInfo of collection2, should access RootCoord @@ -609,5 +683,5 @@ func TestMetaCache_RemoveCollection(t *testing.T) { assert.NoError(t, err) assert.True(t, info.isLoaded) // shouldn't access RootCoord again - assert.Equal(t, rootCoord.AccessCount, 3) + assert.Equal(t, rootCoord.GetAccessCount(), 3) }