mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
parent
45ecc9ae9a
commit
5e60e61042
@ -103,6 +103,7 @@ InitConfig() {
|
||||
|
||||
/* wal */
|
||||
{"wal.enable", CreateBoolConfig("wal.enable", &config.wal.enable.value, true)},
|
||||
{"wal.sync_mode", CreateBoolConfig("wal.sync_mode", &config.wal.sync_mode.value, false)},
|
||||
{"wal.recovery_error_ignore",
|
||||
CreateBoolConfig("wal.recovery_error_ignore", &config.wal.recovery_error_ignore.value, false)},
|
||||
{"wal.buffer_size",
|
||||
|
||||
@ -136,6 +136,7 @@ struct ServerConfig {
|
||||
|
||||
struct WAL {
|
||||
Bool enable{false};
|
||||
Bool sync_mode{false};
|
||||
Bool recovery_error_ignore{false};
|
||||
Integer buffer_size{0};
|
||||
String path{"unknown"};
|
||||
|
||||
@ -21,6 +21,9 @@
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
WalFile::WalFile(bool sync) : sync_(sync) {
|
||||
}
|
||||
|
||||
WalFile::~WalFile() {
|
||||
CloseFile();
|
||||
}
|
||||
|
||||
@ -14,6 +14,7 @@
|
||||
#include "db/Types.h"
|
||||
#include "utils/Status.h"
|
||||
|
||||
#include <unistd.h>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
@ -24,7 +25,7 @@ namespace engine {
|
||||
|
||||
class WalFile {
|
||||
public:
|
||||
WalFile() = default;
|
||||
explicit WalFile(bool sync = false);
|
||||
virtual ~WalFile();
|
||||
|
||||
bool
|
||||
@ -107,7 +108,12 @@ class WalFile {
|
||||
inline void
|
||||
Flush() {
|
||||
if (file_ && mode_ != OpenMode::READ) {
|
||||
fflush(file_);
|
||||
if (sync_) {
|
||||
int fd = fileno(file_);
|
||||
fsync(fd);
|
||||
} else {
|
||||
fflush(file_);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -136,6 +142,11 @@ class WalFile {
|
||||
OpenMode mode_ = OpenMode::NA;
|
||||
int64_t file_size_ = 0;
|
||||
std::string file_path_;
|
||||
|
||||
// fflush() or fsync() to flush data into storage
|
||||
// fflush() flush data to system kernel buffer, ensure data safety even milvus crashed
|
||||
// fsync() flush data into to hard disk(much slower), ensure data safety even the machine shutdown or blackout
|
||||
bool sync_ = false;
|
||||
};
|
||||
|
||||
using WalFilePtr = std::shared_ptr<WalFile>;
|
||||
|
||||
@ -10,6 +10,7 @@
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#include "db/wal/WalManager.h"
|
||||
#include "config/ServerConfig.h"
|
||||
#include "db/Utils.h"
|
||||
#include "db/wal/WalOperationCodec.h"
|
||||
#include "utils/CommonUtil.h"
|
||||
@ -224,7 +225,7 @@ WalManager::Recovery(const DBPtr& db, const CollectionMaxOpIDMap& max_op_ids) {
|
||||
|
||||
// id_files arrange id in assendent, we know which file should be read
|
||||
for (auto& pair : id_files) {
|
||||
WalFilePtr file = std::make_shared<WalFile>();
|
||||
WalFilePtr file = std::make_shared<WalFile>(sync_mode_);
|
||||
file->OpenFile(pair.second.c_str(), WalFile::READ);
|
||||
idx_t last_id = 0;
|
||||
file->ReadLastOpId(last_id);
|
||||
@ -259,6 +260,8 @@ WalManager::Recovery(const DBPtr& db, const CollectionMaxOpIDMap& max_op_ids) {
|
||||
|
||||
Status
|
||||
WalManager::Init() {
|
||||
sync_mode_ = config.wal.sync_mode();
|
||||
|
||||
try {
|
||||
using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator;
|
||||
DirectoryIterator iter(wal_path_);
|
||||
@ -312,7 +315,7 @@ WalManager::RecordInsertOperation(const InsertEntityOperationPtr& operation, con
|
||||
std::lock_guard<std::mutex> lock(file_map_mutex_);
|
||||
WalFilePtr file = file_map_[operation->collection_name_];
|
||||
if (file == nullptr) {
|
||||
file = std::make_shared<WalFile>();
|
||||
file = std::make_shared<WalFile>(sync_mode_);
|
||||
file_map_[operation->collection_name_] = file;
|
||||
file->OpenFile(path, WalFile::APPEND_WRITE);
|
||||
} else if (!file->IsOpened() || file->ExceedMaxSize(chunk_size)) {
|
||||
@ -353,7 +356,7 @@ WalManager::RecordDeleteOperation(const DeleteEntityOperationPtr& operation, con
|
||||
std::lock_guard<std::mutex> lock(file_map_mutex_);
|
||||
WalFilePtr file = file_map_[operation->collection_name_];
|
||||
if (file == nullptr) {
|
||||
file = std::make_shared<WalFile>();
|
||||
file = std::make_shared<WalFile>(sync_mode_);
|
||||
file_map_[operation->collection_name_] = file;
|
||||
file->OpenFile(path, WalFile::APPEND_WRITE);
|
||||
} else if (!file->IsOpened() || file->ExceedMaxSize(append_size)) {
|
||||
|
||||
@ -96,6 +96,7 @@ class WalManager {
|
||||
SafeIDGenerator id_gen_;
|
||||
|
||||
bool enable_ = false;
|
||||
bool sync_mode_ = false;
|
||||
std::string wal_path_;
|
||||
int64_t insert_buffer_size_ = 0;
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user