diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index 04b37b2652..19d393be5a 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -62,6 +62,7 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref log.Error("Failed to add msgstream", zap.String("name", name), zap.Error(err)) panic("Failed to add msgstream") } + ms.AsProducer([]string{name}) d.pool.Store(name, &dmlMsgStream{ ms: ms, mutex: sync.RWMutex{}, @@ -159,9 +160,6 @@ func (d *dmlChannels) addChannels(names ...string) { dms := v.(*dmlMsgStream) dms.mutex.Lock() - if dms.refcnt == 0 { - dms.ms.AsProducer([]string{name}) - } dms.refcnt++ dms.mutex.Unlock() } @@ -180,9 +178,8 @@ func (d *dmlChannels) removeChannels(names ...string) { dms.mutex.Lock() if dms.refcnt > 0 { dms.refcnt-- - if dms.refcnt == 0 { - dms.ms.Close() - } + } else { + log.Warn("Try to remove channel with no ref count", zap.String("channel name", name)) } dms.mutex.Unlock() }