diff --git a/internal/core/src/segcore/token_stream_c.cpp b/internal/core/src/segcore/token_stream_c.cpp index e1db43e541..0cf3794b06 100644 --- a/internal/core/src/segcore/token_stream_c.cpp +++ b/internal/core/src/segcore/token_stream_c.cpp @@ -10,35 +10,26 @@ // or implied. See the License for the specific language governing permissions and limitations under the License #include "segcore/token_stream_c.h" #include "token-stream.h" -#include "monitor/scope_metric.h" void free_token_stream(CTokenStream token_stream) { - SCOPE_CGO_CALL_METRIC(); - delete static_cast(token_stream); } bool token_stream_advance(CTokenStream token_stream) { - SCOPE_CGO_CALL_METRIC(); - return static_cast(token_stream)->advance(); } // Note: returned token must be freed by the caller using `free_token`. const char* token_stream_get_token(CTokenStream token_stream) { - SCOPE_CGO_CALL_METRIC(); - return static_cast(token_stream) ->get_token_no_copy(); } CToken token_stream_get_detailed_token(CTokenStream token_stream) { - SCOPE_CGO_CALL_METRIC(); - auto token = static_cast(token_stream) ->get_detailed_token(); return CToken{token.token, @@ -50,7 +41,5 @@ token_stream_get_detailed_token(CTokenStream token_stream) { void free_token(void* token) { - SCOPE_CGO_CALL_METRIC(); - free_rust_string(static_cast(token)); } diff --git a/internal/core/src/segcore/tokenizer_c.cpp b/internal/core/src/segcore/tokenizer_c.cpp index 8403328785..31a7721e69 100644 --- a/internal/core/src/segcore/tokenizer_c.cpp +++ b/internal/core/src/segcore/tokenizer_c.cpp @@ -13,7 +13,6 @@ #include #include "common/FieldMeta.h" #include "common/protobuf_utils.h" -#include "monitor/scope_metric.h" #include "pb/schema.pb.h" #include "common/EasyAssert.h" #include "tokenizer.h" @@ -22,8 +21,6 @@ using Map = std::map; CStatus create_tokenizer(const char* params, CTokenizer* tokenizer) { - SCOPE_CGO_CALL_METRIC(); - try { auto impl = std::make_unique(params); *tokenizer = impl.release(); @@ -35,8 +32,6 @@ create_tokenizer(const char* params, CTokenizer* tokenizer) { CStatus clone_tokenizer(CTokenizer* tokenizer, CTokenizer* rst) { - SCOPE_CGO_CALL_METRIC(); - try { auto impl = reinterpret_cast(*tokenizer); *rst = impl->Clone().release(); @@ -48,24 +43,18 @@ clone_tokenizer(CTokenizer* tokenizer, CTokenizer* rst) { void free_tokenizer(CTokenizer tokenizer) { - SCOPE_CGO_CALL_METRIC(); - auto impl = reinterpret_cast(tokenizer); delete impl; } CTokenStream create_token_stream(CTokenizer tokenizer, const char* text, uint32_t text_len) { - SCOPE_CGO_CALL_METRIC(); - auto impl = reinterpret_cast(tokenizer); return impl->CreateTokenStream(std::string(text, text_len)).release(); } CStatus validate_tokenizer(const char* params) { - SCOPE_CGO_CALL_METRIC(); - try { auto impl = std::make_unique(params); return milvus::SuccessCStatus(); @@ -76,8 +65,6 @@ validate_tokenizer(const char* params) { CStatus validate_text_schema(const uint8_t* field_schema, uint64_t length) { - SCOPE_CGO_CALL_METRIC(); - try { auto schema = std::make_unique(); AssertInfo(schema->ParseFromArray(field_schema, length), diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index ad6548fe3e..ec40c5f418 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -134,16 +134,17 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p if globalMetaCache != nil { switch msgType { case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias, commonpb.MsgType_CreateAlias: + // remove collection by name first, otherwise the drop collection remove version will be failed. + if collectionName != "" { + globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName, request.GetBase().GetTimestamp()) // no need to return error, though collection may be not cached + node.shardMgr.DeprecateShardCache(request.GetDbName(), collectionName) + } if request.CollectionID != UniqueID(0) { aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID, request.GetBase().GetTimestamp(), msgType == commonpb.MsgType_DropCollection) for _, name := range aliasName { node.shardMgr.DeprecateShardCache(request.GetDbName(), name) } } - if collectionName != "" { - globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached - node.shardMgr.DeprecateShardCache(request.GetDbName(), collectionName) - } log.Info("complete to invalidate collection meta cache with collection name", zap.String("type", request.GetBase().GetMsgType().String())) case commonpb.MsgType_LoadCollection, commonpb.MsgType_ReleaseCollection: // All the request from query use collectionID @@ -163,7 +164,7 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p if request.CollectionID != UniqueID(0) { aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID, request.GetBase().GetTimestamp(), false) } - globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) + globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName, request.GetBase().GetTimestamp()) log.Info("complete to invalidate collection meta cache", zap.String("type", request.GetBase().GetMsgType().String())) case commonpb.MsgType_DropDatabase: node.shardMgr.RemoveDatabase(request.GetDbName()) @@ -178,7 +179,7 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p } } if collectionName != "" { - globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) + globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName, request.GetBase().GetTimestamp()) } log.Info("complete to invalidate collection meta cache", zap.String("type", request.GetBase().GetMsgType().String())) default: @@ -191,7 +192,7 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p } if collectionName != "" { - globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached + globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName, request.GetBase().GetTimestamp()) // no need to return error, though collection may be not cached node.shardMgr.DeprecateShardCache(request.GetDbName(), collectionName) } } diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 07bd2f65a3..b382e4f6f2 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -70,7 +70,7 @@ type Cache interface { // DeprecateShardCache(database, collectionName string) // InvalidateShardLeaderCache(collections []int64) // ListShardLocation() map[int64]nodeInfo - RemoveCollection(ctx context.Context, database, collectionName string) + RemoveCollection(ctx context.Context, database, collectionName string, version uint64) RemoveCollectionsByID(ctx context.Context, collectionID UniqueID, version uint64, removeVersion bool) []string // GetCredentialInfo operate credential cache @@ -800,23 +800,33 @@ func parsePartitionsInfo(infos []*partitionInfo, hasPartitionKey bool) *partitio return result } -func (m *MetaCache) RemoveCollection(ctx context.Context, database, collectionName string) { +func (m *MetaCache) RemoveCollection(ctx context.Context, database, collectionName string, version uint64) { m.mu.Lock() defer m.mu.Unlock() - _, dbOk := m.collInfo[database] - if dbOk { - delete(m.collInfo[database], collectionName) + + if db, dbOk := m.collInfo[database]; dbOk { + if coll, ok := db[collectionName]; ok { + m.removeCollectionByID(ctx, coll.collID, version, false) + } } if database == "" { - delete(m.collInfo[defaultDB], collectionName) + if db, dbOk := m.collInfo[defaultDB]; dbOk { + if coll, ok := db[collectionName]; ok { + m.removeCollectionByID(ctx, coll.collID, version, false) + } + } } - log.Ctx(ctx).Debug("remove collection", zap.String("db", database), zap.String("collection", collectionName), zap.Bool("dbok", dbOk)) + log.Ctx(ctx).Debug("remove collection", zap.String("db", database), zap.String("collection", collectionName)) } func (m *MetaCache) RemoveCollectionsByID(ctx context.Context, collectionID UniqueID, version uint64, removeVersion bool) []string { m.mu.Lock() defer m.mu.Unlock() + return m.removeCollectionByID(ctx, collectionID, version, removeVersion) +} + +func (m *MetaCache) removeCollectionByID(ctx context.Context, collectionID UniqueID, version uint64, removeVersion bool) []string { curVersion := m.collectionCacheVersion[collectionID] var collNames []string for database, db := range m.collInfo { diff --git a/internal/proxy/meta_cache_test.go b/internal/proxy/meta_cache_test.go index 30f6dfa50d..c3691a3a75 100644 --- a/internal/proxy/meta_cache_test.go +++ b/internal/proxy/meta_cache_test.go @@ -1304,7 +1304,7 @@ func TestMetaCache_ConcurrentTest1(t *testing.T) { defer wg.Done() for i := 0; i < cnt; i++ { // periodically invalid collection cache - globalMetaCache.RemoveCollection(ctx, dbName, "collection1") + globalMetaCache.RemoveCollection(ctx, dbName, "collection1", 0) time.Sleep(10 * time.Millisecond) } } @@ -1497,7 +1497,7 @@ func TestMetaCache_RemoveCollection(t *testing.T) { // shouldn't access RootCoord again assert.Equal(t, rootCoord.GetAccessCount(), 1) - globalMetaCache.RemoveCollection(ctx, dbName, "collection1") + globalMetaCache.RemoveCollection(ctx, dbName, "collection1", 0) // no collectionInfo of collection2, should access RootCoord _, err = globalMetaCache.GetCollectionInfo(ctx, dbName, "collection1", 1) assert.NoError(t, err) diff --git a/internal/proxy/mock_cache.go b/internal/proxy/mock_cache.go index cbc4c2f721..401e2c934c 100644 --- a/internal/proxy/mock_cache.go +++ b/internal/proxy/mock_cache.go @@ -660,9 +660,9 @@ func (_c *MockCache_HasDatabase_Call) RunAndReturn(run func(context.Context, str return _c } -// RemoveCollection provides a mock function with given fields: ctx, database, collectionName -func (_m *MockCache) RemoveCollection(ctx context.Context, database string, collectionName string) { - _m.Called(ctx, database, collectionName) +// RemoveCollection provides a mock function with given fields: ctx, database, collectionName, version +func (_m *MockCache) RemoveCollection(ctx context.Context, database string, collectionName string, version uint64) { + _m.Called(ctx, database, collectionName, version) } // MockCache_RemoveCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveCollection' @@ -674,13 +674,14 @@ type MockCache_RemoveCollection_Call struct { // - ctx context.Context // - database string // - collectionName string -func (_e *MockCache_Expecter) RemoveCollection(ctx interface{}, database interface{}, collectionName interface{}) *MockCache_RemoveCollection_Call { - return &MockCache_RemoveCollection_Call{Call: _e.mock.On("RemoveCollection", ctx, database, collectionName)} +// - version uint64 +func (_e *MockCache_Expecter) RemoveCollection(ctx interface{}, database interface{}, collectionName interface{}, version interface{}) *MockCache_RemoveCollection_Call { + return &MockCache_RemoveCollection_Call{Call: _e.mock.On("RemoveCollection", ctx, database, collectionName, version)} } -func (_c *MockCache_RemoveCollection_Call) Run(run func(ctx context.Context, database string, collectionName string)) *MockCache_RemoveCollection_Call { +func (_c *MockCache_RemoveCollection_Call) Run(run func(ctx context.Context, database string, collectionName string, version uint64)) *MockCache_RemoveCollection_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string)) + run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(uint64)) }) return _c } @@ -690,7 +691,7 @@ func (_c *MockCache_RemoveCollection_Call) Return() *MockCache_RemoveCollection_ return _c } -func (_c *MockCache_RemoveCollection_Call) RunAndReturn(run func(context.Context, string, string)) *MockCache_RemoveCollection_Call { +func (_c *MockCache_RemoveCollection_Call) RunAndReturn(run func(context.Context, string, string, uint64)) *MockCache_RemoveCollection_Call { _c.Run(run) return _c } diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index d46ec1e6ac..d59332331c 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -1671,7 +1671,7 @@ func TestHasCollectionTask(t *testing.T) { task.CollectionName = collectionName // invalidate collection cache, trigger rootcoord rpc - globalMetaCache.RemoveCollection(ctx, dbName, collectionName) + globalMetaCache.RemoveCollection(ctx, dbName, collectionName, 0) // rc return collection not found error mixc.describeCollectionFunc = func(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { diff --git a/internal/rootcoord/ddl_callbacks.go b/internal/rootcoord/ddl_callbacks.go index 9b74c20dd0..638e0114a7 100644 --- a/internal/rootcoord/ddl_callbacks.go +++ b/internal/rootcoord/ddl_callbacks.go @@ -97,7 +97,7 @@ type CacheExpirationsGetter interface { } // ExpireCaches handles the cache -func (c *DDLCallback) ExpireCaches(ctx context.Context, expirations any, timetick uint64) error { +func (c *DDLCallback) ExpireCaches(ctx context.Context, expirations any) error { var cacheExpirations *message.CacheExpirations if g, ok := expirations.(CacheExpirationsGetter); ok { cacheExpirations = g.GetCacheExpirations() @@ -109,18 +109,29 @@ func (c *DDLCallback) ExpireCaches(ctx context.Context, expirations any, timetic panic(fmt.Sprintf("invalid getter type: %T", expirations)) } for _, cacheExpiration := range cacheExpirations.CacheExpirations { - if err := c.expireCache(ctx, cacheExpiration, timetick); err != nil { + if err := c.expireCache(ctx, cacheExpiration); err != nil { return err } } return nil } -func (c *DDLCallback) expireCache(ctx context.Context, cacheExpiration *message.CacheExpiration, timetick uint64) error { +func (c *DDLCallback) expireCache(ctx context.Context, cacheExpiration *message.CacheExpiration) error { + ts, err := c.tsoAllocator.GenerateTSO(1) + if err != nil { + return errors.Wrap(err, "failed to generate timestamp") + } switch cacheExpiration.Cache.(type) { case *messagespb.CacheExpiration_LegacyProxyCollectionMetaCache: legacyProxyCollectionMetaCache := cacheExpiration.GetLegacyProxyCollectionMetaCache() - return c.Core.ExpireMetaCache(ctx, legacyProxyCollectionMetaCache.DbName, []string{legacyProxyCollectionMetaCache.CollectionName}, legacyProxyCollectionMetaCache.CollectionId, legacyProxyCollectionMetaCache.PartitionName, timetick, proxyutil.SetMsgType(legacyProxyCollectionMetaCache.MsgType)) + return c.Core.ExpireMetaCache( + ctx, + legacyProxyCollectionMetaCache.DbName, + []string{legacyProxyCollectionMetaCache.CollectionName}, + legacyProxyCollectionMetaCache.CollectionId, + legacyProxyCollectionMetaCache.PartitionName, + ts, + proxyutil.SetMsgType(legacyProxyCollectionMetaCache.MsgType)) } return nil } diff --git a/internal/rootcoord/ddl_callbacks_alter_alias.go b/internal/rootcoord/ddl_callbacks_alter_alias.go index 8c15e8d5de..64adc07f97 100644 --- a/internal/rootcoord/ddl_callbacks_alter_alias.go +++ b/internal/rootcoord/ddl_callbacks_alter_alias.go @@ -112,6 +112,5 @@ func (c *DDLCallback) alterAliasV2AckCallback(ctx context.Context, result messag ce.NewBuilder().WithLegacyProxyCollectionMetaCache( ce.OptLPCMDBName(result.Message.Header().DbName), ce.OptLPCMCollectionName(result.Message.Header().Alias), - ce.OptLPCMMsgType(commonpb.MsgType_AlterAlias)), - result.GetControlChannelResult().TimeTick) + ce.OptLPCMMsgType(commonpb.MsgType_AlterAlias))) } diff --git a/internal/rootcoord/ddl_callbacks_alter_collection_properties.go b/internal/rootcoord/ddl_callbacks_alter_collection_properties.go index 9ce3aa76d0..c11a64588a 100644 --- a/internal/rootcoord/ddl_callbacks_alter_collection_properties.go +++ b/internal/rootcoord/ddl_callbacks_alter_collection_properties.go @@ -296,5 +296,5 @@ func (c *DDLCallback) alterCollectionV2AckCallback(ctx context.Context, result m if err := c.broker.BroadcastAlteredCollection(ctx, header.CollectionId); err != nil { return errors.Wrap(err, "failed to broadcast altered collection") } - return c.ExpireCaches(ctx, header, result.GetControlChannelResult().TimeTick) + return c.ExpireCaches(ctx, header) } diff --git a/internal/rootcoord/ddl_callbacks_alter_database.go b/internal/rootcoord/ddl_callbacks_alter_database.go index fa73b97c40..891479c097 100644 --- a/internal/rootcoord/ddl_callbacks_alter_database.go +++ b/internal/rootcoord/ddl_callbacks_alter_database.go @@ -155,8 +155,7 @@ func (c *DDLCallback) alterDatabaseV1AckCallback(ctx context.Context, result mes WithLegacyProxyCollectionMetaCache( ce.OptLPCMDBName(header.DbName), ce.OptLPCMMsgType(commonpb.MsgType_AlterDatabase), - ), - result.GetControlChannelResult().TimeTick) + )) } func MergeProperties(oldProps, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair { diff --git a/internal/rootcoord/ddl_callbacks_create_collection.go b/internal/rootcoord/ddl_callbacks_create_collection.go index 5a09e15456..481d6737a3 100644 --- a/internal/rootcoord/ddl_callbacks_create_collection.go +++ b/internal/rootcoord/ddl_callbacks_create_collection.go @@ -117,9 +117,7 @@ func (c *DDLCallback) createCollectionV1AckCallback(ctx context.Context, result ce.OptLPCMDBName(body.DbName), ce.OptLPCMCollectionName(body.CollectionName), ce.OptLPCMCollectionID(header.CollectionId), - ce.OptLPCMMsgType(commonpb.MsgType_DropCollection)), - newCollInfo.UpdateTimestamp, - ) + ce.OptLPCMMsgType(commonpb.MsgType_DropCollection))) } func (c *DDLCallback) createCollectionShard(ctx context.Context, header *message.CreateCollectionMessageHeader, body *message.CreateCollectionRequest, vchannel string, appendResult *message.AppendResult) error { diff --git a/internal/rootcoord/ddl_callbacks_create_database.go b/internal/rootcoord/ddl_callbacks_create_database.go index 889bad78d8..2cda570f38 100644 --- a/internal/rootcoord/ddl_callbacks_create_database.go +++ b/internal/rootcoord/ddl_callbacks_create_database.go @@ -86,6 +86,5 @@ func (c *DDLCallback) createDatabaseV1AckCallback(ctx context.Context, result me WithLegacyProxyCollectionMetaCache( ce.OptLPCMDBName(header.DbName), ce.OptLPCMMsgType(commonpb.MsgType_DropDatabase), - ), - result.GetControlChannelResult().TimeTick) + )) } diff --git a/internal/rootcoord/ddl_callbacks_create_partition.go b/internal/rootcoord/ddl_callbacks_create_partition.go index 7e3f52eba0..9a7ba8a395 100644 --- a/internal/rootcoord/ddl_callbacks_create_partition.go +++ b/internal/rootcoord/ddl_callbacks_create_partition.go @@ -109,6 +109,5 @@ func (c *DDLCallback) createPartitionV1AckCallback(ctx context.Context, result m ce.OptLPCMCollectionID(header.CollectionId), ce.OptLPCMPartitionName(body.PartitionName), ce.OptLPCMMsgType(commonpb.MsgType_CreatePartition), - ), - result.GetControlChannelResult().TimeTick) + )) } diff --git a/internal/rootcoord/ddl_callbacks_drop_alias.go b/internal/rootcoord/ddl_callbacks_drop_alias.go index c6beecee23..aaba09d06b 100644 --- a/internal/rootcoord/ddl_callbacks_drop_alias.go +++ b/internal/rootcoord/ddl_callbacks_drop_alias.go @@ -68,7 +68,5 @@ func (c *DDLCallback) dropAliasV2AckCallback(ctx context.Context, result message ce.OptLPCMDBName(result.Message.Header().DbName), ce.OptLPCMCollectionName(result.Message.Header().Alias), ce.OptLPCMMsgType(commonpb.MsgType_DropAlias), - ), - result.GetControlChannelResult().TimeTick, - ) + )) } diff --git a/internal/rootcoord/ddl_callbacks_drop_collection.go b/internal/rootcoord/ddl_callbacks_drop_collection.go index b3031e071b..223cb68b1c 100644 --- a/internal/rootcoord/ddl_callbacks_drop_collection.go +++ b/internal/rootcoord/ddl_callbacks_drop_collection.go @@ -131,8 +131,7 @@ func (c *DDLCallback) dropCollectionV1AckCallback(ctx context.Context, result me ce.OptLPCMDBName(body.DbName), ce.OptLPCMCollectionName(body.CollectionName), ce.OptLPCMCollectionID(header.CollectionId), - ce.OptLPCMMsgType(commonpb.MsgType_DropCollection)).Build(), - result.GetControlChannelResult().TimeTick) + ce.OptLPCMMsgType(commonpb.MsgType_DropCollection)).Build()) } // newCollectionTombstone creates a new collection tombstone. diff --git a/internal/rootcoord/ddl_callbacks_drop_database.go b/internal/rootcoord/ddl_callbacks_drop_database.go index cf769f2315..5c2b6e79e1 100644 --- a/internal/rootcoord/ddl_callbacks_drop_database.go +++ b/internal/rootcoord/ddl_callbacks_drop_database.go @@ -68,6 +68,5 @@ func (c *DDLCallback) dropDatabaseV1AckCallback(ctx context.Context, result mess WithLegacyProxyCollectionMetaCache( ce.OptLPCMDBName(header.DbName), ce.OptLPCMMsgType(commonpb.MsgType_DropDatabase), - ), - result.GetControlChannelResult().TimeTick) + )) } diff --git a/internal/rootcoord/ddl_callbacks_drop_partition.go b/internal/rootcoord/ddl_callbacks_drop_partition.go index d90a5446b1..8a5d032198 100644 --- a/internal/rootcoord/ddl_callbacks_drop_partition.go +++ b/internal/rootcoord/ddl_callbacks_drop_partition.go @@ -114,8 +114,7 @@ func (c *DDLCallback) dropPartitionV1AckCallback(ctx context.Context, result mes ce.OptLPCMCollectionID(header.CollectionId), ce.OptLPCMPartitionName(body.PartitionName), ce.OptLPCMMsgType(commonpb.MsgType_DropPartition), - ), - result.GetControlChannelResult().TimeTick) + )) } // newPartitionTombstone creates a new partition tombstone. diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index a79c71564e..f0844e8e36 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -46,6 +46,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/channel" "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/broadcast" "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry" + mocktso "github.com/milvus-io/milvus/internal/tso/mocks" kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/v2/log" @@ -82,6 +83,8 @@ func initStreamingSystemAndCore(t *testing.T) *Core { require.NoError(t, err) testDB := newNameDb() collID2Meta := make(map[typeutil.UniqueID]*model.Collection) + tso := mocktso.NewAllocator(t) + tso.EXPECT().GenerateTSO(mock.Anything).Return(uint64(1), nil).Maybe() core := newTestCore(withHealthyCode(), withMeta(&MetaTable{ catalog: rootcoord.NewCatalog(catalogKV, ss), @@ -93,6 +96,7 @@ func initStreamingSystemAndCore(t *testing.T) *Core { withValidMixCoord(), withValidProxyManager(), withValidIDAllocator(), + withTsoAllocator(tso), withBroker(newValidMockBroker()), ) registry.ResetRegistration()