From ecfc868dcb7beb2e0607c66f07e91ebeaaece9e1 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Thu, 24 Apr 2025 19:38:38 +0800 Subject: [PATCH] fix: write buffer not unregistered when datasyncservice is gone (#41496) issue: #41495 Signed-off-by: chyezh --- .../flusherimpl/data_service_wrapper.go | 22 +++++++++++++------ .../flusher/flusherimpl/flusher_components.go | 4 ++-- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/internal/streamingnode/server/flusher/flusherimpl/data_service_wrapper.go b/internal/streamingnode/server/flusher/flusherimpl/data_service_wrapper.go index 62a9a35907..6c17b1d211 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/data_service_wrapper.go +++ b/internal/streamingnode/server/flusher/flusherimpl/data_service_wrapper.go @@ -4,26 +4,33 @@ import ( "context" "github.com/milvus-io/milvus/internal/flushcommon/pipeline" + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message/adaptor" ) // newDataSyncServiceWrapper creates a new data sync service wrapper. -func newDataSyncServiceWrapper(input chan<- *msgstream.MsgPack, ds *pipeline.DataSyncService) *dataSyncServiceWrapper { +func newDataSyncServiceWrapper( + channelName string, + input chan<- *msgstream.MsgPack, + ds *pipeline.DataSyncService, +) *dataSyncServiceWrapper { handler := adaptor.NewBaseMsgPackAdaptorHandler() return &dataSyncServiceWrapper{ - input: input, - handler: handler, - ds: ds, + channelName: channelName, + input: input, + handler: handler, + ds: ds, } } // dataSyncServiceWrapper wraps DataSyncService and related input channel. type dataSyncServiceWrapper struct { - input chan<- *msgstream.MsgPack - handler *adaptor.BaseMsgPackAdaptorHandler - ds *pipeline.DataSyncService + channelName string + input chan<- *msgstream.MsgPack + handler *adaptor.BaseMsgPackAdaptorHandler + ds *pipeline.DataSyncService } // Start starts the data sync service. @@ -51,4 +58,5 @@ func (ds *dataSyncServiceWrapper) Close() { // The input channel should be closed first, otherwise the flowgraph in datasync service will be blocked. close(ds.input) ds.ds.GracefullyClose() + resource.Resource().WriteBufferManager().RemoveChannel(ds.channelName) } diff --git a/internal/streamingnode/server/flusher/flusherimpl/flusher_components.go b/internal/streamingnode/server/flusher/flusherimpl/flusher_components.go index 8f9ca989fd..66e5e83379 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/flusher_components.go +++ b/internal/streamingnode/server/flusher/flusherimpl/flusher_components.go @@ -141,7 +141,7 @@ func (impl *flusherComponents) addNewDataSyncService( ds *pipeline.DataSyncService, ) { impl.checkpointManager.AddVChannel(createCollectionMsg.VChannel(), createCollectionMsg.LastConfirmedMessageID()) - newDS := newDataSyncServiceWrapper(input, ds) + newDS := newDataSyncServiceWrapper(createCollectionMsg.VChannel(), input, ds) newDS.Start() impl.dataServices[createCollectionMsg.VChannel()] = newDS impl.logger.Info("create data sync service done", zap.String("vchannel", createCollectionMsg.VChannel())) @@ -268,5 +268,5 @@ func (impl *flusherComponents) buildDataSyncService(ctx context.Context, recover if err != nil { return nil, err } - return newDataSyncServiceWrapper(input, ds), nil + return newDataSyncServiceWrapper(recoverInfo.Info.ChannelName, input, ds), nil }