mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
set wal lsn to meta (#3638)
* wal lsn Signed-off-by: groot <yihua.mo@zilliz.com> * temp comment Signed-off-by: groot <yihua.mo@zilliz.com> * recovery from meta lsn Signed-off-by: groot <yihua.mo@zilliz.com>
This commit is contained in:
parent
5564f7b6ec
commit
2fde7ea98f
@ -86,9 +86,17 @@ MemSegment::Serialize() {
|
||||
return status;
|
||||
}
|
||||
|
||||
// get max op id
|
||||
idx_t max_op_id = 0;
|
||||
for (auto& action : actions_) {
|
||||
if (action.op_id_ > max_op_id) {
|
||||
max_op_id = action.op_id_;
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<snapshot::NewSegmentOperation> new_seg_operation;
|
||||
segment::SegmentWriterPtr segment_writer;
|
||||
status = CreateNewSegment(ss, new_seg_operation, segment_writer);
|
||||
status = CreateNewSegment(ss, new_seg_operation, segment_writer, max_op_id);
|
||||
if (!status.ok()) {
|
||||
LOG_ENGINE_ERROR_ << "Failed to create new segment";
|
||||
return status;
|
||||
@ -119,12 +127,6 @@ MemSegment::Serialize() {
|
||||
LOG_ENGINE_DEBUG_ << "New segment " << seg_id << " of collection " << collection_id_ << " serialized";
|
||||
|
||||
// notify wal the max operation id is done
|
||||
idx_t max_op_id = 0;
|
||||
for (auto& action : actions_) {
|
||||
if (action.op_id_ > max_op_id) {
|
||||
max_op_id = action.op_id_;
|
||||
}
|
||||
}
|
||||
WalManager::GetInstance().OperationDone(ss->GetName(), max_op_id);
|
||||
|
||||
return Status::OK();
|
||||
@ -132,10 +134,11 @@ MemSegment::Serialize() {
|
||||
|
||||
Status
|
||||
MemSegment::CreateNewSegment(snapshot::ScopedSnapshotT& ss, std::shared_ptr<snapshot::NewSegmentOperation>& operation,
|
||||
segment::SegmentWriterPtr& writer) {
|
||||
segment::SegmentWriterPtr& writer, idx_t max_op_id) {
|
||||
// create segment
|
||||
snapshot::SegmentPtr segment;
|
||||
snapshot::OperationContext context;
|
||||
// context.lsn = max_op_id;
|
||||
context.prev_partition = ss->GetResource<snapshot::Partition>(partition_id_);
|
||||
operation = std::make_shared<snapshot::NewSegmentOperation>(context, ss);
|
||||
auto status = operation->CommitNewSegment(segment);
|
||||
|
||||
@ -63,7 +63,7 @@ class MemSegment {
|
||||
private:
|
||||
Status
|
||||
CreateNewSegment(snapshot::ScopedSnapshotT& ss, std::shared_ptr<snapshot::NewSegmentOperation>& operation,
|
||||
segment::SegmentWriterPtr& writer);
|
||||
segment::SegmentWriterPtr& writer, idx_t max_op_id);
|
||||
|
||||
Status
|
||||
ApplyDeleteToMem();
|
||||
|
||||
@ -104,8 +104,8 @@ WalManager::DropCollection(const std::string& collection_name) {
|
||||
if (!path.empty()) {
|
||||
WalFile file;
|
||||
file.OpenFile(path, WalFile::OVER_WRITE);
|
||||
bool del = true;
|
||||
file.Write<bool>(&del);
|
||||
idx_t op_id = id_gen_.GetNextIDNumber();
|
||||
file.Write<idx_t>(&op_id);
|
||||
|
||||
AddCleanupTask(collection_name);
|
||||
StartCleanupThread();
|
||||
@ -177,7 +177,7 @@ WalManager::OperationDone(const std::string& collection_name, idx_t op_id) {
|
||||
}
|
||||
|
||||
Status
|
||||
WalManager::Recovery(const DBPtr& db) {
|
||||
WalManager::Recovery(const DBPtr& db, const CollectionMaxOpIDMap& max_op_ids) {
|
||||
WaitCleanupFinish();
|
||||
|
||||
try {
|
||||
@ -205,6 +205,12 @@ WalManager::Recovery(const DBPtr& db) {
|
||||
}
|
||||
}
|
||||
|
||||
auto iter = max_op_ids.find(collection_name);
|
||||
if (iter != max_op_ids.end()) {
|
||||
idx_t outer_max_id = iter->second;
|
||||
max_op_id = outer_max_id > max_op_id ? outer_max_id : max_op_id;
|
||||
}
|
||||
|
||||
// id_files arrange id in assendent, we know which file should be read
|
||||
for (auto& pair : id_files) {
|
||||
WalFilePtr file = std::make_shared<WalFile>();
|
||||
|
||||
@ -32,6 +32,8 @@ namespace engine {
|
||||
extern const char* WAL_MAX_OP_FILE_NAME;
|
||||
extern const char* WAL_DEL_FILE_NAME;
|
||||
|
||||
using CollectionMaxOpIDMap = std::unordered_map<std::string, idx_t>;
|
||||
|
||||
class WalManager {
|
||||
public:
|
||||
static WalManager&
|
||||
@ -52,8 +54,10 @@ class WalManager {
|
||||
Status
|
||||
OperationDone(const std::string& collection_name, idx_t op_id);
|
||||
|
||||
// max_op_ids is from meta system, wal also record a max id for each collection
|
||||
// compare the two max id, use the max one as recovery base
|
||||
Status
|
||||
Recovery(const DBPtr& db);
|
||||
Recovery(const DBPtr& db, const CollectionMaxOpIDMap& max_op_ids);
|
||||
|
||||
private:
|
||||
WalManager();
|
||||
|
||||
@ -13,13 +13,35 @@
|
||||
#include "config/ServerConfig.h"
|
||||
#include "db/SnapshotUtils.h"
|
||||
#include "db/Utils.h"
|
||||
#include "db/snapshot/Snapshots.h"
|
||||
#include "db/wal/WalManager.h"
|
||||
#include "db/wal/WalOperation.h"
|
||||
#include "utils/Exception.h"
|
||||
|
||||
#include <utility>
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
namespace {
|
||||
|
||||
Status
|
||||
CollectMaxOpIDFromMeta(CollectionMaxOpIDMap& max_op_ids) {
|
||||
std::vector<std::string> collection_names;
|
||||
snapshot::Snapshots::GetInstance().GetCollectionNames(collection_names);
|
||||
for (auto& collection_name : collection_names) {
|
||||
snapshot::ScopedSnapshotT ss;
|
||||
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
|
||||
if (status.ok()) {
|
||||
max_op_ids.insert(std::make_pair(collection_name, ss->GetMaxLsn()));
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
WalProxy::WalProxy(const DBPtr& db, const DBOptions& options) : DBProxy(db, options) {
|
||||
// db must implemented
|
||||
if (db == nullptr) {
|
||||
@ -37,7 +59,10 @@ WalProxy::Start() {
|
||||
|
||||
if (options_.wal_enable_) {
|
||||
WalManager::GetInstance().Start(options_);
|
||||
WalManager::GetInstance().Recovery(db_);
|
||||
|
||||
CollectionMaxOpIDMap max_op_ids;
|
||||
CollectMaxOpIDFromMeta(max_op_ids);
|
||||
WalManager::GetInstance().Recovery(db_, max_op_ids);
|
||||
}
|
||||
|
||||
return status;
|
||||
@ -56,8 +81,12 @@ WalProxy::Stop() {
|
||||
|
||||
Status
|
||||
WalProxy::DropCollection(const std::string& collection_name) {
|
||||
WalManager::GetInstance().DropCollection(collection_name);
|
||||
return db_->DropCollection(collection_name);
|
||||
auto status = db_->DropCollection(collection_name);
|
||||
if (status.ok()) {
|
||||
WalManager::GetInstance().DropCollection(collection_name);
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
Status
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
namespace milvus {
|
||||
|
||||
@ -514,7 +514,8 @@ TEST_F(WalTest, WalManagerTest) {
|
||||
}
|
||||
|
||||
DummyDBPtr db_2 = std::make_shared<DummyDB>(options);
|
||||
WalManager::GetInstance().Recovery(db_2);
|
||||
milvus::engine::CollectionMaxOpIDMap max_op_ids;
|
||||
WalManager::GetInstance().Recovery(db_2, max_op_ids);
|
||||
ASSERT_EQ(db_2->InsertCount(), insert_count);
|
||||
ASSERT_EQ(db_2->DeleteCount(), delete_count);
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user