mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
enhance: make sure stream closed (#29456)
relate: https://github.com/milvus-io/milvus/issues/28367 --------- Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
parent
a76e3b2813
commit
033456ea2c
@ -235,6 +235,8 @@ func (mgr *singleTypeChannelsMgr) createMsgStream(collectionID UniqueID) (msgstr
|
|||||||
zap.Strings("physical_channels", channelInfos.pchans))
|
zap.Strings("physical_channels", channelInfos.pchans))
|
||||||
mgr.infos[collectionID] = streamInfos{channelInfos: channelInfos, stream: stream}
|
mgr.infos[collectionID] = streamInfos{channelInfos: channelInfos, stream: stream}
|
||||||
incPChansMetrics(channelInfos.pchans)
|
incPChansMetrics(channelInfos.pchans)
|
||||||
|
} else {
|
||||||
|
stream.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
return mgr.infos[collectionID].stream, nil
|
return mgr.infos[collectionID].stream, nil
|
||||||
|
|||||||
@ -18,6 +18,7 @@ package proxy
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/cockroachdb/errors"
|
"github.com/cockroachdb/errors"
|
||||||
@ -251,6 +252,43 @@ func Test_singleTypeChannelsMgr_createMsgStream(t *testing.T) {
|
|||||||
assert.NotNil(t, stream)
|
assert.NotNil(t, stream)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("concurrent create", func(t *testing.T) {
|
||||||
|
factory := newMockMsgStreamFactory()
|
||||||
|
factory.f = func(ctx context.Context) (msgstream.MsgStream, error) {
|
||||||
|
return newMockMsgStream(), nil
|
||||||
|
}
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
readyCh := make(chan struct{})
|
||||||
|
m := &singleTypeChannelsMgr{
|
||||||
|
infos: make(map[UniqueID]streamInfos),
|
||||||
|
getChannelsFunc: func(collectionID UniqueID) (channelInfos, error) {
|
||||||
|
close(readyCh)
|
||||||
|
<-stopCh
|
||||||
|
return channelInfos{vchans: []string{"111", "222"}, pchans: []string{"111"}}, nil
|
||||||
|
},
|
||||||
|
msgStreamFactory: factory,
|
||||||
|
repackFunc: nil,
|
||||||
|
}
|
||||||
|
|
||||||
|
firstStream := streamInfos{stream: newMockMsgStream()}
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
stream, err := m.createMsgStream(100)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NotNil(t, stream)
|
||||||
|
}()
|
||||||
|
// make sure create msg stream has run at getchannels
|
||||||
|
<-readyCh
|
||||||
|
// mock create stream for same collection in same time.
|
||||||
|
m.mu.Lock()
|
||||||
|
m.infos[100] = firstStream
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
close(stopCh)
|
||||||
|
wg.Wait()
|
||||||
|
})
|
||||||
t.Run("failed to get channels", func(t *testing.T) {
|
t.Run("failed to get channels", func(t *testing.T) {
|
||||||
m := &singleTypeChannelsMgr{
|
m := &singleTypeChannelsMgr{
|
||||||
getChannelsFunc: func(collectionID UniqueID) (channelInfos, error) {
|
getChannelsFunc: func(collectionID UniqueID) (channelInfos, error) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user