diff --git a/internal/streamingnode/server/flusher/flusherimpl/flusher_components.go b/internal/streamingnode/server/flusher/flusherimpl/flusher_components.go index d3f96485d7..94554d7947 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/flusher_components.go +++ b/internal/streamingnode/server/flusher/flusherimpl/flusher_components.go @@ -168,13 +168,24 @@ func (impl *flusherComponents) recover(ctx context.Context, recoverInfos map[str futures[vchannel] = future } dataServices := make(map[string]*dataSyncServiceWrapper, len(futures)) + var lastErr error for vchannel, future := range futures { ds, err := future.Await() if err != nil { - return err + lastErr = err + continue } dataServices[vchannel] = ds.(*dataSyncServiceWrapper) } + if lastErr != nil { + // release all the data sync services if operation is canceled. + // otherwise, the write buffer will leak. + for _, ds := range dataServices { + ds.Close() + } + impl.logger.Warn("failed to build data sync service, may be canceled when recovering", zap.Error(lastErr)) + return lastErr + } impl.dataServices = dataServices for vchannel, ds := range dataServices { ds.Start() diff --git a/internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go b/internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go index e9eea31efb..10d99c70cb 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go +++ b/internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go @@ -29,7 +29,7 @@ func RecoverWALFlusher(param *interceptors.InterceptorBuildParam) *WALFlusherImp wal: param.WAL, logger: resource.Resource().Logger().With( log.FieldComponent("flusher"), - zap.String("pchannel", param.ChannelInfo.Name)), + zap.String("pchannel", param.ChannelInfo.String())), metrics: newFlusherMetrics(param.ChannelInfo), } go flusher.Execute()