fix: dynamic log level for streaming node (#42964)

issue: #42963

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-06-26 19:12:50 +08:00 committed by GitHub
parent 5dd1f841d2
commit 3602817c53
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 31 additions and 28 deletions

View File

@ -67,13 +67,12 @@ func (b *ServerBuilder) WithMetaKV(kv kv.MetaKv) *ServerBuilder {
// Build builds a streaming node server. // Build builds a streaming node server.
func (b *ServerBuilder) Build() *Server { func (b *ServerBuilder) Build() *Server {
resource.Apply( resource.Init(
resource.OptETCD(b.etcdClient), resource.OptETCD(b.etcdClient),
resource.OptChunkManager(b.chunkManager), resource.OptChunkManager(b.chunkManager),
resource.OptMixCoordClient(b.mixc), resource.OptMixCoordClient(b.mixc),
resource.OptStreamingNodeCatalog(streamingnode.NewCataLog(b.kv)), resource.OptStreamingNodeCatalog(streamingnode.NewCataLog(b.kv)),
) )
resource.Done()
s := &Server{ s := &Server{
session: b.session, session: b.session,
grpcServer: b.grpcServer, grpcServer: b.grpcServer,

View File

@ -19,9 +19,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
var r = &resourceImpl{ var r *resourceImpl // singleton resource instance
logger: log.With(log.FieldModule(typeutil.StreamingNodeRole)),
} // singleton resource instance
// optResourceInit is the option to initialize the resource. // optResourceInit is the option to initialize the resource.
type optResourceInit func(r *resourceImpl) type optResourceInit func(r *resourceImpl)
@ -66,20 +64,27 @@ func Apply(opts ...optResourceInit) {
} }
// Done finish all initialization of resources. // Done finish all initialization of resources.
func Done() { func Init(opts ...optResourceInit) {
r.segmentStatsManager = stats.NewStatsManager() newR := &resourceImpl{}
r.timeTickInspector = tinspector.NewTimeTickSyncInspector() for _, opt := range opts {
r.syncMgr = syncmgr.NewSyncManager(r.chunkManager) opt(newR)
r.wbMgr = writebuffer.NewManager(r.syncMgr) }
r.wbMgr.Start()
assertNotNil(r.ChunkManager()) newR.logger = log.With(log.FieldModule(typeutil.StreamingNodeRole))
assertNotNil(r.TSOAllocator()) newR.segmentStatsManager = stats.NewStatsManager()
assertNotNil(r.MixCoordClient()) newR.timeTickInspector = tinspector.NewTimeTickSyncInspector()
assertNotNil(r.StreamingNodeCatalog()) newR.syncMgr = syncmgr.NewSyncManager(newR.chunkManager)
assertNotNil(r.SegmentStatsManager()) newR.wbMgr = writebuffer.NewManager(newR.syncMgr)
assertNotNil(r.TimeTickInspector()) newR.wbMgr.Start()
assertNotNil(r.SyncManager()) assertNotNil(newR.ChunkManager())
assertNotNil(r.WriteBufferManager()) assertNotNil(newR.TSOAllocator())
assertNotNil(newR.MixCoordClient())
assertNotNil(newR.StreamingNodeCatalog())
assertNotNil(newR.SegmentStatsManager())
assertNotNil(newR.TimeTickInspector())
assertNotNil(newR.SyncManager())
assertNotNil(newR.WriteBufferManager())
r = newR
} }
// Release releases the singleton of resources. // Release releases the singleton of resources.

View File

@ -19,23 +19,18 @@ func TestMain(m *testing.M) {
os.Exit(m.Run()) os.Exit(m.Run())
} }
func TestApply(t *testing.T) { func TestInit(t *testing.T) {
Apply()
Apply(OptETCD(&clientv3.Client{}))
Apply(OptMixCoordClient(syncutil.NewFuture[types.MixCoordClient]()))
assert.Panics(t, func() { assert.Panics(t, func() {
Done() Init(OptETCD(&clientv3.Client{}),
OptMixCoordClient(syncutil.NewFuture[types.MixCoordClient]()))
}) })
Apply( Init(
OptChunkManager(mock_storage.NewMockChunkManager(t)), OptChunkManager(mock_storage.NewMockChunkManager(t)),
OptETCD(&clientv3.Client{}), OptETCD(&clientv3.Client{}),
OptMixCoordClient(syncutil.NewFuture[types.MixCoordClient]()), OptMixCoordClient(syncutil.NewFuture[types.MixCoordClient]()),
OptStreamingNodeCatalog(mock_metastore.NewMockStreamingNodeCataLog(t)), OptStreamingNodeCatalog(mock_metastore.NewMockStreamingNodeCataLog(t)),
) )
Done()
assert.NotNil(t, Resource().TSOAllocator()) assert.NotNil(t, Resource().TSOAllocator())
assert.NotNil(t, Resource().ETCD()) assert.NotNil(t, Resource().ETCD())
assert.NotNil(t, Resource().MixCoordClient()) assert.NotNil(t, Resource().MixCoordClient())

View File

@ -13,6 +13,7 @@ import (
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_wal" "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_wal"
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_walmanager" "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_walmanager"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/walmanager" "github.com/milvus-io/milvus/internal/streamingnode/server/walmanager"
"github.com/milvus-io/milvus/internal/util/streamingutil/service/contextutil" "github.com/milvus-io/milvus/internal/util/streamingutil/service/contextutil"
"github.com/milvus-io/milvus/internal/util/streamingutil/status" "github.com/milvus-io/milvus/internal/util/streamingutil/status"
@ -32,6 +33,7 @@ func TestMain(m *testing.M) {
} }
func TestCreateConsumeServer(t *testing.T) { func TestCreateConsumeServer(t *testing.T) {
resource.InitForTest(t)
manager := mock_walmanager.NewMockManager(t) manager := mock_walmanager.NewMockManager(t)
grpcConsumeServer := mock_streamingpb.NewMockStreamingNodeHandlerService_ConsumeServer(t) grpcConsumeServer := mock_streamingpb.NewMockStreamingNodeHandlerService_ConsumeServer(t)

View File

@ -16,6 +16,7 @@ import (
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_wal" "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_wal"
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_walmanager" "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_walmanager"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/walmanager" "github.com/milvus-io/milvus/internal/streamingnode/server/walmanager"
"github.com/milvus-io/milvus/internal/util/streamingutil/service/contextutil" "github.com/milvus-io/milvus/internal/util/streamingutil/service/contextutil"
@ -35,6 +36,7 @@ func TestMain(m *testing.M) {
} }
func TestCreateProduceServer(t *testing.T) { func TestCreateProduceServer(t *testing.T) {
resource.InitForTest(t)
manager := mock_walmanager.NewMockManager(t) manager := mock_walmanager.NewMockManager(t)
grpcProduceServer := mock_streamingpb.NewMockStreamingNodeHandlerService_ProduceServer(t) grpcProduceServer := mock_streamingpb.NewMockStreamingNodeHandlerService_ProduceServer(t)