diff --git a/internal/proto/proxy.proto b/internal/proto/proxy.proto index 9c443995e0..89ef6b243b 100644 --- a/internal/proto/proxy.proto +++ b/internal/proto/proxy.proto @@ -32,6 +32,7 @@ message InvalidateCollMetaCacheRequest { string db_name = 2; string collection_name = 3; int64 collectionID = 4; + string partition_name = 5; } message InvalidateCredCacheRequest { diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index a4d3a87051..7c12abe812 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -107,22 +107,49 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p zap.String("db", request.DbName), zap.String("collectionName", request.CollectionName), zap.Int64("collectionID", request.CollectionID), + zap.String("msgType", request.GetBase().GetMsgType().String()), + zap.String("partitionName", request.GetPartitionName()), ) log.Info("received request to invalidate collection meta cache") collectionName := request.CollectionName collectionID := request.CollectionID - var aliasName []string + if globalMetaCache != nil { - if collectionName != "" { - globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached - } - if request.CollectionID != UniqueID(0) { - aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID) + switch request.GetBase().GetMsgType() { + case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias: + if collectionName != "" { + globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached + } + if request.CollectionID != UniqueID(0) { + aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID) + } + log.Info("complete to invalidate collection meta cache with collection name", zap.String("collectionName", collectionName)) + case commonpb.MsgType_DropPartition: + if globalMetaCache != nil { + if collectionName != "" && request.GetPartitionName() != "" { + globalMetaCache.RemovePartition(ctx, request.GetDbName(), request.GetCollectionName(), request.GetPartitionName()) + } else { + log.Warn("invalidate collection meta cache failed. collectionName or partitionName is empty", + zap.String("collectionName", collectionName), + zap.String("partitionName", request.GetPartitionName())) + return merr.Status(merr.WrapErrPartitionNotFound(request.GetPartitionName(), "partition name not specified")), nil + } + } + default: + log.Warn("receive unexpected msgType of invalidate collection meta cache", zap.String("msgType", request.GetBase().GetMsgType().String())) + + if collectionName != "" { + globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached + } + if request.CollectionID != UniqueID(0) { + aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID) + } } } + if request.GetBase().GetMsgType() == commonpb.MsgType_DropCollection { // no need to handle error, since this Proxy may not create dml stream for the collection. node.chMgr.removeDMLStream(request.GetCollectionID()) diff --git a/internal/proxy/mock_cache.go b/internal/proxy/mock_cache.go index e254f97c04..39dc306cdb 100644 --- a/internal/proxy/mock_cache.go +++ b/internal/proxy/mock_cache.go @@ -823,6 +823,41 @@ func (_c *MockCache_RefreshPolicyInfo_Call) RunAndReturn(run func(typeutil.Cache return _c } +// RemoveAlias provides a mock function with given fields: ctx, database, alias +func (_m *MockCache) RemoveAlias(ctx context.Context, database string, alias string) { + _m.Called(ctx, database, alias) +} + +// MockCache_RemoveAlias_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveAlias' +type MockCache_RemoveAlias_Call struct { + *mock.Call +} + +// RemoveAlias is a helper method to define mock.On call +// - ctx context.Context +// - database string +// - alias string +func (_e *MockCache_Expecter) RemoveAlias(ctx interface{}, database interface{}, alias interface{}) *MockCache_RemoveAlias_Call { + return &MockCache_RemoveAlias_Call{Call: _e.mock.On("RemoveAlias", ctx, database, alias)} +} + +func (_c *MockCache_RemoveAlias_Call) Run(run func(ctx context.Context, database string, alias string)) *MockCache_RemoveAlias_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string)) + }) + return _c +} + +func (_c *MockCache_RemoveAlias_Call) Return() *MockCache_RemoveAlias_Call { + _c.Call.Return() + return _c +} + +func (_c *MockCache_RemoveAlias_Call) RunAndReturn(run func(context.Context, string, string)) *MockCache_RemoveAlias_Call { + _c.Call.Return(run) + return _c +} + // RemoveCollection provides a mock function with given fields: ctx, database, collectionName func (_m *MockCache) RemoveCollection(ctx context.Context, database string, collectionName string) { _m.Called(ctx, database, collectionName) diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 438a0663ff..783aaed1fe 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -745,7 +745,7 @@ func TestProxy(t *testing.T) { _, _ = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{ Base: &commonpb.MsgBase{ - MsgType: 0, + MsgType: commonpb.MsgType_CreateAlias, MsgID: 0, Timestamp: 0, SourceID: 0, @@ -795,13 +795,13 @@ func TestProxy(t *testing.T) { _, _ = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{ Base: &commonpb.MsgBase{ - MsgType: 0, + MsgType: commonpb.MsgType_AlterAlias, MsgID: 0, Timestamp: 0, SourceID: 0, }, DbName: dbName, - CollectionName: collectionName, + CollectionName: "alias", }) nonExistingCollName := "coll_name_random_zarathustra" @@ -829,14 +829,17 @@ func TestProxy(t *testing.T) { _, _ = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{ Base: &commonpb.MsgBase{ - MsgType: 0, + MsgType: commonpb.MsgType_DropAlias, MsgID: 0, Timestamp: 0, SourceID: 0, }, DbName: dbName, - CollectionName: collectionName, + CollectionName: "alias", }) + + _, err = globalMetaCache.GetCollectionID(ctx, dbName, "alias") + assert.Error(t, err) }) wg.Add(1) @@ -1993,15 +1996,6 @@ func TestProxy(t *testing.T) { assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) assert.Equal(t, "", resp.Reason) - - // release collection cache - resp, err = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{ - Base: nil, - CollectionName: collectionName, - }) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) - assert.Equal(t, "", resp.Reason) }) wg.Add(1) @@ -2297,13 +2291,19 @@ func TestProxy(t *testing.T) { // invalidate meta cache resp, err = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{ - Base: nil, + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DropPartition, + }, DbName: dbName, CollectionName: collectionName, + PartitionName: partitionName, }) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + _, err = globalMetaCache.GetPartitionID(ctx, dbName, collectionName, partitionName) + assert.Error(t, err) + // drop non-exist partition -> fail resp, err = proxy.DropPartition(ctx, &milvuspb.DropPartitionRequest{ @@ -2314,6 +2314,17 @@ func TestProxy(t *testing.T) { }) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) + + // not specify partition name + resp, err = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DropPartition, + }, + DbName: dbName, + CollectionName: collectionName, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) wg.Add(1) @@ -2406,20 +2417,17 @@ func TestProxy(t *testing.T) { // invalidate meta cache resp, err = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{ - Base: nil, + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DropCollection, + }, DbName: dbName, CollectionName: collectionName, }) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) - // release collection load cache - resp, err = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{ - Base: nil, - CollectionName: collectionName, - }) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + _, err = globalMetaCache.GetCollectionID(ctx, dbName, collectionName) + assert.Error(t, err) }) wg.Add(1) diff --git a/internal/rootcoord/alter_alias_task.go b/internal/rootcoord/alter_alias_task.go index 7722929698..61abe8437c 100644 --- a/internal/rootcoord/alter_alias_task.go +++ b/internal/rootcoord/alter_alias_task.go @@ -21,6 +21,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/util/proxyutil" ) type alterAliasTask struct { @@ -36,7 +37,7 @@ func (t *alterAliasTask) Prepare(ctx context.Context) error { } func (t *alterAliasTask) Execute(ctx context.Context) error { - if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, InvalidCollectionID, t.GetTs()); err != nil { + if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, InvalidCollectionID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_AlterAlias)); err != nil { return err } // alter alias is atomic enough. diff --git a/internal/rootcoord/alter_collection_task.go b/internal/rootcoord/alter_collection_task.go index 50333329c7..22f2e9b58f 100644 --- a/internal/rootcoord/alter_collection_task.go +++ b/internal/rootcoord/alter_collection_task.go @@ -67,14 +67,6 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error { ts: ts, }) - redoTask.AddSyncStep(&expireCacheStep{ - baseStep: baseStep{core: a.core}, - dbName: a.Req.GetDbName(), - collectionNames: []string{oldColl.Name}, - collectionID: oldColl.CollectionID, - ts: ts, - }) - a.Req.CollectionID = oldColl.CollectionID redoTask.AddSyncStep(&BroadcastAlteredCollectionStep{ baseStep: baseStep{core: a.core}, diff --git a/internal/rootcoord/alter_collection_task_test.go b/internal/rootcoord/alter_collection_task_test.go index 20c31cc4f6..583c7e0ce0 100644 --- a/internal/rootcoord/alter_collection_task_test.go +++ b/internal/rootcoord/alter_collection_task_test.go @@ -127,7 +127,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) { return errors.New("err") } - core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker)) + core := newTestCore(withMeta(meta), withBroker(broker)) task := &alterCollectionTask{ baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.AlterCollectionRequest{ @@ -161,7 +161,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) { return nil } - core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker)) + core := newTestCore(withMeta(meta), withBroker(broker)) task := &alterCollectionTask{ baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.AlterCollectionRequest{ diff --git a/internal/rootcoord/create_alias_task.go b/internal/rootcoord/create_alias_task.go index 7cd8334bd7..0f3327a022 100644 --- a/internal/rootcoord/create_alias_task.go +++ b/internal/rootcoord/create_alias_task.go @@ -36,9 +36,6 @@ func (t *createAliasTask) Prepare(ctx context.Context) error { } func (t *createAliasTask) Execute(ctx context.Context) error { - if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias(), t.Req.GetCollectionName()}, InvalidCollectionID, t.GetTs()); err != nil { - return err - } // create alias is atomic enough. return t.core.meta.CreateAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.Req.GetCollectionName(), t.GetTs()) } diff --git a/internal/rootcoord/create_alias_task_test.go b/internal/rootcoord/create_alias_task_test.go index 77d8a16f74..5158eaca27 100644 --- a/internal/rootcoord/create_alias_task_test.go +++ b/internal/rootcoord/create_alias_task_test.go @@ -41,19 +41,6 @@ func Test_createAliasTask_Prepare(t *testing.T) { } func Test_createAliasTask_Execute(t *testing.T) { - t.Run("failed to expire cache", func(t *testing.T) { - core := newTestCore(withInvalidProxyManager()) - task := &createAliasTask{ - baseTask: newBaseTask(context.Background(), core), - Req: &milvuspb.CreateAliasRequest{ - Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateAlias}, - Alias: "test", - }, - } - err := task.Execute(context.Background()) - assert.Error(t, err) - }) - t.Run("failed to create alias", func(t *testing.T) { core := newTestCore(withInvalidMeta(), withValidProxyManager()) task := &createAliasTask{ diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go index aca073c8e7..720a488375 100644 --- a/internal/rootcoord/create_collection_task.go +++ b/internal/rootcoord/create_collection_task.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore/model" pb "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" ms "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -478,6 +479,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error { collectionNames: []string{t.Req.GetCollectionName()}, collectionID: InvalidCollectionID, ts: ts, + opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_DropCollection)}, }, &nullStep{}) undoTask.AddStep(&nullStep{}, &removeDmlChannelsStep{ baseStep: baseStep{core: t.core}, diff --git a/internal/rootcoord/create_partition_task.go b/internal/rootcoord/create_partition_task.go index b2587f9d87..4f108beaa8 100644 --- a/internal/rootcoord/create_partition_task.go +++ b/internal/rootcoord/create_partition_task.go @@ -81,6 +81,7 @@ func (t *createPartitionTask) Execute(ctx context.Context) error { dbName: t.Req.GetDbName(), collectionNames: []string{t.collMeta.Name}, collectionID: t.collMeta.CollectionID, + partitionName: t.Req.GetPartitionName(), ts: t.GetTs(), }, &nullStep{}) diff --git a/internal/rootcoord/drop_alias_task.go b/internal/rootcoord/drop_alias_task.go index 3539bd1484..28caceafc9 100644 --- a/internal/rootcoord/drop_alias_task.go +++ b/internal/rootcoord/drop_alias_task.go @@ -21,6 +21,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/util/proxyutil" ) type dropAliasTask struct { @@ -37,7 +38,7 @@ func (t *dropAliasTask) Prepare(ctx context.Context) error { func (t *dropAliasTask) Execute(ctx context.Context) error { // drop alias is atomic enough. - if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, InvalidCollectionID, t.GetTs()); err != nil { + if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, InvalidCollectionID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_DropAlias)); err != nil { return err } return t.core.meta.DropAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.GetTs()) diff --git a/internal/rootcoord/drop_collection_task.go b/internal/rootcoord/drop_collection_task.go index 457aa47a47..ce458e5cff 100644 --- a/internal/rootcoord/drop_collection_task.go +++ b/internal/rootcoord/drop_collection_task.go @@ -80,7 +80,7 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error { collectionNames: append(aliases, collMeta.Name), collectionID: collMeta.CollectionID, ts: ts, - opts: []proxyutil.ExpireCacheOpt{proxyutil.ExpireCacheWithDropFlag()}, + opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_DropCollection)}, }) redoTask.AddSyncStep(&changeCollectionStateStep{ baseStep: baseStep{core: t.core}, diff --git a/internal/rootcoord/drop_partition_task.go b/internal/rootcoord/drop_partition_task.go index 1306b1ef2a..17079a21c6 100644 --- a/internal/rootcoord/drop_partition_task.go +++ b/internal/rootcoord/drop_partition_task.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/metastore/model" pb "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" ) @@ -73,7 +74,9 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error { dbName: t.Req.GetDbName(), collectionNames: []string{t.collMeta.Name}, collectionID: t.collMeta.CollectionID, + partitionName: t.Req.GetPartitionName(), ts: t.GetTs(), + opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_DropPartition)}, }) redoTask.AddSyncStep(&changePartitionStateStep{ baseStep: baseStep{core: t.core}, diff --git a/internal/rootcoord/expire_cache.go b/internal/rootcoord/expire_cache.go index 67934f30e7..ba296d206b 100644 --- a/internal/rootcoord/expire_cache.go +++ b/internal/rootcoord/expire_cache.go @@ -26,20 +26,7 @@ import ( ) // ExpireMetaCache will call invalidate collection meta cache -func (c *Core) ExpireMetaCache(ctx context.Context, dbName string, collNames []string, collectionID UniqueID, ts typeutil.Timestamp, opts ...proxyutil.ExpireCacheOpt) error { - // if collectionID is specified, invalidate all the collection meta cache with the specified collectionID and return - if collectionID != InvalidCollectionID { - req := proxypb.InvalidateCollMetaCacheRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithTimeStamp(ts), - commonpbutil.WithSourceID(c.session.ServerID), - ), - DbName: dbName, - CollectionID: collectionID, - } - return c.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req, opts...) - } - +func (c *Core) ExpireMetaCache(ctx context.Context, dbName string, collNames []string, collectionID UniqueID, partitionName string, ts typeutil.Timestamp, opts ...proxyutil.ExpireCacheOpt) error { // if only collNames are specified, invalidate the collection meta cache with the specified collectionName for _, collName := range collNames { req := proxypb.InvalidateCollMetaCacheRequest{ @@ -49,6 +36,8 @@ func (c *Core) ExpireMetaCache(ctx context.Context, dbName string, collNames []s ), DbName: dbName, CollectionName: collName, + CollectionID: collectionID, + PartitionName: partitionName, } err := c.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req, opts...) if err != nil { diff --git a/internal/rootcoord/expire_cache_test.go b/internal/rootcoord/expire_cache_test.go index 8245444d1a..12a50099b3 100644 --- a/internal/rootcoord/expire_cache_test.go +++ b/internal/rootcoord/expire_cache_test.go @@ -30,8 +30,8 @@ func Test_expireCacheConfig_apply(t *testing.T) { c := proxyutil.DefaultExpireCacheConfig() req := &proxypb.InvalidateCollMetaCacheRequest{} c.Apply(req) - assert.Nil(t, req.GetBase()) - opt := proxyutil.ExpireCacheWithDropFlag() + assert.Equal(t, commonpb.MsgType_Undefined, req.GetBase().GetMsgType()) + opt := proxyutil.SetMsgType(commonpb.MsgType_DropCollection) opt(&c) c.Apply(req) assert.Equal(t, commonpb.MsgType_DropCollection, req.GetBase().GetMsgType()) diff --git a/internal/rootcoord/rename_collection_task.go b/internal/rootcoord/rename_collection_task.go index 6e08575959..50bdc61713 100644 --- a/internal/rootcoord/rename_collection_task.go +++ b/internal/rootcoord/rename_collection_task.go @@ -21,6 +21,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/util/proxyutil" ) type renameCollectionTask struct { @@ -36,7 +37,7 @@ func (t *renameCollectionTask) Prepare(ctx context.Context) error { } func (t *renameCollectionTask) Execute(ctx context.Context) error { - if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetOldName()}, InvalidCollectionID, t.GetTs()); err != nil { + if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetOldName()}, InvalidCollectionID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_RenameCollection)); err != nil { return err } return t.core.meta.RenameCollection(ctx, t.Req.GetDbName(), t.Req.GetOldName(), t.Req.GetNewDBName(), t.Req.GetNewName(), t.GetTs()) diff --git a/internal/rootcoord/step.go b/internal/rootcoord/step.go index dd194a0d73..ca5e10bc05 100644 --- a/internal/rootcoord/step.go +++ b/internal/rootcoord/step.go @@ -170,12 +170,13 @@ type expireCacheStep struct { dbName string collectionNames []string collectionID UniqueID + partitionName string ts Timestamp opts []proxyutil.ExpireCacheOpt } func (s *expireCacheStep) Execute(ctx context.Context) ([]nestedStep, error) { - err := s.core.ExpireMetaCache(ctx, s.dbName, s.collectionNames, s.collectionID, s.ts, s.opts...) + err := s.core.ExpireMetaCache(ctx, s.dbName, s.collectionNames, s.collectionID, s.partitionName, s.ts, s.opts...) return nil, err } diff --git a/internal/util/proxyutil/proxy_client_manager.go b/internal/util/proxyutil/proxy_client_manager.go index bcdce057c2..d06d2bc742 100644 --- a/internal/util/proxyutil/proxy_client_manager.go +++ b/internal/util/proxyutil/proxy_client_manager.go @@ -40,28 +40,25 @@ import ( ) type ExpireCacheConfig struct { - withDropFlag bool + msgType commonpb.MsgType } func (c ExpireCacheConfig) Apply(req *proxypb.InvalidateCollMetaCacheRequest) { - if !c.withDropFlag { - return - } if req.GetBase() == nil { req.Base = commonpbutil.NewMsgBase() } - req.Base.MsgType = commonpb.MsgType_DropCollection + req.Base.MsgType = c.msgType } func DefaultExpireCacheConfig() ExpireCacheConfig { - return ExpireCacheConfig{withDropFlag: false} + return ExpireCacheConfig{} } type ExpireCacheOpt func(c *ExpireCacheConfig) -func ExpireCacheWithDropFlag() ExpireCacheOpt { +func SetMsgType(msgType commonpb.MsgType) ExpireCacheOpt { return func(c *ExpireCacheConfig) { - c.withDropFlag = true + c.msgType = msgType } }