enhance: Extend SkipIndex with IN/Match support and BloomFilter (#44581)

issue: #44584

---------

Signed-off-by: thekingking <1677273255@qq.com>
This commit is contained in:
Jingsong Yin 2025-10-31 22:39:32 +08:00 committed by GitHub
parent 950e8f1f92
commit 0cc79772e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 3423 additions and 464 deletions

View File

@ -50,6 +50,7 @@ class MilvusConan(ConanFile):
"unordered_dense/4.4.0#6a855c992618cc4c63019109a2e47298", "unordered_dense/4.4.0#6a855c992618cc4c63019109a2e47298",
"mongo-cxx-driver/3.11.0#ae206de0e90fb8cb2fb95465fb8b2f01", "mongo-cxx-driver/3.11.0#ae206de0e90fb8cb2fb95465fb8b2f01",
"geos/3.12.0#0b177c90c25a8ca210578fb9e2899c37", "geos/3.12.0#0b177c90c25a8ca210578fb9e2899c37",
"icu/74.2#cd1937b9561b8950a2ae6311284c5813",
) )
generators = ("cmake", "cmake_find_package") generators = ("cmake", "cmake_find_package")
default_options = { default_options = {
@ -90,6 +91,8 @@ class MilvusConan(ConanFile):
"onetbb:tbbproxy": False, "onetbb:tbbproxy": False,
"gdal:shared": True, "gdal:shared": True,
"gdal:fPIC": True, "gdal:fPIC": True,
"icu:shared": False,
"icu:data_packaging": "library",
} }
def configure(self): def configure(self):

View File

@ -0,0 +1,363 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <memory>
#include <string>
#include <string_view>
#include <vector>
#include "nlohmann/json.hpp"
#include <algorithm>
#include <cmath>
#include <cstdint>
#include <stdexcept>
#include "log/Log.h"
#include "xxhash.h" // from xxhash/xxhash
namespace milvus {
const std::string UNSUPPORTED_BF_NAME = "Unsupported BloomFilter";
const std::string BLOCKED_BF_NAME = "BlockedBloomFilter";
const std::string ALWAYS_TRUE_BF_NAME = "AlwaysTrueBloomFilter";
enum class BFType {
Unsupported = 0,
AlwaysTrue, // empty bloom filter
Blocked,
};
inline std::string
BFTypeToString(BFType type) {
switch (type) {
case BFType::Blocked:
return BLOCKED_BF_NAME;
case BFType::AlwaysTrue:
return ALWAYS_TRUE_BF_NAME;
default:
return UNSUPPORTED_BF_NAME;
}
}
inline BFType
StringToBFType(std::string_view name) {
if (name == BLOCKED_BF_NAME) {
return BFType::Blocked;
}
if (name == ALWAYS_TRUE_BF_NAME) {
return BFType::AlwaysTrue;
}
return BFType::Unsupported;
}
class BloomFilter {
public:
virtual ~BloomFilter() = default;
virtual BFType
Type() const = 0;
virtual uint64_t
Cap() const = 0;
virtual uint32_t
K() const = 0;
virtual void
Add(const unsigned char* data, size_t len) = 0;
virtual void
Add(std::string_view data) = 0;
virtual bool
Test(const unsigned char* data, size_t len) const = 0;
virtual bool
Test(std::string_view data) const = 0;
virtual bool
TestLocations(const std::vector<uint64_t>& locs) const = 0;
virtual std::vector<bool>
BatchTestLocations(const std::vector<std::vector<uint64_t>>& locs,
const std::vector<bool>& hits) const = 0;
virtual nlohmann::json
ToJson() const = 0;
};
using BloomFilterPtr = std::shared_ptr<BloomFilter>;
class BlockedBloomFilter : public BloomFilter {
public:
BlockedBloomFilter(uint64_t capacity, double fp) {
double m = -static_cast<double>(capacity) * std::log(fp) /
(std::log(2.0) * std::log(2.0));
num_bits_ = static_cast<uint64_t>(std::ceil(m));
double k = (m / capacity) * std::log(2.0);
k_ = std::max(1u, static_cast<uint32_t>(std::round(k)));
size_t num_blocks = (num_bits_ + 63) / 64;
bits_.resize(num_blocks, 0);
LOG_DEBUG(
"Created BlockedBloomFilter: capacity={}, fp={}, bits={}, k={}",
capacity,
fp,
num_bits_,
k_);
}
explicit BlockedBloomFilter(const nlohmann::json& data) {
if (!data.contains("bits") || !data.contains("num_bits") ||
!data.contains("k")) {
throw std::runtime_error(
"Invalid JSON for BlockedBloomFilter: missing required fields");
}
bits_ = data["bits"].get<std::vector<uint64_t>>();
num_bits_ = data["num_bits"].get<uint64_t>();
k_ = data["k"].get<uint32_t>();
}
BFType
Type() const override {
return BFType::Blocked;
}
uint64_t
Cap() const override {
return num_bits_;
}
uint32_t
K() const override {
return k_;
}
void
Add(const uint8_t* data, size_t len) override {
uint64_t hash = XXH3_64bits(data, len);
AddHash(hash);
}
void
Add(std::string_view data) override {
uint64_t hash = XXH3_64bits(data.data(), data.size());
AddHash(hash);
}
bool
Test(const uint8_t* data, size_t len) const override {
uint64_t hash = XXH3_64bits(data, len);
return TestHash(hash);
}
bool
Test(std::string_view data) const override {
uint64_t hash = XXH3_64bits(data.data(), data.size());
return TestHash(hash);
}
bool
TestLocations(const std::vector<uint64_t>& locs) const override {
if (locs.size() != 1) {
return true;
}
return TestHash(locs[0]);
}
std::vector<bool>
BatchTestLocations(const std::vector<std::vector<uint64_t>>& locs,
const std::vector<bool>& hits) const override {
std::vector<bool> ret(locs.size(), false);
for (size_t i = 0; i < hits.size(); ++i) {
if (!hits[i]) {
if (locs[i].size() != 1) {
ret[i] = true;
continue;
}
ret[i] = TestHash(locs[i][0]);
}
}
return ret;
}
nlohmann::json
ToJson() const override {
nlohmann::json data;
data["type"] = BFTypeToString(Type());
data["bits"] = bits_;
data["num_bits"] = num_bits_;
data["k"] = k_;
return data;
}
private:
void
AddHash(uint64_t hash) {
uint64_t h1 = hash;
uint64_t h2 = hash >> 32;
for (uint32_t i = 0; i < k_; ++i) {
uint64_t combined_hash = h1 + i * h2;
uint64_t bit_pos = combined_hash % num_bits_;
uint64_t block_idx = bit_pos / 64;
uint64_t bit_idx = bit_pos % 64;
bits_[block_idx] |= (1ULL << bit_idx);
}
}
bool
TestHash(uint64_t hash) const {
uint64_t h1 = hash;
uint64_t h2 = hash >> 32;
for (uint32_t i = 0; i < k_; ++i) {
uint64_t combined_hash = h1 + i * h2;
uint64_t bit_pos = combined_hash % num_bits_;
uint64_t block_idx = bit_pos / 64;
uint64_t bit_idx = bit_pos % 64;
if ((bits_[block_idx] & (1ULL << bit_idx)) == 0) {
return false;
}
}
return true;
}
private:
std::vector<uint64_t> bits_;
uint64_t num_bits_;
uint32_t k_;
};
class AlwaysTrueBloomFilter : public BloomFilter {
public:
BFType
Type() const override {
return BFType::AlwaysTrue;
}
uint64_t
Cap() const override {
return 0;
}
uint32_t
K() const override {
return 0;
}
void
Add(const unsigned char* data, size_t len) override {
}
void
Add(std::string_view data) override {
}
bool
Test(const unsigned char* data, size_t len) const override {
return true;
}
bool
Test(std::string_view data) const override {
return true;
}
bool
TestLocations(const std::vector<uint64_t>& locs) const override {
return true;
}
std::vector<bool>
BatchTestLocations(const std::vector<std::vector<uint64_t>>& locs,
const std::vector<bool>& hits) const override {
return std::vector<bool>(locs.size(), true); // 全部返回 true
}
nlohmann::json
ToJson() const override {
nlohmann::json data;
data["type"] = BFTypeToString(Type());
return data;
}
};
static const BloomFilterPtr g_always_true_bf =
std::make_shared<AlwaysTrueBloomFilter>();
inline BloomFilterPtr
NewBloomFilterWithType(uint64_t capacity, double fp, BFType type) {
switch (type) {
case BFType::Blocked:
return std::make_shared<BlockedBloomFilter>(capacity, fp);
case BFType::AlwaysTrue:
return g_always_true_bf;
default:
LOG_WARN(
"Unsupported bloom filter type {}, falling back to BlockedBF",
static_cast<int>(type));
return std::make_shared<BlockedBloomFilter>(capacity, fp);
}
}
inline BloomFilterPtr
NewBloomFilterWithType(uint64_t capacity,
double fp,
std::string_view type_name) {
BFType type = StringToBFType(type_name);
return NewBloomFilterWithType(capacity, fp, type);
}
inline BloomFilterPtr
BloomFilterFromJson(const nlohmann::json& data) {
if (!data.contains("type")) {
throw std::runtime_error(
"JSON data for bloom filter missing 'type' field");
}
std::string type_str = data["type"].get<std::string>();
BFType type = StringToBFType(type_str);
switch (type) {
case BFType::Blocked:
return std::make_shared<BlockedBloomFilter>(data);
case BFType::AlwaysTrue:
return g_always_true_bf;
default:
throw std::runtime_error("Unsupported bloom filter type: " +
type_str);
}
}
inline std::vector<uint64_t>
Locations(const uint8_t* data, size_t len, uint32_t k, BFType bf_type) {
switch (bf_type) {
case BFType::Blocked:
return {XXH3_64bits(data, len)};
case BFType::AlwaysTrue:
return {};
default:
LOG_WARN(
"Unsupported bloom filter type in Locations, returning empty");
return {};
}
}
inline std::vector<uint64_t>
Locations(std::string_view data, uint32_t k, BFType bf_type) {
return Locations(
reinterpret_cast<const uint8_t*>(data.data()), data.size(), k, bf_type);
}
} // namespace milvus

View File

@ -0,0 +1,297 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <gtest/gtest.h>
#include "common/BloomFilter.h"
using namespace milvus;
TEST(BloomFilterTest, BlockedBF_BasicOperations) {
auto bf = NewBloomFilterWithType(1000, 0.01, BFType::Blocked);
ASSERT_EQ(bf->Type(), BFType::Blocked);
ASSERT_GT(bf->Cap(), 0);
ASSERT_GT(bf->K(), 0);
bf->Add("hello");
bf->Add("world");
bf->Add("milvus");
EXPECT_TRUE(bf->Test("hello"));
EXPECT_TRUE(bf->Test("world"));
EXPECT_TRUE(bf->Test("milvus"));
}
TEST(BloomFilterTest, BlockedBF_Locations) {
auto bf = NewBloomFilterWithType(1000, 0.01, BFType::Blocked);
bf->Add("test_key");
auto locs = Locations("test_key", bf->K(), bf->Type());
ASSERT_EQ(locs.size(), 1);
EXPECT_TRUE(bf->TestLocations(locs));
auto locs2 = Locations("non_existent", bf->K(), bf->Type());
EXPECT_FALSE(bf->TestLocations(locs2));
}
TEST(BloomFilterTest, BlockedBF_BatchTestLocations) {
auto bf = NewBloomFilterWithType(1000, 0.01, BFType::Blocked);
std::vector<std::string> keys = {"key1", "key2", "key3", "key4"};
std::vector<std::vector<uint64_t>> locs;
for (size_t i = 0; i < 2; ++i) {
bf->Add(keys[i]);
}
for (const auto& key : keys) {
locs.push_back(Locations(key, bf->K(), bf->Type()));
}
std::vector<bool> hits(keys.size(), false);
auto results = bf->BatchTestLocations(locs, hits);
EXPECT_TRUE(results[0]);
EXPECT_TRUE(results[1]);
EXPECT_FALSE(results[2]);
EXPECT_FALSE(results[3]);
}
TEST(BloomFilterTest, BlockedBF_Serialization) {
auto bf = NewBloomFilterWithType(1000, 0.01, BFType::Blocked);
bf->Add("serialize_test");
bf->Add("data");
nlohmann::json json_data = bf->ToJson();
auto bf2 = BloomFilterFromJson(json_data);
ASSERT_EQ(bf2->Type(), BFType::Blocked);
EXPECT_TRUE(bf2->Test("serialize_test"));
EXPECT_TRUE(bf2->Test("data"));
EXPECT_FALSE(bf2->Test("not_added"));
}
TEST(BloomFilterTest, AlwaysTrueBF_BasicOperations) {
auto bf = NewBloomFilterWithType(0, 0.0, BFType::AlwaysTrue);
ASSERT_EQ(bf->Type(), BFType::AlwaysTrue);
ASSERT_EQ(bf->Cap(), 0);
ASSERT_EQ(bf->K(), 0);
bf->Add("anything");
EXPECT_TRUE(bf->Test("anything"));
EXPECT_TRUE(bf->Test("something"));
EXPECT_TRUE(bf->Test(""));
}
TEST(BloomFilterTest, AlwaysTrueBF_Locations) {
auto bf = NewBloomFilterWithType(0, 0.0, BFType::AlwaysTrue);
auto locs = Locations("test", bf->K(), bf->Type());
EXPECT_TRUE(locs.empty());
EXPECT_TRUE(bf->TestLocations({}));
EXPECT_TRUE(bf->TestLocations({1, 2, 3}));
}
TEST(BloomFilterTest, AlwaysTrueBF_Serialization) {
auto bf = NewBloomFilterWithType(0, 0.0, BFType::AlwaysTrue);
nlohmann::json json_data = bf->ToJson();
auto bf2 = BloomFilterFromJson(json_data);
ASSERT_EQ(bf2->Type(), BFType::AlwaysTrue);
EXPECT_TRUE(bf2->Test("anything"));
}
TEST(BloomFilterTest, BlockedBF_EmptyString) {
auto bf = NewBloomFilterWithType(100, 0.01, BFType::Blocked);
bf->Add("");
EXPECT_TRUE(bf->Test(""));
EXPECT_FALSE(bf->Test("non_empty"));
}
TEST(BloomFilterTest, BlockedBF_SpecialCharacters) {
auto bf = NewBloomFilterWithType(1000, 0.01, BFType::Blocked);
std::vector<std::string> special_strings = {"\0\0\0",
"\n\r\t",
"无懈可击",
"🚀🎉🔥",
"\\x00\\xff\\xaa",
"\"'`",
"<>&",
""};
for (const auto& s : special_strings) {
bf->Add(s);
}
for (const auto& s : special_strings) {
EXPECT_TRUE(bf->Test(s)) << "Failed for: " << s;
}
}
TEST(BloomFilterTest, BlockedBF_BinaryData) {
auto bf = NewBloomFilterWithType(1000, 0.01, BFType::Blocked);
uint8_t binary_data1[] = {0x00, 0x01, 0x02, 0xff, 0xfe};
uint8_t binary_data2[] = {0xaa, 0xbb, 0xcc, 0xdd, 0xee};
uint8_t binary_data3[] = {0x00, 0x00, 0x00, 0x00};
bf->Add(binary_data1, sizeof(binary_data1));
bf->Add(binary_data2, sizeof(binary_data2));
bf->Add(binary_data3, sizeof(binary_data3));
EXPECT_TRUE(bf->Test(binary_data1, sizeof(binary_data1)));
EXPECT_TRUE(bf->Test(binary_data2, sizeof(binary_data2)));
EXPECT_TRUE(bf->Test(binary_data3, sizeof(binary_data3)));
uint8_t not_added[] = {0x11, 0x22, 0x33};
EXPECT_FALSE(bf->Test(not_added, sizeof(not_added)));
}
TEST(BloomFilterTest, BlockedBF_IntegerTypes) {
auto bf = NewBloomFilterWithType(1000, 0.01, BFType::Blocked);
int32_t int32_vals[] = {0, -1, 1, INT32_MAX, INT32_MIN};
for (auto val : int32_vals) {
bf->Add(reinterpret_cast<const uint8_t*>(&val), sizeof(val));
}
for (auto val : int32_vals) {
EXPECT_TRUE(
bf->Test(reinterpret_cast<const uint8_t*>(&val), sizeof(val)));
}
int64_t int64_vals[] = {0L, -1L, 1L, INT64_MAX, INT64_MIN};
for (auto val : int64_vals) {
bf->Add(reinterpret_cast<const uint8_t*>(&val), sizeof(val));
}
for (auto val : int64_vals) {
EXPECT_TRUE(
bf->Test(reinterpret_cast<const uint8_t*>(&val), sizeof(val)));
}
}
TEST(BloomFilterTest, BlockedBF_VerySmallCapacity) {
auto bf = NewBloomFilterWithType(1, 0.01, BFType::Blocked);
bf->Add("test");
EXPECT_TRUE(bf->Test("test"));
}
TEST(BloomFilterTest, BlockedBF_VeryLargeCapacity) {
auto bf = NewBloomFilterWithType(1000000, 0.01, BFType::Blocked);
ASSERT_GT(bf->Cap(), 0);
ASSERT_GT(bf->K(), 0);
bf->Add("test");
EXPECT_TRUE(bf->Test("test"));
}
TEST(BloomFilterTest, BlockedBF_FalsePositiveRate) {
const int capacity = 10000;
const double expected_fp_rate = 0.01;
auto bf =
NewBloomFilterWithType(capacity, expected_fp_rate, BFType::Blocked);
std::vector<std::string> added_keys;
for (int i = 0; i < capacity; ++i) {
std::string key = "key_" + std::to_string(i);
added_keys.push_back(key);
bf->Add(key);
}
for (const auto& key : added_keys) {
EXPECT_TRUE(bf->Test(key));
}
int false_positives = 0;
const int test_count = 10000;
for (int i = 0; i < test_count; ++i) {
std::string test_key = "test_key_" + std::to_string(i + capacity);
if (bf->Test(test_key)) {
false_positives++;
}
}
double actual_fp_rate = static_cast<double>(false_positives) / test_count;
EXPECT_LT(actual_fp_rate, expected_fp_rate * 3);
}
TEST(BloomFilterTest, BlockedBF_SerializationIntegrity) {
auto bf1 = NewBloomFilterWithType(5000, 0.01, BFType::Blocked);
std::vector<std::string> test_data;
for (int i = 0; i < 1000; ++i) {
test_data.push_back("key_" + std::to_string(i));
bf1->Add(test_data.back());
}
nlohmann::json json_data = bf1->ToJson();
ASSERT_TRUE(json_data.contains("type"));
ASSERT_TRUE(json_data.contains("bits"));
ASSERT_TRUE(json_data.contains("num_bits"));
ASSERT_TRUE(json_data.contains("k"));
auto bf2 = BloomFilterFromJson(json_data);
EXPECT_EQ(bf1->Type(), bf2->Type());
EXPECT_EQ(bf1->Cap(), bf2->Cap());
EXPECT_EQ(bf1->K(), bf2->K());
for (const auto& key : test_data) {
EXPECT_TRUE(bf2->Test(key));
}
}
TEST(BloomFilterTest, BlockedBF_InvalidJsonDeserialization) {
nlohmann::json invalid_json1;
invalid_json1["bits"] = std::vector<uint64_t>{1, 2, 3};
invalid_json1["num_bits"] = 192;
invalid_json1["k"] = 7;
EXPECT_THROW(BloomFilterFromJson(invalid_json1), std::runtime_error);
nlohmann::json invalid_json2;
invalid_json2["type"] = "BlockedBloomFilter";
invalid_json2["num_bits"] = 192;
invalid_json2["k"] = 7;
EXPECT_THROW(BloomFilterFromJson(invalid_json2), std::runtime_error);
nlohmann::json invalid_json3;
invalid_json3["type"] = "UnknownType";
EXPECT_THROW(BloomFilterFromJson(invalid_json3), std::runtime_error);
}
TEST(BloomFilterTest, LocationsConsistency) {
std::string data1 = "consistent_test";
std::string data2 = "consistent_test";
auto locs1 = Locations(data1, 7, BFType::Blocked);
auto locs2 = Locations(data2, 7, BFType::Blocked);
ASSERT_EQ(locs1.size(), locs2.size());
for (size_t i = 0; i < locs1.size(); ++i) {
EXPECT_EQ(locs1[i], locs2[i]);
}
std::string data3 = "different_test";
auto locs3 = Locations(data3, 7, BFType::Blocked);
EXPECT_NE(locs1, locs3);
}

View File

@ -104,6 +104,10 @@ const bool DEFAULT_GROWING_JSON_KEY_STATS_ENABLED = false;
const bool DEFAULT_CONFIG_PARAM_TYPE_CHECK_ENABLED = true; const bool DEFAULT_CONFIG_PARAM_TYPE_CHECK_ENABLED = true;
const bool DEFAULT_ENABLE_PARQUET_STATS_SKIP_INDEX = false; const bool DEFAULT_ENABLE_PARQUET_STATS_SKIP_INDEX = false;
// skipindex stats related
const double DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE = 0.01;
const int64_t DEFAULT_SKIPINDEX_MIN_NGRAM_LENGTH = 3;
// index config related // index config related
const std::string SEGMENT_INSERT_FILES_KEY = "segment_insert_files"; const std::string SEGMENT_INSERT_FILES_KEY = "segment_insert_files";
const std::string INSERT_FILES_KEY = "insert_files"; const std::string INSERT_FILES_KEY = "insert_files";

View File

@ -1815,10 +1815,18 @@ PhyBinaryArithOpEvalRangeExpr::ExecRangeVisitorImplForData(
} }
} }
}; };
auto skip_index_func =
[op_type, arith_type, value, right_operand](
const SkipIndex& skip_index, FieldId field_id, int64_t chunk_id) {
return skip_index.CanSkipBinaryArithRange<T>(
field_id, chunk_id, op_type, arith_type, value, right_operand);
};
int64_t processed_size; int64_t processed_size;
if (has_offset_input_) { if (has_offset_input_) {
processed_size = ProcessDataByOffsets<T>(execute_sub_batch, processed_size = ProcessDataByOffsets<T>(execute_sub_batch,
std::nullptr_t{}, skip_index_func,
input, input,
res, res,
valid_res, valid_res,
@ -1826,7 +1834,7 @@ PhyBinaryArithOpEvalRangeExpr::ExecRangeVisitorImplForData(
right_operand); right_operand);
} else { } else {
processed_size = ProcessDataChunks<T>(execute_sub_batch, processed_size = ProcessDataChunks<T>(execute_sub_batch,
std::nullptr_t{}, skip_index_func,
res, res,
valid_res, valid_res,
value, value,

View File

@ -951,17 +951,25 @@ PhyTermFilterExpr::ExecVisitorImplForData(EvalCtx& context) {
} }
processed_cursor += size; processed_cursor += size;
}; };
auto set = std::static_pointer_cast<SetElement<T>>(arg_set_);
auto skip_index_func =
[set](const SkipIndex& skip_index, FieldId field_id, int64_t chunk_id) {
return skip_index.CanSkipInQuery<T>(
field_id, chunk_id, set->GetElements());
};
int64_t processed_size; int64_t processed_size;
if (has_offset_input_) { if (has_offset_input_) {
processed_size = ProcessDataByOffsets<T>(execute_sub_batch, processed_size = ProcessDataByOffsets<T>(execute_sub_batch,
std::nullptr_t{}, skip_index_func,
input, input,
res, res,
valid_res, valid_res,
arg_set_); arg_set_);
} else { } else {
processed_size = ProcessDataChunks<T>( processed_size = ProcessDataChunks<T>(
execute_sub_batch, std::nullptr_t{}, res, valid_res, arg_set_); execute_sub_batch, skip_index_func, res, valid_res, arg_set_);
} }
AssertInfo(processed_size == real_batch_size, AssertInfo(processed_size == real_batch_size,
"internal error: expr processed rows {} not equal " "internal error: expr processed rows {} not equal "

View File

@ -16,9 +16,9 @@
namespace milvus { namespace milvus {
static const FieldChunkMetrics defaultFieldChunkMetrics; static const index::NoneFieldChunkMetrics defaultFieldChunkMetrics{};
const cachinglayer::PinWrapper<const FieldChunkMetrics*> const cachinglayer::PinWrapper<const index::FieldChunkMetrics*>
SkipIndex::GetFieldChunkMetrics(milvus::FieldId field_id, int chunk_id) const { SkipIndex::GetFieldChunkMetrics(milvus::FieldId field_id, int chunk_id) const {
// skip index structure must be setup before using, thus we do not lock here. // skip index structure must be setup before using, thus we do not lock here.
auto field_metrics = fieldChunkMetrics_.find(field_id); auto field_metrics = fieldChunkMetrics_.find(field_id);
@ -27,115 +27,25 @@ SkipIndex::GetFieldChunkMetrics(milvus::FieldId field_id, int chunk_id) const {
auto ca = cachinglayer::SemiInlineGet( auto ca = cachinglayer::SemiInlineGet(
field_chunk_metrics->PinCells(nullptr, {chunk_id})); field_chunk_metrics->PinCells(nullptr, {chunk_id}));
auto metrics = ca->get_cell_of(chunk_id); auto metrics = ca->get_cell_of(chunk_id);
return cachinglayer::PinWrapper<const FieldChunkMetrics*>(ca, metrics); return cachinglayer::PinWrapper<const index::FieldChunkMetrics*>(
ca, metrics);
} }
return cachinglayer::PinWrapper<const FieldChunkMetrics*>( return cachinglayer::PinWrapper<const index::FieldChunkMetrics*>(
&defaultFieldChunkMetrics); &defaultFieldChunkMetrics);
} }
std::vector< std::vector<std::pair<milvus::cachinglayer::cid_t,
std::pair<milvus::cachinglayer::cid_t, std::unique_ptr<FieldChunkMetrics>>> std::unique_ptr<index::FieldChunkMetrics>>>
FieldChunkMetricsTranslator::get_cells( FieldChunkMetricsTranslator::get_cells(
const std::vector<milvus::cachinglayer::cid_t>& cids) { const std::vector<milvus::cachinglayer::cid_t>& cids) {
std::vector<std::pair<milvus::cachinglayer::cid_t, std::vector<std::pair<milvus::cachinglayer::cid_t,
std::unique_ptr<FieldChunkMetrics>>> std::unique_ptr<index::FieldChunkMetrics>>>
cells; cells;
cells.reserve(cids.size()); cells.reserve(cids.size());
if (data_type_ == milvus::DataType::VARCHAR) { for (auto chunk_id : cids) {
for (auto chunk_id : cids) { auto pw = column_->GetChunk(nullptr, chunk_id);
auto pw = column_->GetChunk(nullptr, chunk_id); auto chunk_metrics = builder_.Build(data_type_, pw.get());
auto chunk = static_cast<StringChunk*>(pw.get()); cells.emplace_back(chunk_id, std::move(chunk_metrics));
int num_rows = chunk->RowNums();
auto chunk_metrics = std::make_unique<FieldChunkMetrics>();
if (num_rows > 0) {
auto info = ProcessStringFieldMetrics(chunk);
chunk_metrics->min_ = Metrics(info.min_);
chunk_metrics->max_ = Metrics(info.max_);
chunk_metrics->null_count_ = info.null_count_;
}
chunk_metrics->hasValue_ = chunk_metrics->null_count_ != num_rows;
cells.emplace_back(chunk_id, std::move(chunk_metrics));
}
} else {
for (auto chunk_id : cids) {
auto pw = column_->GetChunk(nullptr, chunk_id);
auto chunk = static_cast<FixedWidthChunk*>(pw.get());
auto span = chunk->Span();
const void* chunk_data = span.data();
const bool* valid_data = span.valid_data();
int64_t count = span.row_count();
auto chunk_metrics = std::make_unique<FieldChunkMetrics>();
if (count > 0) {
switch (data_type_) {
case DataType::INT8: {
const int8_t* typedData =
static_cast<const int8_t*>(chunk_data);
auto info = ProcessFieldMetrics<int8_t>(
typedData, valid_data, count);
chunk_metrics->min_ = Metrics(info.min_);
chunk_metrics->max_ = Metrics(info.max_);
chunk_metrics->null_count_ = info.null_count_;
break;
}
case DataType::INT16: {
const int16_t* typedData =
static_cast<const int16_t*>(chunk_data);
auto info = ProcessFieldMetrics<int16_t>(
typedData, valid_data, count);
chunk_metrics->min_ = Metrics(info.min_);
chunk_metrics->max_ = Metrics(info.max_);
chunk_metrics->null_count_ = info.null_count_;
break;
}
case DataType::INT32: {
const int32_t* typedData =
static_cast<const int32_t*>(chunk_data);
auto info = ProcessFieldMetrics<int32_t>(
typedData, valid_data, count);
chunk_metrics->min_ = Metrics(info.min_);
chunk_metrics->max_ = Metrics(info.max_);
chunk_metrics->null_count_ = info.null_count_;
break;
}
case DataType::INT64: {
const int64_t* typedData =
static_cast<const int64_t*>(chunk_data);
auto info = ProcessFieldMetrics<int64_t>(
typedData, valid_data, count);
chunk_metrics->min_ = Metrics(info.min_);
chunk_metrics->max_ = Metrics(info.max_);
chunk_metrics->null_count_ = info.null_count_;
break;
}
case DataType::FLOAT: {
const float* typedData =
static_cast<const float*>(chunk_data);
auto info = ProcessFieldMetrics<float>(
typedData, valid_data, count);
chunk_metrics->min_ = Metrics(info.min_);
chunk_metrics->max_ = Metrics(info.max_);
chunk_metrics->null_count_ = info.null_count_;
break;
}
case DataType::DOUBLE: {
const double* typedData =
static_cast<const double*>(chunk_data);
auto info = ProcessFieldMetrics<double>(
typedData, valid_data, count);
chunk_metrics->min_ = Metrics(info.min_);
chunk_metrics->max_ = Metrics(info.max_);
chunk_metrics->null_count_ = info.null_count_;
break;
}
}
}
chunk_metrics->hasValue_ = chunk_metrics->null_count_ != count;
cells.emplace_back(chunk_id, std::move(chunk_metrics));
}
} }
return cells; return cells;
} }

View File

@ -13,65 +13,20 @@
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
#include <unordered_map>
#include "cachinglayer/CacheSlot.h" #include "cachinglayer/CacheSlot.h"
#include "cachinglayer/Manager.h" #include "cachinglayer/Manager.h"
#include "cachinglayer/Translator.h" #include "cachinglayer/Translator.h"
#include "cachinglayer/Utils.h" #include "cachinglayer/Utils.h"
#include "common/Chunk.h" #include "common/FieldDataInterface.h"
#include "common/Types.h" #include "common/Types.h"
#include "common/type_c.h"
#include "mmap/ChunkedColumnInterface.h" #include "mmap/ChunkedColumnInterface.h"
#include "parquet/statistics.h" #include "parquet/statistics.h"
#include "parquet/types.h" #include "index/skipindex_stats/SkipIndexStats.h"
namespace milvus { namespace milvus {
using Metrics =
std::variant<int8_t, int16_t, int32_t, int64_t, float, double, std::string>;
// MetricsDataType is used to avoid copy when get min/max value from FieldChunkMetrics
template <typename T>
using MetricsDataType =
std::conditional_t<std::is_same_v<T, std::string>, std::string_view, T>;
// ReverseMetricsDataType is used to avoid copy when get min/max value from FieldChunkMetrics
template <typename T>
using ReverseMetricsDataType =
std::conditional_t<std::is_same_v<T, std::string_view>, std::string, T>;
struct FieldChunkMetrics {
Metrics min_;
Metrics max_;
bool hasValue_;
int64_t null_count_;
FieldChunkMetrics() : hasValue_(false){};
template <typename T>
std::pair<MetricsDataType<T>, MetricsDataType<T>>
GetMinMax() const {
AssertInfo(hasValue_,
"GetMinMax should never be called when hasValue_ is false");
MetricsDataType<T> lower_bound;
MetricsDataType<T> upper_bound;
try {
lower_bound = std::get<ReverseMetricsDataType<T>>(min_);
upper_bound = std::get<ReverseMetricsDataType<T>>(max_);
} catch (const std::bad_variant_access& e) {
return {};
}
return {lower_bound, upper_bound};
}
cachinglayer::ResourceUsage
CellByteSize() const {
return {0, 0};
}
};
class FieldChunkMetricsTranslatorFromStatistics class FieldChunkMetricsTranslatorFromStatistics
: public cachinglayer::Translator<FieldChunkMetrics> { : public cachinglayer::Translator<index::FieldChunkMetrics> {
public: public:
FieldChunkMetricsTranslatorFromStatistics( FieldChunkMetricsTranslatorFromStatistics(
int64_t segment_id, int64_t segment_id,
@ -86,51 +41,8 @@ class FieldChunkMetricsTranslatorFromStatistics
CacheWarmupPolicy::CacheWarmupPolicy_Disable, CacheWarmupPolicy::CacheWarmupPolicy_Disable,
false) { false) {
for (auto& statistic : statistics) { for (auto& statistic : statistics) {
auto chunk_metrics = std::make_unique<FieldChunkMetrics>(); cells_.emplace_back(
switch (data_type) { std::move(builder_.Build(data_type_, statistic)));
case milvus::DataType::INT8: {
SetMinMaxFromStatistics<parquet::Int32Type, int8_t>(
statistic, chunk_metrics.get());
break;
}
case milvus::DataType::INT16: {
SetMinMaxFromStatistics<parquet::Int32Type, int16_t>(
statistic, chunk_metrics.get());
break;
}
case milvus::DataType::INT32: {
SetMinMaxFromStatistics<parquet::Int32Type, int32_t>(
statistic, chunk_metrics.get());
break;
}
case milvus::DataType::INT64: {
SetMinMaxFromStatistics<parquet::Int64Type, int64_t>(
statistic, chunk_metrics.get());
break;
}
case milvus::DataType::FLOAT: {
SetMinMaxFromStatistics<parquet::FloatType, float>(
statistic, chunk_metrics.get());
break;
}
case milvus::DataType::DOUBLE: {
SetMinMaxFromStatistics<parquet::DoubleType, double>(
statistic, chunk_metrics.get());
break;
}
case milvus::DataType::VARCHAR: {
SetMinMaxFromStatistics<parquet::ByteArrayType,
std::string>(statistic,
chunk_metrics.get());
break;
}
default: {
ThrowInfo(
ErrorCode::UnexpectedError,
fmt::format("Unsupported data type: {}", data_type));
}
}
cells_.emplace_back(std::move(chunk_metrics));
} }
} }
@ -158,19 +70,14 @@ class FieldChunkMetricsTranslatorFromStatistics
} }
std::vector<std::pair<milvus::cachinglayer::cid_t, std::vector<std::pair<milvus::cachinglayer::cid_t,
std::unique_ptr<FieldChunkMetrics>>> std::unique_ptr<index::FieldChunkMetrics>>>
get_cells(const std::vector<milvus::cachinglayer::cid_t>& cids) override { get_cells(const std::vector<milvus::cachinglayer::cid_t>& cids) override {
std::vector<std::pair<milvus::cachinglayer::cid_t, std::vector<std::pair<milvus::cachinglayer::cid_t,
std::unique_ptr<FieldChunkMetrics>>> std::unique_ptr<index::FieldChunkMetrics>>>
cells; cells;
cells.reserve(cids.size()); cells.reserve(cids.size());
for (auto cid : cids) { for (auto cid : cids) {
auto chunk_metrics = std::make_unique<FieldChunkMetrics>(); cells.emplace_back(cid, cells_[cid]->Clone());
chunk_metrics->min_ = cells_[cid]->min_;
chunk_metrics->max_ = cells_[cid]->max_;
chunk_metrics->null_count_ = cells_[cid]->null_count_;
chunk_metrics->hasValue_ = cells_[cid]->hasValue_;
cells.emplace_back(cid, std::move(chunk_metrics));
} }
return cells; return cells;
} }
@ -187,51 +94,15 @@ class FieldChunkMetricsTranslatorFromStatistics
} }
private: private:
template <typename ParquetType,
typename OutType,
typename std::enable_if<
!std::is_same<ParquetType, parquet::ByteArrayType>::value,
int>::type = 0>
static void
SetMinMaxFromStatistics(
const std::shared_ptr<parquet::Statistics>& statistic,
FieldChunkMetrics* chunk_metrics) {
auto typed_statistics =
std::dynamic_pointer_cast<parquet::TypedStatistics<ParquetType>>(
statistic);
chunk_metrics->min_ = static_cast<OutType>(typed_statistics->min());
chunk_metrics->max_ = static_cast<OutType>(typed_statistics->max());
chunk_metrics->null_count_ = typed_statistics->null_count();
chunk_metrics->hasValue_ = true;
}
template <typename ParquetType,
typename OutType,
typename std::enable_if<
std::is_same<ParquetType, parquet::ByteArrayType>::value,
int>::type = 0>
static void
SetMinMaxFromStatistics(
const std::shared_ptr<parquet::Statistics>& statistic,
FieldChunkMetrics* chunk_metrics) {
auto typed_statistics = std::dynamic_pointer_cast<
parquet::TypedStatistics<parquet::ByteArrayType>>(statistic);
chunk_metrics->min_ =
std::string(std::string_view(typed_statistics->min()));
chunk_metrics->max_ =
std::string(std::string_view(typed_statistics->max()));
chunk_metrics->null_count_ = typed_statistics->null_count();
chunk_metrics->hasValue_ = true;
}
std::string key_; std::string key_;
milvus::DataType data_type_; milvus::DataType data_type_;
index::SkipIndexStatsBuilder builder_;
cachinglayer::Meta meta_; cachinglayer::Meta meta_;
std::vector<std::unique_ptr<FieldChunkMetrics>> cells_; std::vector<std::unique_ptr<index::FieldChunkMetrics>> cells_;
}; };
class FieldChunkMetricsTranslator class FieldChunkMetricsTranslator
: public cachinglayer::Translator<FieldChunkMetrics> { : public cachinglayer::Translator<index::FieldChunkMetrics> {
public: public:
FieldChunkMetricsTranslator(int64_t segment_id, FieldChunkMetricsTranslator(int64_t segment_id,
FieldId field_id, FieldId field_id,
@ -267,7 +138,7 @@ class FieldChunkMetricsTranslator
return key_; return key_;
} }
std::vector<std::pair<milvus::cachinglayer::cid_t, std::vector<std::pair<milvus::cachinglayer::cid_t,
std::unique_ptr<FieldChunkMetrics>>> std::unique_ptr<index::FieldChunkMetrics>>>
get_cells(const std::vector<milvus::cachinglayer::cid_t>& cids) override; get_cells(const std::vector<milvus::cachinglayer::cid_t>& cids) override;
milvus::cachinglayer::Meta* milvus::cachinglayer::Meta*
@ -282,112 +153,60 @@ class FieldChunkMetricsTranslator
} }
private: private:
// todo: support some null_count_ skip
template <typename T>
struct metricInfo {
T min_;
T max_;
int64_t null_count_;
};
metricInfo<std::string>
ProcessStringFieldMetrics(const StringChunk* chunk) {
// all captured by reference
bool has_first_valid = false;
std::string_view min_string;
std::string_view max_string;
int64_t null_count = 0;
auto row_count = chunk->RowNums();
for (int64_t i = 0; i < row_count; ++i) {
bool is_valid = chunk->isValid(i);
if (!is_valid) {
null_count++;
continue;
}
auto value = chunk->operator[](i);
if (!has_first_valid) {
min_string = value;
max_string = value;
has_first_valid = true;
} else {
if (value < min_string) {
min_string = value;
}
if (value > max_string) {
max_string = value;
}
}
}
// The field data may later be released, so we need to copy the string to avoid invalid memory access.
return {std::string(min_string), std::string(max_string), null_count};
}
template <typename T>
metricInfo<T>
ProcessFieldMetrics(const T* data, const bool* valid_data, int64_t count) {
//double check to avoid crash
if (data == nullptr || count == 0) {
return {T(), T()};
}
// find first not null value
int64_t start = 0;
for (int64_t i = start; i < count; i++) {
if (valid_data != nullptr && !valid_data[i]) {
start++;
continue;
}
break;
}
if (start > count - 1) {
return {T(), T(), count};
}
T minValue = data[start];
T maxValue = data[start];
int64_t null_count = start;
for (int64_t i = start; i < count; i++) {
T value = data[i];
if (valid_data != nullptr && !valid_data[i]) {
null_count++;
continue;
}
if (value < minValue) {
minValue = value;
}
if (value > maxValue) {
maxValue = value;
}
}
return {minValue, maxValue, null_count};
}
std::string key_; std::string key_;
milvus::DataType data_type_; milvus::DataType data_type_;
cachinglayer::Meta meta_; cachinglayer::Meta meta_;
std::shared_ptr<ChunkedColumnInterface> column_; std::shared_ptr<ChunkedColumnInterface> column_;
index::SkipIndexStatsBuilder builder_;
}; };
class SkipIndex { class SkipIndex {
private:
template <typename T>
struct IsAllowedType {
static constexpr bool isAllowedType =
std::is_integral<T>::value || std::is_floating_point<T>::value ||
std::is_same<T, std::string>::value ||
std::is_same<T, std::string_view>::value;
static constexpr bool isDisabledType =
std::is_same<T, milvus::Json>::value ||
std::is_same<T, bool>::value;
static constexpr bool value = isAllowedType && !isDisabledType;
static constexpr bool arith_value =
std::is_integral<T>::value && !std::is_same<T, bool>::value;
static constexpr bool in_value = isAllowedType;
};
template <typename T>
using HighPrecisionType =
std::conditional_t<std::is_integral_v<T> && !std::is_same_v<bool, T>,
int64_t,
T>;
public: public:
template <typename T> template <typename T>
bool std::enable_if_t<SkipIndex::IsAllowedType<T>::value, bool>
CanSkipUnaryRange(FieldId field_id, CanSkipUnaryRange(FieldId field_id,
int64_t chunk_id, int64_t chunk_id,
OpType op_type, OpType op_type,
const T& val) const { const T& val) const {
auto pw = GetFieldChunkMetrics(field_id, chunk_id); auto pw = GetFieldChunkMetrics(field_id, chunk_id);
auto field_chunk_metrics = pw.get(); auto field_chunk_metrics = pw.get();
if (MinMaxUnaryFilter<T>(field_chunk_metrics, op_type, val)) { return field_chunk_metrics->CanSkipUnaryRange(op_type,
return true; index::Metrics{val});
} }
//further more filters for skip, like ngram filter, bf and so on
template <typename T>
std::enable_if_t<!SkipIndex::IsAllowedType<T>::value, bool>
CanSkipUnaryRange(FieldId field_id,
int64_t chunk_id,
OpType op_type,
const T& val) const {
return false; return false;
} }
template <typename T> template <typename T>
bool std::enable_if_t<SkipIndex::IsAllowedType<T>::value, bool>
CanSkipBinaryRange(FieldId field_id, CanSkipBinaryRange(FieldId field_id,
int64_t chunk_id, int64_t chunk_id,
const T& lower_val, const T& lower_val,
@ -396,13 +215,116 @@ class SkipIndex {
bool upper_inclusive) const { bool upper_inclusive) const {
auto pw = GetFieldChunkMetrics(field_id, chunk_id); auto pw = GetFieldChunkMetrics(field_id, chunk_id);
auto field_chunk_metrics = pw.get(); auto field_chunk_metrics = pw.get();
if (MinMaxBinaryFilter<T>(field_chunk_metrics, return field_chunk_metrics->CanSkipBinaryRange(
lower_val, index::Metrics{lower_val},
upper_val, index::Metrics{upper_val},
lower_inclusive, lower_inclusive,
upper_inclusive)) { upper_inclusive);
return true; }
template <typename T>
std::enable_if_t<!SkipIndex::IsAllowedType<T>::value, bool>
CanSkipBinaryRange(FieldId field_id,
int64_t chunk_id,
const T& lower_val,
const T& upper_val,
bool lower_inclusive,
bool upper_inclusive) const {
return false;
}
template <typename T>
std::enable_if_t<SkipIndex::IsAllowedType<T>::arith_value, bool>
CanSkipBinaryArithRange(FieldId field_id,
int64_t chunk_id,
OpType op_type,
ArithOpType arith_type,
const HighPrecisionType<T> value,
const HighPrecisionType<T> right_operand) const {
auto check_and_skip = [&](HighPrecisionType<T> new_value_hp,
OpType new_op_type) {
if constexpr (std::is_integral_v<T>) {
if (new_value_hp > std::numeric_limits<T>::max() ||
new_value_hp < std::numeric_limits<T>::min()) {
// Overflow detected. The transformed value cannot be represented by T.
// We cannot make a safe comparison with the chunk's min/max.
return false;
}
}
return CanSkipUnaryRange<T>(
field_id, chunk_id, new_op_type, static_cast<T>(new_value_hp));
};
switch (arith_type) {
case ArithOpType::Add: {
// field + C > V => field > V - C
return check_and_skip(value - right_operand, op_type);
}
case ArithOpType::Sub: {
// field - C > V => field > V + C
return check_and_skip(value + right_operand, op_type);
}
case ArithOpType::Mul: {
// field * C > V
if (right_operand == 0) {
// field * 0 > V => 0 > V. This doesn't depend on the field's range.
return false;
}
OpType new_op_type = op_type;
if (right_operand < 0) {
new_op_type = FlipComparisonOperator(op_type);
}
return check_and_skip(value / right_operand, new_op_type);
}
case ArithOpType::Div: {
// field / C > V
if (right_operand == 0) {
// Division by zero. Cannot evaluate, so cannot skip.
return false;
}
OpType new_op_type = op_type;
if (right_operand < 0) {
new_op_type = FlipComparisonOperator(op_type);
}
return check_and_skip(value * right_operand, new_op_type);
}
default:
return false;
} }
}
template <typename T>
std::enable_if_t<!SkipIndex::IsAllowedType<T>::arith_value, bool>
CanSkipBinaryArithRange(FieldId field_id,
int64_t chunk_id,
OpType op_type,
ArithOpType arith_type,
const HighPrecisionType<T> value,
const HighPrecisionType<T> right_operand) const {
return false;
}
template <typename T>
std::enable_if_t<SkipIndex::IsAllowedType<T>::in_value, bool>
CanSkipInQuery(FieldId field_id,
int64_t chunk_id,
const std::vector<T>& values) const {
auto pw = GetFieldChunkMetrics(field_id, chunk_id);
auto field_chunk_metrics = pw.get();
auto vals = std::vector<index::Metrics>{};
vals.reserve(values.size());
for (const auto& v : values) {
vals.emplace_back(v);
}
return field_chunk_metrics->CanSkipIn(vals);
}
template <typename T>
std::enable_if_t<!SkipIndex::IsAllowedType<T>::in_value, bool>
CanSkipInQuery(FieldId field_id,
int64_t chunk_id,
const std::vector<T>& values) const {
return false; return false;
} }
@ -413,9 +335,9 @@ class SkipIndex {
std::shared_ptr<ChunkedColumnInterface> column) { std::shared_ptr<ChunkedColumnInterface> column) {
auto translator = std::make_unique<FieldChunkMetricsTranslator>( auto translator = std::make_unique<FieldChunkMetricsTranslator>(
segment_id, field_id, data_type, column); segment_id, field_id, data_type, column);
auto cache_slot = auto cache_slot = cachinglayer::Manager::GetInstance()
cachinglayer::Manager::GetInstance() .CreateCacheSlot<index::FieldChunkMetrics>(
.CreateCacheSlot<FieldChunkMetrics>(std::move(translator)); std::move(translator));
std::unique_lock lck(mutex_); std::unique_lock lck(mutex_);
fieldChunkMetrics_[field_id] = std::move(cache_slot); fieldChunkMetrics_[field_id] = std::move(cache_slot);
@ -430,137 +352,39 @@ class SkipIndex {
auto translator = auto translator =
std::make_unique<FieldChunkMetricsTranslatorFromStatistics>( std::make_unique<FieldChunkMetricsTranslatorFromStatistics>(
segment_id, field_id, data_type, statistics); segment_id, field_id, data_type, statistics);
auto cache_slot = auto cache_slot = cachinglayer::Manager::GetInstance()
cachinglayer::Manager::GetInstance() .CreateCacheSlot<index::FieldChunkMetrics>(
.CreateCacheSlot<FieldChunkMetrics>(std::move(translator)); std::move(translator));
std::unique_lock lck(mutex_); std::unique_lock lck(mutex_);
fieldChunkMetrics_[field_id] = std::move(cache_slot); fieldChunkMetrics_[field_id] = std::move(cache_slot);
} }
private: private:
const cachinglayer::PinWrapper<const FieldChunkMetrics*> OpType
FlipComparisonOperator(OpType op) const {
switch (op) {
case OpType::GreaterThan:
return OpType::LessThan;
case OpType::GreaterEqual:
return OpType::LessEqual;
case OpType::LessThan:
return OpType::GreaterThan;
case OpType::LessEqual:
return OpType::GreaterEqual;
// OpType::Equal and OpType::NotEqual do not flip
default:
return op;
}
}
const cachinglayer::PinWrapper<const index::FieldChunkMetrics*>
GetFieldChunkMetrics(FieldId field_id, int chunk_id) const; GetFieldChunkMetrics(FieldId field_id, int chunk_id) const;
template <typename T>
struct IsAllowedType {
static constexpr bool isAllowedType =
std::is_integral<T>::value || std::is_floating_point<T>::value ||
std::is_same<T, std::string>::value ||
std::is_same<T, std::string_view>::value;
static constexpr bool isDisabledType =
std::is_same<T, milvus::Json>::value ||
std::is_same<T, bool>::value;
static constexpr bool value = isAllowedType && !isDisabledType;
};
template <typename T>
std::enable_if_t<SkipIndex::IsAllowedType<T>::value, bool>
MinMaxUnaryFilter(const FieldChunkMetrics* field_chunk_metrics,
OpType op_type,
const T& val) const {
if (!field_chunk_metrics->hasValue_) {
return false;
}
auto [lower_bound, upper_bound] = field_chunk_metrics->GetMinMax<T>();
if (lower_bound == MetricsDataType<T>() ||
upper_bound == MetricsDataType<T>()) {
return false;
}
return RangeShouldSkip<T>(val, lower_bound, upper_bound, op_type);
}
template <typename T>
std::enable_if_t<!SkipIndex::IsAllowedType<T>::value, bool>
MinMaxUnaryFilter(const FieldChunkMetrics* field_chunk_metrics,
OpType op_type,
const T& val) const {
return false;
}
template <typename T>
std::enable_if_t<SkipIndex::IsAllowedType<T>::value, bool>
MinMaxBinaryFilter(const FieldChunkMetrics* field_chunk_metrics,
const T& lower_val,
const T& upper_val,
bool lower_inclusive,
bool upper_inclusive) const {
if (!field_chunk_metrics->hasValue_) {
return false;
}
auto [lower_bound, upper_bound] = field_chunk_metrics->GetMinMax<T>();
if (lower_bound == MetricsDataType<T>() ||
upper_bound == MetricsDataType<T>()) {
return false;
}
bool should_skip = false;
if (lower_inclusive && upper_inclusive) {
should_skip =
(lower_val > upper_bound) || (upper_val < lower_bound);
} else if (lower_inclusive && !upper_inclusive) {
should_skip =
(lower_val > upper_bound) || (upper_val <= lower_bound);
} else if (!lower_inclusive && upper_inclusive) {
should_skip =
(lower_val >= upper_bound) || (upper_val < lower_bound);
} else {
should_skip =
(lower_val >= upper_bound) || (upper_val <= lower_bound);
}
return should_skip;
}
template <typename T>
std::enable_if_t<!SkipIndex::IsAllowedType<T>::value, bool>
MinMaxBinaryFilter(const FieldChunkMetrics* field_chunk_metrics,
const T& lower_val,
const T& upper_val,
bool lower_inclusive,
bool upper_inclusive) const {
return false;
}
template <typename T>
bool
RangeShouldSkip(const T& value,
const MetricsDataType<T> lower_bound,
const MetricsDataType<T> upper_bound,
OpType op_type) const {
bool should_skip = false;
switch (op_type) {
case OpType::Equal: {
should_skip = value > upper_bound || value < lower_bound;
break;
}
case OpType::LessThan: {
should_skip = value <= lower_bound;
break;
}
case OpType::LessEqual: {
should_skip = value < lower_bound;
break;
}
case OpType::GreaterThan: {
should_skip = value >= upper_bound;
break;
}
case OpType::GreaterEqual: {
should_skip = value > upper_bound;
break;
}
default: {
should_skip = false;
}
}
return should_skip;
}
private:
std::unordered_map< std::unordered_map<
FieldId, FieldId,
std::shared_ptr<cachinglayer::CacheSlot<FieldChunkMetrics>>> std::shared_ptr<cachinglayer::CacheSlot<index::FieldChunkMetrics>>>
fieldChunkMetrics_; fieldChunkMetrics_;
mutable std::shared_mutex mutex_; mutable std::shared_mutex mutex_;
}; };
} // namespace milvus } // namespace milvus

View File

@ -0,0 +1,207 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "index/skipindex_stats/SkipIndexStats.h"
#include <cstring>
#include "parquet/types.h"
namespace milvus::index {
std::unique_ptr<FieldChunkMetrics>
SkipIndexStatsBuilder::Build(
DataType data_type,
const std::shared_ptr<parquet::Statistics>& statistic) const {
std::unique_ptr<FieldChunkMetrics> chunk_metrics;
switch (data_type) {
case DataType::INT8: {
auto info =
ProcessFieldMetrics<parquet::Int32Type, int8_t>(statistic);
chunk_metrics = std::make_unique<IntFieldChunkMetrics<int8_t>>(
info.min_, info.max_, nullptr);
break;
}
case milvus::DataType::INT16: {
auto info =
ProcessFieldMetrics<parquet::Int32Type, int16_t>(statistic);
chunk_metrics = std::make_unique<IntFieldChunkMetrics<int16_t>>(
info.min_, info.max_, nullptr);
break;
}
case milvus::DataType::INT32: {
auto info =
ProcessFieldMetrics<parquet::Int32Type, int32_t>(statistic);
chunk_metrics = std::make_unique<IntFieldChunkMetrics<int32_t>>(
info.min_, info.max_, nullptr);
break;
}
case milvus::DataType::INT64: {
auto info =
ProcessFieldMetrics<parquet::Int64Type, int64_t>(statistic);
chunk_metrics = std::make_unique<IntFieldChunkMetrics<int64_t>>(
info.min_, info.max_, nullptr);
break;
}
case milvus::DataType::FLOAT: {
auto info =
ProcessFieldMetrics<parquet::FloatType, float>(statistic);
chunk_metrics = std::make_unique<FloatFieldChunkMetrics<float>>(
info.min_, info.max_);
break;
}
case milvus::DataType::DOUBLE: {
auto info =
ProcessFieldMetrics<parquet::DoubleType, double>(statistic);
chunk_metrics = std::make_unique<FloatFieldChunkMetrics<double>>(
info.min_, info.max_);
break;
}
case milvus::DataType::VARCHAR: {
auto info =
ProcessFieldMetrics<parquet::ByteArrayType, std::string>(
statistic);
chunk_metrics = std::make_unique<StringFieldChunkMetrics>(
std::string(info.min_),
std::string(info.max_),
nullptr,
nullptr);
break;
}
default: {
chunk_metrics = std::make_unique<NoneFieldChunkMetrics>();
break;
}
}
return chunk_metrics;
}
std::unique_ptr<FieldChunkMetrics>
SkipIndexStatsBuilder::Build(
const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
int col_idx,
arrow::Type::type data_type) const {
auto none_ptr = std::make_unique<NoneFieldChunkMetrics>();
if (batches.empty()) {
return none_ptr;
}
switch (data_type) {
case arrow::Type::BOOL: {
metricsInfo<bool> info =
ProcessFieldMetrics<bool, arrow::BooleanArray>(batches,
col_idx);
return LoadMetrics<bool>(info);
}
case arrow::Type::INT8: {
auto info =
ProcessFieldMetrics<int8_t, arrow::Int8Array>(batches, col_idx);
return LoadMetrics<int8_t>(info);
}
case arrow::Type::INT16: {
auto info = ProcessFieldMetrics<int16_t, arrow::Int16Array>(
batches, col_idx);
return LoadMetrics<int16_t>(info);
}
case arrow::Type::INT32: {
auto info = ProcessFieldMetrics<int32_t, arrow::Int32Array>(
batches, col_idx);
return LoadMetrics<int32_t>(info);
}
case arrow::Type::INT64: {
auto info = ProcessFieldMetrics<int64_t, arrow::Int64Array>(
batches, col_idx);
return LoadMetrics<int64_t>(info);
}
case arrow::Type::FLOAT: {
auto info =
ProcessFieldMetrics<float, arrow::FloatArray>(batches, col_idx);
return LoadMetrics<float>(info);
}
case arrow::Type::DOUBLE: {
auto info = ProcessFieldMetrics<double, arrow::DoubleArray>(
batches, col_idx);
return LoadMetrics<double>(info);
}
case arrow::Type::STRING: {
const metricsInfo<std::string>& info =
ProcessStringFieldMetrics(batches, col_idx);
return LoadMetrics<std::string>(info);
}
}
return none_ptr;
}
std::unique_ptr<FieldChunkMetrics>
SkipIndexStatsBuilder::Build(DataType data_type, const Chunk* chunk) const {
auto none_ptr = std::make_unique<NoneFieldChunkMetrics>();
if (chunk == nullptr || chunk->RowNums() == 0) {
return none_ptr;
}
if (data_type == DataType::VARCHAR) {
auto string_chunk = static_cast<const StringChunk*>(chunk);
metricsInfo<std::string> info = ProcessStringFieldMetrics(string_chunk);
return LoadMetrics<std::string>(info);
}
auto fixed_chunk = static_cast<const FixedWidthChunk*>(chunk);
auto span = fixed_chunk->Span();
const void* chunk_data = span.data();
const bool* valid_data = span.valid_data();
int64_t count = span.row_count();
switch (data_type) {
case DataType::BOOL: {
const bool* typedData = static_cast<const bool*>(chunk_data);
auto info = ProcessFieldMetrics<bool>(typedData, valid_data, count);
return LoadMetrics<bool>(info);
}
case DataType::INT8: {
const int8_t* typedData = static_cast<const int8_t*>(chunk_data);
auto info =
ProcessFieldMetrics<int8_t>(typedData, valid_data, count);
return LoadMetrics<int8_t>(info);
}
case DataType::INT16: {
const int16_t* typedData = static_cast<const int16_t*>(chunk_data);
auto info =
ProcessFieldMetrics<int16_t>(typedData, valid_data, count);
return LoadMetrics<int16_t>(info);
}
case DataType::INT32: {
const int32_t* typedData = static_cast<const int32_t*>(chunk_data);
auto info =
ProcessFieldMetrics<int32_t>(typedData, valid_data, count);
return LoadMetrics<int32_t>(info);
}
case DataType::INT64: {
const int64_t* typedData = static_cast<const int64_t*>(chunk_data);
auto info =
ProcessFieldMetrics<int64_t>(typedData, valid_data, count);
return LoadMetrics<int64_t>(info);
}
case DataType::FLOAT: {
const float* typedData = static_cast<const float*>(chunk_data);
auto info =
ProcessFieldMetrics<float>(typedData, valid_data, count);
return LoadMetrics<float>(info);
}
case DataType::DOUBLE: {
const double* typedData = static_cast<const double*>(chunk_data);
auto info =
ProcessFieldMetrics<double>(typedData, valid_data, count);
return LoadMetrics<double>(info);
}
}
return none_ptr;
}
} // namespace milvus::index

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,116 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include "unicode/brkiter.h"
#include "unicode/unistr.h"
#include "unicode/utypes.h"
#include "ankerl/unordered_dense.h"
#include "common/Types.h"
#include "arrow/api.h"
#include "log/Log.h"
#include <cstring>
namespace milvus::index {
inline bool
SupportsSkipIndex(arrow::Type::type type) {
switch (type) {
case arrow::Type::BOOL:
case arrow::Type::INT8:
case arrow::Type::INT16:
case arrow::Type::INT32:
case arrow::Type::INT64:
case arrow::Type::FLOAT:
case arrow::Type::DOUBLE:
case arrow::Type::STRING:
return true;
default:
return false;
}
}
inline bool
SupportsSkipIndex(DataType type) {
switch (type) {
case DataType::BOOL:
case DataType::INT8:
case DataType::INT16:
case DataType::INT32:
case DataType::INT64:
case DataType::FLOAT:
case DataType::DOUBLE:
case DataType::VARCHAR:
case DataType::STRING:
case DataType::TIMESTAMPTZ:
return true;
default:
return false;
}
}
inline void
ExtractNgrams(ankerl::unordered_dense::set<std::string>& ngrams_set,
const std::string_view& text,
size_t n) {
if (n == 0 || text.size() < n) {
return;
}
UErrorCode status = U_ZERO_ERROR;
std::unique_ptr<icu::BreakIterator> bi(
icu::BreakIterator::createCharacterInstance(icu::Locale::getUS(),
status));
if (U_FAILURE(status)) {
LOG_WARN("Failed to create ICU BreakIterator: {}", u_errorName(status));
return;
}
icu::UnicodeString ustr = icu::UnicodeString::fromUTF8(
icu::StringPiece(text.data(), text.size()));
bi->setText(ustr);
std::vector<int32_t> boundaries;
boundaries.reserve(ustr.length() + 1);
int32_t pos = bi->first();
while (pos != icu::BreakIterator::DONE) {
boundaries.push_back(pos);
pos = bi->next();
}
size_t char_count = boundaries.size() > 0 ? boundaries.size() - 1 : 0;
if (char_count < n) {
return;
}
for (size_t i = 0; i + n < boundaries.size(); ++i) {
int32_t start_pos = boundaries[i];
int32_t end_pos = boundaries[i + n];
int32_t length = end_pos - start_pos;
if (length <= 0) {
continue;
}
icu::UnicodeString ngram_ustr(ustr, start_pos, length);
std::string ngram_utf8;
ngram_ustr.toUTF8String(ngram_utf8);
if (!ngram_utf8.empty()) {
ngrams_set.insert(std::move(ngram_utf8));
}
}
}
} // namespace milvus::index