diff --git a/internal/util/rocksmq/client/rocksmq/client_impl_test.go b/internal/util/rocksmq/client/rocksmq/client_impl_test.go index f1c5c4e2a6..6221959644 100644 --- a/internal/util/rocksmq/client/rocksmq/client_impl_test.go +++ b/internal/util/rocksmq/client/rocksmq/client_impl_test.go @@ -155,7 +155,8 @@ func TestClient_consume(t *testing.T) { msg := &ProducerMessage{ Payload: make([]byte, 10), } - producer.Send(msg) + _, err = producer.Send(msg) + assert.Nil(t, err) <-consumer.Chan() diff --git a/internal/util/rocksmq/client/rocksmq/consumer_impl_test.go b/internal/util/rocksmq/client/rocksmq/consumer_impl_test.go index 11ad472fab..cb0a0835e9 100644 --- a/internal/util/rocksmq/client/rocksmq/consumer_impl_test.go +++ b/internal/util/rocksmq/client/rocksmq/consumer_impl_test.go @@ -137,5 +137,6 @@ func TestConsumer_Seek(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, consumer) - consumer.Seek(0) + err = consumer.Seek(0) + assert.NotNil(t, err) } diff --git a/internal/util/rocksmq/client/rocksmq/test_helper.go b/internal/util/rocksmq/client/rocksmq/test_helper.go index 316d39d9f4..9098502288 100644 --- a/internal/util/rocksmq/client/rocksmq/test_helper.go +++ b/internal/util/rocksmq/client/rocksmq/test_helper.go @@ -18,8 +18,10 @@ import ( "github.com/milvus-io/milvus/internal/allocator" rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" + "github.com/milvus-io/milvus/internal/log" rocksmq "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq" server "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq" + "go.uber.org/zap" ) func newTopicName() string { @@ -64,9 +66,18 @@ func newRocksMQ(rmqPath string) server.RocksMQ { func removePath(rmqPath string) { kvPath := rmqPath + "_kv" - os.RemoveAll(kvPath) + err := os.RemoveAll(kvPath) + if err != nil { + log.Error("os removeAll failed.", zap.Any("path", kvPath)) + } rocksdbPath := rmqPath + "_db" - os.RemoveAll(rocksdbPath) + err = os.RemoveAll(rocksdbPath) + if err != nil { + log.Error("os removeAll failed.", zap.Any("path", kvPath)) + } metaPath := rmqPath + "_meta_kv" - os.RemoveAll(metaPath) + err = os.RemoveAll(metaPath) + if err != nil { + log.Error("os removeAll failed.", zap.Any("path", kvPath)) + } } diff --git a/internal/util/rocksmq/server/rocksmq/global_rmq_test.go b/internal/util/rocksmq/server/rocksmq/global_rmq_test.go index 7641ee22a1..db0e91ac33 100644 --- a/internal/util/rocksmq/server/rocksmq/global_rmq_test.go +++ b/internal/util/rocksmq/server/rocksmq/global_rmq_test.go @@ -46,9 +46,10 @@ func Test_InitRmq(t *testing.T) { func Test_InitRocksMQ(t *testing.T) { // Params.Init() rmqPath := "/tmp/milvus/rdb_data_global" - os.Setenv("ROCKSMQ_PATH", rmqPath) + err := os.Setenv("ROCKSMQ_PATH", rmqPath) + assert.Nil(t, err) defer os.RemoveAll(rmqPath) - err := InitRocksMQ() + err = InitRocksMQ() defer Rmq.stopRetention() assert.NoError(t, err) defer CloseRocksMQ() diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go index 0d9d2c2948..559a3aaf3b 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go @@ -125,7 +125,10 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf // Because loadRetentionInfo may need some time, so do this asynchronously. Finally start retention goroutine. func (ri *retentionInfo) startRetentionInfo() { var wg sync.WaitGroup - ri.kv.ResetPrefixLength(FixedChannelNameLen) + err := ri.kv.ResetPrefixLength(FixedChannelNameLen) + if err != nil { + log.Warn("Start load retention info", zap.Error(err)) + } for _, topic := range ri.topics { log.Debug("Start load retention info", zap.Any("topic", topic)) // Load all page infos @@ -304,7 +307,7 @@ func (ri *retentionInfo) retention() error { // 4. Do delete by range of [start_msg_id, end_msg_id) in rocksdb // 5. Delete corresponding data in retentionInfo func (ri *retentionInfo) expiredCleanUp(topic string) error { - // log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic) + log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic) var ackedInfo *topicAckedInfo if info, ok := ri.ackedInfo.Load(topic); ok { ackedInfo = info.(*topicAckedInfo) @@ -428,7 +431,11 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { } else if pageStartID < pageEndID { pageWriteBatch.DeleteRange([]byte(pageStartKey), []byte(pageEndKey)) } - ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), pageWriteBatch) + err = ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), pageWriteBatch) + if err != nil { + log.Error("rocksdb write error", zap.Error(err)) + return err + } pageInfo.pageEndID = pageInfo.pageEndID[pageRetentionOffset:] } @@ -456,7 +463,11 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { } else { ackedTsWriteBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey)) } - ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), ackedTsWriteBatch) + err = ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), ackedTsWriteBatch) + if err != nil { + log.Error("rocksdb write error", zap.Error(err)) + return err + } // Update acked_size in rocksdb_kv diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go index ad265f734c..b1cb5384c2 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go @@ -29,7 +29,11 @@ import ( var retentionPath string = "/tmp/rmq_retention/" func TestMain(m *testing.M) { - os.MkdirAll(retentionPath, os.ModePerm) + err := os.MkdirAll(retentionPath, os.ModePerm) + if err != nil { + log.Error("MkdirALl error for path", zap.Any("path", retentionPath)) + return + } code := m.Run() os.Exit(code) } @@ -196,7 +200,8 @@ func TestRetentionInfo_LoadRetentionInfo(t *testing.T) { defer lock.Unlock() rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - initRetentionInfo(rmq.retentionInfo.kv, rmq.store) + _, err = initRetentionInfo(rmq.retentionInfo.kv, rmq.store) + assert.Nil(t, err) dummyTopic := strings.Repeat(topicName, 100) err = DeleteMessages(rmq.store, dummyTopic, 0, 0) @@ -208,90 +213,115 @@ func TestRetentionInfo_LoadRetentionInfo(t *testing.T) { ////////////////////////////////////////////////// ackedTsPrefix, _ := constructKey(AckedTsTitle, topicName) ackedTsKey0 := ackedTsPrefix + "/1" - rmq.retentionInfo.kv.Save(ackedTsKey0, "dummy") + err = rmq.retentionInfo.kv.Save(ackedTsKey0, "dummy") + assert.Nil(t, err) wg.Add(1) rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - rmq.retentionInfo.kv.Remove(ackedTsKey0) + err = rmq.retentionInfo.kv.Remove(ackedTsKey0) + assert.Nil(t, err) ////////////////////////////////////////////////// ackedTsKey1 := ackedTsPrefix + "/dummy" - rmq.retentionInfo.kv.Save(ackedTsKey1, "dummy") + err = rmq.retentionInfo.kv.Save(ackedTsKey1, "dummy") + assert.Nil(t, err) wg.Add(1) rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - rmq.retentionInfo.kv.Remove(ackedTsKey1) + err = rmq.retentionInfo.kv.Remove(ackedTsKey1) + assert.Nil(t, err) ////////////////////////////////////////////////// lastRetentionTsKey := LastRetTsTitle + topicName - rmq.retentionInfo.kv.Save(lastRetentionTsKey, strconv.FormatInt(1, 10)) + err = rmq.retentionInfo.kv.Save(lastRetentionTsKey, strconv.FormatInt(1, 10)) + assert.Nil(t, err) wg.Add(1) rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - rmq.retentionInfo.kv.Remove(lastRetentionTsKey) + err = rmq.retentionInfo.kv.Remove(lastRetentionTsKey) + assert.Nil(t, err) ////////////////////////////////////////////////// - rmq.retentionInfo.kv.Save(lastRetentionTsKey, "dummy") + err = rmq.retentionInfo.kv.Save(lastRetentionTsKey, "dummy") + assert.Nil(t, err) wg.Add(1) rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - rmq.retentionInfo.kv.Remove(lastRetentionTsKey) + err = rmq.retentionInfo.kv.Remove(lastRetentionTsKey) + assert.Nil(t, err) ////////////////////////////////////////////////// ackedSizeKey := AckedSizeTitle + topicName - rmq.retentionInfo.kv.Save(ackedSizeKey, strconv.FormatInt(1, 10)) + err = rmq.retentionInfo.kv.Save(ackedSizeKey, strconv.FormatInt(1, 10)) + assert.Nil(t, err) wg.Add(1) rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - rmq.retentionInfo.kv.Remove(ackedSizeKey) + err = rmq.retentionInfo.kv.Remove(ackedSizeKey) + assert.Nil(t, err) ////////////////////////////////////////////////// - rmq.retentionInfo.kv.Save(ackedSizeKey, "") + err = rmq.retentionInfo.kv.Save(ackedSizeKey, "") + assert.Nil(t, err) wg.Add(1) rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - rmq.retentionInfo.kv.Remove(ackedSizeKey) + err = rmq.retentionInfo.kv.Remove(ackedSizeKey) + assert.Nil(t, err) ////////////////////////////////////////////////// - rmq.retentionInfo.kv.Save(ackedSizeKey, "dummy") + err = rmq.retentionInfo.kv.Save(ackedSizeKey, "dummy") + assert.Nil(t, err) wg.Add(1) rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - rmq.retentionInfo.kv.Remove(ackedSizeKey) + err = rmq.retentionInfo.kv.Remove(ackedSizeKey) + assert.Nil(t, err) ////////////////////////////////////////////////// topicBeginIDKey := TopicBeginIDTitle + topicName - rmq.retentionInfo.kv.Save(topicBeginIDKey, "dummy") + err = rmq.retentionInfo.kv.Save(topicBeginIDKey, "dummy") + assert.Nil(t, err) wg.Add(1) rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - rmq.retentionInfo.kv.Remove(topicBeginIDKey) + err = rmq.retentionInfo.kv.Remove(topicBeginIDKey) + assert.Nil(t, err) //////////////////////////////////////////////////// fixedPageSizeKey0, _ := constructKey(PageMsgSizeTitle, topicName) pageMsgSizeKey0 := fixedPageSizeKey0 + "/" + "1" - rmq.retentionInfo.kv.Save(pageMsgSizeKey0, "dummy") + err = rmq.retentionInfo.kv.Save(pageMsgSizeKey0, "dummy") + assert.Nil(t, err) wg.Add(1) rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - rmq.retentionInfo.kv.Remove(pageMsgSizeKey0) + err = rmq.retentionInfo.kv.Remove(pageMsgSizeKey0) + assert.Nil(t, err) ////////////////////////////////////////////////// fixedPageSizeKey1, _ := constructKey(PageMsgSizeTitle, topicName) pageMsgSizeKey1 := fixedPageSizeKey1 + "/" + "dummy" - rmq.retentionInfo.kv.Save(pageMsgSizeKey1, "dummy") + err = rmq.retentionInfo.kv.Save(pageMsgSizeKey1, "dummy") + assert.Nil(t, err) wg.Add(1) rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - rmq.retentionInfo.kv.Remove(pageMsgSizeKey1) + err = rmq.retentionInfo.kv.Remove(pageMsgSizeKey1) + assert.Nil(t, err) + + ////////////////////////////////////////////////// + topicMu.Delete(topicName) + topicMu.Store(topicName, &sync.Mutex{}) + err = rmq.retentionInfo.expiredCleanUp(topicName) + assert.Nil(t, err) + + ////////////////////////////////////////////////// + topicMu.Delete(topicName) + err = rmq.retentionInfo.expiredCleanUp(topicName) + // TopicName has been deleted in line 310 + assert.NotNil(t, err) + + ////////////////////////////////////////////////// + rmq.retentionInfo.ackedInfo.Delete(topicName) + err = rmq.retentionInfo.expiredCleanUp(topicName) + assert.Nil(t, err) ////////////////////////////////////////////////// rmq.retentionInfo.kv.DB = nil wg.Add(1) rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - ////////////////////////////////////////////////// - topicMu.Delete(topicName) - topicMu.Store(topicName, topicName) - rmq.retentionInfo.expiredCleanUp(topicName) - - ////////////////////////////////////////////////// - topicMu.Delete(topicName) - rmq.retentionInfo.expiredCleanUp(topicName) - - ////////////////////////////////////////////////// - rmq.retentionInfo.ackedInfo.Delete(topicName) - rmq.retentionInfo.expiredCleanUp(topicName) } func TestRmqRetention_Complex(t *testing.T) {