mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 10:08:42 +08:00
feat(db): add local meta
Former-commit-id: 30814a42efc7ad53847fc00d93903e1201c125d2
This commit is contained in:
parent
f5348ed57c
commit
2bc205b791
@ -15,7 +15,7 @@ class Env;
|
||||
|
||||
class DB {
|
||||
public:
|
||||
static DB* Open(const Options& options_, const std::string& name_);
|
||||
static DB* Open(const Options& options);
|
||||
|
||||
virtual Status add_group(const GroupOptions& options_,
|
||||
const std::string& group_id_,
|
||||
|
||||
@ -15,16 +15,15 @@ namespace zilliz {
|
||||
namespace vecwise {
|
||||
namespace engine {
|
||||
|
||||
DBImpl::DBImpl(const Options& options_, const std::string& name_)
|
||||
: _dbname(name_),
|
||||
_env(options_.env),
|
||||
_options(options_),
|
||||
DBImpl::DBImpl(const Options& options)
|
||||
: _env(options.env),
|
||||
_options(options),
|
||||
_bg_compaction_scheduled(false),
|
||||
_shutting_down(false),
|
||||
bg_build_index_started_(false),
|
||||
_pMeta(new meta::DBMetaImpl(_options.meta)),
|
||||
_pMemMgr(new MemManager(_pMeta)) {
|
||||
start_timer_task(options_.memory_sync_interval);
|
||||
start_timer_task(_options.memory_sync_interval);
|
||||
}
|
||||
|
||||
Status DBImpl::add_group(const GroupOptions& options,
|
||||
@ -249,8 +248,8 @@ DBImpl::~DBImpl() {
|
||||
|
||||
DB::~DB() {}
|
||||
|
||||
DB* DB::Open(const Options& options_, const std::string& name_) {
|
||||
DBImpl* impl = new DBImpl(options_, name_);
|
||||
DB* DB::Open(const Options& options) {
|
||||
DBImpl* impl = new DBImpl(options);
|
||||
return impl;
|
||||
}
|
||||
|
||||
|
||||
@ -20,7 +20,7 @@ namespace meta {
|
||||
|
||||
class DBImpl : public DB {
|
||||
public:
|
||||
DBImpl(const Options& options_, const std::string& name_);
|
||||
DBImpl(const Options& options);
|
||||
|
||||
virtual Status add_group(const GroupOptions& options_,
|
||||
const std::string& group_id_,
|
||||
@ -57,7 +57,6 @@ private:
|
||||
void background_call();
|
||||
void background_compaction();
|
||||
|
||||
const std::string& _dbname;
|
||||
Env* const _env;
|
||||
const Options _options;
|
||||
|
||||
|
||||
@ -19,8 +19,8 @@ long GetFileSize(const std::string& filename)
|
||||
return rc == 0 ? stat_buf.st_size : -1;
|
||||
}
|
||||
|
||||
DBMetaImpl::DBMetaImpl(const MetaOptions& options_)
|
||||
: _options(static_cast<const DBMetaOptions&>(options_)) {
|
||||
DBMetaImpl::DBMetaImpl(const DBMetaOptions& options_)
|
||||
: _options(options_) {
|
||||
initialize();
|
||||
}
|
||||
|
||||
|
||||
@ -11,7 +11,7 @@ namespace meta {
|
||||
|
||||
class DBMetaImpl : public Meta {
|
||||
public:
|
||||
DBMetaImpl(const MetaOptions& options_);
|
||||
DBMetaImpl(const DBMetaOptions& options_);
|
||||
|
||||
virtual Status add_group(const GroupOptions& options_,
|
||||
const std::string& group_id_,
|
||||
|
||||
217
cpp/src/db/LocalMetaImpl.cpp
Normal file
217
cpp/src/db/LocalMetaImpl.cpp
Normal file
@ -0,0 +1,217 @@
|
||||
#include <sys/stat.h>
|
||||
#include <unistd.h>
|
||||
#include <sstream>
|
||||
#include <iostream>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <boost/property_tree/ptree.hpp>
|
||||
#include <boost/property_tree/json_parser.hpp>
|
||||
|
||||
#include <fstream>
|
||||
#include "LocalMetaImpl.h"
|
||||
#include "IDGenerator.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace vecwise {
|
||||
namespace engine {
|
||||
namespace meta {
|
||||
|
||||
long GetFileSize(const std::string& filename)
|
||||
{
|
||||
struct stat stat_buf;
|
||||
int rc = stat(filename.c_str(), &stat_buf);
|
||||
return rc == 0 ? stat_buf.st_size : -1;
|
||||
}
|
||||
|
||||
std::string LocalMetaImpl::GetGroupPath(const std::string& group_id) {
|
||||
return _options.path + "/" + group_id;
|
||||
}
|
||||
|
||||
std::string LocalMetaImpl::GetGroupMetaPath(const std::string& group_id) {
|
||||
return GetGroupPath(group_id) + "/" + "meta";
|
||||
}
|
||||
|
||||
LocalMetaImpl::LocalMetaImpl(const DBMetaOptions& options_)
|
||||
: _options(options_) {
|
||||
initialize();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::initialize() {
|
||||
if (boost::filesystem::is_directory(_options.path)) {
|
||||
}
|
||||
else if (!boost::filesystem::create_directory(_options.path)) {
|
||||
return Status::InvalidDBPath("Cannot Create " + _options.path);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::add_group(const GroupOptions& options,
|
||||
const std::string& group_id,
|
||||
GroupSchema& group_info) {
|
||||
std::string real_gid;
|
||||
size_t id = SimpleIDGenerator().getNextIDNumber();
|
||||
if (group_id == "") {
|
||||
std::stringstream ss;
|
||||
ss << id;
|
||||
real_gid = ss.str();
|
||||
} else {
|
||||
real_gid = group_id;
|
||||
}
|
||||
|
||||
bool group_exist;
|
||||
has_group(real_gid, group_exist);
|
||||
if (group_exist) {
|
||||
return Status::GroupError("Group Already Existed " + real_gid);
|
||||
}
|
||||
if (!boost::filesystem::create_directory(GetGroupPath(real_gid))) {
|
||||
return Status::GroupError("Cannot Create Group " + real_gid);
|
||||
}
|
||||
|
||||
group_info.group_id = real_gid;
|
||||
group_info.files_cnt = 0;
|
||||
group_info.id = 0;
|
||||
group_info.location = GetGroupPath(real_gid);
|
||||
group_info.dimension = options.dimension;
|
||||
|
||||
boost::property_tree::ptree out;
|
||||
out.put("files_cnt", group_info.files_cnt);
|
||||
out.put("dimension", group_info.dimension);
|
||||
boost::property_tree::write_json(GetGroupMetaPath(real_gid), out);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::get_group(const std::string& group_id, GroupSchema& group_info) {
|
||||
bool group_exist;
|
||||
has_group(group_id, group_exist);
|
||||
if (!group_exist) {
|
||||
return Status::NotFound("Group " + group_id + " Not Found");
|
||||
}
|
||||
|
||||
boost::property_tree::ptree ptree;
|
||||
boost::property_tree::read_json(GetGroupMetaPath(group_id), ptree);
|
||||
auto files_cnt = ptree.get_child("files_cnt").data();
|
||||
auto dimension = ptree.get_child("dimension").data();
|
||||
std::cout << dimension << std::endl;
|
||||
std::cout << files_cnt << std::endl;
|
||||
|
||||
group_info.id = std::stoi(group_id);
|
||||
group_info.group_id = group_id;
|
||||
group_info.files_cnt = std::stoi(files_cnt);
|
||||
group_info.dimension = std::stoi(dimension);
|
||||
group_info.location = GetGroupPath(group_id);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::has_group(const std::string& group_id, bool& has_or_not) {
|
||||
has_or_not = boost::filesystem::is_directory(GetGroupPath(group_id));
|
||||
/* if (!has_or_not) return Status::OK(); */
|
||||
/* boost::filesystem::is_regular_file() */
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::add_group_file(const std::string& group_id,
|
||||
GroupFileSchema& group_file_info,
|
||||
GroupFileSchema::FILE_TYPE file_type) {
|
||||
return add_group_file(group_id, Meta::GetDate(), group_file_info);
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::add_group_file(const std::string& group_id,
|
||||
DateT date,
|
||||
GroupFileSchema& group_file_info,
|
||||
GroupFileSchema::FILE_TYPE file_type) {
|
||||
//PXU TODO
|
||||
std::stringstream ss;
|
||||
SimpleIDGenerator g;
|
||||
std::string suffix = (file_type == GroupFileSchema::RAW) ? ".raw" : ".index";
|
||||
ss << "/tmp/test/" << date
|
||||
<< "/" << g.getNextIDNumber()
|
||||
<< suffix;
|
||||
group_file_info.group_id = "1";
|
||||
group_file_info.dimension = 64;
|
||||
group_file_info.location = ss.str();
|
||||
group_file_info.date = date;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::files_to_index(GroupFilesSchema& files) {
|
||||
// PXU TODO
|
||||
files.clear();
|
||||
std::stringstream ss;
|
||||
ss << "/tmp/test/" << Meta::GetDate();
|
||||
boost::filesystem::path path(ss.str().c_str());
|
||||
boost::filesystem::directory_iterator end_itr;
|
||||
for (boost::filesystem::directory_iterator itr(path); itr != end_itr; ++itr) {
|
||||
/* std::cout << itr->path().string() << std::endl; */
|
||||
GroupFileSchema f;
|
||||
f.location = itr->path().string();
|
||||
std::string suffixStr = f.location.substr(f.location.find_last_of('.') + 1);
|
||||
if (suffixStr == "index") continue;
|
||||
if (1024*1024*1000 >= GetFileSize(f.location)) continue;
|
||||
std::cout << "[About to index] " << f.location << std::endl;
|
||||
f.date = Meta::GetDate();
|
||||
files.push_back(f);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::files_to_merge(const std::string& group_id,
|
||||
DatePartionedGroupFilesSchema& files) {
|
||||
//PXU TODO
|
||||
files.clear();
|
||||
std::stringstream ss;
|
||||
ss << "/tmp/test/" << Meta::GetDate();
|
||||
boost::filesystem::path path(ss.str().c_str());
|
||||
boost::filesystem::directory_iterator end_itr;
|
||||
GroupFilesSchema gfiles;
|
||||
DateT date = Meta::GetDate();
|
||||
files[date] = gfiles;
|
||||
for (boost::filesystem::directory_iterator itr(path); itr != end_itr; ++itr) {
|
||||
/* std::cout << itr->path().string() << std::endl; */
|
||||
GroupFileSchema f;
|
||||
f.location = itr->path().string();
|
||||
std::string suffixStr = f.location.substr(f.location.find_last_of('.') + 1);
|
||||
if (suffixStr == "index") continue;
|
||||
if (1024*1024*1000 < GetFileSize(f.location)) continue;
|
||||
std::cout << "About to merge " << f.location << std::endl;
|
||||
files[date].push_back(f);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::has_group_file(const std::string& group_id_,
|
||||
const std::string& file_id_,
|
||||
bool& has_or_not_) {
|
||||
//PXU TODO
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::get_group_file(const std::string& group_id_,
|
||||
const std::string& file_id_,
|
||||
GroupFileSchema& group_file_info_) {
|
||||
//PXU TODO
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::get_group_files(const std::string& group_id_,
|
||||
const int date_delta_,
|
||||
GroupFilesSchema& group_files_info_) {
|
||||
// PXU TODO
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::update_group_file(const GroupFileSchema& group_file_) {
|
||||
//PXU TODO
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::update_files(const GroupFilesSchema& files) {
|
||||
//PXU TODO
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace meta
|
||||
} // namespace engine
|
||||
} // namespace vecwise
|
||||
} // namespace zilliz
|
||||
68
cpp/src/db/LocalMetaImpl.h
Normal file
68
cpp/src/db/LocalMetaImpl.h
Normal file
@ -0,0 +1,68 @@
|
||||
#ifndef VECENGINE_DB_META_IMPL_H_
|
||||
#define VECENGINE_DB_META_IMPL_H_
|
||||
|
||||
#include "Meta.h"
|
||||
#include "Options.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace vecwise {
|
||||
namespace engine {
|
||||
namespace meta {
|
||||
|
||||
class LocalMetaImpl : public Meta {
|
||||
public:
|
||||
LocalMetaImpl(const DBMetaOptions& options_);
|
||||
|
||||
virtual Status add_group(const GroupOptions& options_,
|
||||
const std::string& group_id_,
|
||||
GroupSchema& group_info_) override;
|
||||
virtual Status get_group(const std::string& group_id_, GroupSchema& group_info_) override;
|
||||
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
|
||||
|
||||
virtual Status add_group_file(const std::string& group_id,
|
||||
DateT date,
|
||||
GroupFileSchema& group_file_info,
|
||||
GroupFileSchema::FILE_TYPE file_type=GroupFileSchema::RAW) override;
|
||||
|
||||
virtual Status add_group_file(const std::string& group_id_,
|
||||
GroupFileSchema& group_file_info_,
|
||||
GroupFileSchema::FILE_TYPE file_type=GroupFileSchema::RAW) override;
|
||||
|
||||
virtual Status has_group_file(const std::string& group_id_,
|
||||
const std::string& file_id_,
|
||||
bool& has_or_not_) override;
|
||||
virtual Status get_group_file(const std::string& group_id_,
|
||||
const std::string& file_id_,
|
||||
GroupFileSchema& group_file_info_) override;
|
||||
virtual Status update_group_file(const GroupFileSchema& group_file_) override;
|
||||
|
||||
virtual Status get_group_files(const std::string& group_id_,
|
||||
const int date_delta_,
|
||||
GroupFilesSchema& group_files_info_) override;
|
||||
|
||||
virtual Status update_files(const GroupFilesSchema& files) override;
|
||||
|
||||
virtual Status files_to_merge(const std::string& group_id,
|
||||
DatePartionedGroupFilesSchema& files) override;
|
||||
|
||||
virtual Status files_to_index(GroupFilesSchema&) override;
|
||||
|
||||
private:
|
||||
|
||||
std::string GetGroupPath(const std::string& group_id);
|
||||
std::string GetGroupMetaPath(const std::string& group_id);
|
||||
|
||||
Status CreateGroupMeta(const GroupSchema& group_schema);
|
||||
|
||||
Status initialize();
|
||||
|
||||
const DBMetaOptions _options;
|
||||
|
||||
}; // LocalMetaImpl
|
||||
|
||||
} // namespace meta
|
||||
} // namespace engine
|
||||
} // namespace vecwise
|
||||
} // namespace zilliz
|
||||
|
||||
#endif // VECENGINE_DB_META_IMPL_H_
|
||||
@ -21,7 +21,6 @@ struct GroupSchema {
|
||||
size_t files_cnt = 0;
|
||||
uint16_t dimension;
|
||||
std::string location = "";
|
||||
std::string next_file_location = "";
|
||||
}; // GroupSchema
|
||||
|
||||
|
||||
|
||||
@ -10,6 +10,11 @@ Options::Options()
|
||||
: env(Env::Default()) {
|
||||
}
|
||||
|
||||
/* DBMetaOptions::DBMetaOptions(const std::string& dbpath, */
|
||||
/* const std::string& uri) */
|
||||
/* : path(dbpath), backend_uri(uri) { */
|
||||
/* } */
|
||||
|
||||
} // namespace engine
|
||||
} // namespace vecwise
|
||||
} // namespace zilliz
|
||||
|
||||
@ -9,13 +9,10 @@ namespace engine {
|
||||
|
||||
class Env;
|
||||
|
||||
struct MetaOptions {
|
||||
}; // MetaOptions
|
||||
|
||||
|
||||
struct DBMetaOptions : public MetaOptions {
|
||||
struct DBMetaOptions {
|
||||
/* DBMetaOptions(const std::string&, const std::string&); */
|
||||
std::string path;
|
||||
std::string backend_uri;
|
||||
std::string dbname;
|
||||
}; // DBMetaOptions
|
||||
|
||||
|
||||
|
||||
@ -22,10 +22,20 @@ public:
|
||||
return Status(kNotFound, msg, msg2);
|
||||
}
|
||||
|
||||
static Status InvalidDBPath(const std::string& msg, const std::string& msg2="") {
|
||||
return Status(kInvalidDBPath, msg, msg2);
|
||||
}
|
||||
static Status GroupError(const std::string& msg, const std::string& msg2="") {
|
||||
return Status(kGroupError, msg, msg2);
|
||||
}
|
||||
|
||||
bool ok() const { return state_ == nullptr; }
|
||||
|
||||
bool IsNotFound() const { return code() == kNotFound; }
|
||||
|
||||
bool IsInvalidDBPath() const { return code() == kInvalidDBPath; }
|
||||
bool IsGroupError() const { return code() == kGroupError; }
|
||||
|
||||
std::string ToString() const;
|
||||
|
||||
private:
|
||||
@ -34,6 +44,9 @@ private:
|
||||
enum Code {
|
||||
kOK = 0,
|
||||
kNotFound,
|
||||
|
||||
kInvalidDBPath,
|
||||
kGroupError,
|
||||
};
|
||||
|
||||
Code code() const {
|
||||
|
||||
@ -21,13 +21,12 @@ namespace {
|
||||
if(s_db == nullptr) {
|
||||
engine::Options opt;
|
||||
|
||||
std::string db_path = "/tmp/test";
|
||||
ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_SERVER);
|
||||
opt.meta.backend_uri = config.GetValue(CONFIG_SERVER_DB_URL);
|
||||
opt.meta.dbname = config.GetValue(CONFIG_SERVER_DB_NAME);
|
||||
opt.meta.path = db_path;
|
||||
|
||||
std::string db_path = "/tmp/test";
|
||||
CommonUtil::CreateDirectory(db_path);
|
||||
s_db = engine::DB::Open(opt, db_path);
|
||||
s_db = engine::DB::Open(opt);
|
||||
}
|
||||
return s_db;
|
||||
}
|
||||
@ -210,4 +209,4 @@ VecServiceHandler::search_vector_batch(VecSearchResultList &_return,
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user