diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 1606c4e801..ca8dde78a4 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -38,9 +38,9 @@ import ( "github.com/milvus-io/milvus/internal/util/trace" ) -func newMsgFactory(localMsg bool, rocksmqPath string) msgstream.Factory { +func newMsgFactory(localMsg bool) msgstream.Factory { if localMsg { - return msgstream.NewRmsFactory(rocksmqPath) + return msgstream.NewRmsFactory() } return msgstream.NewPmsFactory() } @@ -87,7 +87,7 @@ func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool) *compone defer log.Sync() } - factory := newMsgFactory(localMsg, rootcoord.Params.RocksmqPath) + factory := newMsgFactory(localMsg) var err error rc, err = components.NewRootCoord(ctx, factory) if err != nil { @@ -116,7 +116,7 @@ func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, alias string defer log.Sync() } - factory := newMsgFactory(localMsg, proxy.Params.RocksmqPath) + factory := newMsgFactory(localMsg) var err error pn, err = components.NewProxy(ctx, factory) if err != nil { @@ -144,9 +144,7 @@ func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool) *compon defer log.Sync() } - // FIXME(yukun): newMsgFactory requires parameter rocksmqPath, but won't be used here - // so hardcode the path to /tmp/invalid_milvus_rdb - factory := newMsgFactory(localMsg, "/tmp/invalid_milvus_rdb") + factory := newMsgFactory(localMsg) var err error qs, err = components.NewQueryCoord(ctx, factory) if err != nil { @@ -175,7 +173,7 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, alias st defer log.Sync() } - factory := newMsgFactory(localMsg, querynode.Params.RocksmqPath) + factory := newMsgFactory(localMsg) var err error qn, err = components.NewQueryNode(ctx, factory) if err != nil { @@ -203,7 +201,7 @@ func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool) *compone defer log.Sync() } - factory := newMsgFactory(localMsg, datacoord.Params.RocksmqPath) + factory := newMsgFactory(localMsg) var err error ds, err = components.NewDataCoord(ctx, factory) if err != nil { @@ -232,7 +230,7 @@ func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, alias str defer log.Sync() } - factory := newMsgFactory(localMsg, datanode.Params.RocksmqPath) + factory := newMsgFactory(localMsg) var err error dn, err = components.NewDataNode(ctx, factory) if err != nil { diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 092d5dda90..c85c4e1217 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -36,6 +36,8 @@ pulsar: rocksmq: path: /var/lib/milvus/rdb_data + retentionTimeInMinutes: 4320 + retentionSizeInMB: 0 rootCoord: address: localhost diff --git a/docs/developer_guides/chap05_proxy.md b/docs/developer_guides/chap05_proxy.md index 1c6a19271d..762420cacb 100644 --- a/docs/developer_guides/chap05_proxy.md +++ b/docs/developer_guides/chap05_proxy.md @@ -382,6 +382,9 @@ type GlobalParamsTable struct { MasterAddress string PulsarAddress string RocksmqPath string + + RocksmqRetentionTimeInMinutes int64 + RocksmqRetentionSizeInMB int64 ProxyID UniqueID TimeTickInterval time.Duration diff --git a/internal/kv/rocksdb/rocksdb_kv.go b/internal/kv/rocksdb/rocksdb_kv.go index d2baf7dde0..0bec4ba438 100644 --- a/internal/kv/rocksdb/rocksdb_kv.go +++ b/internal/kv/rocksdb/rocksdb_kv.go @@ -16,10 +16,10 @@ import ( ) type RocksdbKV struct { - opts *gorocksdb.Options - db *gorocksdb.DB - writeOptions *gorocksdb.WriteOptions - readOptions *gorocksdb.ReadOptions + Opts *gorocksdb.Options + DB *gorocksdb.DB + WriteOptions *gorocksdb.WriteOptions + ReadOptions *gorocksdb.ReadOptions name string } @@ -39,16 +39,16 @@ func NewRocksdbKV(name string) (*RocksdbKV, error) { return nil, err } return &RocksdbKV{ - opts: opts, - db: db, - writeOptions: wo, - readOptions: ro, + Opts: opts, + DB: db, + WriteOptions: wo, + ReadOptions: ro, name: name, }, nil } func (kv *RocksdbKV) Close() { - kv.db.Close() + kv.DB.Close() } func (kv *RocksdbKV) GetName() string { @@ -56,22 +56,22 @@ func (kv *RocksdbKV) GetName() string { } func (kv *RocksdbKV) Load(key string) (string, error) { - value, err := kv.db.Get(kv.readOptions, []byte(key)) + value, err := kv.DB.Get(kv.ReadOptions, []byte(key)) defer value.Free() return string(value.Data()), err } func (kv *RocksdbKV) LoadWithPrefix(key string) ([]string, []string, error) { - kv.readOptions.SetPrefixSameAsStart(true) - kv.db.Close() - kv.opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(len(key))) + kv.ReadOptions.SetPrefixSameAsStart(true) + kv.DB.Close() + kv.Opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(len(key))) var err error - kv.db, err = gorocksdb.OpenDb(kv.opts, kv.GetName()) + kv.DB, err = gorocksdb.OpenDb(kv.Opts, kv.GetName()) if err != nil { return nil, nil, err } - iter := kv.db.NewIterator(kv.readOptions) + iter := kv.DB.NewIterator(kv.ReadOptions) defer iter.Close() keys := make([]string, 0) values := make([]string, 0) @@ -93,7 +93,7 @@ func (kv *RocksdbKV) LoadWithPrefix(key string) ([]string, []string, error) { func (kv *RocksdbKV) MultiLoad(keys []string) ([]string, error) { values := make([]string, 0, len(keys)) for _, key := range keys { - value, err := kv.db.Get(kv.readOptions, []byte(key)) + value, err := kv.DB.Get(kv.ReadOptions, []byte(key)) if err != nil { return []string{}, err } @@ -103,7 +103,7 @@ func (kv *RocksdbKV) MultiLoad(keys []string) ([]string, error) { } func (kv *RocksdbKV) Save(key, value string) error { - err := kv.db.Put(kv.writeOptions, []byte(key), []byte(value)) + err := kv.DB.Put(kv.WriteOptions, []byte(key), []byte(value)) return err } @@ -113,26 +113,26 @@ func (kv *RocksdbKV) MultiSave(kvs map[string]string) error { for k, v := range kvs { writeBatch.Put([]byte(k), []byte(v)) } - err := kv.db.Write(kv.writeOptions, writeBatch) + err := kv.DB.Write(kv.WriteOptions, writeBatch) return err } func (kv *RocksdbKV) RemoveWithPrefix(prefix string) error { - kv.readOptions.SetPrefixSameAsStart(true) - kv.db.Close() - kv.opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(len(prefix))) + kv.ReadOptions.SetPrefixSameAsStart(true) + kv.DB.Close() + kv.Opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(len(prefix))) var err error - kv.db, err = gorocksdb.OpenDb(kv.opts, kv.GetName()) + kv.DB, err = gorocksdb.OpenDb(kv.Opts, kv.GetName()) if err != nil { return err } - iter := kv.db.NewIterator(kv.readOptions) + iter := kv.DB.NewIterator(kv.ReadOptions) defer iter.Close() iter.Seek([]byte(prefix)) for ; iter.Valid(); iter.Next() { key := iter.Key() - err := kv.db.Delete(kv.writeOptions, key.Data()) + err := kv.DB.Delete(kv.WriteOptions, key.Data()) if err != nil { return nil } @@ -144,7 +144,7 @@ func (kv *RocksdbKV) RemoveWithPrefix(prefix string) error { } func (kv *RocksdbKV) Remove(key string) error { - err := kv.db.Delete(kv.writeOptions, []byte(key)) + err := kv.DB.Delete(kv.WriteOptions, []byte(key)) return err } @@ -154,7 +154,7 @@ func (kv *RocksdbKV) MultiRemove(keys []string) error { for _, key := range keys { writeBatch.Delete([]byte(key)) } - err := kv.db.Write(kv.writeOptions, writeBatch) + err := kv.DB.Write(kv.WriteOptions, writeBatch) return err } @@ -167,13 +167,29 @@ func (kv *RocksdbKV) MultiSaveAndRemove(saves map[string]string, removals []stri for _, key := range removals { writeBatch.Delete([]byte(key)) } - err := kv.db.Write(kv.writeOptions, writeBatch) + err := kv.DB.Write(kv.WriteOptions, writeBatch) + return err +} + +func (kv *RocksdbKV) DeleteRange(startKey, endKey string) error { + writeBatch := gorocksdb.NewWriteBatch() + defer writeBatch.Clear() + if len(startKey) == 0 { + iter := kv.DB.NewIterator(kv.ReadOptions) + defer iter.Close() + iter.SeekToFirst() + startKey = string(iter.Key().Data()) + } + + writeBatch.DeleteRange([]byte(startKey), []byte(endKey)) + err := kv.DB.Write(kv.WriteOptions, writeBatch) return err } func (kv *RocksdbKV) MultiRemoveWithPrefix(keys []string) error { panic("not implement") } + func (kv *RocksdbKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { panic("not implement") } diff --git a/internal/kv/rocksdb/rocksdb_kv_test.go b/internal/kv/rocksdb/rocksdb_kv_test.go index f72b69f267..89c91207a0 100644 --- a/internal/kv/rocksdb/rocksdb_kv_test.go +++ b/internal/kv/rocksdb/rocksdb_kv_test.go @@ -12,6 +12,8 @@ package rocksdbkv_test import ( + "strconv" + "sync" "testing" rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" @@ -103,3 +105,28 @@ func TestRocksdbKV_Prefix(t *testing.T) { assert.Nil(t, err) assert.Equal(t, val, "1234555") } + +func TestRocksdbKV_Goroutines(t *testing.T) { + name := "/tmp/rocksdb" + rocksdbkv, err := rocksdbkv.NewRocksdbKV(name) + assert.Nil(t, err) + defer rocksdbkv.Close() + defer rocksdbkv.RemoveWithPrefix("") + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + key := "key_" + strconv.Itoa(i) + val := "val_" + strconv.Itoa(i) + err := rocksdbkv.Save(key, val) + assert.Nil(t, err) + + getVal, err := rocksdbkv.Load(key) + assert.Nil(t, err) + assert.Equal(t, getVal, val) + }(i) + } + wg.Wait() +} diff --git a/internal/msgstream/mq_factory.go b/internal/msgstream/mq_factory.go index f1d22c6953..0c33bb0b70 100644 --- a/internal/msgstream/mq_factory.go +++ b/internal/msgstream/mq_factory.go @@ -15,7 +15,6 @@ import ( "context" "github.com/apache/pulsar-client-go/pulsar" - "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/mqclient" "github.com/milvus-io/milvus/internal/util/rocksmq/client/rocksmq" rocksmqserver "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq" @@ -106,14 +105,13 @@ func (f *RmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) { return NewMqMsgStream(ctx, f.ReceiveBufSize, f.RmqBufSize, rmqClient, f.dispatcherFactory.NewUnmarshalDispatcher()) } -func NewRmsFactory(rocksmqPath string) Factory { +func NewRmsFactory() Factory { f := &RmsFactory{ dispatcherFactory: ProtoUDFactory{}, ReceiveBufSize: 1024, RmqBufSize: 1024, } - log.Debug("RocksmqPath=" + rocksmqPath) - rocksmqserver.InitRocksMQ(rocksmqPath) + rocksmqserver.InitRocksMQ() return f } diff --git a/internal/msgstream/mq_msgstream_test.go b/internal/msgstream/mq_msgstream_test.go index 1272927734..03ae5813b7 100644 --- a/internal/msgstream/mq_msgstream_test.go +++ b/internal/msgstream/mq_msgstream_test.go @@ -1043,10 +1043,12 @@ func initRmq(name string) *etcdkv.EtcdKV { } func Close(rocksdbName string, intputStream, outputStream MsgStream, etcdKV *etcdkv.EtcdKV) { + rocksmq.CloseRocksMQ() intputStream.Close() outputStream.Close() etcdKV.Close() err := os.RemoveAll(rocksdbName) + _ = os.RemoveAll(rocksdbName + "_meta_kv") log.Println(err) } diff --git a/internal/rootcoord/param_table.go b/internal/rootcoord/param_table.go index 10a9db8537..12537e437d 100644 --- a/internal/rootcoord/param_table.go +++ b/internal/rootcoord/param_table.go @@ -29,14 +29,16 @@ type ParamTable struct { Address string Port int - PulsarAddress string - RocksmqPath string - EtcdEndpoints []string - MetaRootPath string - KvRootPath string - MsgChannelSubName string - TimeTickChannel string - StatisticsChannel string + PulsarAddress string + RocksmqPath string + RocksmqRetentionSizeInMinutes int64 + RocksmqRetentionSizeInMB int64 + EtcdEndpoints []string + MetaRootPath string + KvRootPath string + MsgChannelSubName string + TimeTickChannel string + StatisticsChannel string MaxPartitionNum int64 DefaultPartitionName string @@ -99,6 +101,10 @@ func (p *ParamTable) initRocksmqPath() { p.RocksmqPath = path } +func (p *ParamTable) initRocksmqRetentionTimeInMinutes() { + p.RocksmqRetentionSizeInMinutes = p.ParseInt64("rootcoord.RocksmqRetentionSizeInMinutes") +} + func (p *ParamTable) initEtcdEndpoints() { endpoints, err := p.Load("_EtcdEndpoints") if err != nil { diff --git a/internal/util/mqclient/rmq_client_test.go b/internal/util/mqclient/rmq_client_test.go index 4eff116435..a8c357012a 100644 --- a/internal/util/mqclient/rmq_client_test.go +++ b/internal/util/mqclient/rmq_client_test.go @@ -27,8 +27,8 @@ var Params paramtable.BaseTable func TestMain(m *testing.M) { Params.Init() - rocksdbName := "/tmp/rocksdb_mqclient" - _ = rocksmq1.InitRocksMQ(rocksdbName) + os.Setenv("ROCKSMQ_PATH", "/tmp/milvus/rdb_data") + _ = rocksmq1.InitRocksMQ() exitCode := m.Run() defer rocksmq1.CloseRocksMQ() os.Exit(exitCode) diff --git a/internal/util/mqclient/rmq_producer.go b/internal/util/mqclient/rmq_producer.go index 17e8d9aeab..09cb3cfc1b 100644 --- a/internal/util/mqclient/rmq_producer.go +++ b/internal/util/mqclient/rmq_producer.go @@ -31,4 +31,5 @@ func (rp *rmqProducer) Send(ctx context.Context, message *ProducerMessage) error } func (rp *rmqProducer) Close() { + } diff --git a/internal/util/rocksmq/client/rocksmq/client_impl.go b/internal/util/rocksmq/client/rocksmq/client_impl.go index 5e73c3e8f2..772d8ed293 100644 --- a/internal/util/rocksmq/client/rocksmq/client_impl.go +++ b/internal/util/rocksmq/client/rocksmq/client_impl.go @@ -147,7 +147,7 @@ func (c *client) Close() { log.Debug("Close" + opt.Topic + "+" + opt.SubscriptionName) _ = c.server.DestroyConsumerGroup(opt.Topic, opt.SubscriptionName) //TODO(yukun): Should topic be closed? - //_ = c.server.DestroyTopic(opt.Topic) + _ = c.server.DestroyTopic(opt.Topic) } c.cancel() } diff --git a/internal/util/rocksmq/client/rocksmq/consumer.go b/internal/util/rocksmq/client/rocksmq/consumer.go index 304f759d31..3c661a2083 100644 --- a/internal/util/rocksmq/client/rocksmq/consumer.go +++ b/internal/util/rocksmq/client/rocksmq/consumer.go @@ -63,4 +63,7 @@ type Consumer interface { // Seek to the uniqueID position Seek(UniqueID) error //nolint:govet + + // Close consumer + Close() } diff --git a/internal/util/rocksmq/client/rocksmq/consumer_impl.go b/internal/util/rocksmq/client/rocksmq/consumer_impl.go index 2554f38f6f..bb872d04d5 100644 --- a/internal/util/rocksmq/client/rocksmq/consumer_impl.go +++ b/internal/util/rocksmq/client/rocksmq/consumer_impl.go @@ -11,6 +11,11 @@ package rocksmq +import ( + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + type consumer struct { topic string client *client @@ -101,3 +106,10 @@ func (c *consumer) Seek(id UniqueID) error { //nolint:govet c.client.server.Notify(c.topic, c.consumerName) return nil } + +func (c *consumer) Close() { + err := c.client.server.DestroyConsumerGroup(c.topic, c.consumerName) + if err != nil { + log.Debug("Consumer close failed", zap.Any("topicName", c.topic), zap.Any("groupName", c.consumerName), zap.Any("error", err)) + } +} diff --git a/internal/util/rocksmq/client/rocksmq/producer.go b/internal/util/rocksmq/client/rocksmq/producer.go index 35be82defa..ab150d8bb1 100644 --- a/internal/util/rocksmq/client/rocksmq/producer.go +++ b/internal/util/rocksmq/client/rocksmq/producer.go @@ -25,4 +25,7 @@ type Producer interface { // publish a message Send(message *ProducerMessage) error + + // Close a producer + Close() } diff --git a/internal/util/rocksmq/client/rocksmq/producer_impl.go b/internal/util/rocksmq/client/rocksmq/producer_impl.go index 32f5aa1c4a..3476e67b54 100644 --- a/internal/util/rocksmq/client/rocksmq/producer_impl.go +++ b/internal/util/rocksmq/client/rocksmq/producer_impl.go @@ -12,7 +12,9 @@ package rocksmq import ( + "github.com/milvus-io/milvus/internal/log" server "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq" + "go.uber.org/zap" ) type producer struct { @@ -47,3 +49,10 @@ func (p *producer) Send(message *ProducerMessage) error { }, }) } + +func (p *producer) Close() { + err := p.c.server.DestroyTopic(p.topic) + if err != nil { + log.Debug("Producer close failed", zap.Any("topicName", p.topic), zap.Any("error", err)) + } +} diff --git a/internal/util/rocksmq/server/rocksmq/global_rmq.go b/internal/util/rocksmq/server/rocksmq/global_rmq.go index 3350230897..d4cd47297a 100644 --- a/internal/util/rocksmq/server/rocksmq/global_rmq.go +++ b/internal/util/rocksmq/server/rocksmq/global_rmq.go @@ -16,12 +16,16 @@ import ( "sync" "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/paramtable" + "go.uber.org/zap" rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" ) var Rmq *rocksmq var once sync.Once +var params paramtable.BaseTable func InitRmq(rocksdbName string, idAllocator allocator.GIDAllocator) error { var err error @@ -29,10 +33,13 @@ func InitRmq(rocksdbName string, idAllocator allocator.GIDAllocator) error { return err } -func InitRocksMQ(rocksdbName string) error { +func InitRocksMQ() error { var err error once.Do(func() { - _, err := os.Stat(rocksdbName) + params.Init() + rocksdbName, _ := params.Load("_RocksmqPath") + log.Debug("RocksmqPath=" + rocksdbName) + _, err = os.Stat(rocksdbName) if os.IsNotExist(err) { err = os.MkdirAll(rocksdbName, os.ModePerm) if err != nil { @@ -49,6 +56,9 @@ func InitRocksMQ(rocksdbName string) error { idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV) _ = idAllocator.Initialize() + RocksmqRetentionTimeInMinutes = params.ParseInt64("rocksmq.retentionTimeInMinutes") + RocksmqRetentionSizeInMB = params.ParseInt64("rocksmq.retentionSizeInMB") + log.Debug("Rocksmq retention: ", zap.Any("RocksmqRetentionTimeInMinutes", RocksmqRetentionTimeInMinutes), zap.Any("RocksmqRetentionSizeInMB", RocksmqRetentionSizeInMB)) Rmq, err = NewRocksMQ(rocksdbName, idAllocator) if err != nil { panic(err) @@ -58,7 +68,11 @@ func InitRocksMQ(rocksdbName string) error { } func CloseRocksMQ() { - if Rmq != nil && Rmq.store != nil { - Rmq.store.Close() + log.Debug("Close Rocksmq!") + if Rmq != nil { + Rmq.stopRetention() + if Rmq.store != nil { + Rmq.store.Close() + } } } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index e1a1305739..3170cb77eb 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -16,6 +16,7 @@ import ( "fmt" "strconv" "sync" + "time" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/kv" @@ -23,7 +24,7 @@ import ( "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/tecbot/gorocksdb" - memkv "github.com/milvus-io/milvus/internal/kv/mem" + rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" ) type UniqueID = typeutil.UniqueID @@ -32,6 +33,17 @@ const ( DefaultMessageID = "-1" FixedChannelNameLen = 320 RocksDBLRUCacheCapacity = 3 << 30 + RocksmqPageSize = 2 << 30 + + kvSuffix = "_meta_kv" + + MessageSizeTitle = "message_size/" + PageMsgSizeTitle = "page_message_size/" + TopicBeginIDTitle = "topic_begin_id/" + BeginIDTitle = "begin_id/" + AckedTsTitle = "acked_ts/" + AckedSizeTitle = "acked_size/" + LastRetTsTitle = "last_retention_ts/" ) /** @@ -63,13 +75,35 @@ func combKey(channelName string, id UniqueID) (string, error) { return fixName + "/" + strconv.FormatInt(id, 10), nil } +/** + * Construct table name and fixed channel name to be a key with length of FixedChannelNameLen, + * used for meta infos + */ +func constructKey(metaName, topic string) (string, error) { + // Check metaName/topic + oldLen := len(metaName + topic) + if oldLen > FixedChannelNameLen { + return "", errors.New("Topic name exceeds limit") + } + + nameBytes := make([]byte, FixedChannelNameLen-oldLen) + + for i := 0; i < len(nameBytes); i++ { + nameBytes[i] = byte('*') + } + return metaName + topic + string(nameBytes), nil +} + +var topicMu sync.Map = sync.Map{} + type rocksmq struct { store *gorocksdb.DB kv kv.BaseKV idAllocator allocator.GIDAllocator - channelMu sync.Map + consumers sync.Map + ackedMu sync.Map - consumers sync.Map + retentionInfo *retentionInfo } func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, error) { @@ -85,18 +119,38 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro return nil, err } - mkv := memkv.NewMemoryKV() + kvName := name + kvSuffix + kv, err := rocksdbkv.NewRocksdbKV(kvName) + if err != nil { + return nil, err + } rmq := &rocksmq{ store: db, - kv: mkv, + kv: kv, idAllocator: idAllocator, + consumers: sync.Map{}, + ackedMu: sync.Map{}, } - rmq.channelMu = sync.Map{} - rmq.consumers = sync.Map{} + + ri, err := initRetentionInfo(kv, db) + if err != nil { + return nil, err + } + rmq.retentionInfo = ri + + err = rmq.retentionInfo.startRetentionInfo() + if err != nil { + return nil, err + } + return rmq, nil } +func (rmq *rocksmq) stopRetention() { + rmq.retentionInfo.ctx.Done() +} + func (rmq *rocksmq) checkKeyExist(key string) bool { val, _ := rmq.kv.Load(key) return val != "" @@ -123,12 +177,56 @@ func (rmq *rocksmq) CreateTopic(topicName string) error { log.Debug("RocksMQ: save " + endKey + " failed.") return err } - rmq.channelMu.Store(topicName, new(sync.Mutex)) + if _, ok := topicMu.Load(topicName); !ok { + topicMu.Store(topicName, new(sync.Mutex)) + } + if _, ok := rmq.ackedMu.Load(topicName); !ok { + rmq.ackedMu.Store(topicName, new(sync.Mutex)) + } + // Initialize retention infos + // Initialize acked size to 0 for topic + ackedSizeKey := AckedSizeTitle + topicName + err = rmq.kv.Save(ackedSizeKey, "0") + if err != nil { + return err + } + + // Initialize topic begin id to defaultMessageID + topicBeginIDKey := TopicBeginIDTitle + topicName + err = rmq.kv.Save(topicBeginIDKey, DefaultMessageID) + if err != nil { + return err + } + + // Initialize topic message size to 0 + msgSizeKey := MessageSizeTitle + topicName + err = rmq.kv.Save(msgSizeKey, "0") + if err != nil { + return err + } + + // Initialize last retention timestamp to time_now + lastRetentionTsKey := LastRetTsTitle + topicName + timeNow := time.Now().Unix() + err = rmq.kv.Save(lastRetentionTsKey, strconv.FormatInt(timeNow, 10)) + if err != nil { + return nil + } + rmq.retentionInfo.topics = append(rmq.retentionInfo.topics, topicName) + rmq.retentionInfo.pageInfo.Store(topicName, &topicPageInfo{ + pageEndID: make([]UniqueID, 0), + pageMsgSize: map[UniqueID]int64{}, + }) + rmq.retentionInfo.lastRetentionTime.Store(topicName, timeNow) + rmq.retentionInfo.ackedInfo.Store(topicName, &topicAckedInfo{ + ackedTs: map[UniqueID]int64{}, + }) return nil } func (rmq *rocksmq) DestroyTopic(topicName string) error { + log.Debug("In DestroyTopic") beginKey := topicName + "/begin_id" endKey := topicName + "/end_id" @@ -145,13 +243,50 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error { } rmq.consumers.Delete(topicName) - log.Debug("DestroyTopic: " + topicName) + + ackedSizeKey := AckedSizeTitle + topicName + err = rmq.kv.Remove(ackedSizeKey) + if err != nil { + return err + } + topicBeginIDKey := TopicBeginIDTitle + topicName + err = rmq.kv.Remove(topicBeginIDKey) + if err != nil { + return err + } + lastRetTsKey := LastRetTsTitle + topicName + err = rmq.kv.Remove(lastRetTsKey) + if err != nil { + return err + } + msgSizeKey := MessageSizeTitle + topicName + err = rmq.kv.Remove(msgSizeKey) + if err != nil { + return err + } + + topicMu.Delete(topicName) + rmq.retentionInfo.ackedInfo.Delete(topicName) + rmq.retentionInfo.lastRetentionTime.Delete(topicName) + rmq.retentionInfo.pageInfo.Delete(topicName) return nil } func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer) { key := groupName + "/" + topicName + "/current_id" + // keyExist := false + // if ll, ok := topicMu.Load(topicName); !ok { + // keyExist = rmq.checkKeyExist(key) + // } else { + // if lock, lok := ll.(*sync.Mutex); lok { + // lock.Lock() + // defer lock.Unlock() + // keyExist = rmq.checkKeyExist(key) + // } else { + // keyExist = rmq.checkKeyExist(key) + // } + // } if rmq.checkKeyExist(key) { if vals, ok := rmq.consumers.Load(topicName); ok { for _, v := range vals.([]*Consumer) { @@ -216,13 +351,11 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error { } } - log.Debug("DestroyConsumerGroup: " + topicName + "+" + groupName) - return nil } func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error { - ll, ok := rmq.channelMu.Load(topicName) + ll, ok := topicMu.Load(topicName) if !ok { return fmt.Errorf("topic name = %s not exist", topicName) } @@ -248,6 +381,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error /* Step I: Insert data to store system */ batch := gorocksdb.NewWriteBatch() + msgSizes := make(map[UniqueID]int64) for i := 0; i < msgLen && idStart+UniqueID(i) < idEnd; i++ { key, err := combKey(topicName, idStart+UniqueID(i)) if err != nil { @@ -256,6 +390,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error } batch.Put([]byte(key), messages[i].Payload) + msgSizes[idStart+UniqueID(i)] = int64(len(messages[i].Payload)) } err = rmq.store.Write(gorocksdb.NewDefaultWriteOptions(), batch) @@ -299,11 +434,68 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error } } } + + // Update message page info + // TODO(yukun): Should this be in a go routine + err = rmq.UpdatePageInfo(topicName, msgSizes) + if err != nil { + return err + } + return nil +} + +func (rmq *rocksmq) UpdatePageInfo(topicName string, msgSizes map[UniqueID]int64) error { + msgSizeKey := MessageSizeTitle + topicName + msgSizeVal, err := rmq.kv.Load(msgSizeKey) + if err != nil { + return err + } + curMsgSize, err := strconv.ParseInt(msgSizeVal, 10, 64) + if err != nil { + return err + } + fixedPageSizeKey, err := constructKey(PageMsgSizeTitle, topicName) + if err != nil { + return err + } + for k, v := range msgSizes { + if curMsgSize+v > RocksmqPageSize { + // Current page is full + newPageSize := curMsgSize + v + pageEndID := k + // Update page message size for current page. key is page end ID + pageMsgSizeKey := fixedPageSizeKey + "/" + strconv.FormatInt(pageEndID, 10) + err := rmq.kv.Save(pageMsgSizeKey, strconv.FormatInt(newPageSize, 10)) + if err != nil { + return err + } + + if pageInfo, ok := rmq.retentionInfo.pageInfo.Load(topicName); ok { + pageInfo.(*topicPageInfo).pageEndID = append(pageInfo.(*topicPageInfo).pageEndID, pageEndID) + pageInfo.(*topicPageInfo).pageMsgSize[pageEndID] = newPageSize + rmq.retentionInfo.pageInfo.Store(topicName, pageInfo) + } + + // Update message size to 0 + err = rmq.kv.Save(msgSizeKey, strconv.FormatInt(0, 10)) + if err != nil { + return err + } + curMsgSize = 0 + } else { + curMsgSize += v + // Update message size to current message size + err := rmq.kv.Save(msgSizeKey, strconv.FormatInt(curMsgSize, 10)) + if err != nil { + return err + } + } + } return nil } func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error) { - ll, ok := rmq.channelMu.Load(topicName) + ll, ok := topicMu.Load(topicName) if !ok { return nil, fmt.Errorf("topic name = %s not exist", topicName) } @@ -372,7 +564,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum // When already consume to last mes, an empty slice will be returned if len(consumerMessage) == 0 { - //log.Debug("RocksMQ: consumerMessage is empty") + // log.Debug("RocksMQ: consumerMessage is empty") return consumerMessage, nil } @@ -383,6 +575,9 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum return nil, err } + msgSize := len(consumerMessage[len(consumerMessage)-1].Payload) + go rmq.UpdateAckedInfo(topicName, groupName, newID, int64(msgSize)) + return consumerMessage, nil } @@ -431,3 +626,86 @@ func (rmq *rocksmq) Notify(topicName, groupName string) { } } } + +func (rmq *rocksmq) UpdateAckedInfo(topicName, groupName string, newID UniqueID, msgSize int64) error { + ll, ok := rmq.ackedMu.Load(topicName) + if !ok { + return fmt.Errorf("topic name = %s not exist", topicName) + } + lock, ok := ll.(*sync.Mutex) + if !ok { + return fmt.Errorf("get mutex failed, topic name = %s", topicName) + } + lock.Lock() + defer lock.Unlock() + + fixedBeginIDKey, err := constructKey(BeginIDTitle, topicName) + if err != nil { + return err + } + // Update begin_id for the consumer_group + beginIDKey := fixedBeginIDKey + "/" + groupName + err = rmq.kv.Save(beginIDKey, strconv.FormatInt(newID, 10)) + if err != nil { + return err + } + + // Update begin_id for topic + if vals, ok := rmq.consumers.Load(topicName); ok { + var minBeginID int64 = -1 + for _, v := range vals.([]*Consumer) { + curBeginIDKey := fixedBeginIDKey + "/" + v.GroupName + curBeginIDVal, err := rmq.kv.Load(curBeginIDKey) + if err != nil { + return err + } + curBeginID, err := strconv.ParseInt(curBeginIDVal, 10, 64) + if err != nil { + return err + } + if curBeginID > minBeginID { + minBeginID = curBeginID + } + } + topicBeginIDKey := TopicBeginIDTitle + topicName + err = rmq.kv.Save(topicBeginIDKey, strconv.FormatInt(minBeginID, 10)) + if err != nil { + return err + } + + // Update acked info for msg of begin id + fixedAckedTsKey, err := constructKey(AckedTsTitle, topicName) + if err != nil { + return err + } + ackedTsKey := fixedAckedTsKey + "/" + strconv.FormatInt(minBeginID, 10) + ts := time.Now().Unix() + err = rmq.kv.Save(ackedTsKey, strconv.FormatInt(ts, 10)) + if err != nil { + return err + } + if minBeginID == newID { + // Means the begin_id of topic update to newID, so needs to update acked size + ackedSizeKey := AckedSizeTitle + topicName + ackedSizeVal, err := rmq.kv.Load(ackedSizeKey) + if err != nil { + return err + } + ackedSize, err := strconv.ParseInt(ackedSizeVal, 10, 64) + if err != nil { + return err + } + ackedSize += msgSize + err = rmq.kv.Save(ackedSizeKey, strconv.FormatInt(ackedSize, 10)) + if err != nil { + return err + } + if info, ok := rmq.retentionInfo.ackedInfo.Load(topicName); ok { + ackedInfo := info.(*topicAckedInfo) + ackedInfo.ackedSize = ackedSize + rmq.retentionInfo.ackedInfo.Store(topicName, ackedInfo) + } + } + } + return nil +} diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go index f158475767..d25a7de065 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go @@ -53,6 +53,9 @@ func TestRocksMQ(t *testing.T) { name := "/tmp/rocksmq" _ = os.RemoveAll(name) defer os.RemoveAll(name) + kvName := name + "_meta_kv" + _ = os.RemoveAll(kvName) + defer os.RemoveAll(kvName) rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) @@ -93,6 +96,7 @@ func TestRocksMQ(t *testing.T) { assert.Equal(t, len(cMsgs), 2) assert.Equal(t, string(cMsgs[0].Payload), "b_message") assert.Equal(t, string(cMsgs[1].Payload), "c_message") + rmq.stopRetention() } func TestRocksMQ_Loop(t *testing.T) { @@ -106,6 +110,9 @@ func TestRocksMQ_Loop(t *testing.T) { name := "/tmp/rocksmq_1" _ = os.RemoveAll(name) defer os.RemoveAll(name) + kvName := name + "_meta_kv" + _ = os.RemoveAll(kvName) + defer os.RemoveAll(kvName) rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) @@ -157,6 +164,7 @@ func TestRocksMQ_Loop(t *testing.T) { cMsgs, err = rmq.Consume(channelName, groupName, 1) assert.Nil(t, err) assert.Equal(t, len(cMsgs), 0) + rmq.stopRetention() } func TestRocksMQ_Goroutines(t *testing.T) { @@ -169,6 +177,9 @@ func TestRocksMQ_Goroutines(t *testing.T) { name := "/tmp/rocksmq_2" defer os.RemoveAll(name) + kvName := name + "_meta_kv" + _ = os.RemoveAll(kvName) + defer os.RemoveAll(kvName) rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) @@ -214,6 +225,7 @@ func TestRocksMQ_Goroutines(t *testing.T) { }(&wg, rmq) } wg.Wait() + rmq.stopRetention() } /** @@ -236,6 +248,9 @@ func TestRocksMQ_Throughout(t *testing.T) { name := "/tmp/rocksmq_3" defer os.RemoveAll(name) + kvName := name + "_meta_kv" + _ = os.RemoveAll(kvName) + defer os.RemoveAll(kvName) rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) @@ -274,6 +289,7 @@ func TestRocksMQ_Throughout(t *testing.T) { ct1 := time.Now().UnixNano() / int64(time.Millisecond) cDuration := ct1 - ct0 log.Printf("Total consume %d item, cost %v ms, throughout %v / s", entityNum, cDuration, int64(entityNum)*1000/cDuration) + rmq.stopRetention() } func TestRocksMQ_MultiChan(t *testing.T) { @@ -286,6 +302,9 @@ func TestRocksMQ_MultiChan(t *testing.T) { name := "/tmp/rocksmq_multichan" defer os.RemoveAll(name) + kvName := name + "_meta_kv" + _ = os.RemoveAll(kvName) + defer os.RemoveAll(kvName) rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) @@ -319,4 +338,5 @@ func TestRocksMQ_MultiChan(t *testing.T) { assert.Nil(t, err) assert.Equal(t, len(cMsgs), 1) assert.Equal(t, string(cMsgs[0].Payload), "for_chann1_"+strconv.Itoa(0)) + rmq.stopRetention() } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go new file mode 100644 index 0000000000..3a5d2f383c --- /dev/null +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go @@ -0,0 +1,495 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package rocksmq + +import ( + "context" + "fmt" + "strconv" + "sync" + "time" + + rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" + "github.com/milvus-io/milvus/internal/log" + "github.com/tecbot/gorocksdb" + "go.uber.org/zap" +) + +var RocksmqRetentionTimeInMinutes int64 +var RocksmqRetentionSizeInMB int64 +var TickerTimeInMinutes int64 = 1 +var CheckTimeInterval int64 = 6 + +const ( + MB = 2 << 20 + MINUTE = 60 +) + +type topicPageInfo struct { + pageEndID []UniqueID + pageMsgSize map[UniqueID]int64 +} + +type topicAckedInfo struct { + topicBeginID UniqueID + // TODO(yukun): may need to delete ackedTs + ackedTs map[UniqueID]UniqueID + ackedSize int64 +} + +type retentionInfo struct { + ctx context.Context + topics []string + // pageInfo map[string]*topicPageInfo + pageInfo sync.Map + // ackedInfo map[string]*topicAckedInfo + ackedInfo sync.Map + // Key is last_retention_time/${topic} + // lastRetentionTime map[string]int64 + lastRetentionTime sync.Map + + kv *rocksdbkv.RocksdbKV + db *gorocksdb.DB +} + +// Interface LoadWithPrefix() in rocksdbkv needs to close db instance first and then reopen, +// which will cause crash when other goroutines operate the db instance. So here implement a +// prefixLoad without reopen db instance. +func prefixLoad(db *gorocksdb.DB, prefix string) ([]string, []string, error) { + readOpts := gorocksdb.NewDefaultReadOptions() + defer readOpts.Destroy() + readOpts.SetPrefixSameAsStart(true) + iter := db.NewIterator(readOpts) + defer iter.Close() + keys := make([]string, 0) + values := make([]string, 0) + iter.Seek([]byte(prefix)) + for ; iter.Valid(); iter.Next() { + key := iter.Key() + value := iter.Value() + defer key.Free() + defer value.Free() + keys = append(keys, string(key.Data())) + values = append(values, string(value.Data())) + } + if err := iter.Err(); err != nil { + return nil, nil, err + } + return keys, values, nil +} + +func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInfo, error) { + ri := &retentionInfo{ + ctx: context.Background(), + topics: make([]string, 0), + pageInfo: sync.Map{}, + ackedInfo: sync.Map{}, + lastRetentionTime: sync.Map{}, + kv: kv, + db: db, + } + // Get topic from topic begin id + beginIDKeys, _, err := ri.kv.LoadWithPrefix(TopicBeginIDTitle) + if err != nil { + return nil, err + } + for _, key := range beginIDKeys { + topic := key[len(TopicBeginIDTitle):] + ri.topics = append(ri.topics, topic) + topicMu.Store(topic, new(sync.Mutex)) + } + return ri, nil +} + +func (ri *retentionInfo) startRetentionInfo() error { + var wg sync.WaitGroup + for _, topic := range ri.topics { + // Load all page infos + wg.Add(1) + go ri.loadRetentionInfo(topic, &wg) + } + wg.Wait() + go ri.retention() + + return nil +} + +func (ri *retentionInfo) loadRetentionInfo(topic string, wg *sync.WaitGroup) { + // TODO(yukun): If there needs to add lock + // ll, ok := topicMu.Load(topic) + // if !ok { + // return fmt.Errorf("topic name = %s not exist", topic) + // } + // lock, ok := ll.(*sync.Mutex) + // if !ok { + // return fmt.Errorf("get mutex failed, topic name = %s", topic) + // } + // lock.Lock() + // defer lock.Unlock() + defer wg.Done() + pageEndID := make([]UniqueID, 0) + pageMsgSize := make(map[int64]UniqueID) + + fixedPageSizeKey, err := constructKey(PageMsgSizeTitle, topic) + if err != nil { + log.Debug("ConstructKey failed", zap.Any("error", err)) + return + } + pageMsgSizePrefix := fixedPageSizeKey + "/" + pageMsgSizeKeys, pageMsgSizeVals, err := prefixLoad(ri.kv.DB, pageMsgSizePrefix) + if err != nil { + log.Debug("PrefixLoad failed", zap.Any("error", err)) + return + } + for i, key := range pageMsgSizeKeys { + endID, err := strconv.ParseInt(key[FixedChannelNameLen+1:], 10, 64) + if err != nil { + log.Debug("ParseInt failed", zap.Any("error", err)) + return + } + pageEndID = append(pageEndID, endID) + + msgSize, err := strconv.ParseInt(pageMsgSizeVals[i], 10, 64) + if err != nil { + log.Debug("ParseInt failed", zap.Any("error", err)) + return + } + pageMsgSize[endID] = msgSize + } + topicPageInfo := &topicPageInfo{ + pageEndID: pageEndID, + pageMsgSize: pageMsgSize, + } + + // Load all acked infos + ackedTs := make(map[UniqueID]UniqueID) + + topicBeginIDKey := TopicBeginIDTitle + topic + topicBeginIDVal, err := ri.kv.Load(topicBeginIDKey) + if err != nil { + return + } + topicBeginID, err := strconv.ParseInt(topicBeginIDVal, 10, 64) + if err != nil { + log.Debug("ParseInt failed", zap.Any("error", err)) + return + } + + ackedTsPrefix, err := constructKey(AckedTsTitle, topic) + if err != nil { + log.Debug("ConstructKey failed", zap.Any("error", err)) + return + } + keys, vals, err := prefixLoad(ri.kv.DB, ackedTsPrefix) + if err != nil { + log.Debug("PrefixLoad failed", zap.Any("error", err)) + return + } + if len(keys) != len(vals) { + log.Debug("LoadWithPrefix return unequal value length of keys and values") + return + } + + for i, key := range keys { + offset := FixedChannelNameLen + 1 + ackedID, err := strconv.ParseInt((key)[offset:], 10, 64) + if err != nil { + log.Debug("RocksMQ: parse int " + key[offset:] + " failed") + return + } + + ts, err := strconv.ParseInt(vals[i], 10, 64) + if err != nil { + return + } + ackedTs[ackedID] = ts + } + + ackedSizeKey := AckedSizeTitle + topic + ackedSizeVal, err := ri.kv.Load(ackedSizeKey) + if err != nil { + log.Debug("Load failed", zap.Any("error", err)) + return + } + ackedSize, err := strconv.ParseInt(ackedSizeVal, 10, 64) + if err != nil { + log.Debug("PrefixLoad failed", zap.Any("error", err)) + return + } + + ackedInfo := &topicAckedInfo{ + topicBeginID: topicBeginID, + ackedTs: ackedTs, + ackedSize: ackedSize, + } + + //Load last retention timestamp + lastRetentionTsKey := LastRetTsTitle + topic + lastRetentionTsVal, err := ri.kv.Load(lastRetentionTsKey) + if err != nil { + log.Debug("Load failed", zap.Any("error", err)) + return + } + lastRetentionTs, err := strconv.ParseInt(lastRetentionTsVal, 10, 64) + if err != nil { + log.Debug("ParseInt failed", zap.Any("error", err)) + return + } + + ri.ackedInfo.Store(topic, ackedInfo) + ri.pageInfo.Store(topic, topicPageInfo) + ri.lastRetentionTime.Store(topic, lastRetentionTs) +} + +func (ri *retentionInfo) retention() error { + log.Debug("Rocksmq retention goroutine start!") + ticker := time.NewTicker(time.Duration(TickerTimeInMinutes * int64(time.Minute) / 10)) + + for { + select { + case <-ri.ctx.Done(): + return nil + case t := <-ticker.C: + timeNow := t.Unix() + checkTime := RocksmqRetentionTimeInMinutes * 60 / 10 + log.Debug("In ticker: ", zap.Any("ticker", timeNow)) + ri.lastRetentionTime.Range(func(k, v interface{}) bool { + if v.(int64)+checkTime < timeNow { + err := ri.expiredCleanUp(k.(string)) + if err != nil { + panic(err) + } + } + return true + }) + // for k, v := range ri.lastRetentionTime { + // if v+checkTime < timeNow { + // err := ri.expiredCleanUp(k) + // if err != nil { + // panic(err) + // } + // } + // } + } + } +} + +func (ri *retentionInfo) expiredCleanUp(topic string) error { + // log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic) + var ackedInfo *topicAckedInfo + if info, ok := ri.ackedInfo.Load(topic); ok { + ackedInfo = info.(*topicAckedInfo) + } else { + log.Debug("Topic " + topic + " doesn't have acked infos") + return nil + } + + ll, ok := topicMu.Load(topic) + if !ok { + return fmt.Errorf("topic name = %s not exist", topic) + } + lock, ok := ll.(*sync.Mutex) + if !ok { + return fmt.Errorf("get mutex failed, topic name = %s", topic) + } + lock.Lock() + defer lock.Unlock() + + readOpts := gorocksdb.NewDefaultReadOptions() + defer readOpts.Destroy() + readOpts.SetPrefixSameAsStart(true) + iter := ri.kv.DB.NewIterator(readOpts) + defer iter.Close() + ackedTsPrefix, err := constructKey(AckedTsTitle, topic) + if err != nil { + return err + } + iter.Seek([]byte(ackedTsPrefix)) + if !iter.Valid() { + return nil + } + var startID UniqueID + var endID UniqueID + endID = 0 + startID, err = strconv.ParseInt(string(iter.Key().Data())[FixedChannelNameLen+1:], 10, 64) + if err != nil { + return err + } + + var deletedAckedSize int64 = 0 + pageRetentionOffset := 0 + var pageInfo *topicPageInfo + if info, ok := ri.pageInfo.Load(topic); ok { + pageInfo = info.(*topicPageInfo) + } + if pageInfo != nil { + for i, pageEndID := range pageInfo.pageEndID { + // Clean by RocksmqRetentionTimeInMinutes + if msgTimeExpiredCheck(ackedInfo.ackedTs[pageEndID]) { + // All of the page expired, set the pageEndID to current endID + endID = pageEndID + fixedAckedTsKey, err := constructKey(AckedTsTitle, topic) + if err != nil { + return err + } + newKey := fixedAckedTsKey + "/" + strconv.Itoa(int(pageEndID)) + iter.Seek([]byte(newKey)) + pageRetentionOffset = i + 1 + + deletedAckedSize += pageInfo.pageMsgSize[pageEndID] + delete(pageInfo.pageMsgSize, pageEndID) + } + } + } + log.Debug("In expiredCleanUp: ", zap.Any("topic", topic), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize)) + + pageEndID := endID + // The end msg of the page is not expired, find the last expired msg in this page + for ; iter.Valid(); iter.Next() { + ackedTs, err := strconv.ParseInt(string(iter.Value().Data()), 10, 64) + if err != nil { + return err + } + if msgTimeExpiredCheck(ackedTs) { + endID, err = strconv.ParseInt(string(iter.Key().Data())[FixedChannelNameLen+1:], 10, 64) + if err != nil { + return err + } + } else { + break + } + } + if endID == 0 { + log.Debug("All messages are not expired") + return nil + } + + // Delete page message size in rocksdb_kv + if pageInfo != nil { + // Judge expire by ackedSize + if msgSizeExpiredCheck(deletedAckedSize, ackedInfo.ackedSize) { + for _, pEndID := range pageInfo.pageEndID[pageRetentionOffset:0] { + curDeletedSize := deletedAckedSize + pageInfo.pageMsgSize[pEndID] + if msgSizeExpiredCheck(curDeletedSize, ackedInfo.ackedSize) { + endID = pEndID + pageEndID = pEndID + deletedAckedSize = curDeletedSize + delete(pageInfo.pageMsgSize, pEndID) + } else { + break + } + } + } + + if pageEndID > 0 && len(pageInfo.pageEndID) > 0 { + pageStartID := pageInfo.pageEndID[0] + fixedPageSizeKey, err := constructKey(PageMsgSizeTitle, topic) + if err != nil { + return err + } + pageStartKey := fixedPageSizeKey + "/" + strconv.Itoa(int(pageStartID)) + pageEndKey := fixedPageSizeKey + "/" + strconv.Itoa(int(pageEndID)) + pageWriteBatch := gorocksdb.NewWriteBatch() + defer pageWriteBatch.Clear() + log.Debug("Delete page info", zap.Any("topic", topic), zap.Any("pageStartID", pageStartID), zap.Any("pageEndID", pageEndID)) + if pageStartID == pageEndID { + pageWriteBatch.Delete([]byte(pageStartKey)) + } else { + pageWriteBatch.DeleteRange([]byte(pageStartKey), []byte(pageEndKey)) + } + ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), pageWriteBatch) + + pageInfo.pageEndID = pageInfo.pageEndID[pageRetentionOffset:] + } + ri.pageInfo.Store(topic, pageInfo) + } + + // Delete acked_ts in rocksdb_kv + fixedAckedTsTitle, err := constructKey(AckedTsTitle, topic) + if err != nil { + return err + } + ackedStartIDKey := fixedAckedTsTitle + "/" + strconv.Itoa(int(startID)) + ackedEndIDKey := fixedAckedTsTitle + "/" + strconv.Itoa(int(endID)) + ackedTsWriteBatch := gorocksdb.NewWriteBatch() + defer ackedTsWriteBatch.Clear() + if startID == endID { + ackedTsWriteBatch.Delete([]byte(ackedStartIDKey)) + } else { + ackedTsWriteBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey)) + } + ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), ackedTsWriteBatch) + + // Update acked_size in rocksdb_kv + + // Update last retention ts + lastRetentionTsKey := LastRetTsTitle + topic + err = ri.kv.Save(lastRetentionTsKey, strconv.FormatInt(time.Now().Unix(), 10)) + if err != nil { + return err + } + + ackedInfo.ackedSize -= deletedAckedSize + ackedSizeKey := AckedSizeTitle + topic + err = ri.kv.Save(ackedSizeKey, strconv.FormatInt(ackedInfo.ackedSize, 10)) + if err != nil { + return err + } + + for k := range ackedInfo.ackedTs { + if k < endID { + delete(ackedInfo.ackedTs, k) + } + } + ri.ackedInfo.Store(topic, ackedInfo) + + return DeleteMessages(ri.db, topic, startID, endID) +} + +func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error { + // Delete msg by range of startID and endID + startKey, err := combKey(topic, startID) + if err != nil { + log.Debug("RocksMQ: combKey(" + topic + "," + strconv.FormatInt(startID, 10) + ")") + return err + } + endKey, err := combKey(topic, endID) + if err != nil { + log.Debug("RocksMQ: combKey(" + topic + "," + strconv.FormatInt(endID, 10) + ")") + return err + } + + writeBatch := gorocksdb.NewWriteBatch() + defer writeBatch.Clear() + log.Debug("Delete messages by range", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID)) + if startID == endID { + writeBatch.Delete([]byte(startKey)) + } else { + writeBatch.DeleteRange([]byte(startKey), []byte(endKey)) + } + err = db.Write(gorocksdb.NewDefaultWriteOptions(), writeBatch) + if err != nil { + return err + } + + log.Debug("Delete message for topic: "+topic, zap.Any("startID", startID), zap.Any("endID", endID)) + + return nil +} + +func msgTimeExpiredCheck(ackedTs int64) bool { + return ackedTs+RocksmqRetentionTimeInMinutes*MINUTE < time.Now().Unix() +} + +func msgSizeExpiredCheck(deletedAckedSize, ackedSize int64) bool { + return ackedSize-deletedAckedSize > RocksmqRetentionSizeInMB*MB +} diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go new file mode 100644 index 0000000000..09f0df64dd --- /dev/null +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go @@ -0,0 +1,91 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package rocksmq + +import ( + "os" + "strconv" + "testing" + "time" + + "github.com/milvus-io/milvus/internal/allocator" + rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" + "github.com/stretchr/testify/assert" +) + +func InitIDAllocator(kvPath string) *allocator.GlobalIDAllocator { + rocksdbKV, err := rocksdbkv.NewRocksdbKV(kvPath) + if err != nil { + panic(err) + } + idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV) + _ = idAllocator.Initialize() + return idAllocator +} + +func TestRmqRetention(t *testing.T) { + //RocksmqRetentionSizeInMB = 0 + //RocksmqRetentionTimeInMinutes = 0 + kvPath := "/tmp/rocksmq_idAllocator_kv" + defer os.RemoveAll(kvPath) + idAllocator := InitIDAllocator(kvPath) + + rocksdbPath := "/tmp/rocksmq_test" + defer os.RemoveAll(rocksdbPath) + metaPath := rocksdbPath + "_meta_kv" + defer os.RemoveAll(metaPath) + + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) + assert.Nil(t, err) + + topicName := "topic_a" + err = rmq.CreateTopic(topicName) + assert.Nil(t, err) + defer rmq.DestroyTopic(topicName) + + msgNum := 100 + pMsgs := make([]ProducerMessage, msgNum) + for i := 0; i < msgNum; i++ { + msg := "message_" + strconv.Itoa(i) + pMsg := ProducerMessage{Payload: []byte(msg)} + pMsgs[i] = pMsg + } + err = rmq.Produce(topicName, pMsgs) + assert.Nil(t, err) + + groupName := "test_group" + _ = rmq.DestroyConsumerGroup(topicName, groupName) + err = rmq.CreateConsumerGroup(topicName, groupName) + + consumer := &Consumer{ + Topic: topicName, + GroupName: groupName, + } + rmq.RegisterConsumer(consumer) + + assert.Nil(t, err) + cMsgs := make([]ConsumerMessage, 0) + for i := 0; i < msgNum; i++ { + cMsg, err := rmq.Consume(topicName, groupName, 1) + assert.Nil(t, err) + cMsgs = append(cMsgs, cMsg[0]) + } + assert.Equal(t, len(cMsgs), msgNum) + + time.Sleep(time.Duration(CheckTimeInterval+1) * time.Second) + // Seek to a previous consumed message, the message should be clean up + err = rmq.Seek(topicName, groupName, cMsgs[msgNum/2].MsgID) + assert.Nil(t, err) + newRes, err := rmq.Consume(topicName, groupName, 1) + assert.Nil(t, err) + assert.Equal(t, len(newRes), 0) +}