mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: add bitmap offset cache to speed up retrieve raw data (#35498)
#35458 Signed-off-by: luzhang <luzhang@zilliz.com> Co-authored-by: luzhang <luzhang@zilliz.com>
This commit is contained in:
parent
75da36d1aa
commit
42f7800b5b
@ -255,7 +255,7 @@ BitmapIndex<T>::Serialize(const Config& config) {
|
||||
ret_set.Append(BITMAP_INDEX_META, index_meta.first, index_meta.second);
|
||||
|
||||
LOG_INFO("build bitmap index with cardinality = {}, num_rows = {}",
|
||||
Cardinality(),
|
||||
data_.size(),
|
||||
total_num_rows_);
|
||||
|
||||
Disassemble(ret_set);
|
||||
@ -345,6 +345,31 @@ BitmapIndex<T>::DeserializeIndexData(const uint8_t* data_ptr,
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void
|
||||
BitmapIndex<T>::BuildOffsetCache() {
|
||||
if (build_mode_ == BitmapIndexBuildMode::ROARING) {
|
||||
data_offsets_cache_.resize(total_num_rows_);
|
||||
for (auto it = data_.begin(); it != data_.end(); it++) {
|
||||
for (const auto& v : it->second) {
|
||||
data_offsets_cache_[v] = it;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (auto it = bitsets_.begin(); it != bitsets_.end(); it++) {
|
||||
bitsets_offsets_cache_.resize(total_num_rows_);
|
||||
const auto& bits = it->second;
|
||||
for (int i = 0; i < bits.size(); i++) {
|
||||
if (bits[i]) {
|
||||
bitsets_offsets_cache_[i] = it;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
use_offset_cache_ = true;
|
||||
LOG_INFO("build offset cache for bitmap index");
|
||||
}
|
||||
|
||||
template <>
|
||||
void
|
||||
BitmapIndex<std::string>::DeserializeIndexData(const uint8_t* data_ptr,
|
||||
@ -377,6 +402,9 @@ template <typename T>
|
||||
void
|
||||
BitmapIndex<T>::LoadWithoutAssemble(const BinarySet& binary_set,
|
||||
const Config& config) {
|
||||
auto enable_offset_cache =
|
||||
GetValueFromConfig<bool>(config, ENABLE_OFFSET_CACHE);
|
||||
|
||||
auto index_meta_buffer = binary_set.GetByName(BITMAP_INDEX_META);
|
||||
auto index_meta = DeserializeIndexMeta(index_meta_buffer->data.get(),
|
||||
index_meta_buffer->size);
|
||||
@ -387,6 +415,10 @@ BitmapIndex<T>::LoadWithoutAssemble(const BinarySet& binary_set,
|
||||
auto index_data_buffer = binary_set.GetByName(BITMAP_INDEX_DATA);
|
||||
DeserializeIndexData(index_data_buffer->data.get(), index_length);
|
||||
|
||||
if (enable_offset_cache.has_value() && enable_offset_cache.value()) {
|
||||
BuildOffsetCache();
|
||||
}
|
||||
|
||||
LOG_INFO("load bitmap index with cardinality = {}, num_rows = {}",
|
||||
Cardinality(),
|
||||
total_num_rows_);
|
||||
@ -575,7 +607,6 @@ BitmapIndex<T>::RangeForRoaring(const T value, const OpType op) {
|
||||
}
|
||||
auto lb = data_.begin();
|
||||
auto ub = data_.end();
|
||||
|
||||
switch (op) {
|
||||
case OpType::LessThan: {
|
||||
ub = std::lower_bound(data_.begin(),
|
||||
@ -758,12 +789,26 @@ BitmapIndex<T>::RangeForRoaring(const T lower_value,
|
||||
return res;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
T
|
||||
BitmapIndex<T>::Reverse_Lookup_InCache(size_t idx) const {
|
||||
if (build_mode_ == BitmapIndexBuildMode::ROARING) {
|
||||
return data_offsets_cache_[idx]->first;
|
||||
} else {
|
||||
return bitsets_offsets_cache_[idx]->first;
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
T
|
||||
BitmapIndex<T>::Reverse_Lookup(size_t idx) const {
|
||||
AssertInfo(is_built_, "index has not been built");
|
||||
AssertInfo(idx < total_num_rows_, "out of range of total coun");
|
||||
|
||||
if (use_offset_cache_) {
|
||||
return Reverse_Lookup_InCache(idx);
|
||||
}
|
||||
|
||||
if (build_mode_ == BitmapIndexBuildMode::ROARING) {
|
||||
for (auto it = data_.begin(); it != data_.end(); it++) {
|
||||
for (const auto& v : it->second) {
|
||||
|
||||
@ -175,6 +175,12 @@ class BitmapIndex : public ScalarIndex<T> {
|
||||
void
|
||||
DeserializeIndexData(const uint8_t* data_ptr, size_t index_length);
|
||||
|
||||
void
|
||||
BuildOffsetCache();
|
||||
|
||||
T
|
||||
Reverse_Lookup_InCache(size_t idx) const;
|
||||
|
||||
void
|
||||
ChooseIndexLoadMode(int64_t index_length);
|
||||
|
||||
@ -210,6 +216,11 @@ class BitmapIndex : public ScalarIndex<T> {
|
||||
std::map<T, TargetBitmap> bitsets_;
|
||||
size_t total_num_rows_{0};
|
||||
proto::schema::FieldSchema schema_;
|
||||
bool use_offset_cache_{false};
|
||||
std::vector<typename std::map<T, roaring::Roaring>::iterator>
|
||||
data_offsets_cache_;
|
||||
std::vector<typename std::map<T, TargetBitmap>::iterator>
|
||||
bitsets_offsets_cache_;
|
||||
std::shared_ptr<storage::MemFileManagerImpl> file_manager_;
|
||||
|
||||
// generate valid_bitset to speed up NotIn and IsNull and IsNotNull operate
|
||||
|
||||
@ -25,9 +25,6 @@
|
||||
#include "common/Tracer.h"
|
||||
#include "common/Types.h"
|
||||
|
||||
const std::string kMmapFilepath = "mmap_filepath";
|
||||
const std::string kEnableMmap = "enable_mmap";
|
||||
|
||||
namespace milvus::index {
|
||||
|
||||
class IndexBase {
|
||||
|
||||
@ -58,6 +58,12 @@ constexpr const char* INDEX_ENGINE_VERSION = "index_engine_version";
|
||||
constexpr const char* BITMAP_INDEX_CARDINALITY_LIMIT =
|
||||
"bitmap_cardinality_limit";
|
||||
|
||||
// index config key
|
||||
constexpr const char* MMAP_FILE_PATH = "mmap_filepath";
|
||||
constexpr const char* ENABLE_MMAP = "enable_mmap";
|
||||
constexpr const char* INDEX_FILES = "index_files";
|
||||
constexpr const char* ENABLE_OFFSET_CACHE = "indexoffsetcache.enabled";
|
||||
|
||||
// VecIndex file metas
|
||||
constexpr const char* DISK_ANN_PREFIX_PATH = "index_prefix";
|
||||
constexpr const char* DISK_ANN_RAW_DATA_PATH = "data_path";
|
||||
|
||||
@ -201,7 +201,7 @@ StringIndexMarisa::LoadWithoutAssemble(const BinarySet& set,
|
||||
}
|
||||
|
||||
file.Seek(0, SEEK_SET);
|
||||
if (config.contains(kEnableMmap)) {
|
||||
if (config.contains(ENABLE_MMAP)) {
|
||||
trie_.mmap(file_name.c_str());
|
||||
} else {
|
||||
trie_.read(file.Descriptor());
|
||||
|
||||
@ -26,6 +26,7 @@
|
||||
#include <tuple>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
#include "common/Types.h"
|
||||
#include "common/FieldData.h"
|
||||
@ -79,7 +80,12 @@ void inline CheckParameter(Config& conf,
|
||||
template <typename T>
|
||||
inline std::optional<T>
|
||||
GetValueFromConfig(const Config& cfg, const std::string& key) {
|
||||
// cfg value are all string type
|
||||
if (cfg.contains(key)) {
|
||||
if constexpr (std::is_same_v<T, bool>) {
|
||||
return boost::algorithm::to_lower_copy(
|
||||
cfg.at(key).get<std::string>()) == "true";
|
||||
}
|
||||
return cfg.at(key).get<T>();
|
||||
}
|
||||
return std::nullopt;
|
||||
|
||||
@ -406,9 +406,9 @@ VectorDiskAnnIndex<T>::update_load_json(const Config& config) {
|
||||
}
|
||||
}
|
||||
|
||||
if (config.contains(kMmapFilepath)) {
|
||||
load_config.erase(kMmapFilepath);
|
||||
load_config[kEnableMmap] = true;
|
||||
if (config.contains(MMAP_FILE_PATH)) {
|
||||
load_config.erase(MMAP_FILE_PATH);
|
||||
load_config[ENABLE_MMAP] = true;
|
||||
}
|
||||
|
||||
return load_config;
|
||||
|
||||
@ -32,6 +32,7 @@
|
||||
|
||||
#include "index/Index.h"
|
||||
#include "index/IndexInfo.h"
|
||||
#include "index/Meta.h"
|
||||
#include "index/Utils.h"
|
||||
#include "common/EasyAssert.h"
|
||||
#include "config/ConfigKnowhere.h"
|
||||
@ -142,7 +143,7 @@ template <typename T>
|
||||
void
|
||||
VectorMemIndex<T>::Load(milvus::tracer::TraceContext ctx,
|
||||
const Config& config) {
|
||||
if (config.contains(kMmapFilepath)) {
|
||||
if (config.contains(MMAP_FILE_PATH)) {
|
||||
return LoadFromFile(config);
|
||||
}
|
||||
|
||||
@ -483,7 +484,7 @@ VectorMemIndex<T>::GetSparseVector(const DatasetPtr dataset) const {
|
||||
|
||||
template <typename T>
|
||||
void VectorMemIndex<T>::LoadFromFile(const Config& config) {
|
||||
auto filepath = GetValueFromConfig<std::string>(config, kMmapFilepath);
|
||||
auto filepath = GetValueFromConfig<std::string>(config, MMAP_FILE_PATH);
|
||||
AssertInfo(filepath.has_value(), "mmap filepath is empty when load index");
|
||||
|
||||
std::filesystem::create_directories(
|
||||
@ -598,8 +599,8 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
|
||||
|
||||
LOG_INFO("load index into Knowhere...");
|
||||
auto conf = config;
|
||||
conf.erase(kMmapFilepath);
|
||||
conf[kEnableMmap] = true;
|
||||
conf.erase(MMAP_FILE_PATH);
|
||||
conf[ENABLE_MMAP] = true;
|
||||
auto start_deserialize = std::chrono::system_clock::now();
|
||||
auto stat = index_.DeserializeFromFile(filepath.value(), conf);
|
||||
auto deserialize_duration =
|
||||
|
||||
@ -221,7 +221,6 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) {
|
||||
static_cast<milvus::segcore::LoadIndexInfo*>(c_load_index_info);
|
||||
auto& index_params = load_index_info->index_params;
|
||||
auto field_type = load_index_info->field_type;
|
||||
|
||||
auto engine_version = load_index_info->index_engine_version;
|
||||
|
||||
milvus::index::CreateIndexInfo index_info;
|
||||
@ -271,7 +270,7 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) {
|
||||
|
||||
auto config = milvus::index::ParseConfigFromIndexParams(
|
||||
load_index_info->index_params);
|
||||
config["index_files"] = load_index_info->index_files;
|
||||
config[milvus::index::INDEX_FILES] = load_index_info->index_files;
|
||||
|
||||
milvus::storage::FileManagerContext fileManagerContext(
|
||||
field_meta, index_meta, remote_chunk_manager);
|
||||
@ -289,9 +288,10 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) {
|
||||
std::to_string(load_index_info->field_id) /
|
||||
std::to_string(load_index_info->index_id);
|
||||
|
||||
config[kMmapFilepath] = filepath.string();
|
||||
config[milvus::index::MMAP_FILE_PATH] = filepath.string();
|
||||
}
|
||||
|
||||
LOG_DEBUG("load index with configs: {}", config.dump());
|
||||
load_index_info->index->Load(ctx, config);
|
||||
|
||||
span->End();
|
||||
|
||||
@ -272,13 +272,19 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
|
||||
func ValidateIndexParams(index *model.Index) error {
|
||||
indexType := GetIndexType(index.IndexParams)
|
||||
indexParams := funcutil.KeyValuePair2Map(index.IndexParams)
|
||||
userIndexParams := funcutil.KeyValuePair2Map(index.UserIndexParams)
|
||||
if err := indexparamcheck.ValidateMmapIndexParams(indexType, indexParams); err != nil {
|
||||
return merr.WrapErrParameterInvalidMsg("invalid mmap index params", err.Error())
|
||||
}
|
||||
userIndexParams := funcutil.KeyValuePair2Map(index.UserIndexParams)
|
||||
if err := indexparamcheck.ValidateMmapIndexParams(indexType, userIndexParams); err != nil {
|
||||
return merr.WrapErrParameterInvalidMsg("invalid mmap user index params", err.Error())
|
||||
}
|
||||
if err := indexparamcheck.ValidateOffsetCacheIndexParams(indexType, indexParams); err != nil {
|
||||
return merr.WrapErrParameterInvalidMsg("invalid offset cache index params", err.Error())
|
||||
}
|
||||
if err := indexparamcheck.ValidateOffsetCacheIndexParams(indexType, userIndexParams); err != nil {
|
||||
return merr.WrapErrParameterInvalidMsg("invalid offset cache index params", err.Error())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -167,10 +167,11 @@ const (
|
||||
|
||||
// common properties
|
||||
const (
|
||||
MmapEnabledKey = "mmap.enabled"
|
||||
LazyLoadEnableKey = "lazyload.enabled"
|
||||
PartitionKeyIsolationKey = "partitionkey.isolation"
|
||||
FieldSkipLoadKey = "field.skipLoad"
|
||||
MmapEnabledKey = "mmap.enabled"
|
||||
LazyLoadEnableKey = "lazyload.enabled"
|
||||
PartitionKeyIsolationKey = "partitionkey.isolation"
|
||||
FieldSkipLoadKey = "field.skipLoad"
|
||||
IndexOffsetCacheEnabledKey = "indexoffsetcache.enabled"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@ -29,4 +29,5 @@ func Test_BitmapIndexChecker(t *testing.T) {
|
||||
assert.Error(t, c.CheckValidDataType(&schemapb.FieldSchema{DataType: schemapb.DataType_Double}))
|
||||
assert.Error(t, c.CheckValidDataType(&schemapb.FieldSchema{DataType: schemapb.DataType_Array, ElementType: schemapb.DataType_Float}))
|
||||
assert.Error(t, c.CheckValidDataType(&schemapb.FieldSchema{DataType: schemapb.DataType_Array, ElementType: schemapb.DataType_Double}))
|
||||
assert.Error(t, c.CheckValidDataType(&schemapb.FieldSchema{DataType: schemapb.DataType_Double, IsPrimaryKey: true}))
|
||||
}
|
||||
|
||||
@ -16,6 +16,9 @@ func (c *BITMAPChecker) CheckTrain(params map[string]string) error {
|
||||
}
|
||||
|
||||
func (c *BITMAPChecker) CheckValidDataType(field *schemapb.FieldSchema) error {
|
||||
if field.IsPrimaryKey {
|
||||
return fmt.Errorf("create bitmap index on primary key not supported")
|
||||
}
|
||||
mainType := field.GetDataType()
|
||||
elemType := field.GetElementType()
|
||||
if !typeutil.IsBoolType(mainType) && !typeutil.IsIntegerType(mainType) &&
|
||||
|
||||
@ -73,6 +73,10 @@ func IsVectorMmapIndex(indexType IndexType) bool {
|
||||
indexType == IndexSparseWand
|
||||
}
|
||||
|
||||
func IsOffsetCacheSupported(indexType IndexType) bool {
|
||||
return indexType == IndexBitmap
|
||||
}
|
||||
|
||||
func IsDiskIndex(indexType IndexType) bool {
|
||||
return indexType == IndexDISKANN
|
||||
}
|
||||
@ -96,3 +100,18 @@ func ValidateMmapIndexParams(indexType IndexType, indexParams map[string]string)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func ValidateOffsetCacheIndexParams(indexType IndexType, indexParams map[string]string) error {
|
||||
offsetCacheEnable, ok := indexParams[common.IndexOffsetCacheEnabledKey]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
enable, err := strconv.ParseBool(offsetCacheEnable)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid %s value: %s, expected: true, false", common.IndexOffsetCacheEnabledKey, offsetCacheEnable)
|
||||
}
|
||||
if enable && IsOffsetCacheSupported(indexType) {
|
||||
return fmt.Errorf("only bitmap index support %s now", common.IndexOffsetCacheEnabledKey)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -54,6 +54,7 @@ var configableIndexParams = typeutil.NewSet[string]()
|
||||
|
||||
func init() {
|
||||
configableIndexParams.Insert(common.MmapEnabledKey)
|
||||
configableIndexParams.Insert(common.IndexOffsetCacheEnabledKey)
|
||||
}
|
||||
|
||||
func IsConfigableIndexParam(key string) bool {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user