From a3d621cb5ea0e52d10a4e41c1206baccb31c7b56 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Thu, 24 Apr 2025 20:36:38 +0800 Subject: [PATCH] fix: remove the concurrent limits for streaming service (#41484) issue: #41479 Signed-off-by: chyezh --- internal/distributed/streaming/wal.go | 4 ++-- internal/streamingnode/server/wal/adaptor/wal_adaptor.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/distributed/streaming/wal.go b/internal/distributed/streaming/wal.go index ed036bf8b5..a29103adad 100644 --- a/internal/distributed/streaming/wal.go +++ b/internal/distributed/streaming/wal.go @@ -41,8 +41,8 @@ func newWALAccesser(c *clientv3.Client) *walAccesserImpl { producers: make(map[string]*producer.ResumableProducer), // TODO: optimize the pool size, use the streaming api but not goroutines. - appendExecutionPool: conc.NewPool[struct{}](10), - dispatchExecutionPool: conc.NewPool[struct{}](10), + appendExecutionPool: conc.NewPool[struct{}](0), + dispatchExecutionPool: conc.NewPool[struct{}](0), } } diff --git a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go index 318f539b09..c47f437f91 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go @@ -62,8 +62,8 @@ func adaptImplsToWAL( wal := &walAdaptorImpl{ roWALAdaptorImpl: roWAL, rwWALImpls: basicWAL, - // TODO: make the pool size configurable. - appendExecutionPool: conc.NewPool[struct{}](10), + // TODO: remove the pool, use a queue instead. + appendExecutionPool: conc.NewPool[struct{}](0), param: param, interceptorBuildResult: buildInterceptor(builders, param), writeMetrics: metricsutil.NewWriteMetrics(basicWAL.Channel(), basicWAL.WALName()),