diff --git a/go.mod b/go.mod index 2d269939bc..7fd7c5abf0 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.9 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241129033252-5d0b09587056 + github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910 github.com/minio/minio-go/v7 v7.0.73 github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/prometheus/client_golang v1.14.0 diff --git a/go.sum b/go.sum index afc6f6f6fb..4ce7ad6a3f 100644 --- a/go.sum +++ b/go.sum @@ -630,12 +630,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241120015424-93892e628c69 h1:Qt0Bv2Fum3EX3OlkuQYHJINBzeU4oEuHy2lXSfB/gZw= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241120015424-93892e628c69/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241129024423-3911e6ebd8a6 h1:TrGZtojfj84Rdd1XAaGULCWZqO3rJMiGS8vxFXHT7G4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241129024423-3911e6ebd8a6/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241129033252-5d0b09587056 h1:o2uJgfwTOg8bu/E9n6TvmFT2XPrPm1v0XFhc6XXcFoE= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241129033252-5d0b09587056/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910 h1:cFRrdFZwhFHv33pue1z8beYSvrXDYFSFsCuvXGX3DHE= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index dae27e347c..f3d797e165 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -129,21 +129,21 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p if globalMetaCache != nil { switch msgType { case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias: - if collectionName != "" { - globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached - globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName) - } if request.CollectionID != UniqueID(0) { - aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID) + aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID, request.GetBase().GetTimestamp(), msgType == commonpb.MsgType_DropCollection) for _, name := range aliasName { globalMetaCache.DeprecateShardCache(request.GetDbName(), name) } } + if collectionName != "" { + globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached + globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName) + } log.Info("complete to invalidate collection meta cache with collection name", zap.String("type", request.GetBase().GetMsgType().String())) case commonpb.MsgType_LoadCollection, commonpb.MsgType_ReleaseCollection: // All the request from query use collectionID if request.CollectionID != UniqueID(0) { - aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID) + aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID, 0, false) for _, name := range aliasName { globalMetaCache.DeprecateShardCache(request.GetDbName(), name) } @@ -154,31 +154,27 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p log.Warn("invalidate collection meta cache failed. partitionName is empty") return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil } - // no need to deprecate shard cache because shard won't change when create or drop partition - globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // drop all the alias as well if request.CollectionID != UniqueID(0) { - aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID) - for _, name := range aliasName { - globalMetaCache.DeprecateShardCache(request.GetDbName(), name) - } + aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID, request.GetBase().GetTimestamp(), false) } + globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) log.Info("complete to invalidate collection meta cache", zap.String("type", request.GetBase().GetMsgType().String())) case commonpb.MsgType_DropDatabase: globalMetaCache.RemoveDatabase(ctx, request.GetDbName()) default: log.Warn("receive unexpected msgType of invalidate collection meta cache", zap.String("msgType", request.GetBase().GetMsgType().String())) + if request.CollectionID != UniqueID(0) { + aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID, request.GetBase().GetTimestamp(), false) + for _, name := range aliasName { + globalMetaCache.DeprecateShardCache(request.GetDbName(), name) + } + } if collectionName != "" { globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName) } - if request.CollectionID != UniqueID(0) { - aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID) - for _, name := range aliasName { - globalMetaCache.DeprecateShardCache(request.GetDbName(), name) - } - } } } diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 57543a2abd..90b5d0b796 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -75,7 +75,7 @@ type Cache interface { InvalidateShardLeaderCache(collections []int64) ListShardLocation() map[int64]nodeInfo RemoveCollection(ctx context.Context, database, collectionName string) - RemoveCollectionsByID(ctx context.Context, collectionID UniqueID) []string + RemoveCollectionsByID(ctx context.Context, collectionID UniqueID, version uint64, removeVersion bool) []string // GetCredentialInfo operate credential cache GetCredentialInfo(ctx context.Context, username string) (*internalpb.CredentialInfo, error) @@ -340,6 +340,8 @@ type MetaCache struct { IDCount int64 IDIndex int64 IDLock sync.RWMutex + + collectionCacheVersion map[UniqueID]uint64 // collectionID -> cacheVersion } // globalMetaCache is singleton instance of Cache @@ -368,15 +370,16 @@ func InitMetaCache(ctx context.Context, rootCoord types.RootCoordClient, queryCo // NewMetaCache creates a MetaCache with provided RootCoord and QueryNode func NewMetaCache(rootCoord types.RootCoordClient, queryCoord types.QueryCoordClient, shardMgr shardClientMgr) (*MetaCache, error) { return &MetaCache{ - rootCoord: rootCoord, - queryCoord: queryCoord, - dbInfo: map[string]*databaseInfo{}, - collInfo: map[string]map[string]*collectionInfo{}, - collLeader: map[string]map[string]*shardLeaders{}, - credMap: map[string]*internalpb.CredentialInfo{}, - shardMgr: shardMgr, - privilegeInfos: map[string]struct{}{}, - userToRoles: map[string]map[string]struct{}{}, + rootCoord: rootCoord, + queryCoord: queryCoord, + dbInfo: map[string]*databaseInfo{}, + collInfo: map[string]map[string]*collectionInfo{}, + collLeader: map[string]map[string]*shardLeaders{}, + credMap: map[string]*internalpb.CredentialInfo{}, + shardMgr: shardMgr, + privilegeInfos: map[string]struct{}{}, + userToRoles: map[string]map[string]struct{}{}, + collectionCacheVersion: make(map[UniqueID]uint64), }, nil } @@ -445,19 +448,36 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string, if database == "" { log.Warn("database is empty, use default database name", zap.String("collectionName", collectionName), zap.Stack("stack")) } - m.mu.Lock() - defer m.mu.Unlock() - _, dbOk := m.collInfo[database] - if !dbOk { - m.collInfo[database] = make(map[string]*collectionInfo) - } - isolation, err := common.IsPartitionKeyIsolationKvEnabled(collection.Properties...) if err != nil { return nil, err } schemaInfo := newSchemaInfoWithLoadFields(collection.Schema, loadFields) + + m.mu.Lock() + defer m.mu.Unlock() + curVersion := m.collectionCacheVersion[collection.GetCollectionID()] + // Compatibility logic: if the rootcoord version is lower(requestTime = 0), update the cache directly. + if collection.GetRequestTime() < curVersion && collection.GetRequestTime() != 0 { + log.Debug("describe collection timestamp less than version, don't update cache", + zap.String("collectionName", collectionName), + zap.Uint64("version", collection.GetRequestTime()), zap.Uint64("cache version", curVersion)) + return &collectionInfo{ + collID: collection.CollectionID, + schema: schemaInfo, + partInfo: parsePartitionsInfo(infos, schemaInfo.hasPartitionKeyField), + createdTimestamp: collection.CreatedTimestamp, + createdUtcTimestamp: collection.CreatedUtcTimestamp, + consistencyLevel: collection.ConsistencyLevel, + partitionKeyIsolation: isolation, + }, nil + } + _, dbOk := m.collInfo[database] + if !dbOk { + m.collInfo[database] = make(map[string]*collectionInfo) + } + m.collInfo[database][collectionName] = &collectionInfo{ collID: collection.CollectionID, schema: schemaInfo, @@ -470,9 +490,14 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string, log.Ctx(ctx).Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName), zap.String("actual collection Name", collection.Schema.GetName()), zap.Int64("collectionID", collection.CollectionID), - zap.Strings("partition", partitions.PartitionNames), + zap.Strings("partition", partitions.PartitionNames), zap.Uint64("currentVersion", curVersion), + zap.Uint64("version", collection.GetRequestTime()), ) - return m.collInfo[database][collectionName], nil + + m.collectionCacheVersion[collection.GetCollectionID()] = collection.GetRequestTime() + collInfo := m.collInfo[database][collectionName] + + return collInfo, nil } func buildSfKeyByName(database, collectionName string) string { @@ -822,19 +847,30 @@ func (m *MetaCache) RemoveCollection(ctx context.Context, database, collectionNa log.Debug("remove collection", zap.String("db", database), zap.String("collection", collectionName)) } -func (m *MetaCache) RemoveCollectionsByID(ctx context.Context, collectionID UniqueID) []string { +func (m *MetaCache) RemoveCollectionsByID(ctx context.Context, collectionID UniqueID, version uint64, removeVersion bool) []string { m.mu.Lock() defer m.mu.Unlock() + + curVersion := m.collectionCacheVersion[collectionID] var collNames []string for database, db := range m.collInfo { for k, v := range db { if v.collID == collectionID { - delete(m.collInfo[database], k) - collNames = append(collNames, k) + if version == 0 || curVersion <= version { + delete(m.collInfo[database], k) + collNames = append(collNames, k) + } } } } - log.Debug("remove collection by id", zap.Int64("id", collectionID), zap.Strings("collection", collNames)) + if removeVersion { + delete(m.collectionCacheVersion, collectionID) + } else if version != 0 { + m.collectionCacheVersion[collectionID] = version + } + log.Debug("remove collection by id", zap.Int64("id", collectionID), + zap.Strings("collection", collNames), zap.Uint64("currentVersion", curVersion), + zap.Uint64("version", version), zap.Bool("removeVersion", removeVersion)) return collNames } diff --git a/internal/proxy/meta_cache_test.go b/internal/proxy/meta_cache_test.go index cfc3cb291c..4592786f1c 100644 --- a/internal/proxy/meta_cache_test.go +++ b/internal/proxy/meta_cache_test.go @@ -140,7 +140,8 @@ func (m *MockRootCoordClientInterface) DescribeCollection(ctx context.Context, i AutoID: true, Name: "collection1", }, - DbName: dbName, + DbName: dbName, + RequestTime: 100, }, nil } if in.CollectionName == "collection2" || in.CollectionID == 2 { @@ -151,7 +152,8 @@ func (m *MockRootCoordClientInterface) DescribeCollection(ctx context.Context, i AutoID: true, Name: "collection2", }, - DbName: dbName, + DbName: dbName, + RequestTime: 100, }, nil } if in.CollectionName == "errorCollection" { @@ -161,7 +163,8 @@ func (m *MockRootCoordClientInterface) DescribeCollection(ctx context.Context, i Schema: &schemapb.CollectionSchema{ AutoID: true, }, - DbName: dbName, + DbName: dbName, + RequestTime: 100, }, nil } @@ -791,14 +794,14 @@ func TestMetaCache_RemoveCollection(t *testing.T) { // shouldn't access RootCoord again assert.Equal(t, rootCoord.GetAccessCount(), 2) - globalMetaCache.RemoveCollectionsByID(ctx, UniqueID(1)) + globalMetaCache.RemoveCollectionsByID(ctx, UniqueID(1), 100, false) // no collectionInfo of collection2, should access RootCoord _, err = globalMetaCache.GetCollectionInfo(ctx, dbName, "collection1", 1) assert.NoError(t, err) // shouldn't access RootCoord again assert.Equal(t, rootCoord.GetAccessCount(), 3) - globalMetaCache.RemoveCollectionsByID(ctx, UniqueID(1)) + globalMetaCache.RemoveCollectionsByID(ctx, UniqueID(1), 100, false) // no collectionInfo of collection2, should access RootCoord _, err = globalMetaCache.GetCollectionInfo(ctx, dbName, "collection1", 1) assert.NoError(t, err) @@ -1259,3 +1262,60 @@ func TestSchemaInfo_GetLoadFieldIDs(t *testing.T) { }) } } + +func TestMetaCache_Parallel(t *testing.T) { + ctx := context.Background() + rootCoord := mocks.NewMockRootCoordClient(t) + queryCoord := mocks.NewMockQueryCoordClient(t) + queryCoord.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{}, nil).Maybe() + rootCoord.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(&milvuspb.ShowPartitionsResponse{ + Status: merr.Success(), + }, nil).Maybe() + mgr := newShardClientMgr() + cache, err := NewMetaCache(rootCoord, queryCoord, mgr) + assert.NoError(t, err) + + cacheVersion := uint64(100) + // clean cache + cache.RemoveCollectionsByID(ctx, 111, cacheVersion+2, false) + + // update cache, but version is smaller + rootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *milvuspb.DescribeCollectionRequest, option ...grpc.CallOption) (*milvuspb.DescribeCollectionResponse, error) { + return &milvuspb.DescribeCollectionResponse{ + Status: merr.Success(), + Schema: &schemapb.CollectionSchema{ + Name: "collection1", + }, + CollectionID: 111, + DbName: dbName, + RequestTime: cacheVersion, + }, nil + }).Once() + + collInfo, err := cache.update(ctx, dbName, "collection1", 111) + assert.NoError(t, err) + assert.Equal(t, "collection1", collInfo.schema.Name) + assert.Equal(t, int64(111), collInfo.collID) + _, ok := cache.collInfo[dbName]["collection1"] + assert.False(t, ok) + + rootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *milvuspb.DescribeCollectionRequest, option ...grpc.CallOption) (*milvuspb.DescribeCollectionResponse, error) { + cacheVersion++ + return &milvuspb.DescribeCollectionResponse{ + Status: merr.Success(), + Schema: &schemapb.CollectionSchema{ + Name: "collection1", + }, + CollectionID: 111, + DbName: dbName, + RequestTime: cacheVersion + 5, + }, nil + }).Once() + + collInfo, err = cache.update(ctx, dbName, "collection1", 111) + assert.NoError(t, err) + assert.Equal(t, "collection1", collInfo.schema.Name) + assert.Equal(t, int64(111), collInfo.collID) + _, ok = cache.collInfo[dbName]["collection1"] + assert.True(t, ok) +} diff --git a/internal/proxy/mock_cache.go b/internal/proxy/mock_cache.go index 72a17fa514..cbd4039d1f 100644 --- a/internal/proxy/mock_cache.go +++ b/internal/proxy/mock_cache.go @@ -1109,17 +1109,17 @@ func (_c *MockCache_RemoveCollection_Call) RunAndReturn(run func(context.Context return _c } -// RemoveCollectionsByID provides a mock function with given fields: ctx, collectionID -func (_m *MockCache) RemoveCollectionsByID(ctx context.Context, collectionID int64) []string { - ret := _m.Called(ctx, collectionID) +// RemoveCollectionsByID provides a mock function with given fields: ctx, collectionID, version, removeVersion +func (_m *MockCache) RemoveCollectionsByID(ctx context.Context, collectionID int64, version uint64, removeVersion bool) []string { + ret := _m.Called(ctx, collectionID, version, removeVersion) if len(ret) == 0 { panic("no return value specified for RemoveCollectionsByID") } var r0 []string - if rf, ok := ret.Get(0).(func(context.Context, int64) []string); ok { - r0 = rf(ctx, collectionID) + if rf, ok := ret.Get(0).(func(context.Context, int64, uint64, bool) []string); ok { + r0 = rf(ctx, collectionID, version, removeVersion) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]string) @@ -1137,13 +1137,15 @@ type MockCache_RemoveCollectionsByID_Call struct { // RemoveCollectionsByID is a helper method to define mock.On call // - ctx context.Context // - collectionID int64 -func (_e *MockCache_Expecter) RemoveCollectionsByID(ctx interface{}, collectionID interface{}) *MockCache_RemoveCollectionsByID_Call { - return &MockCache_RemoveCollectionsByID_Call{Call: _e.mock.On("RemoveCollectionsByID", ctx, collectionID)} +// - version uint64 +// - removeVersion bool +func (_e *MockCache_Expecter) RemoveCollectionsByID(ctx interface{}, collectionID interface{}, version interface{}, removeVersion interface{}) *MockCache_RemoveCollectionsByID_Call { + return &MockCache_RemoveCollectionsByID_Call{Call: _e.mock.On("RemoveCollectionsByID", ctx, collectionID, version, removeVersion)} } -func (_c *MockCache_RemoveCollectionsByID_Call) Run(run func(ctx context.Context, collectionID int64)) *MockCache_RemoveCollectionsByID_Call { +func (_c *MockCache_RemoveCollectionsByID_Call) Run(run func(ctx context.Context, collectionID int64, version uint64, removeVersion bool)) *MockCache_RemoveCollectionsByID_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64)) + run(args[0].(context.Context), args[1].(int64), args[2].(uint64), args[3].(bool)) }) return _c } @@ -1153,7 +1155,7 @@ func (_c *MockCache_RemoveCollectionsByID_Call) Return(_a0 []string) *MockCache_ return _c } -func (_c *MockCache_RemoveCollectionsByID_Call) RunAndReturn(run func(context.Context, int64) []string) *MockCache_RemoveCollectionsByID_Call { +func (_c *MockCache_RemoveCollectionsByID_Call) RunAndReturn(run func(context.Context, int64, uint64, bool) []string) *MockCache_RemoveCollectionsByID_Call { _c.Call.Return(run) return _c } diff --git a/internal/rootcoord/alter_alias_task.go b/internal/rootcoord/alter_alias_task.go index ac2f79acbf..080e18605f 100644 --- a/internal/rootcoord/alter_alias_task.go +++ b/internal/rootcoord/alter_alias_task.go @@ -37,7 +37,8 @@ func (t *alterAliasTask) Prepare(ctx context.Context) error { } func (t *alterAliasTask) Execute(ctx context.Context) error { - if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, InvalidCollectionID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_AlterAlias)); err != nil { + collID := t.core.meta.GetCollectionID(ctx, t.Req.GetDbName(), t.Req.GetCollectionName()) + if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, collID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_AlterAlias)); err != nil { return err } // alter alias is atomic enough. diff --git a/internal/rootcoord/alter_alias_task_test.go b/internal/rootcoord/alter_alias_task_test.go index 8eefe721c7..1ca9450235 100644 --- a/internal/rootcoord/alter_alias_task_test.go +++ b/internal/rootcoord/alter_alias_task_test.go @@ -18,12 +18,15 @@ package rootcoord import ( "context" + "fmt" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" ) func Test_alterAliasTask_Prepare(t *testing.T) { @@ -42,7 +45,9 @@ func Test_alterAliasTask_Prepare(t *testing.T) { func Test_alterAliasTask_Execute(t *testing.T) { t.Run("failed to expire cache", func(t *testing.T) { - core := newTestCore(withInvalidProxyManager()) + mockMeta := mockrootcoord.NewIMetaTable(t) + mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111) + core := newTestCore(withInvalidProxyManager(), withMeta(mockMeta)) task := &alterAliasTask{ baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.AlterAliasRequest{ @@ -55,7 +60,11 @@ func Test_alterAliasTask_Execute(t *testing.T) { }) t.Run("failed to alter alias", func(t *testing.T) { - core := newTestCore(withValidProxyManager(), withInvalidMeta()) + mockMeta := mockrootcoord.NewIMetaTable(t) + mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111) + mockMeta.EXPECT().AlterAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(fmt.Errorf("failed to alter alias")) + core := newTestCore(withValidProxyManager(), withMeta(mockMeta)) task := &alterAliasTask{ baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.AlterAliasRequest{ diff --git a/internal/rootcoord/alter_collection_task.go b/internal/rootcoord/alter_collection_task.go index f0887f2cf8..dc9c326224 100644 --- a/internal/rootcoord/alter_collection_task.go +++ b/internal/rootcoord/alter_collection_task.go @@ -89,7 +89,7 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error { baseStep: baseStep{core: a.core}, dbName: a.Req.GetDbName(), collectionNames: append(aliases, a.Req.GetCollectionName()), - collectionID: InvalidCollectionID, + collectionID: oldColl.CollectionID, opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_AlterCollection)}, }) diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go index 00e7235267..60a46ec9ba 100644 --- a/internal/rootcoord/create_collection_task.go +++ b/internal/rootcoord/create_collection_task.go @@ -611,7 +611,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error { baseStep: baseStep{core: t.core}, dbName: t.Req.GetDbName(), collectionNames: []string{t.Req.GetCollectionName()}, - collectionID: InvalidCollectionID, + collectionID: collID, ts: ts, opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_DropCollection)}, }, &nullStep{}) diff --git a/internal/rootcoord/describe_collection_task.go b/internal/rootcoord/describe_collection_task.go index 3ca5dc65c2..b911d64ba0 100644 --- a/internal/rootcoord/describe_collection_task.go +++ b/internal/rootcoord/describe_collection_task.go @@ -51,6 +51,7 @@ func (t *describeCollectionTask) Execute(ctx context.Context) (err error) { return err } t.Rsp = convertModelToDesc(coll, aliases, db.Name) + t.Rsp.RequestTime = t.ts return nil } diff --git a/internal/rootcoord/drop_alias_task.go b/internal/rootcoord/drop_alias_task.go index f29026edd2..18d390b6b2 100644 --- a/internal/rootcoord/drop_alias_task.go +++ b/internal/rootcoord/drop_alias_task.go @@ -37,8 +37,9 @@ func (t *dropAliasTask) Prepare(ctx context.Context) error { } func (t *dropAliasTask) Execute(ctx context.Context) error { + collID := t.core.meta.GetCollectionID(ctx, t.Req.GetDbName(), t.Req.GetAlias()) // drop alias is atomic enough. - if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, InvalidCollectionID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_DropAlias)); err != nil { + if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, collID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_DropAlias)); err != nil { return err } return t.core.meta.DropAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.GetTs()) diff --git a/internal/rootcoord/drop_alias_task_test.go b/internal/rootcoord/drop_alias_task_test.go index 199a583107..d97823c952 100644 --- a/internal/rootcoord/drop_alias_task_test.go +++ b/internal/rootcoord/drop_alias_task_test.go @@ -18,6 +18,7 @@ package rootcoord import ( "context" + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -49,8 +50,10 @@ func Test_dropAliasTask_Prepare(t *testing.T) { func Test_dropAliasTask_Execute(t *testing.T) { t.Run("failed to expire cache", func(t *testing.T) { - core := newTestCore(withInvalidProxyManager()) + mockMeta := mockrootcoord.NewIMetaTable(t) + mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111) alias := funcutil.GenRandomStr() + core := newTestCore(withInvalidProxyManager(), withMeta(mockMeta)) task := &dropAliasTask{ baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DropAliasRequest{ @@ -63,7 +66,11 @@ func Test_dropAliasTask_Execute(t *testing.T) { }) t.Run("failed to drop alias", func(t *testing.T) { - core := newTestCore(withValidProxyManager(), withInvalidMeta()) + mockMeta := mockrootcoord.NewIMetaTable(t) + mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111) + mockMeta.EXPECT().DropAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(fmt.Errorf("failed to alter alias")) + core := newTestCore(withValidProxyManager(), withMeta(mockMeta)) alias := funcutil.GenRandomStr() task := &dropAliasTask{ baseTask: newBaseTask(context.Background(), core), @@ -77,15 +84,12 @@ func Test_dropAliasTask_Execute(t *testing.T) { }) t.Run("normal case", func(t *testing.T) { - meta := mockrootcoord.NewIMetaTable(t) - meta.On("DropAlias", - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - ).Return(nil) + mockMeta := mockrootcoord.NewIMetaTable(t) + mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111) + mockMeta.EXPECT().DropAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(nil) - core := newTestCore(withValidProxyManager(), withMeta(meta)) + core := newTestCore(withValidProxyManager(), withMeta(mockMeta)) alias := funcutil.GenRandomStr() task := &dropAliasTask{ baseTask: newBaseTask(context.Background(), core), diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 8de5068868..14d6e5a135 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -56,6 +56,10 @@ type IMetaTable interface { AddCollection(ctx context.Context, coll *model.Collection) error ChangeCollectionState(ctx context.Context, collectionID UniqueID, state pb.CollectionState, ts Timestamp) error RemoveCollection(ctx context.Context, collectionID UniqueID, ts Timestamp) error + // GetCollectionID retrieves the corresponding collectionID based on the collectionName. + // If the collection does not exist, it will return InvalidCollectionID. + // Please use the function with caution. + GetCollectionID(ctx context.Context, dbName string, collectionName string) UniqueID GetCollectionByName(ctx context.Context, dbName string, collectionName string, ts Timestamp) (*model.Collection, error) GetCollectionByID(ctx context.Context, dbName string, collectionID UniqueID, ts Timestamp, allowUnavailable bool) (*model.Collection, error) GetCollectionByIDWithMaxTs(ctx context.Context, collectionID UniqueID) (*model.Collection, error) @@ -610,6 +614,36 @@ func (mt *MetaTable) GetCollectionByName(ctx context.Context, dbName string, col return mt.getCollectionByNameInternal(ctx, dbName, collectionName, ts) } +// GetCollectionID retrieves the corresponding collectionID based on the collectionName. +// If the collection does not exist, it will return InvalidCollectionID. +// Please use the function with caution. +func (mt *MetaTable) GetCollectionID(ctx context.Context, dbName string, collectionName string) UniqueID { + mt.ddLock.RLock() + defer mt.ddLock.RUnlock() + + // backward compatibility for rolling upgrade + if dbName == "" { + log.Warn("db name is empty", zap.String("collectionName", collectionName)) + dbName = util.DefaultDBName + } + + _, err := mt.getDatabaseByNameInternal(ctx, dbName, typeutil.MaxTimestamp) + if err != nil { + return InvalidCollectionID + } + + collectionID, ok := mt.aliases.get(dbName, collectionName) + if ok { + return collectionID + } + + collectionID, ok = mt.names.get(dbName, collectionName) + if ok { + return collectionID + } + return InvalidCollectionID +} + func (mt *MetaTable) getCollectionByNameInternal(ctx context.Context, dbName string, collectionName string, ts Timestamp) (*model.Collection, error) { // backward compatibility for rolling upgrade if dbName == "" { diff --git a/internal/rootcoord/mocks/meta_table.go b/internal/rootcoord/mocks/meta_table.go index 9805b71910..3574ec369a 100644 --- a/internal/rootcoord/mocks/meta_table.go +++ b/internal/rootcoord/mocks/meta_table.go @@ -1245,6 +1245,54 @@ func (_c *IMetaTable_GetCollectionByName_Call) RunAndReturn(run func(context.Con return _c } +// GetCollectionID provides a mock function with given fields: ctx, dbName, collectionName +func (_m *IMetaTable) GetCollectionID(ctx context.Context, dbName string, collectionName string) int64 { + ret := _m.Called(ctx, dbName, collectionName) + + if len(ret) == 0 { + panic("no return value specified for GetCollectionID") + } + + var r0 int64 + if rf, ok := ret.Get(0).(func(context.Context, string, string) int64); ok { + r0 = rf(ctx, dbName, collectionName) + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// IMetaTable_GetCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCollectionID' +type IMetaTable_GetCollectionID_Call struct { + *mock.Call +} + +// GetCollectionID is a helper method to define mock.On call +// - ctx context.Context +// - dbName string +// - collectionName string +func (_e *IMetaTable_Expecter) GetCollectionID(ctx interface{}, dbName interface{}, collectionName interface{}) *IMetaTable_GetCollectionID_Call { + return &IMetaTable_GetCollectionID_Call{Call: _e.mock.On("GetCollectionID", ctx, dbName, collectionName)} +} + +func (_c *IMetaTable_GetCollectionID_Call) Run(run func(ctx context.Context, dbName string, collectionName string)) *IMetaTable_GetCollectionID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string)) + }) + return _c +} + +func (_c *IMetaTable_GetCollectionID_Call) Return(_a0 int64) *IMetaTable_GetCollectionID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *IMetaTable_GetCollectionID_Call) RunAndReturn(run func(context.Context, string, string) int64) *IMetaTable_GetCollectionID_Call { + _c.Call.Return(run) + return _c +} + // GetCollectionVirtualChannels provides a mock function with given fields: ctx, colID func (_m *IMetaTable) GetCollectionVirtualChannels(ctx context.Context, colID int64) []string { ret := _m.Called(ctx, colID) diff --git a/internal/rootcoord/rename_collection_task.go b/internal/rootcoord/rename_collection_task.go index fc915302cc..879c56628b 100644 --- a/internal/rootcoord/rename_collection_task.go +++ b/internal/rootcoord/rename_collection_task.go @@ -37,7 +37,8 @@ func (t *renameCollectionTask) Prepare(ctx context.Context) error { } func (t *renameCollectionTask) Execute(ctx context.Context) error { - if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetOldName()}, InvalidCollectionID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_RenameCollection)); err != nil { + collID := t.core.meta.GetCollectionID(ctx, t.Req.GetDbName(), t.Req.GetOldName()) + if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetOldName()}, collID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_RenameCollection)); err != nil { return err } return t.core.meta.RenameCollection(ctx, t.Req.GetDbName(), t.Req.GetOldName(), t.Req.GetNewDBName(), t.Req.GetNewName(), t.GetTs()) diff --git a/internal/rootcoord/rename_collection_task_test.go b/internal/rootcoord/rename_collection_task_test.go index dd4be07ab9..3b12042463 100644 --- a/internal/rootcoord/rename_collection_task_test.go +++ b/internal/rootcoord/rename_collection_task_test.go @@ -18,13 +18,15 @@ package rootcoord import ( "context" + "fmt" "testing" - "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" ) func Test_renameCollectionTask_Prepare(t *testing.T) { @@ -55,7 +57,9 @@ func Test_renameCollectionTask_Prepare(t *testing.T) { func Test_renameCollectionTask_Execute(t *testing.T) { t.Run("failed to expire cache", func(t *testing.T) { - core := newTestCore(withInvalidProxyManager()) + mockMeta := mockrootcoord.NewIMetaTable(t) + mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111) + core := newTestCore(withInvalidProxyManager(), withMeta(mockMeta)) task := &renameCollectionTask{ baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.RenameCollectionRequest{ @@ -69,12 +73,11 @@ func Test_renameCollectionTask_Execute(t *testing.T) { }) t.Run("failed to rename collection", func(t *testing.T) { - meta := newMockMetaTable() - meta.RenameCollectionFunc = func(ctx context.Context, oldName string, newName string, ts Timestamp) error { - return errors.New("fail") - } - - core := newTestCore(withValidProxyManager(), withMeta(meta)) + mockMeta := mockrootcoord.NewIMetaTable(t) + mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111) + mockMeta.EXPECT().RenameCollection(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(fmt.Errorf("failed to alter alias")) + core := newTestCore(withValidProxyManager(), withMeta(mockMeta)) task := &renameCollectionTask{ baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.RenameCollectionRequest{ diff --git a/internal/util/hookutil/default.go b/internal/util/hookutil/default.go index 7bbd467bb6..546ed80c43 100644 --- a/internal/util/hookutil/default.go +++ b/internal/util/hookutil/default.go @@ -59,3 +59,7 @@ var _ hook.Extension = (*DefaultExtension)(nil) func (d DefaultExtension) Report(info any) int { return 0 } + +func (d DefaultExtension) ReportRefused(ctx context.Context, req interface{}, resp interface{}, err error, fullMethod string) error { + return nil +} diff --git a/tests/integration/minicluster_v2.go b/tests/integration/minicluster_v2.go index 9b0f1d4452..ea14966949 100644 --- a/tests/integration/minicluster_v2.go +++ b/tests/integration/minicluster_v2.go @@ -607,6 +607,10 @@ func (r *ReportChanExtension) Report(info any) int { return 1 } +func (r *ReportChanExtension) ReportRefused(ctx context.Context, req interface{}, resp interface{}, err error, fullMethod string) error { + return nil +} + func (r *ReportChanExtension) GetReportChan() <-chan any { return r.reportChan }