mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Fix data race in DataCoord unit test (#19469)
Refine the DataCoord initialization Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
289e55ec52
commit
2dca3688a0
@ -255,20 +255,10 @@ func (s *Server) initSession() error {
|
||||
|
||||
// Init change server state to Initializing
|
||||
func (s *Server) Init() error {
|
||||
var err error
|
||||
atomic.StoreInt64(&s.isServing, ServerStateInitializing)
|
||||
s.factory.Init(&Params)
|
||||
return s.initSession()
|
||||
}
|
||||
|
||||
// Start initialize `Server` members and start loops, follow steps are taken:
|
||||
// 1. initialize message factory parameters
|
||||
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
|
||||
// allocator, segment manager
|
||||
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
|
||||
// datanodes etcd watch, etcd alive check and flush completed status check
|
||||
// 4. set server state to Healthy
|
||||
func (s *Server) Start() error {
|
||||
var err error
|
||||
if err = s.initRootCoordClient(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -290,6 +280,10 @@ func (s *Server) Start() error {
|
||||
|
||||
s.allocator = newRootCoordAllocator(s.rootCoordClient)
|
||||
|
||||
if err = s.initSession(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = s.initServiceDiscovery(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -298,10 +292,24 @@ func (s *Server) Start() error {
|
||||
s.createCompactionHandler()
|
||||
s.createCompactionTrigger()
|
||||
}
|
||||
s.startSegmentManager()
|
||||
s.initSegmentManager()
|
||||
|
||||
s.initGarbageCollection(storageCli)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start initialize `Server` members and start loops, follow steps are taken:
|
||||
// 1. initialize message factory parameters
|
||||
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
|
||||
// allocator, segment manager
|
||||
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
|
||||
// datanodes etcd watch, etcd alive check and flush completed status check
|
||||
// 4. set server state to Healthy
|
||||
func (s *Server) Start() error {
|
||||
s.compactionHandler.start()
|
||||
s.compactionTrigger.start()
|
||||
|
||||
s.startServerLoop()
|
||||
Params.DataCoordCfg.CreatedTime = time.Now()
|
||||
Params.DataCoordCfg.UpdatedTime = time.Now()
|
||||
@ -344,7 +352,6 @@ func (s *Server) SetIndexCoord(indexCoord types.IndexCoord) {
|
||||
|
||||
func (s *Server) createCompactionHandler() {
|
||||
s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator, s.flushCh, s.segReferManager)
|
||||
s.compactionHandler.start()
|
||||
}
|
||||
|
||||
func (s *Server) stopCompactionHandler() {
|
||||
@ -353,7 +360,6 @@ func (s *Server) stopCompactionHandler() {
|
||||
|
||||
func (s *Server) createCompactionTrigger() {
|
||||
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.segReferManager, s.indexCoord)
|
||||
s.compactionTrigger.start()
|
||||
}
|
||||
|
||||
func (s *Server) stopCompactionTrigger() {
|
||||
@ -446,7 +452,7 @@ func (s *Server) initServiceDiscovery() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Server) startSegmentManager() {
|
||||
func (s *Server) initSegmentManager() {
|
||||
if s.segmentManager == nil {
|
||||
s.segmentManager = newSegmentManager(s.meta, s.allocator, s.rootCoordClient)
|
||||
}
|
||||
|
||||
@ -1337,7 +1337,7 @@ func TestDataNodeTtChannel(t *testing.T) {
|
||||
}
|
||||
}
|
||||
t.Run("Test segment flush after tt", func(t *testing.T) {
|
||||
ch := make(chan interface{}, 1)
|
||||
ch := make(chan any, 1)
|
||||
svr := newTestServer(t, ch)
|
||||
defer closeTestServer(t, svr)
|
||||
|
||||
@ -1407,7 +1407,7 @@ func TestDataNodeTtChannel(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("flush segment with different channels", func(t *testing.T) {
|
||||
ch := make(chan interface{}, 1)
|
||||
ch := make(chan any, 1)
|
||||
svr := newTestServer(t, ch)
|
||||
defer closeTestServer(t, svr)
|
||||
svr.meta.AddCollection(&datapb.CollectionInfo{
|
||||
@ -1484,7 +1484,7 @@ func TestDataNodeTtChannel(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("test expire allocation after receiving tt msg", func(t *testing.T) {
|
||||
ch := make(chan interface{}, 1)
|
||||
ch := make(chan any, 1)
|
||||
helper := ServerHelper{
|
||||
eventAfterHandleDataNodeTt: func() { ch <- struct{}{} },
|
||||
}
|
||||
@ -2616,12 +2616,9 @@ func TestDataCoordServer_SetSegmentState(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("dataCoord meta set state error", func(t *testing.T) {
|
||||
svr := newTestServer(t, nil)
|
||||
svr.meta.Lock()
|
||||
func() {
|
||||
defer svr.meta.Unlock()
|
||||
svr.meta, _ = newMeta(context.TODO(), &mockTxnKVext{}, "")
|
||||
}()
|
||||
meta, err := newMeta(context.TODO(), &mockTxnKVext{}, "")
|
||||
assert.NoError(t, err)
|
||||
svr := newTestServerWithMeta(t, nil, meta)
|
||||
defer closeTestServer(t, svr)
|
||||
segment := &datapb.SegmentInfo{
|
||||
ID: 1000,
|
||||
@ -2961,7 +2958,7 @@ func (ms *MockClosePanicMsgstream) Chan() <-chan *msgstream.MsgPack {
|
||||
return args.Get(0).(chan *msgstream.MsgPack)
|
||||
}
|
||||
|
||||
func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server {
|
||||
func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server {
|
||||
var err error
|
||||
Params.Init()
|
||||
Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
|
||||
@ -3000,6 +2997,47 @@ func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Se
|
||||
return svr
|
||||
}
|
||||
|
||||
func newTestServerWithMeta(t *testing.T, receiveCh chan any, meta *meta, opts ...Option) *Server {
|
||||
var err error
|
||||
Params.Init()
|
||||
Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
|
||||
factory := dependency.NewDefaultFactory(true)
|
||||
|
||||
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
|
||||
assert.Nil(t, err)
|
||||
sessKey := path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot)
|
||||
_, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix())
|
||||
assert.Nil(t, err)
|
||||
|
||||
svr := CreateServer(context.TODO(), factory, opts...)
|
||||
svr.SetEtcdClient(etcdCli)
|
||||
svr.dataNodeCreator = func(ctx context.Context, addr string) (types.DataNode, error) {
|
||||
return newMockDataNodeClient(0, receiveCh)
|
||||
}
|
||||
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoord, error) {
|
||||
return newMockRootCoordService(), nil
|
||||
}
|
||||
indexCoord := mocks.NewMockIndexCoord(t)
|
||||
indexCoord.EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
|
||||
svr.indexCoord = indexCoord
|
||||
|
||||
err = svr.Init()
|
||||
assert.Nil(t, err)
|
||||
svr.meta = meta
|
||||
|
||||
err = svr.Start()
|
||||
assert.Nil(t, err)
|
||||
err = svr.Register()
|
||||
assert.Nil(t, err)
|
||||
|
||||
// Stop channal watch state watcher in tests
|
||||
if svr.channelManager != nil && svr.channelManager.stopChecker != nil {
|
||||
svr.channelManager.stopChecker()
|
||||
}
|
||||
|
||||
return svr
|
||||
}
|
||||
|
||||
func closeTestServer(t *testing.T, svr *Server) {
|
||||
err := svr.Stop()
|
||||
assert.Nil(t, err)
|
||||
@ -3007,7 +3045,7 @@ func closeTestServer(t *testing.T, svr *Server) {
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func newTestServer2(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server {
|
||||
func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server {
|
||||
var err error
|
||||
Params.Init()
|
||||
Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user