diff --git a/internal/streamingnode/server/builder.go b/internal/streamingnode/server/builder.go index bde9a10cfb..11d013b8c6 100644 --- a/internal/streamingnode/server/builder.go +++ b/internal/streamingnode/server/builder.go @@ -67,13 +67,12 @@ func (b *ServerBuilder) WithMetaKV(kv kv.MetaKv) *ServerBuilder { // Build builds a streaming node server. func (b *ServerBuilder) Build() *Server { - resource.Apply( + resource.Init( resource.OptETCD(b.etcdClient), resource.OptChunkManager(b.chunkManager), resource.OptMixCoordClient(b.mixc), resource.OptStreamingNodeCatalog(streamingnode.NewCataLog(b.kv)), ) - resource.Done() s := &Server{ session: b.session, grpcServer: b.grpcServer, diff --git a/internal/streamingnode/server/resource/resource.go b/internal/streamingnode/server/resource/resource.go index c69fcd497c..366b13634f 100644 --- a/internal/streamingnode/server/resource/resource.go +++ b/internal/streamingnode/server/resource/resource.go @@ -19,9 +19,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) -var r = &resourceImpl{ - logger: log.With(log.FieldModule(typeutil.StreamingNodeRole)), -} // singleton resource instance +var r *resourceImpl // singleton resource instance // optResourceInit is the option to initialize the resource. type optResourceInit func(r *resourceImpl) @@ -66,20 +64,27 @@ func Apply(opts ...optResourceInit) { } // Done finish all initialization of resources. -func Done() { - r.segmentStatsManager = stats.NewStatsManager() - r.timeTickInspector = tinspector.NewTimeTickSyncInspector() - r.syncMgr = syncmgr.NewSyncManager(r.chunkManager) - r.wbMgr = writebuffer.NewManager(r.syncMgr) - r.wbMgr.Start() - assertNotNil(r.ChunkManager()) - assertNotNil(r.TSOAllocator()) - assertNotNil(r.MixCoordClient()) - assertNotNil(r.StreamingNodeCatalog()) - assertNotNil(r.SegmentStatsManager()) - assertNotNil(r.TimeTickInspector()) - assertNotNil(r.SyncManager()) - assertNotNil(r.WriteBufferManager()) +func Init(opts ...optResourceInit) { + newR := &resourceImpl{} + for _, opt := range opts { + opt(newR) + } + + newR.logger = log.With(log.FieldModule(typeutil.StreamingNodeRole)) + newR.segmentStatsManager = stats.NewStatsManager() + newR.timeTickInspector = tinspector.NewTimeTickSyncInspector() + newR.syncMgr = syncmgr.NewSyncManager(newR.chunkManager) + newR.wbMgr = writebuffer.NewManager(newR.syncMgr) + newR.wbMgr.Start() + assertNotNil(newR.ChunkManager()) + assertNotNil(newR.TSOAllocator()) + assertNotNil(newR.MixCoordClient()) + assertNotNil(newR.StreamingNodeCatalog()) + assertNotNil(newR.SegmentStatsManager()) + assertNotNil(newR.TimeTickInspector()) + assertNotNil(newR.SyncManager()) + assertNotNil(newR.WriteBufferManager()) + r = newR } // Release releases the singleton of resources. diff --git a/internal/streamingnode/server/resource/resource_test.go b/internal/streamingnode/server/resource/resource_test.go index 9ac6e7954d..a9abf49ea3 100644 --- a/internal/streamingnode/server/resource/resource_test.go +++ b/internal/streamingnode/server/resource/resource_test.go @@ -19,23 +19,18 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -func TestApply(t *testing.T) { - Apply() - Apply(OptETCD(&clientv3.Client{})) - Apply(OptMixCoordClient(syncutil.NewFuture[types.MixCoordClient]())) - +func TestInit(t *testing.T) { assert.Panics(t, func() { - Done() + Init(OptETCD(&clientv3.Client{}), + OptMixCoordClient(syncutil.NewFuture[types.MixCoordClient]())) }) - Apply( + Init( OptChunkManager(mock_storage.NewMockChunkManager(t)), OptETCD(&clientv3.Client{}), OptMixCoordClient(syncutil.NewFuture[types.MixCoordClient]()), OptStreamingNodeCatalog(mock_metastore.NewMockStreamingNodeCataLog(t)), ) - Done() - assert.NotNil(t, Resource().TSOAllocator()) assert.NotNil(t, Resource().ETCD()) assert.NotNil(t, Resource().MixCoordClient()) diff --git a/internal/streamingnode/server/service/handler/consumer/consume_server_test.go b/internal/streamingnode/server/service/handler/consumer/consume_server_test.go index d50ccda837..0d3c96f7b6 100644 --- a/internal/streamingnode/server/service/handler/consumer/consume_server_test.go +++ b/internal/streamingnode/server/service/handler/consumer/consume_server_test.go @@ -13,6 +13,7 @@ import ( "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_wal" "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_walmanager" + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/walmanager" "github.com/milvus-io/milvus/internal/util/streamingutil/service/contextutil" "github.com/milvus-io/milvus/internal/util/streamingutil/status" @@ -32,6 +33,7 @@ func TestMain(m *testing.M) { } func TestCreateConsumeServer(t *testing.T) { + resource.InitForTest(t) manager := mock_walmanager.NewMockManager(t) grpcConsumeServer := mock_streamingpb.NewMockStreamingNodeHandlerService_ConsumeServer(t) diff --git a/internal/streamingnode/server/service/handler/producer/produce_server_test.go b/internal/streamingnode/server/service/handler/producer/produce_server_test.go index a15e956510..af825f92c0 100644 --- a/internal/streamingnode/server/service/handler/producer/produce_server_test.go +++ b/internal/streamingnode/server/service/handler/producer/produce_server_test.go @@ -16,6 +16,7 @@ import ( "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_wal" "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_walmanager" + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/walmanager" "github.com/milvus-io/milvus/internal/util/streamingutil/service/contextutil" @@ -35,6 +36,7 @@ func TestMain(m *testing.M) { } func TestCreateProduceServer(t *testing.T) { + resource.InitForTest(t) manager := mock_walmanager.NewMockManager(t) grpcProduceServer := mock_streamingpb.NewMockStreamingNodeHandlerService_ProduceServer(t)