diff --git a/core/src/config/ConfigInit.cpp b/core/src/config/ConfigInit.cpp index 7123dbfa5b..f63627dd9c 100644 --- a/core/src/config/ConfigInit.cpp +++ b/core/src/config/ConfigInit.cpp @@ -103,6 +103,7 @@ InitConfig() { /* wal */ {"wal.enable", CreateBoolConfig("wal.enable", &config.wal.enable.value, true)}, + {"wal.sync_mode", CreateBoolConfig("wal.sync_mode", &config.wal.sync_mode.value, false)}, {"wal.recovery_error_ignore", CreateBoolConfig("wal.recovery_error_ignore", &config.wal.recovery_error_ignore.value, false)}, {"wal.buffer_size", diff --git a/core/src/config/ServerConfig.h b/core/src/config/ServerConfig.h index 0ac3c83969..2f70bb5454 100644 --- a/core/src/config/ServerConfig.h +++ b/core/src/config/ServerConfig.h @@ -136,6 +136,7 @@ struct ServerConfig { struct WAL { Bool enable{false}; + Bool sync_mode{false}; Bool recovery_error_ignore{false}; Integer buffer_size{0}; String path{"unknown"}; diff --git a/core/src/db/wal/WalFile.cpp b/core/src/db/wal/WalFile.cpp index 9ab66f7ce6..e0d2b71074 100644 --- a/core/src/db/wal/WalFile.cpp +++ b/core/src/db/wal/WalFile.cpp @@ -21,6 +21,9 @@ namespace milvus { namespace engine { +WalFile::WalFile(bool sync) : sync_(sync) { +} + WalFile::~WalFile() { CloseFile(); } diff --git a/core/src/db/wal/WalFile.h b/core/src/db/wal/WalFile.h index 0b0b22adc2..5194685d6a 100644 --- a/core/src/db/wal/WalFile.h +++ b/core/src/db/wal/WalFile.h @@ -14,6 +14,7 @@ #include "db/Types.h" #include "utils/Status.h" +#include #include #include #include @@ -24,7 +25,7 @@ namespace engine { class WalFile { public: - WalFile() = default; + explicit WalFile(bool sync = false); virtual ~WalFile(); bool @@ -107,7 +108,12 @@ class WalFile { inline void Flush() { if (file_ && mode_ != OpenMode::READ) { - fflush(file_); + if (sync_) { + int fd = fileno(file_); + fsync(fd); + } else { + fflush(file_); + } } } @@ -136,6 +142,11 @@ class WalFile { OpenMode mode_ = OpenMode::NA; int64_t file_size_ = 0; std::string file_path_; + + // fflush() or fsync() to flush data into storage + // fflush() flush data to system kernel buffer, ensure data safety even milvus crashed + // fsync() flush data into to hard disk(much slower), ensure data safety even the machine shutdown or blackout + bool sync_ = false; }; using WalFilePtr = std::shared_ptr; diff --git a/core/src/db/wal/WalManager.cpp b/core/src/db/wal/WalManager.cpp index c6fbc3df86..a877c6aca7 100644 --- a/core/src/db/wal/WalManager.cpp +++ b/core/src/db/wal/WalManager.cpp @@ -10,6 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "db/wal/WalManager.h" +#include "config/ServerConfig.h" #include "db/Utils.h" #include "db/wal/WalOperationCodec.h" #include "utils/CommonUtil.h" @@ -224,7 +225,7 @@ WalManager::Recovery(const DBPtr& db, const CollectionMaxOpIDMap& max_op_ids) { // id_files arrange id in assendent, we know which file should be read for (auto& pair : id_files) { - WalFilePtr file = std::make_shared(); + WalFilePtr file = std::make_shared(sync_mode_); file->OpenFile(pair.second.c_str(), WalFile::READ); idx_t last_id = 0; file->ReadLastOpId(last_id); @@ -259,6 +260,8 @@ WalManager::Recovery(const DBPtr& db, const CollectionMaxOpIDMap& max_op_ids) { Status WalManager::Init() { + sync_mode_ = config.wal.sync_mode(); + try { using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator; DirectoryIterator iter(wal_path_); @@ -312,7 +315,7 @@ WalManager::RecordInsertOperation(const InsertEntityOperationPtr& operation, con std::lock_guard lock(file_map_mutex_); WalFilePtr file = file_map_[operation->collection_name_]; if (file == nullptr) { - file = std::make_shared(); + file = std::make_shared(sync_mode_); file_map_[operation->collection_name_] = file; file->OpenFile(path, WalFile::APPEND_WRITE); } else if (!file->IsOpened() || file->ExceedMaxSize(chunk_size)) { @@ -353,7 +356,7 @@ WalManager::RecordDeleteOperation(const DeleteEntityOperationPtr& operation, con std::lock_guard lock(file_map_mutex_); WalFilePtr file = file_map_[operation->collection_name_]; if (file == nullptr) { - file = std::make_shared(); + file = std::make_shared(sync_mode_); file_map_[operation->collection_name_] = file; file->OpenFile(path, WalFile::APPEND_WRITE); } else if (!file->IsOpened() || file->ExceedMaxSize(append_size)) { diff --git a/core/src/db/wal/WalManager.h b/core/src/db/wal/WalManager.h index b50f66e87f..2af8e512f6 100644 --- a/core/src/db/wal/WalManager.h +++ b/core/src/db/wal/WalManager.h @@ -96,6 +96,7 @@ class WalManager { SafeIDGenerator id_gen_; bool enable_ = false; + bool sync_mode_ = false; std::string wal_path_; int64_t insert_buffer_size_ = 0;