fix: write buffer not unregistered when datasyncservice is gone (#41496)

issue: #41495

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-04-24 19:38:38 +08:00 committed by GitHub
parent b5fe6a5243
commit ecfc868dcb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 17 additions and 9 deletions

View File

@ -4,26 +4,33 @@ import (
"context"
"github.com/milvus-io/milvus/internal/flushcommon/pipeline"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/adaptor"
)
// newDataSyncServiceWrapper creates a new data sync service wrapper.
func newDataSyncServiceWrapper(input chan<- *msgstream.MsgPack, ds *pipeline.DataSyncService) *dataSyncServiceWrapper {
func newDataSyncServiceWrapper(
channelName string,
input chan<- *msgstream.MsgPack,
ds *pipeline.DataSyncService,
) *dataSyncServiceWrapper {
handler := adaptor.NewBaseMsgPackAdaptorHandler()
return &dataSyncServiceWrapper{
input: input,
handler: handler,
ds: ds,
channelName: channelName,
input: input,
handler: handler,
ds: ds,
}
}
// dataSyncServiceWrapper wraps DataSyncService and related input channel.
type dataSyncServiceWrapper struct {
input chan<- *msgstream.MsgPack
handler *adaptor.BaseMsgPackAdaptorHandler
ds *pipeline.DataSyncService
channelName string
input chan<- *msgstream.MsgPack
handler *adaptor.BaseMsgPackAdaptorHandler
ds *pipeline.DataSyncService
}
// Start starts the data sync service.
@ -51,4 +58,5 @@ func (ds *dataSyncServiceWrapper) Close() {
// The input channel should be closed first, otherwise the flowgraph in datasync service will be blocked.
close(ds.input)
ds.ds.GracefullyClose()
resource.Resource().WriteBufferManager().RemoveChannel(ds.channelName)
}

View File

@ -141,7 +141,7 @@ func (impl *flusherComponents) addNewDataSyncService(
ds *pipeline.DataSyncService,
) {
impl.checkpointManager.AddVChannel(createCollectionMsg.VChannel(), createCollectionMsg.LastConfirmedMessageID())
newDS := newDataSyncServiceWrapper(input, ds)
newDS := newDataSyncServiceWrapper(createCollectionMsg.VChannel(), input, ds)
newDS.Start()
impl.dataServices[createCollectionMsg.VChannel()] = newDS
impl.logger.Info("create data sync service done", zap.String("vchannel", createCollectionMsg.VChannel()))
@ -268,5 +268,5 @@ func (impl *flusherComponents) buildDataSyncService(ctx context.Context, recover
if err != nil {
return nil, err
}
return newDataSyncServiceWrapper(input, ds), nil
return newDataSyncServiceWrapper(recoverInfo.Info.ChannelName, input, ds), nil
}