diff --git a/docs/developer_guides/chap04_message_stream.md b/docs/developer_guides/chap04_message_stream.md index 31fa4aa10d..85eda4b37e 100644 --- a/docs/developer_guides/chap04_message_stream.md +++ b/docs/developer_guides/chap04_message_stream.md @@ -195,14 +195,14 @@ type Channel struct { endOffset MessageID } -type ComsumerGroupContext struct { +type ConsumerGroupContext struct { currentOffset MessageID } // Every collection has its RocksMQ type RocksMQ struct { channels map[string]Channel - cgCtxs map[string]ComsumerGroupContext + cgCtxs map[string]ConsumerGroupContext mu sync.Mutex } diff --git a/internal/msgstream/rmq/rmq_msgstream.go b/internal/msgstream/rmq/rmq_msgstream.go new file mode 100644 index 0000000000..c0090f8737 --- /dev/null +++ b/internal/msgstream/rmq/rmq_msgstream.go @@ -0,0 +1,30 @@ +package rmqmsgstream + +import "github.com/zilliztech/milvus-distributed/internal/msgstream" + +type RmqMsgStream struct { +} + +func NewRmqMsgStream() *RmqMsgStream { + return nil +} + +func (ms *RmqMsgStream) Start() { + +} + +func (ms *RmqMsgStream) Close() { + +} + +func (ms *RmqMsgStream) Produce(pack *msgstream.MsgPack) error { + return nil +} + +func (ms *RmqMsgStream) Consume() *msgstream.MsgPack { + return nil +} + +func (ms *RmqMsgStream) Chan() <-chan *msgstream.MsgPack { + return nil +} diff --git a/internal/util/rocksmq/rocksmq.go b/internal/util/rocksmq/rocksmq.go new file mode 100644 index 0000000000..91a7a9f10f --- /dev/null +++ b/internal/util/rocksmq/rocksmq.go @@ -0,0 +1,130 @@ +package rocksmq + +import ( + "sync" + + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/kv" + memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" +) + +type UniqueID = typeutil.UniqueID + +type ProducerMessage struct { + payload []byte +} + +type ConsumerMessage struct { + msgID UniqueID + payload []byte +} + +type Channel struct { + beginOffset UniqueID + endOffset UniqueID +} + +type ConsumerGroupContext struct { + currentOffset UniqueID +} + +type RocksMQ struct { + kv kv.Base + channels map[string]*Channel + cgCtxs map[string]ConsumerGroupContext + mu sync.Mutex +} + +func NewRocksMQ() *RocksMQ { + mkv := memkv.NewMemoryKV() + rmq := &RocksMQ{ + kv: mkv, + } + return rmq +} + +func (rmq *RocksMQ) checkKeyExist(key string) bool { + _, err := rmq.kv.Load(key) + return err == nil +} + +func (rmq *RocksMQ) CreateChannel(channelName string) error { + beginKey := channelName + "/begin_id" + endKey := channelName + "/end_id" + + // Check if channel exist + if rmq.checkKeyExist(beginKey) || rmq.checkKeyExist(endKey) { + return errors.New("Channel " + channelName + " already exists.") + } + + err := rmq.kv.Save(beginKey, "0") + if err != nil { + return err + } + + err = rmq.kv.Save(endKey, "0") + if err != nil { + return err + } + + channel := &Channel{ + beginOffset: 0, + endOffset: 0, + } + rmq.channels[channelName] = channel + return nil +} + +func (rmq *RocksMQ) DestroyChannel(channelName string) error { + beginKey := channelName + "/begin_id" + endKey := channelName + "/end_id" + + err := rmq.kv.Remove(beginKey) + if err != nil { + return err + } + + err = rmq.kv.Remove(endKey) + if err != nil { + return err + } + + return nil +} + +func (rmq *RocksMQ) CreateConsumerGroup(groupName string, channelName string) error { + key := groupName + "/" + channelName + "/current_id" + if rmq.checkKeyExist(key) { + return errors.New("ConsumerGroup " + groupName + " already exists.") + } + err := rmq.kv.Save(key, "0") + if err != nil { + return err + } + + return nil +} + +func (rmq *RocksMQ) DestroyConsumerGroup(groupName string, channelName string) error { + key := groupName + "/" + channelName + "/current_id" + + err := rmq.kv.Remove(key) + if err != nil { + return err + } + + return nil +} + +func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) error { + return nil +} + +func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]ConsumerMessage, error) { + return nil, nil +} + +func (rmq *RocksMQ) Seek(groupName string, channelName string, msgID UniqueID) error { + return nil +}