diff --git a/docs/developer_guides/developer_guides.md b/docs/developer_guides/developer_guides.md index df1bb3719e..6f6354dcea 100644 --- a/docs/developer_guides/developer_guides.md +++ b/docs/developer_guides/developer_guides.md @@ -442,17 +442,17 @@ type TsMsg interface { Ts() Timestamp } -type TsMsgMarshaler interface { - Marshal(input *TsMsg) ([]byte, Status) - Unmarshal(input []byte) (*TsMsg, Status) -} - type MsgPack struct { BeginTs Timestamp EndTs Timestamp Msgs []*TsMsg } +type TsMsgMarshaler interface { + Marshal(input *TsMsg) ([]byte, Status) + Unmarshal(input []byte) (*TsMsg, Status) +} + type MsgStream interface { SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) Produce(*MsgPack) Status @@ -461,14 +461,17 @@ type MsgStream interface { type PulsarMsgStream struct { client *pulsar.Client - produceChannels []string - consumeChannels []string - + msgHashFunc (*MsgPack) map[int32]*MsgPack // return a map from produceChannel idx to *MsgPack + producers []*pulsar.Producer + consumers []*pulsar.Consumer msgMarshaler *TsMsgMarshaler msgUnmarshaler *TsMsgMarshaler } +func (ms *PulsarMsgStream) SetProducerChannels(channels []string) +func (ms *PulsarMsgStream) SetConsumerChannels(channels []string) func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) +func (ms *PulsarMsgStream) SetMsgHashFunc(XXX) func (ms *PulsarMsgStream) Produce(*MsgPack) Status func (ms *PulsarMsgStream) Consume() *MsgPack //return messages in one time tick