From 2dca3688a003d25d6ea147829cc1a60d0783bb50 Mon Sep 17 00:00:00 2001 From: yah01 Date: Tue, 27 Sep 2022 18:04:53 +0800 Subject: [PATCH] Fix data race in DataCoord unit test (#19469) Refine the DataCoord initialization Signed-off-by: yah01 Signed-off-by: yah01 --- internal/datacoord/server.go | 36 +++++++++++-------- internal/datacoord/server_test.go | 60 +++++++++++++++++++++++++------ 2 files changed, 70 insertions(+), 26 deletions(-) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 25a6c0a2b7..20779adb14 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -255,20 +255,10 @@ func (s *Server) initSession() error { // Init change server state to Initializing func (s *Server) Init() error { + var err error atomic.StoreInt64(&s.isServing, ServerStateInitializing) s.factory.Init(&Params) - return s.initSession() -} -// Start initialize `Server` members and start loops, follow steps are taken: -// 1. initialize message factory parameters -// 2. initialize root coord client, meta, datanode cluster, segment info channel, -// allocator, segment manager -// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt) -// datanodes etcd watch, etcd alive check and flush completed status check -// 4. set server state to Healthy -func (s *Server) Start() error { - var err error if err = s.initRootCoordClient(); err != nil { return err } @@ -290,6 +280,10 @@ func (s *Server) Start() error { s.allocator = newRootCoordAllocator(s.rootCoordClient) + if err = s.initSession(); err != nil { + return err + } + if err = s.initServiceDiscovery(); err != nil { return err } @@ -298,10 +292,24 @@ func (s *Server) Start() error { s.createCompactionHandler() s.createCompactionTrigger() } - s.startSegmentManager() + s.initSegmentManager() s.initGarbageCollection(storageCli) + return nil +} + +// Start initialize `Server` members and start loops, follow steps are taken: +// 1. initialize message factory parameters +// 2. initialize root coord client, meta, datanode cluster, segment info channel, +// allocator, segment manager +// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt) +// datanodes etcd watch, etcd alive check and flush completed status check +// 4. set server state to Healthy +func (s *Server) Start() error { + s.compactionHandler.start() + s.compactionTrigger.start() + s.startServerLoop() Params.DataCoordCfg.CreatedTime = time.Now() Params.DataCoordCfg.UpdatedTime = time.Now() @@ -344,7 +352,6 @@ func (s *Server) SetIndexCoord(indexCoord types.IndexCoord) { func (s *Server) createCompactionHandler() { s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator, s.flushCh, s.segReferManager) - s.compactionHandler.start() } func (s *Server) stopCompactionHandler() { @@ -353,7 +360,6 @@ func (s *Server) stopCompactionHandler() { func (s *Server) createCompactionTrigger() { s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.segReferManager, s.indexCoord) - s.compactionTrigger.start() } func (s *Server) stopCompactionTrigger() { @@ -446,7 +452,7 @@ func (s *Server) initServiceDiscovery() error { return err } -func (s *Server) startSegmentManager() { +func (s *Server) initSegmentManager() { if s.segmentManager == nil { s.segmentManager = newSegmentManager(s.meta, s.allocator, s.rootCoordClient) } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 97fc106641..eb240cf46b 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -1337,7 +1337,7 @@ func TestDataNodeTtChannel(t *testing.T) { } } t.Run("Test segment flush after tt", func(t *testing.T) { - ch := make(chan interface{}, 1) + ch := make(chan any, 1) svr := newTestServer(t, ch) defer closeTestServer(t, svr) @@ -1407,7 +1407,7 @@ func TestDataNodeTtChannel(t *testing.T) { }) t.Run("flush segment with different channels", func(t *testing.T) { - ch := make(chan interface{}, 1) + ch := make(chan any, 1) svr := newTestServer(t, ch) defer closeTestServer(t, svr) svr.meta.AddCollection(&datapb.CollectionInfo{ @@ -1484,7 +1484,7 @@ func TestDataNodeTtChannel(t *testing.T) { }) t.Run("test expire allocation after receiving tt msg", func(t *testing.T) { - ch := make(chan interface{}, 1) + ch := make(chan any, 1) helper := ServerHelper{ eventAfterHandleDataNodeTt: func() { ch <- struct{}{} }, } @@ -2616,12 +2616,9 @@ func TestDataCoordServer_SetSegmentState(t *testing.T) { }) t.Run("dataCoord meta set state error", func(t *testing.T) { - svr := newTestServer(t, nil) - svr.meta.Lock() - func() { - defer svr.meta.Unlock() - svr.meta, _ = newMeta(context.TODO(), &mockTxnKVext{}, "") - }() + meta, err := newMeta(context.TODO(), &mockTxnKVext{}, "") + assert.NoError(t, err) + svr := newTestServerWithMeta(t, nil, meta) defer closeTestServer(t, svr) segment := &datapb.SegmentInfo{ ID: 1000, @@ -2961,7 +2958,7 @@ func (ms *MockClosePanicMsgstream) Chan() <-chan *msgstream.MsgPack { return args.Get(0).(chan *msgstream.MsgPack) } -func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server { +func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server { var err error Params.Init() Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int()) @@ -3000,6 +2997,47 @@ func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Se return svr } +func newTestServerWithMeta(t *testing.T, receiveCh chan any, meta *meta, opts ...Option) *Server { + var err error + Params.Init() + Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int()) + factory := dependency.NewDefaultFactory(true) + + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + assert.Nil(t, err) + sessKey := path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot) + _, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix()) + assert.Nil(t, err) + + svr := CreateServer(context.TODO(), factory, opts...) + svr.SetEtcdClient(etcdCli) + svr.dataNodeCreator = func(ctx context.Context, addr string) (types.DataNode, error) { + return newMockDataNodeClient(0, receiveCh) + } + svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoord, error) { + return newMockRootCoordService(), nil + } + indexCoord := mocks.NewMockIndexCoord(t) + indexCoord.EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(nil, nil).Maybe() + svr.indexCoord = indexCoord + + err = svr.Init() + assert.Nil(t, err) + svr.meta = meta + + err = svr.Start() + assert.Nil(t, err) + err = svr.Register() + assert.Nil(t, err) + + // Stop channal watch state watcher in tests + if svr.channelManager != nil && svr.channelManager.stopChecker != nil { + svr.channelManager.stopChecker() + } + + return svr +} + func closeTestServer(t *testing.T, svr *Server) { err := svr.Stop() assert.Nil(t, err) @@ -3007,7 +3045,7 @@ func closeTestServer(t *testing.T, svr *Server) { assert.Nil(t, err) } -func newTestServer2(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server { +func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server { var err error Params.Init() Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())