From eb413f1396f57672ced482025f4f802cde5f645c Mon Sep 17 00:00:00 2001 From: Xiangyu Wang Date: Thu, 23 Sep 2021 21:57:55 +0800 Subject: [PATCH] Add comments for exposed structures and functions (#8435) Signed-off-by: Xiangyu Wang --- internal/msgstream/mq_factory.go | 12 ++++++++++++ internal/msgstream/mq_msgstream.go | 5 +++++ internal/msgstream/msgstream.go | 3 +++ 3 files changed, 20 insertions(+) diff --git a/internal/msgstream/mq_factory.go b/internal/msgstream/mq_factory.go index e60bf80ad8..f4f6099dd6 100644 --- a/internal/msgstream/mq_factory.go +++ b/internal/msgstream/mq_factory.go @@ -22,6 +22,7 @@ import ( rocksmqserver "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq" ) +// PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go) type PmsFactory struct { dispatcherFactory ProtoUDFactory // the following members must be public, so that mapstructure.Decode() can access them @@ -30,6 +31,7 @@ type PmsFactory struct { PulsarBufSize int64 } +// SetParams is used to set parameters for PmsFactory func (f *PmsFactory) SetParams(params map[string]interface{}) error { err := mapstructure.Decode(params, f) if err != nil { @@ -38,6 +40,7 @@ func (f *PmsFactory) SetParams(params map[string]interface{}) error { return nil } +// NewMsgStream is used to generate a new Msgstream object func (f *PmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) { pulsarClient, err := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: f.PulsarAddress}) if err != nil { @@ -46,6 +49,7 @@ func (f *PmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) { return NewMqMsgStream(ctx, f.ReceiveBufSize, f.PulsarBufSize, pulsarClient, f.dispatcherFactory.NewUnmarshalDispatcher()) } +// NewTtMsgStream is used to generate a new TtMsgstream object func (f *PmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) { pulsarClient, err := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: f.PulsarAddress}) if err != nil { @@ -54,10 +58,12 @@ func (f *PmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) { return NewMqTtMsgStream(ctx, f.ReceiveBufSize, f.PulsarBufSize, pulsarClient, f.dispatcherFactory.NewUnmarshalDispatcher()) } +// NewQueryMsgStream is used to generate a new QueryMsgstream object func (f *PmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) { return f.NewMsgStream(ctx) } +// NewPmsFactory is used to generate a new PmsFactory object func NewPmsFactory() Factory { f := &PmsFactory{ dispatcherFactory: ProtoUDFactory{}, @@ -67,6 +73,7 @@ func NewPmsFactory() Factory { return f } +// RmsFactory is a rocksmq msgstream factory that implemented Factory interface(msgstream.go) type RmsFactory struct { dispatcherFactory ProtoUDFactory // the following members must be public, so that mapstructure.Decode() can access them @@ -74,6 +81,7 @@ type RmsFactory struct { RmqBufSize int64 } +// SetParams is used to set parameters for RmsFactory func (f *RmsFactory) SetParams(params map[string]interface{}) error { err := mapstructure.Decode(params, f) if err != nil { @@ -82,6 +90,7 @@ func (f *RmsFactory) SetParams(params map[string]interface{}) error { return nil } +// NewMsgStream is used to generate a new Msgstream object func (f *RmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) { rmqClient, err := mqclient.NewRmqClient(rocksmq.ClientOptions{Server: rocksmqserver.Rmq}) if err != nil { @@ -90,6 +99,7 @@ func (f *RmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) { return NewMqMsgStream(ctx, f.ReceiveBufSize, f.RmqBufSize, rmqClient, f.dispatcherFactory.NewUnmarshalDispatcher()) } +// NewTtMsgStream is used to generate a new TtMsgstream object func (f *RmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) { rmqClient, err := mqclient.NewRmqClient(rocksmq.ClientOptions{Server: rocksmqserver.Rmq}) if err != nil { @@ -98,6 +108,7 @@ func (f *RmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) { return NewMqTtMsgStream(ctx, f.ReceiveBufSize, f.RmqBufSize, rmqClient, f.dispatcherFactory.NewUnmarshalDispatcher()) } +// NewQueryMsgStream is used to generate a new QueryMsgstream object func (f *RmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) { rmqClient, err := mqclient.NewRmqClient(rocksmq.ClientOptions{Server: rocksmqserver.Rmq}) if err != nil { @@ -106,6 +117,7 @@ func (f *RmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) { return NewMqMsgStream(ctx, f.ReceiveBufSize, f.RmqBufSize, rmqClient, f.dispatcherFactory.NewUnmarshalDispatcher()) } +// NewRmsFactory is used to generate a new RmsFactory object func NewRmsFactory() Factory { f := &RmsFactory{ dispatcherFactory: ProtoUDFactory{}, diff --git a/internal/msgstream/mq_msgstream.go b/internal/msgstream/mq_msgstream.go index 7e18052e43..0db0105e90 100644 --- a/internal/msgstream/mq_msgstream.go +++ b/internal/msgstream/mq_msgstream.go @@ -49,6 +49,7 @@ type mqMsgStream struct { consumerLock *sync.Mutex } +// NewMqMsgStream is used to generate a new mqMsgStream object func NewMqMsgStream(ctx context.Context, receiveBufSize int64, bufSize int64, @@ -417,6 +418,7 @@ func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { return nil } +// MqTtMsgStream is a msgstream that contains timeticks type MqTtMsgStream struct { mqMsgStream chanMsgBuf map[mqclient.Consumer][]TsMsg @@ -430,6 +432,7 @@ type MqTtMsgStream struct { syncConsumer chan int } +// NewMqTtMsgStream is used to generate a new MqTtMsgStream object func NewMqTtMsgStream(ctx context.Context, receiveBufSize int64, bufSize int64, @@ -509,6 +512,7 @@ func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string) { } } +// Start will start a goroutine which keep carrying msg from pulsar/rocksmq to golang chan func (ms *MqTtMsgStream) Start() { if ms.consumers != nil { ms.wait.Add(1) @@ -516,6 +520,7 @@ func (ms *MqTtMsgStream) Start() { } } +// Close will stop goroutine and free internal producers and consumers func (ms *MqTtMsgStream) Close() { ms.streamCancel() close(ms.syncConsumer) diff --git a/internal/msgstream/msgstream.go b/internal/msgstream/msgstream.go index 035ba68f46..cc8a9b5f94 100644 --- a/internal/msgstream/msgstream.go +++ b/internal/msgstream/msgstream.go @@ -23,6 +23,7 @@ type Timestamp = typeutil.Timestamp type IntPrimaryKey = typeutil.IntPrimaryKey type MsgPosition = internalpb.MsgPosition +// MsgPack represents a batch of msg in msgstream type MsgPack struct { BeginTs Timestamp EndTs Timestamp @@ -33,6 +34,7 @@ type MsgPack struct { type RepackFunc func(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) +// MsgStream is an interface that can be used to produce and consume message on message queue type MsgStream interface { Start() Close() @@ -48,6 +50,7 @@ type MsgStream interface { Seek(offset []*MsgPosition) error } +// Factory is an interface that can be used to generate a new msgstream object type Factory interface { SetParams(params map[string]interface{}) error NewMsgStream(ctx context.Context) (MsgStream, error)