diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index ed513dc778..7eb4c85931 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -492,7 +492,6 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) { log.Info("DataCoord creates the timetick channel consumer", zap.String("timeTickChannel", Params.CommonCfg.DataCoordTimeTick), zap.String("subscription", subName)) - ttMsgStream.Start() go s.handleDataNodeTimetickMsgstream(ctx, ttMsgStream) } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 32d850dbf2..f49a67c78c 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -1487,7 +1487,6 @@ func TestDataNodeTtChannel(t *testing.T) { ttMsgStream, err := svr.factory.NewMsgStream(context.TODO()) assert.Nil(t, err) ttMsgStream.AsProducer([]string{Params.CommonCfg.DataCoordTimeTick}) - ttMsgStream.Start() defer ttMsgStream.Close() info := &NodeInfo{ Address: "localhost:7777", @@ -1555,7 +1554,6 @@ func TestDataNodeTtChannel(t *testing.T) { ttMsgStream, err := svr.factory.NewMsgStream(context.TODO()) assert.Nil(t, err) ttMsgStream.AsProducer([]string{Params.CommonCfg.DataCoordTimeTick}) - ttMsgStream.Start() defer ttMsgStream.Close() info := &NodeInfo{ Address: "localhost:7777", @@ -1637,7 +1635,6 @@ func TestDataNodeTtChannel(t *testing.T) { ttMsgStream, err := svr.factory.NewMsgStream(context.TODO()) assert.Nil(t, err) ttMsgStream.AsProducer([]string{Params.CommonCfg.DataCoordTimeTick}) - ttMsgStream.Start() defer ttMsgStream.Close() node := &NodeInfo{ NodeID: 0, diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index b9e1e6eedf..1ea656a7b5 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -293,13 +293,12 @@ func TestDataNode(t *testing.T) { insertStream, err := factory.NewMsgStream(node1.ctx) assert.NoError(t, err) insertStream.AsProducer([]string{dmChannelName}) - insertStream.Start() defer insertStream.Close() - err = insertStream.Broadcast(&timeTickMsgPack) + _, err = insertStream.Broadcast(&timeTickMsgPack) assert.NoError(t, err) - err = insertStream.Broadcast(&timeTickMsgPack) + _, err = insertStream.Broadcast(&timeTickMsgPack) assert.NoError(t, err) }() diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 7507bdaee0..26afe37c18 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -304,17 +304,14 @@ func TestDataSyncService_Start(t *testing.T) { ddStream.AsProducer([]string{ddlChannelName}) var insertMsgStream msgstream.MsgStream = insertStream - insertMsgStream.Start() - var ddMsgStream msgstream.MsgStream = ddStream - ddMsgStream.Start() err = insertMsgStream.Produce(&msgPack) assert.NoError(t, err) - err = insertMsgStream.Broadcast(&timeTickMsgPack) + _, err = insertMsgStream.Broadcast(&timeTickMsgPack) assert.NoError(t, err) - err = ddMsgStream.Broadcast(&timeTickMsgPack) + _, err = ddMsgStream.Broadcast(&timeTickMsgPack) assert.NoError(t, err) // dataSync @@ -487,10 +484,6 @@ func TestGetChannelLatestMsgID(t *testing.T) { insertStream, _ := factory.NewMsgStream(ctx) insertStream.AsProducer([]string{dmlChannelName}) - - var insertMsgStream = insertStream - insertMsgStream.Start() - id, err := dsService.getChannelLatestMsgID(ctx, dmlChannelName, 0) assert.NoError(t, err) assert.NotNil(t, id) diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index d0a5ee6b37..9ee4d03258 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -369,7 +369,6 @@ func newDDNode(ctx context.Context, collID UniqueID, vChannelName string, droppe metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc() log.Info("datanode AsProducer", zap.String("DeltaChannelName", deltaChannelName)) var deltaMsgStream msgstream.MsgStream = deltaStream - deltaMsgStream.Start() dd := &ddNode{ ctx: ctx, diff --git a/internal/datanode/flow_graph_dmstream_input_node_test.go b/internal/datanode/flow_graph_dmstream_input_node_test.go index 4d1ad5c365..2b3570aeb3 100644 --- a/internal/datanode/flow_graph_dmstream_input_node_test.go +++ b/internal/datanode/flow_graph_dmstream_input_node_test.go @@ -65,7 +65,6 @@ func (mm *mockMsgStreamFactory) NewMsgStreamDisposer(ctx context.Context) func([ type mockTtMsgStream struct { } -func (mtm *mockTtMsgStream) Start() {} func (mtm *mockTtMsgStream) Close() {} func (mtm *mockTtMsgStream) Chan() <-chan *msgstream.MsgPack { return make(chan *msgstream.MsgPack, 100) @@ -75,9 +74,6 @@ func (mtm *mockTtMsgStream) AsProducer(channels []string) {} func (mtm *mockTtMsgStream) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) { } func (mtm *mockTtMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) {} -func (mtm *mockTtMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 { - return make([][]int32, 0) -} func (mtm *mockTtMsgStream) GetProduceChannels() []string { return make([]string, 0) @@ -85,14 +81,8 @@ func (mtm *mockTtMsgStream) GetProduceChannels() []string { func (mtm *mockTtMsgStream) Produce(*msgstream.MsgPack) error { return nil } -func (mtm *mockTtMsgStream) ProduceMark(*msgstream.MsgPack) (map[string][]msgstream.MessageID, error) { - return map[string][]msgstream.MessageID{}, nil -} -func (mtm *mockTtMsgStream) Broadcast(*msgstream.MsgPack) error { - return nil -} -func (mtm *mockTtMsgStream) BroadcastMark(*msgstream.MsgPack) (map[string][]msgstream.MessageID, error) { - return map[string][]msgstream.MessageID{}, nil +func (mtm *mockTtMsgStream) Broadcast(*msgstream.MsgPack) (map[string][]msgstream.MessageID, error) { + return nil, nil } func (mtm *mockTtMsgStream) Seek(offset []*internalpb.MsgPosition) error { return nil diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 0f1fef4de9..37cb653409 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -615,7 +615,6 @@ func newInsertBufferNode(ctx context.Context, collID UniqueID, flushCh <-chan fl metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc() log.Info("datanode AsProducer", zap.String("TimeTickChannelName", Params.CommonCfg.DataCoordTimeTick)) var wTtMsgStream msgstream.MsgStream = wTt - wTtMsgStream.Start() mt := newMergedTimeTickerSender(func(ts Timestamp, segmentIDs []int64) error { stats := make([]*datapb.SegmentStats, 0, len(segmentIDs)) diff --git a/internal/mq/msgstream/mock_msgstream.go b/internal/mq/msgstream/mock_msgstream.go index e4d7f0a2f8..1e86fce52d 100644 --- a/internal/mq/msgstream/mock_msgstream.go +++ b/internal/mq/msgstream/mock_msgstream.go @@ -15,10 +15,6 @@ func (m MockMsgStream) AsProducer(channels []string) { m.AsProducerFunc(channels) } -func (m MockMsgStream) BroadcastMark(pack *MsgPack) (map[string][]MessageID, error) { +func (m MockMsgStream) Broadcast(pack *MsgPack) (map[string][]MessageID, error) { return m.BroadcastMarkFunc(pack) } - -func (m MockMsgStream) Broadcast(pack *MsgPack) error { - return m.BroadcastFunc(pack) -} diff --git a/internal/mq/msgstream/mq_kafka_msgstream_test.go b/internal/mq/msgstream/mq_kafka_msgstream_test.go index caff88b29e..1493aab51c 100644 --- a/internal/mq/msgstream/mq_kafka_msgstream_test.go +++ b/internal/mq/msgstream/mq_kafka_msgstream_test.go @@ -150,7 +150,6 @@ func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) { err = outputStream2.Seek([]*internalpb.MsgPosition{seekPosition}) assert.Nil(t, err) - outputStream2.Start() cnt := 0 var value int64 = 6 @@ -223,21 +222,21 @@ func TestStream_KafkaTtMsgStream_Seek(t *testing.T) { inputStream := getKafkaInputStream(ctx, kafkaAddress, producerChannels) outputStream := getKafkaTtOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName) - err := inputStream.Broadcast(&msgPack0) + _, err := inputStream.Broadcast(&msgPack0) assert.Nil(t, err) err = inputStream.Produce(&msgPack1) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack2) + _, err = inputStream.Broadcast(&msgPack2) assert.Nil(t, err) err = inputStream.Produce(&msgPack3) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack4) + _, err = inputStream.Broadcast(&msgPack4) assert.Nil(t, err) err = inputStream.Produce(&msgPack5) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack6) + _, err = inputStream.Broadcast(&msgPack6) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack7) + _, err = inputStream.Broadcast(&msgPack7) assert.Nil(t, err) receivedMsg := consumer(ctx, outputStream) @@ -413,7 +412,6 @@ func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) { kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress) outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionLatest) - outputStream.Start() var wg sync.WaitGroup wg.Add(1) @@ -460,7 +458,6 @@ func getKafkaInputStream(ctx context.Context, kafkaAddress string, producerChann for _, opt := range opts { inputStream.SetRepackFunc(opt) } - inputStream.Start() return inputStream } @@ -469,7 +466,6 @@ func getKafkaOutputStream(ctx context.Context, kafkaAddress string, consumerChan kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress) outputStream, _ := NewMqMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName, position) - outputStream.Start() return outputStream } @@ -478,7 +474,6 @@ func getKafkaTtOutputStream(ctx context.Context, kafkaAddress string, consumerCh kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress) outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest) - outputStream.Start() return outputStream } @@ -492,6 +487,5 @@ func getKafkaTtOutputStreamAndSeek(ctx context.Context, kafkaAddress string, pos } outputStream.AsConsumer(consumerName, funcutil.RandomString(8), mqwrapper.SubscriptionPositionUnknown) outputStream.Seek(positions) - outputStream.Start() return outputStream } diff --git a/internal/mq/msgstream/mq_msgstream.go b/internal/mq/msgstream/mq_msgstream.go index 225404a6e8..77fbca2834 100644 --- a/internal/mq/msgstream/mq_msgstream.go +++ b/internal/mq/msgstream/mq_msgstream.go @@ -175,9 +175,6 @@ func (ms *mqMsgStream) SetRepackFunc(repackFunc RepackFunc) { ms.repackFunc = repackFunc } -func (ms *mqMsgStream) Start() { -} - func (ms *mqMsgStream) Close() { log.Info("start to close mq msg stream", zap.Int("producer num", len(ms.producers)), @@ -205,7 +202,7 @@ func (ms *mqMsgStream) Close() { } -func (ms *mqMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 { +func (ms *mqMsgStream) computeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 { if len(tsMsgs) <= 0 { return nil } @@ -239,7 +236,7 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error { return errors.New("nil producer in msg stream") } tsMsgs := msgPack.Msgs - reBucketValues := ms.ComputeProduceChannelIndexes(msgPack.Msgs) + reBucketValues := ms.computeProduceChannelIndexes(msgPack.Msgs) var result map[int32]*MsgPack var err error if ms.repackFunc != nil { @@ -294,119 +291,9 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error { return nil } -// ProduceMark send msg pack to all producers and returns corresponding msg id -// the returned message id serves as marking -func (ms *mqMsgStream) ProduceMark(msgPack *MsgPack) (map[string][]MessageID, error) { - ids := make(map[string][]MessageID) - if msgPack == nil || len(msgPack.Msgs) <= 0 { - return ids, errors.New("empty msgs") - } - if len(ms.producers) <= 0 { - return ids, errors.New("nil producer in msg stream") - } - tsMsgs := msgPack.Msgs - reBucketValues := ms.ComputeProduceChannelIndexes(msgPack.Msgs) - var result map[int32]*MsgPack - var err error - if ms.repackFunc != nil { - result, err = ms.repackFunc(tsMsgs, reBucketValues) - } else { - msgType := (tsMsgs[0]).Type() - switch msgType { - case commonpb.MsgType_Insert: - result, err = InsertRepackFunc(tsMsgs, reBucketValues) - case commonpb.MsgType_Delete: - result, err = DeleteRepackFunc(tsMsgs, reBucketValues) - default: - result, err = DefaultRepackFunc(tsMsgs, reBucketValues) - } - } - if err != nil { - return ids, err - } - for k, v := range result { - channel := ms.producerChannels[k] - for i, tsMsg := range v.Msgs { - sp, spanCtx := MsgSpanFromCtx(v.Msgs[i].TraceCtx(), tsMsg) - - mb, err := tsMsg.Marshal(tsMsg) - if err != nil { - return ids, err - } - - m, err := convertToByteArray(mb) - if err != nil { - return ids, err - } - - msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}} - - trace.InjectContextToMsgProperties(sp.Context(), msg.Properties) - - ms.producerLock.Lock() - id, err := ms.producers[channel].Send( - spanCtx, - msg, - ) - if err != nil { - ms.producerLock.Unlock() - trace.LogError(sp, err) - sp.Finish() - return ids, err - } - ids[channel] = append(ids[channel], id) - sp.Finish() - ms.producerLock.Unlock() - } - } - return ids, nil -} - -// Broadcast put msgPack to all producer in current msgstream +// Broadcast put msgPack to all producer in current msgstream and returned message id // which ignores repackFunc logic -func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) error { - if msgPack == nil || len(msgPack.Msgs) <= 0 { - log.Debug("Warning: Receive empty msgPack") - return nil - } - for _, v := range msgPack.Msgs { - sp, spanCtx := MsgSpanFromCtx(v.TraceCtx(), v) - - mb, err := v.Marshal(v) - if err != nil { - return err - } - - m, err := convertToByteArray(mb) - if err != nil { - return err - } - - msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}} - - trace.InjectContextToMsgProperties(sp.Context(), msg.Properties) - - ms.producerLock.Lock() - for _, producer := range ms.producers { - if _, err := producer.Send( - spanCtx, - msg, - ); err != nil { - ms.producerLock.Unlock() - trace.LogError(sp, err) - sp.Finish() - return err - } - } - ms.producerLock.Unlock() - sp.Finish() - } - return nil -} - -// BroadcastMark broadcast msg pack to all producers and returns corresponding msg id -// the returned message id serves as marking -func (ms *mqMsgStream) BroadcastMark(msgPack *MsgPack) (map[string][]MessageID, error) { +func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) (map[string][]MessageID, error) { ids := make(map[string][]MessageID) if msgPack == nil || len(msgPack.Msgs) <= 0 { return ids, errors.New("empty msgs") @@ -653,9 +540,6 @@ func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string, position } } -// Start will start a goroutine which keep carrying msg from pulsar/rocksmq to golang chan -func (ms *MqTtMsgStream) Start() {} - // Close will stop goroutine and free internal producers and consumers func (ms *MqTtMsgStream) Close() { close(ms.syncConsumer) diff --git a/internal/mq/msgstream/mq_msgstream_test.go b/internal/mq/msgstream/mq_msgstream_test.go index 177231a58d..f8436165ce 100644 --- a/internal/mq/msgstream/mq_msgstream_test.go +++ b/internal/mq/msgstream/mq_msgstream_test.go @@ -202,7 +202,7 @@ func TestMqMsgStream_ComputeProduceChannelIndexes(t *testing.T) { assert.Nil(t, err) // empty parameters - reBucketValues := m.ComputeProduceChannelIndexes([]TsMsg{}) + reBucketValues := m.computeProduceChannelIndexes([]TsMsg{}) assert.Nil(t, reBucketValues) // not called AsProducer yet @@ -228,7 +228,7 @@ func TestMqMsgStream_ComputeProduceChannelIndexes(t *testing.T) { RowData: []*commonpb.Blob{}, }, } - reBucketValues = m.ComputeProduceChannelIndexes([]TsMsg{insertMsg}) + reBucketValues = m.computeProduceChannelIndexes([]TsMsg{insertMsg}) assert.Nil(t, reBucketValues) }(parameters[i].client) } @@ -312,8 +312,8 @@ func TestMqMsgStream_Broadcast(t *testing.T) { assert.Nil(t, err) // Broadcast nil pointer - err = m.Broadcast(nil) - assert.Nil(t, err) + _, err = m.Broadcast(nil) + assert.NotNil(t, err) }(parameters[i].client) } } @@ -481,7 +481,7 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) { inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - err := inputStream.Broadcast(&msgPack) + _, err := inputStream.Broadcast(&msgPack) require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) receiveMsg(ctx, outputStream, len(consumerChannels)*len(msgPack.Msgs)) @@ -552,12 +552,10 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) { pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) inputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) - inputStream.Start() pulsarClient2, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest) - outputStream.Start() var output MsgStream = outputStream err := (*inputStream).Produce(&msgPack) @@ -607,12 +605,10 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) { pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) inputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) - inputStream.Start() pulsarClient2, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest) - outputStream.Start() var output MsgStream = outputStream err := (*inputStream).Produce(&msgPack) @@ -641,12 +637,10 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) { pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) inputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) - inputStream.Start() pulsarClient2, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest) - outputStream.Start() var output MsgStream = outputStream err := (*inputStream).Produce(&msgPack) @@ -677,13 +671,13 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) { inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) outputStream := getPulsarTtOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - err := inputStream.Broadcast(&msgPack0) + _, err := inputStream.Broadcast(&msgPack0) require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) err = inputStream.Produce(&msgPack1) require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) - err = inputStream.Broadcast(&msgPack2) + _, err = inputStream.Broadcast(&msgPack2) require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) receiveMsg(ctx, outputStream, len(msgPack1.Msgs)) @@ -722,17 +716,17 @@ func TestStream_PulsarTtMsgStream_NoSeek(t *testing.T) { inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) outputStream := getPulsarTtOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - err := inputStream.Broadcast(&msgPack0) + _, err := inputStream.Broadcast(&msgPack0) assert.Nil(t, err) err = inputStream.Produce(&msgPack1) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack2) + _, err = inputStream.Broadcast(&msgPack2) assert.Nil(t, err) err = inputStream.Produce(&msgPack3) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack4) + _, err = inputStream.Broadcast(&msgPack4) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack5) + _, err = inputStream.Broadcast(&msgPack5) assert.Nil(t, err) o1 := consumer(ctx, outputStream) @@ -802,7 +796,6 @@ func TestStream_PulsarMsgStream_SeekToLast(t *testing.T) { err = outputStream2.Seek([]*internalpb.MsgPosition{seekPosition}) assert.Nil(t, err) - outputStream2.Start() cnt := 0 var value int64 = 6 @@ -874,21 +867,21 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) { inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) outputStream := getPulsarTtOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - err := inputStream.Broadcast(&msgPack0) + _, err := inputStream.Broadcast(&msgPack0) assert.Nil(t, err) err = inputStream.Produce(&msgPack1) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack2) + _, err = inputStream.Broadcast(&msgPack2) assert.Nil(t, err) err = inputStream.Produce(&msgPack3) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack4) + _, err = inputStream.Broadcast(&msgPack4) assert.Nil(t, err) err = inputStream.Produce(&msgPack5) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack6) + _, err = inputStream.Broadcast(&msgPack6) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack7) + _, err = inputStream.Broadcast(&msgPack7) assert.Nil(t, err) receivedMsg := consumer(ctx, outputStream) @@ -968,13 +961,13 @@ func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) { inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) outputStream := getPulsarTtOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - err := inputStream.Broadcast(&msgPack0) + _, err := inputStream.Broadcast(&msgPack0) require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) err = inputStream.Produce(&msgPack1) require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) - err = inputStream.Broadcast(&msgPack2) + _, err = inputStream.Broadcast(&msgPack2) require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) receiveMsg(ctx, outputStream, len(msgPack1.Msgs)) @@ -1025,7 +1018,7 @@ func sendMsgPacks(ms MsgStream, msgPacks []*MsgPack) error { } } else { // tt msg use Broadcast - if err := ms.Broadcast(msgPacks[i]); err != nil { + if _, err := ms.Broadcast(msgPacks[i]); err != nil { return err } } @@ -1033,23 +1026,22 @@ func sendMsgPacks(ms MsgStream, msgPacks []*MsgPack) error { return nil } -// // This testcase will generate MsgPacks as following: // -// Insert Insert Insert Insert Insert Insert -// c1 |----------|----------|----------|----------|----------|----------| -// ^ ^ ^ ^ ^ ^ -// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100) +// Insert Insert Insert Insert Insert Insert +// c1 |----------|----------|----------|----------|----------|----------| +// ^ ^ ^ ^ ^ ^ +// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100) +// +// Insert Insert Insert Insert Insert Insert +// c2 |----------|----------|----------|----------|----------|----------| +// ^ ^ ^ ^ ^ ^ +// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100) // -// Insert Insert Insert Insert Insert Insert -// c2 |----------|----------|----------|----------|----------|----------| -// ^ ^ ^ ^ ^ ^ -// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100) // Then check: -// 1. For each msg in MsgPack received by ttMsgStream consumer, there should be -// msgPack.BeginTs < msg.BeginTs() <= msgPack.EndTs -// 2. The count of consumed msg should be equal to the count of produced msg -// +// 1. For each msg in MsgPack received by ttMsgStream consumer, there should be +// msgPack.BeginTs < msg.BeginTs() <= msgPack.EndTs +// 2. The count of consumed msg should be equal to the count of produced msg func TestStream_PulsarTtMsgStream_1(t *testing.T) { pulsarAddr := getPulsarAddress() c1 := funcutil.RandomString(8) @@ -1097,22 +1089,25 @@ func TestStream_PulsarTtMsgStream_1(t *testing.T) { outputStream.Close() } -// // This testcase will generate MsgPacks as following: // -// Insert Insert Insert Insert Insert Insert +// Insert Insert Insert Insert Insert Insert +// // c1 |----------|----------|----------|----------|----------|----------| -// ^ ^ ^ ^ ^ ^ -// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100) // -// Insert Insert Insert Insert Insert Insert +// ^ ^ ^ ^ ^ ^ +// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100) +// +// Insert Insert Insert Insert Insert Insert +// // c2 |----------|----------|----------|----------|----------|----------| -// ^ ^ ^ ^ ^ ^ -// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100) -// Then check: -// 1. ttMsgStream consumer can seek to the right position and resume -// 2. The count of consumed msg should be equal to the count of produced msg // +// ^ ^ ^ ^ ^ ^ +// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100) +// +// Then check: +// 1. ttMsgStream consumer can seek to the right position and resume +// 2. The count of consumed msg should be equal to the count of produced msg func TestStream_PulsarTtMsgStream_2(t *testing.T) { pulsarAddr := getPulsarAddress() c1 := funcutil.RandomString(8) @@ -1203,7 +1198,6 @@ func TestStream_MqMsgStream_Seek(t *testing.T) { outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) outputStream2.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest) outputStream2.Seek([]*internalpb.MsgPosition{seekPosition}) - outputStream2.Start() for i := 6; i < 10; i++ { result := consumer(ctx, outputStream2) @@ -1261,7 +1255,6 @@ func TestStream_MqMsgStream_SeekInvalidMessage(t *testing.T) { err = outputStream2.Seek(p) assert.Nil(t, err) - outputStream2.Start() for i := 10; i < 20; i++ { insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i)) @@ -1318,7 +1311,6 @@ func TestStream_RMqMsgStream_SeekInvalidMessage(t *testing.T) { err = outputStream2.Seek(p) assert.Nil(t, err) - outputStream2.Start() for i := 10; i < 20; i++ { insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i)) @@ -1356,7 +1348,6 @@ func TestStream_MqMsgStream_SeekLatest(t *testing.T) { pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) outputStream2.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionLatest) - outputStream2.Start() msgPack.Msgs = nil // produce another 10 tsMs @@ -1423,13 +1414,11 @@ func initRmqStream(ctx context.Context, for _, opt := range opts { inputStream.SetRepackFunc(opt) } - inputStream.Start() var input MsgStream = inputStream rmqClient2, _ := rmq.NewClientWithDefaultOptions() outputStream, _ := NewMqMsgStream(ctx, 100, 100, rmqClient2, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerGroupName, mqwrapper.SubscriptionPositionEarliest) - outputStream.Start() var output MsgStream = outputStream return input, output @@ -1448,13 +1437,11 @@ func initRmqTtStream(ctx context.Context, for _, opt := range opts { inputStream.SetRepackFunc(opt) } - inputStream.Start() var input MsgStream = inputStream rmqClient2, _ := rmq.NewClientWithDefaultOptions() outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, rmqClient2, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerGroupName, mqwrapper.SubscriptionPositionEarliest) - outputStream.Start() var output MsgStream = outputStream return input, output @@ -1500,13 +1487,13 @@ func TestStream_RmqTtMsgStream_Insert(t *testing.T) { ctx := context.Background() inputStream, outputStream := initRmqTtStream(ctx, producerChannels, consumerChannels, consumerSubName) - err := inputStream.Broadcast(&msgPack0) + _, err := inputStream.Broadcast(&msgPack0) require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) err = inputStream.Produce(&msgPack1) require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) - err = inputStream.Broadcast(&msgPack2) + _, err = inputStream.Broadcast(&msgPack2) require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) receiveMsg(ctx, outputStream, len(msgPack1.Msgs)) @@ -1541,13 +1528,13 @@ func TestStream_RmqTtMsgStream_DuplicatedIDs(t *testing.T) { ctx := context.Background() inputStream, outputStream := initRmqTtStream(ctx, producerChannels, consumerChannels, consumerSubName) - err := inputStream.Broadcast(&msgPack0) + _, err := inputStream.Broadcast(&msgPack0) assert.Nil(t, err) err = inputStream.Produce(&msgPack1) assert.Nil(t, err) err = inputStream.Produce(&msgPack2) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack3) + _, err = inputStream.Broadcast(&msgPack3) assert.Nil(t, err) receivedMsg := consumer(ctx, outputStream) @@ -1564,7 +1551,6 @@ func TestStream_RmqTtMsgStream_DuplicatedIDs(t *testing.T) { consumerSubName = funcutil.RandomString(8) outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionUnknown) outputStream.Seek(receivedMsg.StartPositions) - outputStream.Start() seekMsg := consumer(ctx, outputStream) assert.Equal(t, len(seekMsg.Msgs), 1+2) assert.EqualValues(t, seekMsg.Msgs[0].BeginTs(), 1) @@ -1614,21 +1600,21 @@ func TestStream_RmqTtMsgStream_Seek(t *testing.T) { ctx := context.Background() inputStream, outputStream := initRmqTtStream(ctx, producerChannels, consumerChannels, consumerSubName) - err := inputStream.Broadcast(&msgPack0) + _, err := inputStream.Broadcast(&msgPack0) assert.Nil(t, err) err = inputStream.Produce(&msgPack1) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack2) + _, err = inputStream.Broadcast(&msgPack2) assert.Nil(t, err) err = inputStream.Produce(&msgPack3) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack4) + _, err = inputStream.Broadcast(&msgPack4) assert.Nil(t, err) err = inputStream.Produce(&msgPack5) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack6) + _, err = inputStream.Broadcast(&msgPack6) assert.Nil(t, err) - err = inputStream.Broadcast(&msgPack7) + _, err = inputStream.Broadcast(&msgPack7) assert.Nil(t, err) receivedMsg := consumer(ctx, outputStream) @@ -1670,7 +1656,6 @@ func TestStream_RmqTtMsgStream_Seek(t *testing.T) { outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionUnknown) outputStream.Seek(receivedMsg3.StartPositions) - outputStream.Start() seekMsg := consumer(ctx, outputStream) assert.Equal(t, len(seekMsg.Msgs), 3) result := []uint64{14, 12, 13} @@ -1701,12 +1686,11 @@ func TestStream_BroadcastMark(t *testing.T) { // add producer channels outputStream.AsProducer(producerChannels) - outputStream.Start() msgPack0 := MsgPack{} msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) - ids, err := outputStream.BroadcastMark(&msgPack0) + ids, err := outputStream.Broadcast(&msgPack0) assert.Nil(t, err) assert.NotNil(t, ids) assert.Equal(t, len(producerChannels), len(ids)) @@ -1720,7 +1704,7 @@ func TestStream_BroadcastMark(t *testing.T) { msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) - ids, err = outputStream.BroadcastMark(&msgPack1) + ids, err = outputStream.Broadcast(&msgPack1) assert.Nil(t, err) assert.NotNil(t, ids) assert.Equal(t, len(producerChannels), len(ids)) @@ -1731,86 +1715,19 @@ func TestStream_BroadcastMark(t *testing.T) { } // edge cases - _, err = outputStream.BroadcastMark(nil) + _, err = outputStream.Broadcast(nil) assert.NotNil(t, err) msgPack2 := MsgPack{} msgPack2.Msgs = append(msgPack2.Msgs, &MarshalFailTsMsg{}) - _, err = outputStream.BroadcastMark(&msgPack2) + _, err = outputStream.Broadcast(&msgPack2) assert.NotNil(t, err) // mock send fail for k, p := range outputStream.producers { outputStream.producers[k] = &mockSendFailProducer{Producer: p} } - _, err = outputStream.BroadcastMark(&msgPack1) - assert.NotNil(t, err) - - outputStream.Close() -} - -func TestStream_ProduceMark(t *testing.T) { - pulsarAddress := getPulsarAddress() - c1 := funcutil.RandomString(8) - c2 := funcutil.RandomString(8) - producerChannels := []string{c1, c2} - - factory := ProtoUDFactory{} - pulsarClient, err := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - assert.Nil(t, err) - outputStream, err := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) - assert.Nil(t, err) - - // add producer channels - outputStream.AsProducer(producerChannels) - outputStream.Start() - - msgPack0 := MsgPack{} - msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) - - ids, err := outputStream.ProduceMark(&msgPack0) - assert.Nil(t, err) - assert.NotNil(t, ids) - assert.Equal(t, len(msgPack0.Msgs), len(ids)) - for _, c := range producerChannels { - if id, ok := ids[c]; ok { - assert.Equal(t, len(msgPack0.Msgs), len(id)) - } - } - - msgPack1 := MsgPack{} - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 2)) - - ids, err = outputStream.ProduceMark(&msgPack1) - assert.Nil(t, err) - assert.NotNil(t, ids) - assert.Equal(t, len(producerChannels), len(ids)) - for _, c := range producerChannels { - ids, ok := ids[c] - assert.True(t, ok) - assert.Equal(t, 1, len(ids)) - } - - // edge cases - _, err = outputStream.ProduceMark(nil) - assert.NotNil(t, err) - - msgPack2 := MsgPack{} - msgPack2.Msgs = append(msgPack2.Msgs, &MarshalFailTsMsg{BaseMsg: BaseMsg{HashValues: []uint32{1}}}) - _, err = outputStream.ProduceMark(&msgPack2) - assert.NotNil(t, err) - - // mock send fail - for k, p := range outputStream.producers { - outputStream.producers[k] = &mockSendFailProducer{Producer: p} - } - _, err = outputStream.ProduceMark(&msgPack1) - assert.NotNil(t, err) - - // mock producers is nil - outputStream.producers = nil - _, err = outputStream.ProduceMark(&msgPack1) + _, err = outputStream.Broadcast(&msgPack1) assert.NotNil(t, err) outputStream.Close() @@ -2054,7 +1971,6 @@ func getPulsarInputStream(ctx context.Context, pulsarAddress string, producerCha for _, opt := range opts { inputStream.SetRepackFunc(opt) } - inputStream.Start() return inputStream } @@ -2063,7 +1979,6 @@ func getPulsarOutputStream(ctx context.Context, pulsarAddress string, consumerCh pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest) - outputStream.Start() return outputStream } @@ -2072,7 +1987,6 @@ func getPulsarTtOutputStream(ctx context.Context, pulsarAddress string, consumer pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest) - outputStream.Start() return outputStream } @@ -2086,7 +2000,6 @@ func getPulsarTtOutputStreamAndSeek(ctx context.Context, pulsarAddress string, p } outputStream.AsConsumer(consumerName, funcutil.RandomString(8), mqwrapper.SubscriptionPositionUnknown) outputStream.Seek(positions) - outputStream.Start() return outputStream } @@ -2140,12 +2053,10 @@ func TestStream_RmqTtMsgStream_AsConsumerWithPosition(t *testing.T) { otherInputStream, _ := NewMqMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher()) otherInputStream.AsProducer([]string{"root_timetick"}) - otherInputStream.Start() otherInputStream.Produce(getTimeTickMsgPack(999)) inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) - inputStream.Start() for i := 0; i < 100; i++ { inputStream.Produce(getTimeTickMsgPack(int64(i))) @@ -2154,7 +2065,6 @@ func TestStream_RmqTtMsgStream_AsConsumerWithPosition(t *testing.T) { rmqClient2, _ := rmq.NewClientWithDefaultOptions() outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, rmqClient2, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionLatest) - outputStream.Start() inputStream.Produce(getTimeTickMsgPack(1000)) pack := <-outputStream.Chan() diff --git a/internal/mq/msgstream/msgstream.go b/internal/mq/msgstream/msgstream.go index ac5838b8db..7906f49046 100644 --- a/internal/mq/msgstream/msgstream.go +++ b/internal/mq/msgstream/msgstream.go @@ -53,17 +53,13 @@ type RepackFunc func(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, erro // MsgStream is an interface that can be used to produce and consume message on message queue type MsgStream interface { - Start() Close() AsProducer(channels []string) Produce(*MsgPack) error SetRepackFunc(repackFunc RepackFunc) - ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 GetProduceChannels() []string - ProduceMark(*MsgPack) (map[string][]MessageID, error) - Broadcast(*MsgPack) error - BroadcastMark(*MsgPack) (map[string][]MessageID, error) + Broadcast(*MsgPack) (map[string][]MessageID, error) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) Chan() <-chan *MsgPack diff --git a/internal/proxy/mock_test.go b/internal/proxy/mock_test.go index b8c9fb5074..0e1c0e7a02 100644 --- a/internal/proxy/mock_test.go +++ b/internal/proxy/mock_test.go @@ -236,9 +236,6 @@ type simpleMockMsgStream struct { msgCountMtx sync.RWMutex } -func (ms *simpleMockMsgStream) Start() { -} - func (ms *simpleMockMsgStream) Close() { } @@ -259,26 +256,6 @@ func (ms *simpleMockMsgStream) AsProducer(channels []string) { func (ms *simpleMockMsgStream) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) { } -func (ms *simpleMockMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 { - if len(tsMsgs) <= 0 { - return nil - } - reBucketValues := make([][]int32, len(tsMsgs)) - channelNum := uint32(1) - if channelNum == 0 { - return nil - } - for idx, tsMsg := range tsMsgs { - hashValues := tsMsg.HashKeys() - bucketValues := make([]int32, len(hashValues)) - for index, hashValue := range hashValues { - bucketValues[index] = int32(hashValue % channelNum) - } - reBucketValues[idx] = bucketValues - } - return reBucketValues -} - func (ms *simpleMockMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) { } @@ -308,18 +285,7 @@ func (ms *simpleMockMsgStream) Produce(pack *msgstream.MsgPack) error { return nil } -func (ms *simpleMockMsgStream) ProduceMark(pack *msgstream.MsgPack) (map[string][]msgstream.MessageID, error) { - defer ms.increaseMsgCount(1) - ms.msgChan <- pack - - return map[string][]msgstream.MessageID{}, nil -} - -func (ms *simpleMockMsgStream) Broadcast(pack *msgstream.MsgPack) error { - return nil -} - -func (ms *simpleMockMsgStream) BroadcastMark(pack *msgstream.MsgPack) (map[string][]msgstream.MessageID, error) { +func (ms *simpleMockMsgStream) Broadcast(pack *msgstream.MsgPack) (map[string][]msgstream.MessageID, error) { return map[string][]msgstream.MessageID{}, nil } diff --git a/internal/querynode/load_segment_task_test.go b/internal/querynode/load_segment_task_test.go index 5e03100b5f..165e60a24a 100644 --- a/internal/querynode/load_segment_task_test.go +++ b/internal/querynode/load_segment_task_test.go @@ -279,7 +279,7 @@ func TestTask_loadSegmentsTask(t *testing.T) { }, } - pos1, err := stream.ProduceMark(&msgstream.MsgPack{Msgs: []msgstream.TsMsg{timeTickMsg}}) + pos1, err := stream.Broadcast(&msgstream.MsgPack{Msgs: []msgstream.TsMsg{timeTickMsg}}) assert.NoError(t, err) msgIDs, ok := pos1[pDmChannel] assert.True(t, ok) diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 0d577b4a5f..cce4320348 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -751,7 +751,6 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection if err != nil { return err } - stream.Start() delData := &deleteData{ deleteIDs: make(map[UniqueID][]primaryKey), diff --git a/internal/querynode/segment_loader_test.go b/internal/querynode/segment_loader_test.go index 512f7851d5..3e86771dad 100644 --- a/internal/querynode/segment_loader_test.go +++ b/internal/querynode/segment_loader_test.go @@ -934,8 +934,6 @@ func (ms *LoadDeleteMsgStream) GetLatestMsgID(channel string) (msgstream.Message return msg.(msgstream.MessageID), err.(error) } -func (ms *LoadDeleteMsgStream) Start() {} - type getCollectionByIDFunc func(collectionID UniqueID) (*Collection, error) type mockReplicaInterface struct { diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index 8634207e8c..9801107261 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -231,7 +231,7 @@ func (d *dmlChannels) broadcast(chanNames []string, pack *msgstream.MsgPack) err dms.mutex.RLock() if dms.refcnt > 0 { - if err := dms.ms.Broadcast(pack); err != nil { + if _, err := dms.ms.Broadcast(pack); err != nil { log.Error("Broadcast failed", zap.Error(err), zap.String("chanName", chanName)) dms.mutex.RUnlock() return err @@ -254,7 +254,7 @@ func (d *dmlChannels) broadcastMark(chanNames []string, pack *msgstream.MsgPack) dms.mutex.RLock() if dms.refcnt > 0 { - ids, err := dms.ms.BroadcastMark(pack) + ids, err := dms.ms.Broadcast(pack) if err != nil { log.Error("BroadcastMark failed", zap.Error(err), zap.String("chanName", chanName)) dms.mutex.RUnlock() diff --git a/internal/rootcoord/dml_channels_test.go b/internal/rootcoord/dml_channels_test.go index 82ea8be254..045c519fca 100644 --- a/internal/rootcoord/dml_channels_test.go +++ b/internal/rootcoord/dml_channels_test.go @@ -209,29 +209,18 @@ type FailMsgStream struct { errBroadcast bool } -func (ms *FailMsgStream) Start() {} func (ms *FailMsgStream) Close() {} func (ms *FailMsgStream) Chan() <-chan *msgstream.MsgPack { return nil } func (ms *FailMsgStream) AsProducer(channels []string) {} func (ms *FailMsgStream) AsReader(channels []string, subName string) {} func (ms *FailMsgStream) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) { } -func (ms *FailMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) {} -func (ms *FailMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 { return nil } -func (ms *FailMsgStream) GetProduceChannels() []string { return nil } -func (ms *FailMsgStream) Produce(*msgstream.MsgPack) error { return nil } -func (ms *FailMsgStream) ProduceMark(*msgstream.MsgPack) (map[string][]msgstream.MessageID, error) { - return nil, nil -} -func (ms *FailMsgStream) Broadcast(*msgstream.MsgPack) error { +func (ms *FailMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) {} +func (ms *FailMsgStream) GetProduceChannels() []string { return nil } +func (ms *FailMsgStream) Produce(*msgstream.MsgPack) error { return nil } +func (ms *FailMsgStream) Broadcast(*msgstream.MsgPack) (map[string][]msgstream.MessageID, error) { if ms.errBroadcast { - return errors.New("broadcast error") - } - return nil -} -func (ms *FailMsgStream) BroadcastMark(*msgstream.MsgPack) (map[string][]msgstream.MessageID, error) { - if ms.errBroadcast { - return nil, errors.New("broadcastMark error") + return nil, errors.New("broadcast error") } return nil, nil } diff --git a/internal/util/flowgraph/input_node.go b/internal/util/flowgraph/input_node.go index e62a331110..a56149062d 100644 --- a/internal/util/flowgraph/input_node.go +++ b/internal/util/flowgraph/input_node.go @@ -52,7 +52,6 @@ func (inNode *InputNode) IsInputNode() bool { // Start is used to start input msgstream func (inNode *InputNode) Start() { - inNode.inStream.Start() } // Close implements node diff --git a/internal/util/flowgraph/input_node_test.go b/internal/util/flowgraph/input_node_test.go index 0f1d93ce3e..3c537393ae 100644 --- a/internal/util/flowgraph/input_node_test.go +++ b/internal/util/flowgraph/input_node_test.go @@ -33,7 +33,6 @@ func TestInputNode(t *testing.T) { msgStream, _ := factory.NewMsgStream(context.TODO()) channels := []string{"cc"} msgStream.AsConsumer(channels, "sub", mqwrapper.SubscriptionPositionEarliest) - msgStream.Start() msgPack := generateMsgPack() produceStream, _ := factory.NewMsgStream(context.TODO())