From ef31fe23b81df46a2cddb8d41251f45cf657b032 Mon Sep 17 00:00:00 2001 From: SimFG Date: Mon, 7 Aug 2023 14:23:08 +0800 Subject: [PATCH] Refine the rbac cache update process (#26150) (#26151) Signed-off-by: SimFG --- internal/proxy/meta_cache.go | 15 +++++++---- internal/proxy/meta_cache_test.go | 43 +++++++++++++++++++++++++++++++ internal/rootcoord/root_coord.go | 26 ++++++++++--------- 3 files changed, 67 insertions(+), 17 deletions(-) diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 7e8c6dd0d0..bf579accd9 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -915,12 +915,14 @@ func (m *MetaCache) GetUserRole(user string) []string { } func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) error { - m.mu.Lock() - defer m.mu.Unlock() - - if op.OpKey == "" { - return errors.New("empty op key") + if op.OpType != typeutil.CacheRefresh { + m.mu.Lock() + defer m.mu.Unlock() + if op.OpKey == "" { + return errors.New("empty op key") + } } + switch op.OpType { case typeutil.CacheGrantPrivilege: m.privilegeInfos[op.OpKey] = struct{}{} @@ -955,6 +957,9 @@ func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) error { log.Error("fail to init meta cache", zap.Error(err)) return err } + + m.mu.Lock() + defer m.mu.Unlock() m.userToRoles = make(map[string]map[string]struct{}) m.privilegeInfos = make(map[string]struct{}) m.unsafeInitPolicyInfo(resp.PolicyInfos, resp.UserRoles) diff --git a/internal/proxy/meta_cache_test.go b/internal/proxy/meta_cache_test.go index d32dc3e689..99e757ee8c 100644 --- a/internal/proxy/meta_cache_test.go +++ b/internal/proxy/meta_cache_test.go @@ -669,6 +669,49 @@ func TestMetaCache_PolicyInfo(t *testing.T) { err = globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{OpType: 100, OpKey: "policyX"}) assert.NotNil(t, err) }) + + t.Run("Delete user or drop role", func(t *testing.T) { + client.listPolicy = func(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) { + return &internalpb.ListPolicyResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + PolicyInfos: []string{"policy1", "policy2", "policy3"}, + UserRoles: []string{funcutil.EncodeUserRoleCache("foo", "role1"), funcutil.EncodeUserRoleCache("foo", "role2"), funcutil.EncodeUserRoleCache("foo2", "role2"), funcutil.EncodeUserRoleCache("foo2", "role3")}, + }, nil + } + err := InitMetaCache(context.Background(), client, qc, mgr) + assert.NoError(t, err) + + err = globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{OpType: typeutil.CacheDeleteUser, OpKey: "foo"}) + assert.NoError(t, err) + + roles := globalMetaCache.GetUserRole("foo") + assert.Len(t, roles, 0) + + roles = globalMetaCache.GetUserRole("foo2") + assert.Len(t, roles, 2) + + err = globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{OpType: typeutil.CacheDropRole, OpKey: "role2"}) + assert.NoError(t, err) + roles = globalMetaCache.GetUserRole("foo2") + assert.Len(t, roles, 1) + assert.Equal(t, "role3", roles[0]) + + client.listPolicy = func(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) { + return &internalpb.ListPolicyResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + PolicyInfos: []string{"policy1", "policy2", "policy3"}, + UserRoles: []string{funcutil.EncodeUserRoleCache("foo", "role1"), funcutil.EncodeUserRoleCache("foo", "role2"), funcutil.EncodeUserRoleCache("foo2", "role2"), funcutil.EncodeUserRoleCache("foo2", "role3")}, + }, nil + } + err = globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{OpType: typeutil.CacheRefresh}) + assert.NoError(t, err) + roles = globalMetaCache.GetUserRole("foo") + assert.Len(t, roles, 2) + }) } func TestMetaCache_LoadCache(t *testing.T) { diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index d34ce45297..57dfd51936 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -663,24 +663,26 @@ func (c *Core) startInternal() error { c.scheduler.Start() c.stepExecutor.Start() + go func() { + // refresh rbac cache + if err := retry.Do(c.ctx, func() error { + if err := c.proxyClientManager.RefreshPolicyInfoCache(c.ctx, &proxypb.RefreshPolicyInfoCacheRequest{ + OpType: int32(typeutil.CacheRefresh), + }); err != nil { + log.Warn("fail to refresh policy info cache", zap.Error(err)) + return err + } + return nil + }, retry.Attempts(100), retry.Sleep(time.Second)); err != nil { + log.Panic("fail to refresh policy info cache", zap.Error(err)) + } + }() Params.RootCoordCfg.CreatedTime = time.Now() Params.RootCoordCfg.UpdatedTime = time.Now() c.startServerLoop() c.UpdateStateCode(commonpb.StateCode_Healthy) - // refresh rbac cache - if err := retry.Do(c.ctx, func() error { - if err := c.proxyClientManager.RefreshPolicyInfoCache(c.ctx, &proxypb.RefreshPolicyInfoCacheRequest{ - OpType: int32(typeutil.CacheRefresh), - }); err != nil { - log.Warn("fail to refresh policy info cache", zap.Error(err)) - return err - } - return nil - }, retry.Attempts(100), retry.Sleep(time.Second)); err != nil { - log.Panic("fail to refresh policy info cache", zap.Error(err)) - } logutil.Logger(c.ctx).Info("rootcoord startup successfully") return nil