diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 774d3834cf..d76ecd9999 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -111,8 +111,9 @@ rocksmq: # please adjust in embedded Milvus: /tmp/milvus/rdb_data path: /var/lib/milvus/rdb_data # The path where the message is stored in rocksmq rocksmqPageSize: 2147483648 # 2 GB, 2 * 1024 * 1024 * 1024 bytes, The size of each page of messages in rocksmq - retentionTimeInMinutes: 10080 # 7 days, 7 * 24 * 60 minutes, The retention time of the message in rocksmq. + retentionTimeInMinutes: 7200 # 5 days, 5 * 24 * 60 minutes, The retention time of the message in rocksmq. retentionSizeInMB: 8192 # 8 GB, 8 * 1024 MB, The retention size of the message in rocksmq. + compactionInterval: 86400 # 1 day, trigger rocksdb compaction every day to remove deleted data lrucacheratio: 0.06 # rocksdb cache memory ratio # Related configuration of rootCoord, used to handle data definition language (DDL) and data control language (DCL) requests diff --git a/internal/mq/mqimpl/rocksmq/client/producer_impl.go b/internal/mq/mqimpl/rocksmq/client/producer_impl.go index 4c59e9e2f6..ab588c6722 100644 --- a/internal/mq/mqimpl/rocksmq/client/producer_impl.go +++ b/internal/mq/mqimpl/rocksmq/client/producer_impl.go @@ -65,6 +65,6 @@ func (p *producer) Send(message *ProducerMessage) (UniqueID, error) { func (p *producer) Close() { err := p.c.server.DestroyTopic(p.topic) if err != nil { - log.Debug("Producer close failed", zap.Any("topicName", p.topic), zap.Any("error", err)) + log.Warn("Producer close failed", zap.Any("topicName", p.topic), zap.Any("error", err)) } } diff --git a/internal/mq/mqimpl/rocksmq/server/global_rmq.go b/internal/mq/mqimpl/rocksmq/server/global_rmq.go index 7ff73a3f53..ac0ca8a6e3 100644 --- a/internal/mq/mqimpl/rocksmq/server/global_rmq.go +++ b/internal/mq/mqimpl/rocksmq/server/global_rmq.go @@ -77,26 +77,7 @@ func InitRocksMQ(path string) error { log.Warn("rocksmq.rocksmqPageSize is invalid, using default value 2G") } } - rawRmqRetentionTimeInMinutes, err := params.Load("rocksmq.retentionTimeInMinutes") - if err == nil && rawRmqRetentionTimeInMinutes != "" { - rawRmqRetentionTimeInMinutes, err := strconv.ParseInt(rawRmqRetentionTimeInMinutes, 10, 64) - if err == nil { - atomic.StoreInt64(&RocksmqRetentionTimeInSecs, rawRmqRetentionTimeInMinutes*60) - } else { - log.Warn("rocksmq.retentionTimeInMinutes is invalid, using default value") - } - } - rawRmqRetentionSizeInMB, err := params.Load("rocksmq.retentionSizeInMB") - if err == nil && rawRmqRetentionSizeInMB != "" { - rawRmqRetentionSizeInMB, err := strconv.ParseInt(rawRmqRetentionSizeInMB, 10, 64) - if err == nil { - atomic.StoreInt64(&RocksmqRetentionSizeInMB, rawRmqRetentionSizeInMB) - } else { - log.Warn("rocksmq.retentionSizeInMB is invalid, using default value 0") - } - } - log.Debug("", zap.Any("RocksmqRetentionTimeInMinutes", rawRmqRetentionTimeInMinutes), - zap.Any("RocksmqRetentionSizeInMB", RocksmqRetentionSizeInMB), zap.Any("RocksmqPageSize", RocksmqPageSize)) + Rmq, finalErr = NewRocksMQ(params, path, nil) }) return finalErr diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go index 366bbd3e2a..bbdb0f8721 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -31,7 +31,6 @@ import ( "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/retry" - "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/tecbot/gorocksdb" @@ -114,19 +113,6 @@ func checkRetention() bool { return RocksmqRetentionTimeInSecs != -1 || RocksmqRetentionSizeInMB != -1 } -func getNowTs(idAllocator allocator.GIDAllocator) (int64, error) { - err := idAllocator.UpdateID() - if err != nil { - return 0, err - } - newID, err := idAllocator.AllocOne() - if err != nil { - return 0, err - } - nowTs, _ := tsoutil.ParseTS(uint64(newID)) - return nowTs.Unix(), err -} - var topicMu = sync.Map{} type rocksmq struct { @@ -227,7 +213,7 @@ func NewRocksMQ(params paramtable.BaseTable, name string, idAllocator allocator. readers: sync.Map{}, } - ri, err := initRetentionInfo(kv, db) + ri, err := initRetentionInfo(params, kv, db) if err != nil { return nil, err } diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go index 011bf7c9d1..0203496adf 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go @@ -77,7 +77,7 @@ func TestRocksmq_RegisterConsumer(t *testing.T) { params.Init() rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator) assert.NoError(t, err) - defer rmq.stopRetention() + defer rmq.Close() topicName := "topic_register" groupName := "group_register" @@ -231,7 +231,6 @@ func TestRocksmq_Dummy(t *testing.T) { err = rmq.Seek(channelName1, groupName1, 0) assert.Error(t, err) - rmq.stopRetention() channelName2 := strings.Repeat(channelName1, 100) err = rmq.CreateTopic(string(channelName2)) assert.NoError(t, err) diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go index 1cbc0a8c62..75ce1a4765 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go @@ -21,16 +21,23 @@ import ( rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/tecbot/gorocksdb" "go.uber.org/zap" ) // RocksmqRetentionTimeInMinutes is the time of retention -var RocksmqRetentionTimeInSecs int64 = 10080 * 60 +var RocksmqRetentionTimeInSecs int64 +var DefaultRocksmqRetentionTimeInMins int64 = 7200 // RocksmqRetentionSizeInMB is the size of retention -var RocksmqRetentionSizeInMB int64 = 8192 +var RocksmqRetentionSizeInMB int64 +var DefaultRocksmqRetentionSizeInMB int64 = 8192 + +// RocksmqRetentionCompactionInterval is the Interval we trigger compaction, +var RocksmqRetentionCompactionInterval int64 +var DefaultRocksmqRetentionCompactionInterval int64 = 86400 // Const value that used to convert unit const ( @@ -38,7 +45,7 @@ const ( ) // TickerTimeInSeconds is the time of expired check, default 10 minutes -var TickerTimeInSeconds int64 = 60 +var TickerTimeInSeconds int64 = 600 type retentionInfo struct { // key is topic name, value is last retention time @@ -53,7 +60,11 @@ type retentionInfo struct { closeOnce sync.Once } -func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInfo, error) { +func initRetentionInfo(params paramtable.BaseTable, kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInfo, error) { + rawRmqRetentionTimeInMinutes := params.ParseInt64WithDefault("rocksmq.retentionTimeInMinutes", DefaultRocksmqRetentionTimeInMins) + atomic.StoreInt64(&RocksmqRetentionTimeInSecs, rawRmqRetentionTimeInMinutes*60) + atomic.StoreInt64(&RocksmqRetentionSizeInMB, params.ParseInt64WithDefault("rocksmq.retentionSizeInMB", DefaultRocksmqRetentionSizeInMB)) + atomic.StoreInt64(&RocksmqRetentionCompactionInterval, params.ParseInt64WithDefault("rocksmq.compactionInterval", DefaultRocksmqRetentionCompactionInterval)) ri := &retentionInfo{ topicRetetionTime: sync.Map{}, mutex: sync.RWMutex{}, @@ -86,15 +97,22 @@ func (ri *retentionInfo) startRetentionInfo() { // retention do time ticker and trigger retention check and operation for each topic func (ri *retentionInfo) retention() error { log.Debug("Rocksmq retention goroutine start!") - // Do retention check every 6s + // Do retention check every 10 mins ticker := time.NewTicker(time.Duration(atomic.LoadInt64(&TickerTimeInSeconds) * int64(time.Second))) + defer ticker.Stop() + compactionTicker := time.NewTicker(time.Duration(atomic.LoadInt64(&RocksmqRetentionCompactionInterval) * int64(time.Second))) + defer compactionTicker.Stop() defer ri.closeWg.Done() for { select { case <-ri.closeCh: - log.Debug("Rocksmq retention finish!") + log.Warn("Rocksmq retention finish!") return nil + case <-compactionTicker.C: + log.Info("trigger rocksdb compaction, should trigger rocksdb data clean") + go ri.db.CompactRange(gorocksdb.Range{Start: nil, Limit: nil}) + go ri.kv.DB.CompactRange(gorocksdb.Range{Start: nil, Limit: nil}) case t := <-ticker.C: timeNow := t.Unix() checkTime := atomic.LoadInt64(&RocksmqRetentionTimeInSecs) / 10 @@ -354,7 +372,6 @@ func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) err if err != nil { return err } - log.Debug("Delete message for topic", zap.String("topic", topic), zap.Int64("startID", startID), zap.Int64("endID", endID)) return nil } diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go index 50f2b90978..71036cf1cf 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go @@ -39,11 +39,6 @@ func TestRmqRetention_Basic(t *testing.T) { return } defer os.RemoveAll(retentionPath) - atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0) - atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 0) - atomic.StoreInt64(&RocksmqPageSize, 10) - atomic.StoreInt64(&TickerTimeInSeconds, 2) - rocksdbPath := retentionPath defer os.RemoveAll(rocksdbPath) metaPath := retentionPath + metaPathSuffix @@ -52,9 +47,12 @@ func TestRmqRetention_Basic(t *testing.T) { var params paramtable.BaseTable params.Init() rmq, err := NewRocksMQ(params, rocksdbPath, nil) - defer rmq.Close() assert.Nil(t, err) - defer rmq.stopRetention() + defer rmq.Close() + atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0) + atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 0) + atomic.StoreInt64(&RocksmqPageSize, 10) + atomic.StoreInt64(&TickerTimeInSeconds, 2) topicName := "topic_a" err = rmq.CreateTopic(topicName) @@ -133,10 +131,6 @@ func TestRmqRetention_NotConsumed(t *testing.T) { return } defer os.RemoveAll(retentionPath) - atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0) - atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 0) - atomic.StoreInt64(&RocksmqPageSize, 10) - atomic.StoreInt64(&TickerTimeInSeconds, 2) rocksdbPath := retentionPath defer os.RemoveAll(rocksdbPath) @@ -146,9 +140,13 @@ func TestRmqRetention_NotConsumed(t *testing.T) { var params paramtable.BaseTable params.Init() rmq, err := NewRocksMQ(params, rocksdbPath, nil) - defer rmq.Close() assert.Nil(t, err) - defer rmq.stopRetention() + defer rmq.Close() + + atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0) + atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 0) + atomic.StoreInt64(&RocksmqPageSize, 10) + atomic.StoreInt64(&TickerTimeInSeconds, 2) topicName := "topic_a" err = rmq.CreateTopic(topicName) @@ -238,12 +236,6 @@ func TestRmqRetention_MultipleTopic(t *testing.T) { return } defer os.RemoveAll(retentionPath) - // no retention by size - atomic.StoreInt64(&RocksmqRetentionSizeInMB, -1) - // retention by secs - atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 1) - atomic.StoreInt64(&RocksmqPageSize, 10) - atomic.StoreInt64(&TickerTimeInSeconds, 1) kvPath := retentionPath + "kv_multi_topic" os.RemoveAll(kvPath) idAllocator := InitIDAllocator(kvPath) @@ -258,6 +250,13 @@ func TestRmqRetention_MultipleTopic(t *testing.T) { assert.Nil(t, err) defer rmq.Close() + // no retention by size + atomic.StoreInt64(&RocksmqRetentionSizeInMB, -1) + // retention by secs + atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 1) + atomic.StoreInt64(&RocksmqPageSize, 10) + atomic.StoreInt64(&TickerTimeInSeconds, 1) + topicName := "topic_a" err = rmq.CreateTopic(topicName) assert.Nil(t, err) @@ -456,12 +455,7 @@ func TestRmqRetention_PageTimeExpire(t *testing.T) { return } defer os.RemoveAll(retentionPath) - // no retention by size - atomic.StoreInt64(&RocksmqRetentionSizeInMB, -1) - // retention by secs - atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 5) - atomic.StoreInt64(&RocksmqPageSize, 10) - atomic.StoreInt64(&TickerTimeInSeconds, 1) + kvPath := retentionPath + "kv_com1" os.RemoveAll(kvPath) idAllocator := InitIDAllocator(kvPath) @@ -477,6 +471,13 @@ func TestRmqRetention_PageTimeExpire(t *testing.T) { assert.Nil(t, err) defer rmq.Close() + // no retention by size + atomic.StoreInt64(&RocksmqRetentionSizeInMB, -1) + // retention by secs + atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 5) + atomic.StoreInt64(&RocksmqPageSize, 10) + atomic.StoreInt64(&TickerTimeInSeconds, 1) + topicName := "topic_a" err = rmq.CreateTopic(topicName) assert.Nil(t, err) @@ -579,10 +580,6 @@ func TestRmqRetention_PageSizeExpire(t *testing.T) { return } defer os.RemoveAll(retentionPath) - atomic.StoreInt64(&RocksmqRetentionSizeInMB, 1) - atomic.StoreInt64(&RocksmqRetentionTimeInSecs, -1) - atomic.StoreInt64(&RocksmqPageSize, 10) - atomic.StoreInt64(&TickerTimeInSeconds, 1) kvPath := retentionPath + "kv_com2" os.RemoveAll(kvPath) idAllocator := InitIDAllocator(kvPath) @@ -598,6 +595,12 @@ func TestRmqRetention_PageSizeExpire(t *testing.T) { assert.Nil(t, err) defer rmq.Close() + // update some configrocksmq_retentions to make cleanup trigger faster + atomic.StoreInt64(&RocksmqRetentionSizeInMB, 1) + atomic.StoreInt64(&RocksmqRetentionTimeInSecs, -1) + atomic.StoreInt64(&RocksmqPageSize, 10) + atomic.StoreInt64(&TickerTimeInSeconds, 1) + topicName := "topic_a" err = rmq.CreateTopic(topicName) assert.Nil(t, err)