mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-04 18:02:08 +08:00
MS-626 Refactor DataObj to support cache any type data
Former-commit-id: d3ac029a909bfc0a98f3f968ba52326243da40d4
This commit is contained in:
parent
504965412d
commit
729ccc8c25
@ -28,6 +28,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- MS-609 - Update task construct function
|
||||
- MS-611 - Add resources validity check in ResourceMgr
|
||||
- MS-619 - Add optimizer class in scheduler
|
||||
- MS-626 - Refactor DataObj to support cache any type data
|
||||
|
||||
## New Feature
|
||||
|
||||
|
||||
12
cpp/src/cache/Cache.inl
vendored
12
cpp/src/cache/Cache.inl
vendored
@ -86,11 +86,11 @@ Cache<ItemObj>::insert(const std::string &key, const ItemObj &item) {
|
||||
//if key already exist, subtract old item size
|
||||
if (lru_.exists(key)) {
|
||||
const ItemObj &old_item = lru_.get(key);
|
||||
usage_ -= old_item->size();
|
||||
usage_ -= old_item->Size();
|
||||
}
|
||||
|
||||
//plus new item size
|
||||
usage_ += item->size();
|
||||
usage_ += item->Size();
|
||||
}
|
||||
|
||||
//if usage exceed capacity, free some items
|
||||
@ -106,7 +106,7 @@ Cache<ItemObj>::insert(const std::string &key, const ItemObj &item) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
lru_.put(key, item);
|
||||
SERVER_LOG_DEBUG << "Insert " << key << " size:" << item->size()
|
||||
SERVER_LOG_DEBUG << "Insert " << key << " size:" << item->Size()
|
||||
<< " bytes into cache, usage: " << usage_ << " bytes";
|
||||
}
|
||||
}
|
||||
@ -120,9 +120,9 @@ Cache<ItemObj>::erase(const std::string &key) {
|
||||
}
|
||||
|
||||
const ItemObj &old_item = lru_.get(key);
|
||||
usage_ -= old_item->size();
|
||||
usage_ -= old_item->Size();
|
||||
|
||||
SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->size();
|
||||
SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->Size();
|
||||
|
||||
lru_.erase(key);
|
||||
}
|
||||
@ -160,7 +160,7 @@ Cache<ItemObj>::free_memory() {
|
||||
auto &obj_ptr = it->second;
|
||||
|
||||
key_array.emplace(key);
|
||||
released_size += obj_ptr->size();
|
||||
released_size += obj_ptr->Size();
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
8
cpp/src/cache/CpuCacheMgr.cpp
vendored
8
cpp/src/cache/CpuCacheMgr.cpp
vendored
@ -59,14 +59,10 @@ CpuCacheMgr::GetInstance() {
|
||||
return &s_mgr;
|
||||
}
|
||||
|
||||
engine::VecIndexPtr
|
||||
DataObjPtr
|
||||
CpuCacheMgr::GetIndex(const std::string& key) {
|
||||
DataObjPtr obj = GetItem(key);
|
||||
if (obj != nullptr) {
|
||||
return obj->data();
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
return obj;
|
||||
}
|
||||
|
||||
} // namespace cache
|
||||
|
||||
2
cpp/src/cache/CpuCacheMgr.h
vendored
2
cpp/src/cache/CpuCacheMgr.h
vendored
@ -35,7 +35,7 @@ class CpuCacheMgr : public CacheMgr<DataObjPtr> {
|
||||
static CpuCacheMgr*
|
||||
GetInstance();
|
||||
|
||||
engine::VecIndexPtr
|
||||
DataObjPtr
|
||||
GetIndex(const std::string& key);
|
||||
};
|
||||
|
||||
|
||||
34
cpp/src/cache/DataObj.h
vendored
34
cpp/src/cache/DataObj.h
vendored
@ -17,7 +17,6 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "src/wrapper/VecIndex.h"
|
||||
|
||||
#include <memory>
|
||||
|
||||
@ -26,38 +25,9 @@ namespace cache {
|
||||
|
||||
class DataObj {
|
||||
public:
|
||||
explicit DataObj(const engine::VecIndexPtr& index) : index_(index) {
|
||||
}
|
||||
virtual int64_t
|
||||
Size() = 0;
|
||||
|
||||
DataObj(const engine::VecIndexPtr& index, int64_t size) : index_(index), size_(size) {
|
||||
}
|
||||
|
||||
engine::VecIndexPtr
|
||||
data() {
|
||||
return index_;
|
||||
}
|
||||
|
||||
const engine::VecIndexPtr&
|
||||
data() const {
|
||||
return index_;
|
||||
}
|
||||
|
||||
int64_t
|
||||
size() const {
|
||||
if (index_ == nullptr) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (size_ > 0) {
|
||||
return size_;
|
||||
}
|
||||
|
||||
return index_->Count() * index_->Dimension() * sizeof(float);
|
||||
}
|
||||
|
||||
private:
|
||||
engine::VecIndexPtr index_ = nullptr;
|
||||
int64_t size_ = 0;
|
||||
};
|
||||
|
||||
using DataObjPtr = std::shared_ptr<DataObj>;
|
||||
|
||||
8
cpp/src/cache/GpuCacheMgr.cpp
vendored
8
cpp/src/cache/GpuCacheMgr.cpp
vendored
@ -71,14 +71,10 @@ GpuCacheMgr::GetInstance(uint64_t gpu_id) {
|
||||
}
|
||||
}
|
||||
|
||||
engine::VecIndexPtr
|
||||
DataObjPtr
|
||||
GpuCacheMgr::GetIndex(const std::string& key) {
|
||||
DataObjPtr obj = GetItem(key);
|
||||
if (obj != nullptr) {
|
||||
return obj->data();
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
return obj;
|
||||
}
|
||||
|
||||
} // namespace cache
|
||||
|
||||
2
cpp/src/cache/GpuCacheMgr.h
vendored
2
cpp/src/cache/GpuCacheMgr.h
vendored
@ -35,7 +35,7 @@ class GpuCacheMgr : public CacheMgr<DataObjPtr> {
|
||||
static GpuCacheMgr*
|
||||
GetInstance(uint64_t gpu_id);
|
||||
|
||||
engine::VecIndexPtr
|
||||
DataObjPtr
|
||||
GetIndex(const std::string& key);
|
||||
|
||||
private:
|
||||
|
||||
@ -91,6 +91,60 @@ ExecutionEngineImpl::CreatetVecIndex(EngineType type) {
|
||||
return index;
|
||||
}
|
||||
|
||||
void
|
||||
ExecutionEngineImpl::HybridLoad() {
|
||||
// if (index_type_ != EngineType::FAISS_IVFSQ8Hybrid) {
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// const std::string key = location_ + ".quantizer";
|
||||
// std::vector<uint64_t> gpus;
|
||||
//
|
||||
// // cache hit
|
||||
// {
|
||||
// int64_t selected = -1;
|
||||
// void* quantizer = nullptr;
|
||||
// for (auto& gpu : gpus) {
|
||||
// auto cache = cache::GpuCacheMgr::GetInstance(gpu);
|
||||
// if (auto quan = cache->GetIndex(key)) {
|
||||
// selected = gpu;
|
||||
// quantizer = quan;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// if (selected != -1) {
|
||||
// // set quantizer into index;
|
||||
// return;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // cache miss
|
||||
// {
|
||||
// std::vector<int64_t> all_free_mem;
|
||||
// for (auto& gpu : gpus) {
|
||||
// auto cache = cache::GpuCacheMgr::GetInstance(gpu);
|
||||
// auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
|
||||
// all_free_mem.push_back(free_mem);
|
||||
// }
|
||||
//
|
||||
// auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
|
||||
// auto best = std::distance(all_free_mem.begin(), max_e);
|
||||
//
|
||||
// // load to best device;
|
||||
// // cache quantizer
|
||||
// }
|
||||
//
|
||||
// // if index_type == Hybrid
|
||||
//
|
||||
// // 1. quantizer in which gpu
|
||||
//
|
||||
// // 2.1 which gpu cache best
|
||||
//
|
||||
// // 2.2 load to that gpu cache
|
||||
//
|
||||
// // set quantizer into index
|
||||
}
|
||||
|
||||
Status
|
||||
ExecutionEngineImpl::AddWithIds(int64_t n, const float* xdata, const int64_t* xids) {
|
||||
auto status = index_->Add(n, xdata, xids);
|
||||
@ -133,7 +187,7 @@ ExecutionEngineImpl::Serialize() {
|
||||
|
||||
Status
|
||||
ExecutionEngineImpl::Load(bool to_cache) {
|
||||
index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
|
||||
index_ = std::static_pointer_cast<VecIndex>(cache::CpuCacheMgr::GetInstance()->GetIndex(location_));
|
||||
bool already_in_cache = (index_ != nullptr);
|
||||
if (!already_in_cache) {
|
||||
try {
|
||||
@ -161,7 +215,7 @@ ExecutionEngineImpl::Load(bool to_cache) {
|
||||
|
||||
Status
|
||||
ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
|
||||
auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
|
||||
auto index = std::static_pointer_cast<VecIndex>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_));
|
||||
bool already_in_cache = (index != nullptr);
|
||||
if (already_in_cache) {
|
||||
index_ = index;
|
||||
@ -189,7 +243,7 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
|
||||
|
||||
Status
|
||||
ExecutionEngineImpl::CopyToCpu() {
|
||||
auto index = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
|
||||
auto index = std::static_pointer_cast<VecIndex>(cache::CpuCacheMgr::GetInstance()->GetIndex(location_));
|
||||
bool already_in_cache = (index != nullptr);
|
||||
if (already_in_cache) {
|
||||
index_ = index;
|
||||
@ -322,7 +376,7 @@ ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t npr
|
||||
|
||||
Status
|
||||
ExecutionEngineImpl::Cache() {
|
||||
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index_, PhysicalSize());
|
||||
cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index_);
|
||||
milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, obj);
|
||||
|
||||
return Status::OK();
|
||||
@ -330,7 +384,7 @@ ExecutionEngineImpl::Cache() {
|
||||
|
||||
Status
|
||||
ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
|
||||
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index_, PhysicalSize());
|
||||
cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index_);
|
||||
milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, obj);
|
||||
|
||||
return Status::OK();
|
||||
|
||||
@ -104,6 +104,9 @@ class ExecutionEngineImpl : public ExecutionEngine {
|
||||
VecIndexPtr
|
||||
Load(const std::string& location);
|
||||
|
||||
void
|
||||
HybridLoad();
|
||||
|
||||
protected:
|
||||
VecIndexPtr index_ = nullptr;
|
||||
EngineType index_type_;
|
||||
|
||||
@ -34,6 +34,11 @@
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
int64_t
|
||||
VecIndex::Size() {
|
||||
return Count() * Dimension() * sizeof(float);
|
||||
}
|
||||
|
||||
struct FileIOReader {
|
||||
std::fstream fs;
|
||||
std::string name;
|
||||
|
||||
@ -20,6 +20,7 @@
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include "cache/DataObj.h"
|
||||
#include "knowhere/common/BinarySet.h"
|
||||
#include "knowhere/common/Config.h"
|
||||
#include "utils/Status.h"
|
||||
@ -48,7 +49,7 @@ class VecIndex;
|
||||
|
||||
using VecIndexPtr = std::shared_ptr<VecIndex>;
|
||||
|
||||
class VecIndex {
|
||||
class VecIndex : public cache::DataObj {
|
||||
public:
|
||||
virtual Status
|
||||
BuildAll(const int64_t& nb, const float* xb, const int64_t* ids, const Config& cfg, const int64_t& nt = 0,
|
||||
@ -81,6 +82,9 @@ class VecIndex {
|
||||
virtual int64_t
|
||||
Count() = 0;
|
||||
|
||||
int64_t
|
||||
Size() override;
|
||||
|
||||
virtual knowhere::BinarySet
|
||||
Serialize() = 0;
|
||||
|
||||
|
||||
@ -141,7 +141,7 @@ insert_dummy_index_into_gpu_cache(uint64_t device_id) {
|
||||
mock_index->ntotal_ = 1000;
|
||||
engine::VecIndexPtr index(mock_index);
|
||||
|
||||
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index);
|
||||
cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index);
|
||||
|
||||
cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location", obj);
|
||||
}
|
||||
|
||||
@ -145,7 +145,7 @@ TEST(CacheTest, CPU_CACHE_TEST) {
|
||||
for (uint64_t i = 0; i < item_count; i++) {
|
||||
//each vector is 1k byte, total size less than 1G
|
||||
ms::engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 1000000);
|
||||
ms::cache::DataObjPtr data_obj = std::make_shared<ms::cache::DataObj>(mock_index);
|
||||
ms::cache::DataObjPtr data_obj = std::static_pointer_cast<ms::cache::DataObj>(mock_index);
|
||||
cpu_mgr->InsertItem("index_" + std::to_string(i), data_obj);
|
||||
}
|
||||
ASSERT_LT(cpu_mgr->ItemCount(), g_num);
|
||||
@ -169,7 +169,7 @@ TEST(CacheTest, CPU_CACHE_TEST) {
|
||||
|
||||
//each vector is 1k byte, total size less than 6G
|
||||
ms::engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 6000000);
|
||||
ms::cache::DataObjPtr data_obj = std::make_shared<ms::cache::DataObj>(mock_index);
|
||||
ms::cache::DataObjPtr data_obj = std::static_pointer_cast<ms::cache::DataObj>(mock_index);
|
||||
cpu_mgr->InsertItem("index_6g", data_obj);
|
||||
ASSERT_TRUE(cpu_mgr->ItemExists("index_6g"));
|
||||
}
|
||||
@ -183,7 +183,7 @@ TEST(CacheTest, GPU_CACHE_TEST) {
|
||||
for (int i = 0; i < 20; i++) {
|
||||
//each vector is 1k byte
|
||||
ms::engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 1000);
|
||||
ms::cache::DataObjPtr data_obj = std::make_shared<ms::cache::DataObj>(mock_index);
|
||||
ms::cache::DataObjPtr data_obj = std::static_pointer_cast<ms::cache::DataObj>(mock_index);
|
||||
gpu_mgr->InsertItem("index_" + std::to_string(i), data_obj);
|
||||
}
|
||||
|
||||
@ -196,8 +196,8 @@ TEST(CacheTest, GPU_CACHE_TEST) {
|
||||
// TODO(myh): use gpu index to mock
|
||||
//each vector is 1k byte, total size less than 2G
|
||||
ms::engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 2000000);
|
||||
ms::cache::DataObjPtr data_obj = std::make_shared<ms::cache::DataObj>(mock_index);
|
||||
std::cout << data_obj->size() << std::endl;
|
||||
ms::cache::DataObjPtr data_obj = std::static_pointer_cast<ms::cache::DataObj>(mock_index);
|
||||
std::cout << data_obj->Size() << std::endl;
|
||||
gpu_mgr->InsertItem("index_" + std::to_string(i), data_obj);
|
||||
}
|
||||
|
||||
@ -227,7 +227,7 @@ TEST(CacheTest, INVALID_TEST) {
|
||||
for (int i = 0; i < 20; i++) {
|
||||
//each vector is 1k byte
|
||||
ms::engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 2);
|
||||
ms::cache::DataObjPtr data_obj = std::make_shared<ms::cache::DataObj>(mock_index);
|
||||
ms::cache::DataObjPtr data_obj = std::static_pointer_cast<ms::cache::DataObj>(mock_index);
|
||||
mgr.InsertItem("index_" + std::to_string(i), data_obj);
|
||||
}
|
||||
ASSERT_EQ(mgr.GetItem("index_0"), nullptr);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user