diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index c04fce54b8..d6e2cfd298 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -243,15 +243,12 @@ func TestMasterService(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - msFactory := msgstream.NewPmsFactory() + coreFactory := msgstream.NewPmsFactory() Params.Init() - core, err := NewCore(ctx, msFactory) + core, err := NewCore(ctx, coreFactory) assert.Nil(t, err) randVal := rand.Int() - err = core.Register() - assert.Nil(t, err) - Params.TimeTickChannel = fmt.Sprintf("master-time-tick-%d", randVal) Params.DdChannel = fmt.Sprintf("master-dd-%d", randVal) Params.StatisticsChannel = fmt.Sprintf("master-statistics-%d", randVal) @@ -259,6 +256,9 @@ func TestMasterService(t *testing.T) { Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath) Params.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal) + err = core.Register() + assert.Nil(t, err) + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second}) assert.Nil(t, err) sessKey := path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot) @@ -306,6 +306,43 @@ func TestMasterService(t *testing.T) { err = core.SetQueryService(qm) assert.Nil(t, err) + tmpFactory := msgstream.NewPmsFactory() + + m := map[string]interface{}{ + "pulsarAddress": Params.PulsarAddress, + "receiveBufSize": 1024, + "pulsarBufSize": 1024} + err = tmpFactory.SetParams(m) + assert.Nil(t, err) + + dataServiceSegmentStream, _ := tmpFactory.NewMsgStream(ctx) + dataServiceSegmentStream.AsProducer([]string{Params.DataServiceSegmentChannel}) + + timeTickStream, _ := tmpFactory.NewMsgStream(ctx) + timeTickStream.AsConsumer([]string{Params.TimeTickChannel}, Params.MsgChannelSubName) + timeTickStream.Start() + + ddStream, _ := tmpFactory.NewMsgStream(ctx) + ddStream.AsConsumer([]string{Params.DdChannel}, Params.MsgChannelSubName) + ddStream.Start() + + // test dataServiceSegmentStream seek + dataNodeSubName := Params.MsgChannelSubName + "dn" + flushedSegStream, _ := tmpFactory.NewMsgStream(ctx) + flushedSegStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, dataNodeSubName) + flushedSegStream.Start() + msgPack := GenFlushedSegMsgPack(9999) + err = dataServiceSegmentStream.Produce(msgPack) + assert.Nil(t, err) + + flushedSegMsgPack := flushedSegStream.Consume() + flushedSegStream.Close() + + flushedSegPosStr, _ := EncodeMsgPositions(flushedSegMsgPack.EndPositions) + + _, err = etcdCli.Put(ctx, path.Join(Params.MetaRootPath, FlushedSegMsgEndPosPrefix), flushedSegPosStr) + assert.Nil(t, err) + err = core.Init() assert.Nil(t, err) @@ -321,43 +358,6 @@ func TestMasterService(t *testing.T) { err = core.Start() assert.Nil(t, err) - m := map[string]interface{}{ - "pulsarAddress": Params.PulsarAddress, - "receiveBufSize": 1024, - "pulsarBufSize": 1024} - err = msFactory.SetParams(m) - assert.Nil(t, err) - - dataServiceSegmentStream, _ := msFactory.NewMsgStream(ctx) - dataServiceSegmentStream.AsProducer([]string{Params.DataServiceSegmentChannel}) - - timeTickStream, _ := msFactory.NewMsgStream(ctx) - timeTickStream.AsConsumer([]string{Params.TimeTickChannel}, Params.MsgChannelSubName) - timeTickStream.Start() - - ddStream, _ := msFactory.NewMsgStream(ctx) - ddStream.AsConsumer([]string{Params.DdChannel}, Params.MsgChannelSubName) - ddStream.Start() - - // test dataServiceSegmentStream seek - dataNodeSubName := Params.MsgChannelSubName + "dn" - flushedSegStream, _ := msFactory.NewMsgStream(ctx) - flushedSegStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, dataNodeSubName) - flushedSegStream.Start() - msgPack := GenFlushedSegMsgPack(9999) - err = dataServiceSegmentStream.Produce(msgPack) - assert.Nil(t, err) - flushedSegMsgPack := flushedSegStream.Consume() - flushedSegPosStr, _ := EncodeMsgPositions(flushedSegMsgPack.EndPositions) - _, err = etcdCli.Put(ctx, path.Join(Params.MetaRootPath, FlushedSegMsgEndPosPrefix), flushedSegPosStr) - assert.Nil(t, err) - - err = core.Init() - assert.Nil(t, err) - - err = core.Start() - assert.Nil(t, err) - time.Sleep(time.Second) getNotTtMsg := func(n int, ch <-chan *msgstream.MsgPack) []msgstream.TsMsg { @@ -1733,9 +1733,6 @@ func TestMasterService2(t *testing.T) { assert.Nil(t, err) randVal := rand.Int() - err = core.Register() - assert.Nil(t, err) - Params.TimeTickChannel = fmt.Sprintf("master-time-tick-%d", randVal) Params.DdChannel = fmt.Sprintf("master-dd-%d", randVal) Params.StatisticsChannel = fmt.Sprintf("master-statistics-%d", randVal) @@ -1743,6 +1740,9 @@ func TestMasterService2(t *testing.T) { Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath) Params.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal) + err = core.Register() + assert.Nil(t, err) + dm := &dataMock{randVal: randVal} err = core.SetDataService(ctx, dm) assert.Nil(t, err) diff --git a/internal/util/mqclient/pulsar_client.go b/internal/util/mqclient/pulsar_client.go index 0e9234c62d..bed6e112c1 100644 --- a/internal/util/mqclient/pulsar_client.go +++ b/internal/util/mqclient/pulsar_client.go @@ -58,7 +58,7 @@ func (pc *pulsarClient) Subscribe(options ConsumerOptions) (Consumer, error) { if err != nil { return nil, err } - msgChannel := make(chan ConsumerMessage, 1) + msgChannel := make(chan ConsumerMessage) pConsumer := &pulsarConsumer{c: consumer, msgChannel: msgChannel} go func() { @@ -67,6 +67,7 @@ func (pc *pulsarClient) Subscribe(options ConsumerOptions) (Consumer, error) { case msg, ok := <-pConsumer.c.Chan(): if !ok { close(msgChannel) + log.Debug("pulsar consumer channel closed") return } msgChannel <- &pulsarMessage{msg: msg}