diff --git a/internal/core/conanfile.py b/internal/core/conanfile.py index 5480ff6dce..36e0856fdb 100644 --- a/internal/core/conanfile.py +++ b/internal/core/conanfile.py @@ -50,6 +50,7 @@ class MilvusConan(ConanFile): "unordered_dense/4.4.0#6a855c992618cc4c63019109a2e47298", "mongo-cxx-driver/3.11.0#ae206de0e90fb8cb2fb95465fb8b2f01", "geos/3.12.0#0b177c90c25a8ca210578fb9e2899c37", + "icu/74.2#cd1937b9561b8950a2ae6311284c5813", ) generators = ("cmake", "cmake_find_package") default_options = { @@ -90,6 +91,8 @@ class MilvusConan(ConanFile): "onetbb:tbbproxy": False, "gdal:shared": True, "gdal:fPIC": True, + "icu:shared": False, + "icu:data_packaging": "library", } def configure(self): diff --git a/internal/core/src/common/BloomFilter.h b/internal/core/src/common/BloomFilter.h new file mode 100644 index 0000000000..d5ec5d2005 --- /dev/null +++ b/internal/core/src/common/BloomFilter.h @@ -0,0 +1,363 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include +#include +#include +#include +#include "nlohmann/json.hpp" +#include +#include +#include +#include +#include "log/Log.h" +#include "xxhash.h" // from xxhash/xxhash + +namespace milvus { + +const std::string UNSUPPORTED_BF_NAME = "Unsupported BloomFilter"; +const std::string BLOCKED_BF_NAME = "BlockedBloomFilter"; +const std::string ALWAYS_TRUE_BF_NAME = "AlwaysTrueBloomFilter"; + +enum class BFType { + Unsupported = 0, + AlwaysTrue, // empty bloom filter + Blocked, +}; + +inline std::string +BFTypeToString(BFType type) { + switch (type) { + case BFType::Blocked: + return BLOCKED_BF_NAME; + case BFType::AlwaysTrue: + return ALWAYS_TRUE_BF_NAME; + default: + return UNSUPPORTED_BF_NAME; + } +} + +inline BFType +StringToBFType(std::string_view name) { + if (name == BLOCKED_BF_NAME) { + return BFType::Blocked; + } + if (name == ALWAYS_TRUE_BF_NAME) { + return BFType::AlwaysTrue; + } + return BFType::Unsupported; +} + +class BloomFilter { + public: + virtual ~BloomFilter() = default; + + virtual BFType + Type() const = 0; + + virtual uint64_t + Cap() const = 0; + + virtual uint32_t + K() const = 0; + + virtual void + Add(const unsigned char* data, size_t len) = 0; + + virtual void + Add(std::string_view data) = 0; + + virtual bool + Test(const unsigned char* data, size_t len) const = 0; + + virtual bool + Test(std::string_view data) const = 0; + + virtual bool + TestLocations(const std::vector& locs) const = 0; + + virtual std::vector + BatchTestLocations(const std::vector>& locs, + const std::vector& hits) const = 0; + + virtual nlohmann::json + ToJson() const = 0; +}; +using BloomFilterPtr = std::shared_ptr; + +class BlockedBloomFilter : public BloomFilter { + public: + BlockedBloomFilter(uint64_t capacity, double fp) { + double m = -static_cast(capacity) * std::log(fp) / + (std::log(2.0) * std::log(2.0)); + num_bits_ = static_cast(std::ceil(m)); + + double k = (m / capacity) * std::log(2.0); + k_ = std::max(1u, static_cast(std::round(k))); + + size_t num_blocks = (num_bits_ + 63) / 64; + bits_.resize(num_blocks, 0); + + LOG_DEBUG( + "Created BlockedBloomFilter: capacity={}, fp={}, bits={}, k={}", + capacity, + fp, + num_bits_, + k_); + } + + explicit BlockedBloomFilter(const nlohmann::json& data) { + if (!data.contains("bits") || !data.contains("num_bits") || + !data.contains("k")) { + throw std::runtime_error( + "Invalid JSON for BlockedBloomFilter: missing required fields"); + } + + bits_ = data["bits"].get>(); + num_bits_ = data["num_bits"].get(); + k_ = data["k"].get(); + } + + BFType + Type() const override { + return BFType::Blocked; + } + + uint64_t + Cap() const override { + return num_bits_; + } + + uint32_t + K() const override { + return k_; + } + + void + Add(const uint8_t* data, size_t len) override { + uint64_t hash = XXH3_64bits(data, len); + AddHash(hash); + } + + void + Add(std::string_view data) override { + uint64_t hash = XXH3_64bits(data.data(), data.size()); + AddHash(hash); + } + + bool + Test(const uint8_t* data, size_t len) const override { + uint64_t hash = XXH3_64bits(data, len); + return TestHash(hash); + } + + bool + Test(std::string_view data) const override { + uint64_t hash = XXH3_64bits(data.data(), data.size()); + return TestHash(hash); + } + + bool + TestLocations(const std::vector& locs) const override { + if (locs.size() != 1) { + return true; + } + return TestHash(locs[0]); + } + + std::vector + BatchTestLocations(const std::vector>& locs, + const std::vector& hits) const override { + std::vector ret(locs.size(), false); + for (size_t i = 0; i < hits.size(); ++i) { + if (!hits[i]) { + if (locs[i].size() != 1) { + ret[i] = true; + continue; + } + ret[i] = TestHash(locs[i][0]); + } + } + return ret; + } + + nlohmann::json + ToJson() const override { + nlohmann::json data; + data["type"] = BFTypeToString(Type()); + data["bits"] = bits_; + data["num_bits"] = num_bits_; + data["k"] = k_; + return data; + } + + private: + void + AddHash(uint64_t hash) { + uint64_t h1 = hash; + uint64_t h2 = hash >> 32; + + for (uint32_t i = 0; i < k_; ++i) { + uint64_t combined_hash = h1 + i * h2; + uint64_t bit_pos = combined_hash % num_bits_; + uint64_t block_idx = bit_pos / 64; + uint64_t bit_idx = bit_pos % 64; + bits_[block_idx] |= (1ULL << bit_idx); + } + } + + bool + TestHash(uint64_t hash) const { + uint64_t h1 = hash; + uint64_t h2 = hash >> 32; + + for (uint32_t i = 0; i < k_; ++i) { + uint64_t combined_hash = h1 + i * h2; + uint64_t bit_pos = combined_hash % num_bits_; + uint64_t block_idx = bit_pos / 64; + uint64_t bit_idx = bit_pos % 64; + if ((bits_[block_idx] & (1ULL << bit_idx)) == 0) { + return false; + } + } + return true; + } + + private: + std::vector bits_; + uint64_t num_bits_; + uint32_t k_; +}; + +class AlwaysTrueBloomFilter : public BloomFilter { + public: + BFType + Type() const override { + return BFType::AlwaysTrue; + } + + uint64_t + Cap() const override { + return 0; + } + + uint32_t + K() const override { + return 0; + } + + void + Add(const unsigned char* data, size_t len) override { + } + + void + Add(std::string_view data) override { + } + + bool + Test(const unsigned char* data, size_t len) const override { + return true; + } + + bool + Test(std::string_view data) const override { + return true; + } + + bool + TestLocations(const std::vector& locs) const override { + return true; + } + + std::vector + BatchTestLocations(const std::vector>& locs, + const std::vector& hits) const override { + return std::vector(locs.size(), true); // 全部返回 true + } + + nlohmann::json + ToJson() const override { + nlohmann::json data; + data["type"] = BFTypeToString(Type()); + return data; + } +}; + +static const BloomFilterPtr g_always_true_bf = + std::make_shared(); + +inline BloomFilterPtr +NewBloomFilterWithType(uint64_t capacity, double fp, BFType type) { + switch (type) { + case BFType::Blocked: + return std::make_shared(capacity, fp); + case BFType::AlwaysTrue: + return g_always_true_bf; + default: + LOG_WARN( + "Unsupported bloom filter type {}, falling back to BlockedBF", + static_cast(type)); + return std::make_shared(capacity, fp); + } +} + +inline BloomFilterPtr +NewBloomFilterWithType(uint64_t capacity, + double fp, + std::string_view type_name) { + BFType type = StringToBFType(type_name); + return NewBloomFilterWithType(capacity, fp, type); +} + +inline BloomFilterPtr +BloomFilterFromJson(const nlohmann::json& data) { + if (!data.contains("type")) { + throw std::runtime_error( + "JSON data for bloom filter missing 'type' field"); + } + + std::string type_str = data["type"].get(); + BFType type = StringToBFType(type_str); + + switch (type) { + case BFType::Blocked: + return std::make_shared(data); + case BFType::AlwaysTrue: + return g_always_true_bf; + default: + throw std::runtime_error("Unsupported bloom filter type: " + + type_str); + } +} + +inline std::vector +Locations(const uint8_t* data, size_t len, uint32_t k, BFType bf_type) { + switch (bf_type) { + case BFType::Blocked: + return {XXH3_64bits(data, len)}; + case BFType::AlwaysTrue: + return {}; + default: + LOG_WARN( + "Unsupported bloom filter type in Locations, returning empty"); + return {}; + } +} + +inline std::vector +Locations(std::string_view data, uint32_t k, BFType bf_type) { + return Locations( + reinterpret_cast(data.data()), data.size(), k, bf_type); +} + +} // namespace milvus \ No newline at end of file diff --git a/internal/core/src/common/BloomFilterTest.cpp b/internal/core/src/common/BloomFilterTest.cpp new file mode 100644 index 0000000000..70d9b81963 --- /dev/null +++ b/internal/core/src/common/BloomFilterTest.cpp @@ -0,0 +1,297 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include "common/BloomFilter.h" + +using namespace milvus; + +TEST(BloomFilterTest, BlockedBF_BasicOperations) { + auto bf = NewBloomFilterWithType(1000, 0.01, BFType::Blocked); + + ASSERT_EQ(bf->Type(), BFType::Blocked); + ASSERT_GT(bf->Cap(), 0); + ASSERT_GT(bf->K(), 0); + + bf->Add("hello"); + bf->Add("world"); + bf->Add("milvus"); + + EXPECT_TRUE(bf->Test("hello")); + EXPECT_TRUE(bf->Test("world")); + EXPECT_TRUE(bf->Test("milvus")); +} + +TEST(BloomFilterTest, BlockedBF_Locations) { + auto bf = NewBloomFilterWithType(1000, 0.01, BFType::Blocked); + + bf->Add("test_key"); + + auto locs = Locations("test_key", bf->K(), bf->Type()); + + ASSERT_EQ(locs.size(), 1); + EXPECT_TRUE(bf->TestLocations(locs)); + + auto locs2 = Locations("non_existent", bf->K(), bf->Type()); + EXPECT_FALSE(bf->TestLocations(locs2)); +} + +TEST(BloomFilterTest, BlockedBF_BatchTestLocations) { + auto bf = NewBloomFilterWithType(1000, 0.01, BFType::Blocked); + + std::vector keys = {"key1", "key2", "key3", "key4"}; + std::vector> locs; + + for (size_t i = 0; i < 2; ++i) { + bf->Add(keys[i]); + } + + for (const auto& key : keys) { + locs.push_back(Locations(key, bf->K(), bf->Type())); + } + + std::vector hits(keys.size(), false); + auto results = bf->BatchTestLocations(locs, hits); + + EXPECT_TRUE(results[0]); + EXPECT_TRUE(results[1]); + EXPECT_FALSE(results[2]); + EXPECT_FALSE(results[3]); +} + +TEST(BloomFilterTest, BlockedBF_Serialization) { + auto bf = NewBloomFilterWithType(1000, 0.01, BFType::Blocked); + + bf->Add("serialize_test"); + bf->Add("data"); + + nlohmann::json json_data = bf->ToJson(); + + auto bf2 = BloomFilterFromJson(json_data); + + ASSERT_EQ(bf2->Type(), BFType::Blocked); + EXPECT_TRUE(bf2->Test("serialize_test")); + EXPECT_TRUE(bf2->Test("data")); + EXPECT_FALSE(bf2->Test("not_added")); +} + +TEST(BloomFilterTest, AlwaysTrueBF_BasicOperations) { + auto bf = NewBloomFilterWithType(0, 0.0, BFType::AlwaysTrue); + + ASSERT_EQ(bf->Type(), BFType::AlwaysTrue); + ASSERT_EQ(bf->Cap(), 0); + ASSERT_EQ(bf->K(), 0); + + bf->Add("anything"); + + EXPECT_TRUE(bf->Test("anything")); + EXPECT_TRUE(bf->Test("something")); + EXPECT_TRUE(bf->Test("")); +} + +TEST(BloomFilterTest, AlwaysTrueBF_Locations) { + auto bf = NewBloomFilterWithType(0, 0.0, BFType::AlwaysTrue); + + auto locs = Locations("test", bf->K(), bf->Type()); + EXPECT_TRUE(locs.empty()); + + EXPECT_TRUE(bf->TestLocations({})); + EXPECT_TRUE(bf->TestLocations({1, 2, 3})); +} + +TEST(BloomFilterTest, AlwaysTrueBF_Serialization) { + auto bf = NewBloomFilterWithType(0, 0.0, BFType::AlwaysTrue); + + nlohmann::json json_data = bf->ToJson(); + auto bf2 = BloomFilterFromJson(json_data); + + ASSERT_EQ(bf2->Type(), BFType::AlwaysTrue); + EXPECT_TRUE(bf2->Test("anything")); +} + +TEST(BloomFilterTest, BlockedBF_EmptyString) { + auto bf = NewBloomFilterWithType(100, 0.01, BFType::Blocked); + + bf->Add(""); + EXPECT_TRUE(bf->Test("")); + EXPECT_FALSE(bf->Test("non_empty")); +} + +TEST(BloomFilterTest, BlockedBF_SpecialCharacters) { + auto bf = NewBloomFilterWithType(1000, 0.01, BFType::Blocked); + + std::vector special_strings = {"\0\0\0", + "\n\r\t", + "无懈可击", + "🚀🎉🔥", + "\\x00\\xff\\xaa", + "\"'`", + "<>&", + ""}; + + for (const auto& s : special_strings) { + bf->Add(s); + } + + for (const auto& s : special_strings) { + EXPECT_TRUE(bf->Test(s)) << "Failed for: " << s; + } +} + +TEST(BloomFilterTest, BlockedBF_BinaryData) { + auto bf = NewBloomFilterWithType(1000, 0.01, BFType::Blocked); + + uint8_t binary_data1[] = {0x00, 0x01, 0x02, 0xff, 0xfe}; + uint8_t binary_data2[] = {0xaa, 0xbb, 0xcc, 0xdd, 0xee}; + uint8_t binary_data3[] = {0x00, 0x00, 0x00, 0x00}; + + bf->Add(binary_data1, sizeof(binary_data1)); + bf->Add(binary_data2, sizeof(binary_data2)); + bf->Add(binary_data3, sizeof(binary_data3)); + + EXPECT_TRUE(bf->Test(binary_data1, sizeof(binary_data1))); + EXPECT_TRUE(bf->Test(binary_data2, sizeof(binary_data2))); + EXPECT_TRUE(bf->Test(binary_data3, sizeof(binary_data3))); + + uint8_t not_added[] = {0x11, 0x22, 0x33}; + EXPECT_FALSE(bf->Test(not_added, sizeof(not_added))); +} + +TEST(BloomFilterTest, BlockedBF_IntegerTypes) { + auto bf = NewBloomFilterWithType(1000, 0.01, BFType::Blocked); + + int32_t int32_vals[] = {0, -1, 1, INT32_MAX, INT32_MIN}; + for (auto val : int32_vals) { + bf->Add(reinterpret_cast(&val), sizeof(val)); + } + for (auto val : int32_vals) { + EXPECT_TRUE( + bf->Test(reinterpret_cast(&val), sizeof(val))); + } + + int64_t int64_vals[] = {0L, -1L, 1L, INT64_MAX, INT64_MIN}; + for (auto val : int64_vals) { + bf->Add(reinterpret_cast(&val), sizeof(val)); + } + for (auto val : int64_vals) { + EXPECT_TRUE( + bf->Test(reinterpret_cast(&val), sizeof(val))); + } +} + +TEST(BloomFilterTest, BlockedBF_VerySmallCapacity) { + auto bf = NewBloomFilterWithType(1, 0.01, BFType::Blocked); + + bf->Add("test"); + EXPECT_TRUE(bf->Test("test")); +} + +TEST(BloomFilterTest, BlockedBF_VeryLargeCapacity) { + auto bf = NewBloomFilterWithType(1000000, 0.01, BFType::Blocked); + + ASSERT_GT(bf->Cap(), 0); + ASSERT_GT(bf->K(), 0); + + bf->Add("test"); + EXPECT_TRUE(bf->Test("test")); +} + +TEST(BloomFilterTest, BlockedBF_FalsePositiveRate) { + const int capacity = 10000; + const double expected_fp_rate = 0.01; + auto bf = + NewBloomFilterWithType(capacity, expected_fp_rate, BFType::Blocked); + + std::vector added_keys; + for (int i = 0; i < capacity; ++i) { + std::string key = "key_" + std::to_string(i); + added_keys.push_back(key); + bf->Add(key); + } + + for (const auto& key : added_keys) { + EXPECT_TRUE(bf->Test(key)); + } + + int false_positives = 0; + const int test_count = 10000; + for (int i = 0; i < test_count; ++i) { + std::string test_key = "test_key_" + std::to_string(i + capacity); + if (bf->Test(test_key)) { + false_positives++; + } + } + + double actual_fp_rate = static_cast(false_positives) / test_count; + EXPECT_LT(actual_fp_rate, expected_fp_rate * 3); +} + +TEST(BloomFilterTest, BlockedBF_SerializationIntegrity) { + auto bf1 = NewBloomFilterWithType(5000, 0.01, BFType::Blocked); + + std::vector test_data; + for (int i = 0; i < 1000; ++i) { + test_data.push_back("key_" + std::to_string(i)); + bf1->Add(test_data.back()); + } + + nlohmann::json json_data = bf1->ToJson(); + + ASSERT_TRUE(json_data.contains("type")); + ASSERT_TRUE(json_data.contains("bits")); + ASSERT_TRUE(json_data.contains("num_bits")); + ASSERT_TRUE(json_data.contains("k")); + + auto bf2 = BloomFilterFromJson(json_data); + + EXPECT_EQ(bf1->Type(), bf2->Type()); + EXPECT_EQ(bf1->Cap(), bf2->Cap()); + EXPECT_EQ(bf1->K(), bf2->K()); + + for (const auto& key : test_data) { + EXPECT_TRUE(bf2->Test(key)); + } +} + +TEST(BloomFilterTest, BlockedBF_InvalidJsonDeserialization) { + nlohmann::json invalid_json1; + invalid_json1["bits"] = std::vector{1, 2, 3}; + invalid_json1["num_bits"] = 192; + invalid_json1["k"] = 7; + EXPECT_THROW(BloomFilterFromJson(invalid_json1), std::runtime_error); + + nlohmann::json invalid_json2; + invalid_json2["type"] = "BlockedBloomFilter"; + invalid_json2["num_bits"] = 192; + invalid_json2["k"] = 7; + EXPECT_THROW(BloomFilterFromJson(invalid_json2), std::runtime_error); + + nlohmann::json invalid_json3; + invalid_json3["type"] = "UnknownType"; + EXPECT_THROW(BloomFilterFromJson(invalid_json3), std::runtime_error); +} + +TEST(BloomFilterTest, LocationsConsistency) { + std::string data1 = "consistent_test"; + std::string data2 = "consistent_test"; + + auto locs1 = Locations(data1, 7, BFType::Blocked); + auto locs2 = Locations(data2, 7, BFType::Blocked); + + ASSERT_EQ(locs1.size(), locs2.size()); + for (size_t i = 0; i < locs1.size(); ++i) { + EXPECT_EQ(locs1[i], locs2[i]); + } + + std::string data3 = "different_test"; + auto locs3 = Locations(data3, 7, BFType::Blocked); + EXPECT_NE(locs1, locs3); +} diff --git a/internal/core/src/common/Consts.h b/internal/core/src/common/Consts.h index 0b00887054..c67e8a5a76 100644 --- a/internal/core/src/common/Consts.h +++ b/internal/core/src/common/Consts.h @@ -104,6 +104,10 @@ const bool DEFAULT_GROWING_JSON_KEY_STATS_ENABLED = false; const bool DEFAULT_CONFIG_PARAM_TYPE_CHECK_ENABLED = true; const bool DEFAULT_ENABLE_PARQUET_STATS_SKIP_INDEX = false; +// skipindex stats related +const double DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE = 0.01; +const int64_t DEFAULT_SKIPINDEX_MIN_NGRAM_LENGTH = 3; + // index config related const std::string SEGMENT_INSERT_FILES_KEY = "segment_insert_files"; const std::string INSERT_FILES_KEY = "insert_files"; diff --git a/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.cpp b/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.cpp index 99083342f4..4a5ffef6f8 100644 --- a/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.cpp +++ b/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.cpp @@ -1815,10 +1815,18 @@ PhyBinaryArithOpEvalRangeExpr::ExecRangeVisitorImplForData( } } }; + + auto skip_index_func = + [op_type, arith_type, value, right_operand]( + const SkipIndex& skip_index, FieldId field_id, int64_t chunk_id) { + return skip_index.CanSkipBinaryArithRange( + field_id, chunk_id, op_type, arith_type, value, right_operand); + }; + int64_t processed_size; if (has_offset_input_) { processed_size = ProcessDataByOffsets(execute_sub_batch, - std::nullptr_t{}, + skip_index_func, input, res, valid_res, @@ -1826,7 +1834,7 @@ PhyBinaryArithOpEvalRangeExpr::ExecRangeVisitorImplForData( right_operand); } else { processed_size = ProcessDataChunks(execute_sub_batch, - std::nullptr_t{}, + skip_index_func, res, valid_res, value, diff --git a/internal/core/src/exec/expression/TermExpr.cpp b/internal/core/src/exec/expression/TermExpr.cpp index 8cef0b9df2..7a083ff595 100644 --- a/internal/core/src/exec/expression/TermExpr.cpp +++ b/internal/core/src/exec/expression/TermExpr.cpp @@ -951,17 +951,25 @@ PhyTermFilterExpr::ExecVisitorImplForData(EvalCtx& context) { } processed_cursor += size; }; + + auto set = std::static_pointer_cast>(arg_set_); + auto skip_index_func = + [set](const SkipIndex& skip_index, FieldId field_id, int64_t chunk_id) { + return skip_index.CanSkipInQuery( + field_id, chunk_id, set->GetElements()); + }; + int64_t processed_size; if (has_offset_input_) { processed_size = ProcessDataByOffsets(execute_sub_batch, - std::nullptr_t{}, + skip_index_func, input, res, valid_res, arg_set_); } else { processed_size = ProcessDataChunks( - execute_sub_batch, std::nullptr_t{}, res, valid_res, arg_set_); + execute_sub_batch, skip_index_func, res, valid_res, arg_set_); } AssertInfo(processed_size == real_batch_size, "internal error: expr processed rows {} not equal " diff --git a/internal/core/src/index/SkipIndex.cpp b/internal/core/src/index/SkipIndex.cpp index 3efdad35a9..a33d967051 100644 --- a/internal/core/src/index/SkipIndex.cpp +++ b/internal/core/src/index/SkipIndex.cpp @@ -16,9 +16,9 @@ namespace milvus { -static const FieldChunkMetrics defaultFieldChunkMetrics; +static const index::NoneFieldChunkMetrics defaultFieldChunkMetrics{}; -const cachinglayer::PinWrapper +const cachinglayer::PinWrapper SkipIndex::GetFieldChunkMetrics(milvus::FieldId field_id, int chunk_id) const { // skip index structure must be setup before using, thus we do not lock here. auto field_metrics = fieldChunkMetrics_.find(field_id); @@ -27,115 +27,25 @@ SkipIndex::GetFieldChunkMetrics(milvus::FieldId field_id, int chunk_id) const { auto ca = cachinglayer::SemiInlineGet( field_chunk_metrics->PinCells(nullptr, {chunk_id})); auto metrics = ca->get_cell_of(chunk_id); - return cachinglayer::PinWrapper(ca, metrics); + return cachinglayer::PinWrapper( + ca, metrics); } - return cachinglayer::PinWrapper( + return cachinglayer::PinWrapper( &defaultFieldChunkMetrics); } -std::vector< - std::pair>> +std::vector>> FieldChunkMetricsTranslator::get_cells( const std::vector& cids) { std::vector>> + std::unique_ptr>> cells; cells.reserve(cids.size()); - if (data_type_ == milvus::DataType::VARCHAR) { - for (auto chunk_id : cids) { - auto pw = column_->GetChunk(nullptr, chunk_id); - auto chunk = static_cast(pw.get()); - - int num_rows = chunk->RowNums(); - auto chunk_metrics = std::make_unique(); - if (num_rows > 0) { - auto info = ProcessStringFieldMetrics(chunk); - chunk_metrics->min_ = Metrics(info.min_); - chunk_metrics->max_ = Metrics(info.max_); - chunk_metrics->null_count_ = info.null_count_; - } - chunk_metrics->hasValue_ = chunk_metrics->null_count_ != num_rows; - cells.emplace_back(chunk_id, std::move(chunk_metrics)); - } - } else { - for (auto chunk_id : cids) { - auto pw = column_->GetChunk(nullptr, chunk_id); - auto chunk = static_cast(pw.get()); - auto span = chunk->Span(); - - const void* chunk_data = span.data(); - const bool* valid_data = span.valid_data(); - int64_t count = span.row_count(); - - auto chunk_metrics = std::make_unique(); - - if (count > 0) { - switch (data_type_) { - case DataType::INT8: { - const int8_t* typedData = - static_cast(chunk_data); - auto info = ProcessFieldMetrics( - typedData, valid_data, count); - chunk_metrics->min_ = Metrics(info.min_); - chunk_metrics->max_ = Metrics(info.max_); - chunk_metrics->null_count_ = info.null_count_; - break; - } - case DataType::INT16: { - const int16_t* typedData = - static_cast(chunk_data); - auto info = ProcessFieldMetrics( - typedData, valid_data, count); - chunk_metrics->min_ = Metrics(info.min_); - chunk_metrics->max_ = Metrics(info.max_); - chunk_metrics->null_count_ = info.null_count_; - break; - } - case DataType::INT32: { - const int32_t* typedData = - static_cast(chunk_data); - auto info = ProcessFieldMetrics( - typedData, valid_data, count); - chunk_metrics->min_ = Metrics(info.min_); - chunk_metrics->max_ = Metrics(info.max_); - chunk_metrics->null_count_ = info.null_count_; - break; - } - case DataType::INT64: { - const int64_t* typedData = - static_cast(chunk_data); - auto info = ProcessFieldMetrics( - typedData, valid_data, count); - chunk_metrics->min_ = Metrics(info.min_); - chunk_metrics->max_ = Metrics(info.max_); - chunk_metrics->null_count_ = info.null_count_; - break; - } - case DataType::FLOAT: { - const float* typedData = - static_cast(chunk_data); - auto info = ProcessFieldMetrics( - typedData, valid_data, count); - chunk_metrics->min_ = Metrics(info.min_); - chunk_metrics->max_ = Metrics(info.max_); - chunk_metrics->null_count_ = info.null_count_; - break; - } - case DataType::DOUBLE: { - const double* typedData = - static_cast(chunk_data); - auto info = ProcessFieldMetrics( - typedData, valid_data, count); - chunk_metrics->min_ = Metrics(info.min_); - chunk_metrics->max_ = Metrics(info.max_); - chunk_metrics->null_count_ = info.null_count_; - break; - } - } - } - chunk_metrics->hasValue_ = chunk_metrics->null_count_ != count; - cells.emplace_back(chunk_id, std::move(chunk_metrics)); - } + for (auto chunk_id : cids) { + auto pw = column_->GetChunk(nullptr, chunk_id); + auto chunk_metrics = builder_.Build(data_type_, pw.get()); + cells.emplace_back(chunk_id, std::move(chunk_metrics)); } return cells; } diff --git a/internal/core/src/index/SkipIndex.h b/internal/core/src/index/SkipIndex.h index 1f9b46b5a8..3de67b603a 100644 --- a/internal/core/src/index/SkipIndex.h +++ b/internal/core/src/index/SkipIndex.h @@ -13,65 +13,20 @@ #include #include -#include #include "cachinglayer/CacheSlot.h" #include "cachinglayer/Manager.h" #include "cachinglayer/Translator.h" #include "cachinglayer/Utils.h" -#include "common/Chunk.h" +#include "common/FieldDataInterface.h" #include "common/Types.h" -#include "common/type_c.h" #include "mmap/ChunkedColumnInterface.h" #include "parquet/statistics.h" -#include "parquet/types.h" +#include "index/skipindex_stats/SkipIndexStats.h" + namespace milvus { - -using Metrics = - std::variant; - -// MetricsDataType is used to avoid copy when get min/max value from FieldChunkMetrics -template -using MetricsDataType = - std::conditional_t, std::string_view, T>; - -// ReverseMetricsDataType is used to avoid copy when get min/max value from FieldChunkMetrics -template -using ReverseMetricsDataType = - std::conditional_t, std::string, T>; - -struct FieldChunkMetrics { - Metrics min_; - Metrics max_; - bool hasValue_; - int64_t null_count_; - - FieldChunkMetrics() : hasValue_(false){}; - - template - std::pair, MetricsDataType> - GetMinMax() const { - AssertInfo(hasValue_, - "GetMinMax should never be called when hasValue_ is false"); - MetricsDataType lower_bound; - MetricsDataType upper_bound; - try { - lower_bound = std::get>(min_); - upper_bound = std::get>(max_); - } catch (const std::bad_variant_access& e) { - return {}; - } - return {lower_bound, upper_bound}; - } - - cachinglayer::ResourceUsage - CellByteSize() const { - return {0, 0}; - } -}; - class FieldChunkMetricsTranslatorFromStatistics - : public cachinglayer::Translator { + : public cachinglayer::Translator { public: FieldChunkMetricsTranslatorFromStatistics( int64_t segment_id, @@ -86,51 +41,8 @@ class FieldChunkMetricsTranslatorFromStatistics CacheWarmupPolicy::CacheWarmupPolicy_Disable, false) { for (auto& statistic : statistics) { - auto chunk_metrics = std::make_unique(); - switch (data_type) { - case milvus::DataType::INT8: { - SetMinMaxFromStatistics( - statistic, chunk_metrics.get()); - break; - } - case milvus::DataType::INT16: { - SetMinMaxFromStatistics( - statistic, chunk_metrics.get()); - break; - } - case milvus::DataType::INT32: { - SetMinMaxFromStatistics( - statistic, chunk_metrics.get()); - break; - } - case milvus::DataType::INT64: { - SetMinMaxFromStatistics( - statistic, chunk_metrics.get()); - break; - } - case milvus::DataType::FLOAT: { - SetMinMaxFromStatistics( - statistic, chunk_metrics.get()); - break; - } - case milvus::DataType::DOUBLE: { - SetMinMaxFromStatistics( - statistic, chunk_metrics.get()); - break; - } - case milvus::DataType::VARCHAR: { - SetMinMaxFromStatistics(statistic, - chunk_metrics.get()); - break; - } - default: { - ThrowInfo( - ErrorCode::UnexpectedError, - fmt::format("Unsupported data type: {}", data_type)); - } - } - cells_.emplace_back(std::move(chunk_metrics)); + cells_.emplace_back( + std::move(builder_.Build(data_type_, statistic))); } } @@ -158,19 +70,14 @@ class FieldChunkMetricsTranslatorFromStatistics } std::vector>> + std::unique_ptr>> get_cells(const std::vector& cids) override { std::vector>> + std::unique_ptr>> cells; cells.reserve(cids.size()); for (auto cid : cids) { - auto chunk_metrics = std::make_unique(); - chunk_metrics->min_ = cells_[cid]->min_; - chunk_metrics->max_ = cells_[cid]->max_; - chunk_metrics->null_count_ = cells_[cid]->null_count_; - chunk_metrics->hasValue_ = cells_[cid]->hasValue_; - cells.emplace_back(cid, std::move(chunk_metrics)); + cells.emplace_back(cid, cells_[cid]->Clone()); } return cells; } @@ -187,51 +94,15 @@ class FieldChunkMetricsTranslatorFromStatistics } private: - template ::value, - int>::type = 0> - static void - SetMinMaxFromStatistics( - const std::shared_ptr& statistic, - FieldChunkMetrics* chunk_metrics) { - auto typed_statistics = - std::dynamic_pointer_cast>( - statistic); - chunk_metrics->min_ = static_cast(typed_statistics->min()); - chunk_metrics->max_ = static_cast(typed_statistics->max()); - chunk_metrics->null_count_ = typed_statistics->null_count(); - chunk_metrics->hasValue_ = true; - } - - template ::value, - int>::type = 0> - static void - SetMinMaxFromStatistics( - const std::shared_ptr& statistic, - FieldChunkMetrics* chunk_metrics) { - auto typed_statistics = std::dynamic_pointer_cast< - parquet::TypedStatistics>(statistic); - chunk_metrics->min_ = - std::string(std::string_view(typed_statistics->min())); - chunk_metrics->max_ = - std::string(std::string_view(typed_statistics->max())); - chunk_metrics->null_count_ = typed_statistics->null_count(); - chunk_metrics->hasValue_ = true; - } - std::string key_; milvus::DataType data_type_; + index::SkipIndexStatsBuilder builder_; cachinglayer::Meta meta_; - std::vector> cells_; + std::vector> cells_; }; class FieldChunkMetricsTranslator - : public cachinglayer::Translator { + : public cachinglayer::Translator { public: FieldChunkMetricsTranslator(int64_t segment_id, FieldId field_id, @@ -267,7 +138,7 @@ class FieldChunkMetricsTranslator return key_; } std::vector>> + std::unique_ptr>> get_cells(const std::vector& cids) override; milvus::cachinglayer::Meta* @@ -282,112 +153,60 @@ class FieldChunkMetricsTranslator } private: - // todo: support some null_count_ skip - - template - struct metricInfo { - T min_; - T max_; - int64_t null_count_; - }; - - metricInfo - ProcessStringFieldMetrics(const StringChunk* chunk) { - // all captured by reference - bool has_first_valid = false; - std::string_view min_string; - std::string_view max_string; - int64_t null_count = 0; - - auto row_count = chunk->RowNums(); - - for (int64_t i = 0; i < row_count; ++i) { - bool is_valid = chunk->isValid(i); - if (!is_valid) { - null_count++; - continue; - } - auto value = chunk->operator[](i); - if (!has_first_valid) { - min_string = value; - max_string = value; - has_first_valid = true; - } else { - if (value < min_string) { - min_string = value; - } - if (value > max_string) { - max_string = value; - } - } - } - // The field data may later be released, so we need to copy the string to avoid invalid memory access. - return {std::string(min_string), std::string(max_string), null_count}; - } - - template - metricInfo - ProcessFieldMetrics(const T* data, const bool* valid_data, int64_t count) { - //double check to avoid crash - if (data == nullptr || count == 0) { - return {T(), T()}; - } - // find first not null value - int64_t start = 0; - for (int64_t i = start; i < count; i++) { - if (valid_data != nullptr && !valid_data[i]) { - start++; - continue; - } - break; - } - if (start > count - 1) { - return {T(), T(), count}; - } - T minValue = data[start]; - T maxValue = data[start]; - int64_t null_count = start; - for (int64_t i = start; i < count; i++) { - T value = data[i]; - if (valid_data != nullptr && !valid_data[i]) { - null_count++; - continue; - } - if (value < minValue) { - minValue = value; - } - if (value > maxValue) { - maxValue = value; - } - } - return {minValue, maxValue, null_count}; - } - std::string key_; milvus::DataType data_type_; cachinglayer::Meta meta_; std::shared_ptr column_; + index::SkipIndexStatsBuilder builder_; }; class SkipIndex { + private: + template + struct IsAllowedType { + static constexpr bool isAllowedType = + std::is_integral::value || std::is_floating_point::value || + std::is_same::value || + std::is_same::value; + static constexpr bool isDisabledType = + std::is_same::value || + std::is_same::value; + static constexpr bool value = isAllowedType && !isDisabledType; + static constexpr bool arith_value = + std::is_integral::value && !std::is_same::value; + static constexpr bool in_value = isAllowedType; + }; + + template + using HighPrecisionType = + std::conditional_t && !std::is_same_v, + int64_t, + T>; + public: template - bool + std::enable_if_t::value, bool> CanSkipUnaryRange(FieldId field_id, int64_t chunk_id, OpType op_type, const T& val) const { auto pw = GetFieldChunkMetrics(field_id, chunk_id); auto field_chunk_metrics = pw.get(); - if (MinMaxUnaryFilter(field_chunk_metrics, op_type, val)) { - return true; - } - //further more filters for skip, like ngram filter, bf and so on + return field_chunk_metrics->CanSkipUnaryRange(op_type, + index::Metrics{val}); + } + + template + std::enable_if_t::value, bool> + CanSkipUnaryRange(FieldId field_id, + int64_t chunk_id, + OpType op_type, + const T& val) const { return false; } template - bool + std::enable_if_t::value, bool> CanSkipBinaryRange(FieldId field_id, int64_t chunk_id, const T& lower_val, @@ -396,13 +215,116 @@ class SkipIndex { bool upper_inclusive) const { auto pw = GetFieldChunkMetrics(field_id, chunk_id); auto field_chunk_metrics = pw.get(); - if (MinMaxBinaryFilter(field_chunk_metrics, - lower_val, - upper_val, - lower_inclusive, - upper_inclusive)) { - return true; + return field_chunk_metrics->CanSkipBinaryRange( + index::Metrics{lower_val}, + index::Metrics{upper_val}, + lower_inclusive, + upper_inclusive); + } + + template + std::enable_if_t::value, bool> + CanSkipBinaryRange(FieldId field_id, + int64_t chunk_id, + const T& lower_val, + const T& upper_val, + bool lower_inclusive, + bool upper_inclusive) const { + return false; + } + + template + std::enable_if_t::arith_value, bool> + CanSkipBinaryArithRange(FieldId field_id, + int64_t chunk_id, + OpType op_type, + ArithOpType arith_type, + const HighPrecisionType value, + const HighPrecisionType right_operand) const { + auto check_and_skip = [&](HighPrecisionType new_value_hp, + OpType new_op_type) { + if constexpr (std::is_integral_v) { + if (new_value_hp > std::numeric_limits::max() || + new_value_hp < std::numeric_limits::min()) { + // Overflow detected. The transformed value cannot be represented by T. + // We cannot make a safe comparison with the chunk's min/max. + return false; + } + } + return CanSkipUnaryRange( + field_id, chunk_id, new_op_type, static_cast(new_value_hp)); + }; + switch (arith_type) { + case ArithOpType::Add: { + // field + C > V => field > V - C + return check_and_skip(value - right_operand, op_type); + } + case ArithOpType::Sub: { + // field - C > V => field > V + C + return check_and_skip(value + right_operand, op_type); + } + case ArithOpType::Mul: { + // field * C > V + if (right_operand == 0) { + // field * 0 > V => 0 > V. This doesn't depend on the field's range. + return false; + } + + OpType new_op_type = op_type; + if (right_operand < 0) { + new_op_type = FlipComparisonOperator(op_type); + } + return check_and_skip(value / right_operand, new_op_type); + } + case ArithOpType::Div: { + // field / C > V + if (right_operand == 0) { + // Division by zero. Cannot evaluate, so cannot skip. + return false; + } + + OpType new_op_type = op_type; + if (right_operand < 0) { + new_op_type = FlipComparisonOperator(op_type); + } + return check_and_skip(value * right_operand, new_op_type); + } + default: + return false; } + } + + template + std::enable_if_t::arith_value, bool> + CanSkipBinaryArithRange(FieldId field_id, + int64_t chunk_id, + OpType op_type, + ArithOpType arith_type, + const HighPrecisionType value, + const HighPrecisionType right_operand) const { + return false; + } + + template + std::enable_if_t::in_value, bool> + CanSkipInQuery(FieldId field_id, + int64_t chunk_id, + const std::vector& values) const { + auto pw = GetFieldChunkMetrics(field_id, chunk_id); + auto field_chunk_metrics = pw.get(); + auto vals = std::vector{}; + vals.reserve(values.size()); + for (const auto& v : values) { + vals.emplace_back(v); + } + return field_chunk_metrics->CanSkipIn(vals); + } + + template + std::enable_if_t::in_value, bool> + CanSkipInQuery(FieldId field_id, + int64_t chunk_id, + const std::vector& values) const { return false; } @@ -413,9 +335,9 @@ class SkipIndex { std::shared_ptr column) { auto translator = std::make_unique( segment_id, field_id, data_type, column); - auto cache_slot = - cachinglayer::Manager::GetInstance() - .CreateCacheSlot(std::move(translator)); + auto cache_slot = cachinglayer::Manager::GetInstance() + .CreateCacheSlot( + std::move(translator)); std::unique_lock lck(mutex_); fieldChunkMetrics_[field_id] = std::move(cache_slot); @@ -430,137 +352,39 @@ class SkipIndex { auto translator = std::make_unique( segment_id, field_id, data_type, statistics); - auto cache_slot = - cachinglayer::Manager::GetInstance() - .CreateCacheSlot(std::move(translator)); + auto cache_slot = cachinglayer::Manager::GetInstance() + .CreateCacheSlot( + std::move(translator)); std::unique_lock lck(mutex_); fieldChunkMetrics_[field_id] = std::move(cache_slot); } private: - const cachinglayer::PinWrapper + OpType + FlipComparisonOperator(OpType op) const { + switch (op) { + case OpType::GreaterThan: + return OpType::LessThan; + case OpType::GreaterEqual: + return OpType::LessEqual; + case OpType::LessThan: + return OpType::GreaterThan; + case OpType::LessEqual: + return OpType::GreaterEqual; + // OpType::Equal and OpType::NotEqual do not flip + default: + return op; + } + } + + const cachinglayer::PinWrapper GetFieldChunkMetrics(FieldId field_id, int chunk_id) const; - template - struct IsAllowedType { - static constexpr bool isAllowedType = - std::is_integral::value || std::is_floating_point::value || - std::is_same::value || - std::is_same::value; - static constexpr bool isDisabledType = - std::is_same::value || - std::is_same::value; - static constexpr bool value = isAllowedType && !isDisabledType; - }; - - template - std::enable_if_t::value, bool> - MinMaxUnaryFilter(const FieldChunkMetrics* field_chunk_metrics, - OpType op_type, - const T& val) const { - if (!field_chunk_metrics->hasValue_) { - return false; - } - auto [lower_bound, upper_bound] = field_chunk_metrics->GetMinMax(); - if (lower_bound == MetricsDataType() || - upper_bound == MetricsDataType()) { - return false; - } - return RangeShouldSkip(val, lower_bound, upper_bound, op_type); - } - - template - std::enable_if_t::value, bool> - MinMaxUnaryFilter(const FieldChunkMetrics* field_chunk_metrics, - OpType op_type, - const T& val) const { - return false; - } - - template - std::enable_if_t::value, bool> - MinMaxBinaryFilter(const FieldChunkMetrics* field_chunk_metrics, - const T& lower_val, - const T& upper_val, - bool lower_inclusive, - bool upper_inclusive) const { - if (!field_chunk_metrics->hasValue_) { - return false; - } - auto [lower_bound, upper_bound] = field_chunk_metrics->GetMinMax(); - if (lower_bound == MetricsDataType() || - upper_bound == MetricsDataType()) { - return false; - } - bool should_skip = false; - if (lower_inclusive && upper_inclusive) { - should_skip = - (lower_val > upper_bound) || (upper_val < lower_bound); - } else if (lower_inclusive && !upper_inclusive) { - should_skip = - (lower_val > upper_bound) || (upper_val <= lower_bound); - } else if (!lower_inclusive && upper_inclusive) { - should_skip = - (lower_val >= upper_bound) || (upper_val < lower_bound); - } else { - should_skip = - (lower_val >= upper_bound) || (upper_val <= lower_bound); - } - return should_skip; - } - - template - std::enable_if_t::value, bool> - MinMaxBinaryFilter(const FieldChunkMetrics* field_chunk_metrics, - const T& lower_val, - const T& upper_val, - bool lower_inclusive, - bool upper_inclusive) const { - return false; - } - - template - bool - RangeShouldSkip(const T& value, - const MetricsDataType lower_bound, - const MetricsDataType upper_bound, - OpType op_type) const { - bool should_skip = false; - switch (op_type) { - case OpType::Equal: { - should_skip = value > upper_bound || value < lower_bound; - break; - } - case OpType::LessThan: { - should_skip = value <= lower_bound; - break; - } - case OpType::LessEqual: { - should_skip = value < lower_bound; - break; - } - case OpType::GreaterThan: { - should_skip = value >= upper_bound; - break; - } - case OpType::GreaterEqual: { - should_skip = value > upper_bound; - break; - } - default: { - should_skip = false; - } - } - return should_skip; - } - - private: std::unordered_map< FieldId, - std::shared_ptr>> + std::shared_ptr>> fieldChunkMetrics_; - mutable std::shared_mutex mutex_; }; } // namespace milvus diff --git a/internal/core/src/index/skipindex_stats/SkipIndexStats.cpp b/internal/core/src/index/skipindex_stats/SkipIndexStats.cpp new file mode 100644 index 0000000000..7c0696813c --- /dev/null +++ b/internal/core/src/index/skipindex_stats/SkipIndexStats.cpp @@ -0,0 +1,207 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "index/skipindex_stats/SkipIndexStats.h" +#include +#include "parquet/types.h" + +namespace milvus::index { + +std::unique_ptr +SkipIndexStatsBuilder::Build( + DataType data_type, + const std::shared_ptr& statistic) const { + std::unique_ptr chunk_metrics; + switch (data_type) { + case DataType::INT8: { + auto info = + ProcessFieldMetrics(statistic); + chunk_metrics = std::make_unique>( + info.min_, info.max_, nullptr); + break; + } + case milvus::DataType::INT16: { + auto info = + ProcessFieldMetrics(statistic); + chunk_metrics = std::make_unique>( + info.min_, info.max_, nullptr); + break; + } + case milvus::DataType::INT32: { + auto info = + ProcessFieldMetrics(statistic); + chunk_metrics = std::make_unique>( + info.min_, info.max_, nullptr); + break; + } + case milvus::DataType::INT64: { + auto info = + ProcessFieldMetrics(statistic); + chunk_metrics = std::make_unique>( + info.min_, info.max_, nullptr); + break; + } + case milvus::DataType::FLOAT: { + auto info = + ProcessFieldMetrics(statistic); + chunk_metrics = std::make_unique>( + info.min_, info.max_); + break; + } + case milvus::DataType::DOUBLE: { + auto info = + ProcessFieldMetrics(statistic); + chunk_metrics = std::make_unique>( + info.min_, info.max_); + break; + } + case milvus::DataType::VARCHAR: { + auto info = + ProcessFieldMetrics( + statistic); + chunk_metrics = std::make_unique( + std::string(info.min_), + std::string(info.max_), + nullptr, + nullptr); + break; + } + default: { + chunk_metrics = std::make_unique(); + break; + } + } + return chunk_metrics; +} + +std::unique_ptr +SkipIndexStatsBuilder::Build( + const std::vector>& batches, + int col_idx, + arrow::Type::type data_type) const { + auto none_ptr = std::make_unique(); + if (batches.empty()) { + return none_ptr; + } + switch (data_type) { + case arrow::Type::BOOL: { + metricsInfo info = + ProcessFieldMetrics(batches, + col_idx); + return LoadMetrics(info); + } + case arrow::Type::INT8: { + auto info = + ProcessFieldMetrics(batches, col_idx); + return LoadMetrics(info); + } + case arrow::Type::INT16: { + auto info = ProcessFieldMetrics( + batches, col_idx); + return LoadMetrics(info); + } + case arrow::Type::INT32: { + auto info = ProcessFieldMetrics( + batches, col_idx); + return LoadMetrics(info); + } + case arrow::Type::INT64: { + auto info = ProcessFieldMetrics( + batches, col_idx); + return LoadMetrics(info); + } + case arrow::Type::FLOAT: { + auto info = + ProcessFieldMetrics(batches, col_idx); + return LoadMetrics(info); + } + case arrow::Type::DOUBLE: { + auto info = ProcessFieldMetrics( + batches, col_idx); + return LoadMetrics(info); + } + case arrow::Type::STRING: { + const metricsInfo& info = + ProcessStringFieldMetrics(batches, col_idx); + return LoadMetrics(info); + } + } + return none_ptr; +} + +std::unique_ptr +SkipIndexStatsBuilder::Build(DataType data_type, const Chunk* chunk) const { + auto none_ptr = std::make_unique(); + if (chunk == nullptr || chunk->RowNums() == 0) { + return none_ptr; + } + if (data_type == DataType::VARCHAR) { + auto string_chunk = static_cast(chunk); + metricsInfo info = ProcessStringFieldMetrics(string_chunk); + return LoadMetrics(info); + } + auto fixed_chunk = static_cast(chunk); + auto span = fixed_chunk->Span(); + + const void* chunk_data = span.data(); + const bool* valid_data = span.valid_data(); + int64_t count = span.row_count(); + switch (data_type) { + case DataType::BOOL: { + const bool* typedData = static_cast(chunk_data); + auto info = ProcessFieldMetrics(typedData, valid_data, count); + return LoadMetrics(info); + } + case DataType::INT8: { + const int8_t* typedData = static_cast(chunk_data); + auto info = + ProcessFieldMetrics(typedData, valid_data, count); + return LoadMetrics(info); + } + case DataType::INT16: { + const int16_t* typedData = static_cast(chunk_data); + auto info = + ProcessFieldMetrics(typedData, valid_data, count); + return LoadMetrics(info); + } + case DataType::INT32: { + const int32_t* typedData = static_cast(chunk_data); + auto info = + ProcessFieldMetrics(typedData, valid_data, count); + return LoadMetrics(info); + } + case DataType::INT64: { + const int64_t* typedData = static_cast(chunk_data); + auto info = + ProcessFieldMetrics(typedData, valid_data, count); + return LoadMetrics(info); + } + case DataType::FLOAT: { + const float* typedData = static_cast(chunk_data); + auto info = + ProcessFieldMetrics(typedData, valid_data, count); + return LoadMetrics(info); + } + case DataType::DOUBLE: { + const double* typedData = static_cast(chunk_data); + auto info = + ProcessFieldMetrics(typedData, valid_data, count); + return LoadMetrics(info); + } + } + return none_ptr; +} +} // namespace milvus::index \ No newline at end of file diff --git a/internal/core/src/index/skipindex_stats/SkipIndexStats.h b/internal/core/src/index/skipindex_stats/SkipIndexStats.h new file mode 100644 index 0000000000..84a2eac9ef --- /dev/null +++ b/internal/core/src/index/skipindex_stats/SkipIndexStats.h @@ -0,0 +1,1122 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "nlohmann/json.hpp" +#include "ankerl/unordered_dense.h" +#include "arrow/type_fwd.h" +#include "arrow/array/array_primitive.h" +#include "parquet/statistics.h" +#include "common/BloomFilter.h" +#include "common/Chunk.h" +#include "common/Consts.h" +#include "common/Types.h" +#include "common/FieldDataInterface.h" +#include "index/Utils.h" +#include "index/skipindex_stats/utils.h" + +namespace milvus::index { + +using Metrics = std::variant; + +template +using MetricsDataType = + std::conditional_t, std::string_view, T>; + +const std::string NONE_FIELD_CHUNK_METRICS = "NONE"; +const std::string BOOLEAN_FIELD_CHUNK_METRICS = "BOOLEAN"; +const std::string FLOAT_FIELD_CHUNK_METRICS = "FLOAT"; +const std::string INT_FIELD_CHUNK_METRICS = "INT"; +const std::string STRING_FIELD_CHUNK_METRICS = "STRING"; + +enum class FieldChunkMetricsType { + NONE = 0, + BOOLEAN, + FLOAT, + INT, + STRING, +}; + +inline std::string +FieldChunkMetricsTypeToString(FieldChunkMetricsType type) { + switch (type) { + case FieldChunkMetricsType::BOOLEAN: + return BOOLEAN_FIELD_CHUNK_METRICS; + case FieldChunkMetricsType::FLOAT: + return FLOAT_FIELD_CHUNK_METRICS; + case FieldChunkMetricsType::INT: + return INT_FIELD_CHUNK_METRICS; + case FieldChunkMetricsType::STRING: + return STRING_FIELD_CHUNK_METRICS; + default: + return NONE_FIELD_CHUNK_METRICS; + } +} + +inline FieldChunkMetricsType +StringToFieldChunkMetricsType(std::string_view name) { + if (name == BOOLEAN_FIELD_CHUNK_METRICS) { + return FieldChunkMetricsType::BOOLEAN; + } + if (name == FLOAT_FIELD_CHUNK_METRICS) { + return FieldChunkMetricsType::FLOAT; + } + if (name == INT_FIELD_CHUNK_METRICS) { + return FieldChunkMetricsType::INT; + } + if (name == STRING_FIELD_CHUNK_METRICS) { + return FieldChunkMetricsType::STRING; + } + return FieldChunkMetricsType::NONE; +} + +template +inline bool +RangeShouldSkip(const T& value, + const T& lower_bound, + const T& upper_bound, + OpType op_type) { + bool should_skip = false; + switch (op_type) { + case OpType::Equal: { + should_skip = value > upper_bound || value < lower_bound; + break; + } + case OpType::LessThan: { + should_skip = value <= lower_bound; + break; + } + case OpType::LessEqual: { + should_skip = value < lower_bound; + break; + } + case OpType::GreaterThan: { + should_skip = value >= upper_bound; + break; + } + case OpType::GreaterEqual: { + should_skip = value > upper_bound; + break; + } + default: { + should_skip = false; + } + } + return should_skip; +} + +template +inline bool +RangeShouldSkip(const T& lower_val, + const T& upper_val, + const T& lower_bound, + const T& upper_bound, + bool lower_inclusive, + bool upper_inclusive) { + bool should_skip = false; + if (lower_inclusive && upper_inclusive) { + should_skip = (lower_val > upper_bound) || (upper_val < lower_bound); + } else if (lower_inclusive && !upper_inclusive) { + should_skip = (lower_val > upper_bound) || (upper_val <= lower_bound); + } else if (!lower_inclusive && upper_inclusive) { + should_skip = (lower_val >= upper_bound) || (upper_val < lower_bound); + } else { + should_skip = (lower_val >= upper_bound) || (upper_val <= lower_bound); + } + return should_skip; +} + +class FieldChunkMetrics { + public: + FieldChunkMetrics() = default; + virtual ~FieldChunkMetrics() = default; + + virtual std::unique_ptr + Clone() const = 0; + + virtual FieldChunkMetricsType + GetMetricsType() const = 0; + + virtual bool + CanSkipUnaryRange(OpType op_type, const Metrics& val) const = 0; + + virtual bool + CanSkipBinaryRange(const Metrics& lower_val, + const Metrics& upper_val, + bool lower_inclusive, + bool upper_inclusive) const { + return false; + } + + virtual bool + CanSkipIn(const std::vector& values) const { + return false; + } + + cachinglayer::ResourceUsage + CellByteSize() const { + return cell_size_; + } + + void + SetCellSize(cachinglayer::ResourceUsage cell_size) { + cell_size_ = cell_size; + } + + virtual nlohmann::json + ToJson() const = 0; + + protected: + bool has_value_{false}; + cachinglayer::ResourceUsage cell_size_ = {0, 0}; +}; + +class NoneFieldChunkMetrics : public FieldChunkMetrics { + public: + NoneFieldChunkMetrics() = default; + ~NoneFieldChunkMetrics() = default; + + std::unique_ptr + Clone() const override { + return std::make_unique(); + } + + FieldChunkMetricsType + GetMetricsType() const override { + return FieldChunkMetricsType::NONE; + } + + bool + CanSkipUnaryRange(OpType op_type, const Metrics& val) const override { + return false; + } + + bool + CanSkipBinaryRange(const Metrics& lower_val, + const Metrics& upper_val, + bool lower_inclusive, + bool upper_inclusive) const override { + return false; + } + + bool + CanSkipIn(const std::vector& values) const override { + return false; + } + + nlohmann::json + ToJson() const override { + nlohmann::json j; + j["type"] = FieldChunkMetricsTypeToString(GetMetricsType()); + return j; + } +}; + +class BooleanFieldChunkMetrics : public FieldChunkMetrics { + public: + BooleanFieldChunkMetrics() = default; + BooleanFieldChunkMetrics(bool has_true, bool has_false) + : has_true_(has_true), has_false_(has_false) { + this->has_value_ = true; + } + + std::unique_ptr + Clone() const override { + if (!this->has_value_) { + return std::make_unique(); + } + return std::make_unique(has_true_, + has_false_); + } + + bool + CanSkipUnaryRange(OpType op_type, const Metrics& val) const override { + return false; + } + + bool + CanSkipBinaryRange(const Metrics& lower_val, + const Metrics& upper_val, + bool lower_inclusive, + bool upper_inclusive) const override { + return false; + } + + bool + CanSkipIn(const std::vector& values) const override { + if (!this->has_value_ || values.size() != 2) { + return false; + } + if (!std::holds_alternative(values[0]) || + !std::holds_alternative(values[1])) { + return false; + } + bool contains_true = std::get(values[0]); + bool contains_false = std::get(values[1]); + if (contains_true && has_true_ || contains_false && has_false_) { + return false; + } + return true; + } + + FieldChunkMetricsType + GetMetricsType() const override { + return FieldChunkMetricsType::BOOLEAN; + } + + nlohmann::json + ToJson() const override { + nlohmann::json j; + j["type"] = FieldChunkMetricsTypeToString(GetMetricsType()); + j["has_value"] = this->has_value_; + j["has_true"] = has_true_; + j["has_false"] = has_false_; + return j; + } + + private: + bool has_true_ = false; + bool has_false_ = false; +}; + +template +class FloatFieldChunkMetrics : public FieldChunkMetrics { + public: + FloatFieldChunkMetrics() = default; + FloatFieldChunkMetrics(T min, T max) : min_(min), max_(max) { + this->has_value_ = true; + } + + std::unique_ptr + Clone() const override { + if (!this->has_value_) { + return std::make_unique(); + } + return std::make_unique>(min_, max_); + } + + bool + CanSkipUnaryRange(OpType op_type, const Metrics& val) const override { + if (!this->has_value_) { + return false; + } + if (!std::holds_alternative(val)) { + return false; + } + const T& typed_val = std::get(val); + return RangeShouldSkip(typed_val, min_, max_, op_type); + } + + bool + CanSkipIn(const std::vector& values) const override { + if (!this->has_value_ || values.empty()) { + return false; + } + for (const auto& v : values) { + if (!std::holds_alternative(v)) { + return false; // Mixed types in IN list, cannot evaluate + } + } + T typed_min = std::get(values[0]); + T typed_max = std::get(values[0]); + for (const auto& v : values) { + const T& current_val = std::get(v); + if (current_val < typed_min) { + typed_min = current_val; + } + if (current_val > typed_max) { + typed_max = current_val; + } + } + return RangeShouldSkip(typed_min, typed_max, min_, max_, true, true); + } + + bool + CanSkipBinaryRange(const Metrics& lower_val, + const Metrics& upper_val, + bool lower_inclusive, + bool upper_inclusive) const override { + if (!std::holds_alternative(lower_val) || + !std::holds_alternative(upper_val)) { + return false; + } + if (!this->has_value_) { + return false; + } + const T& typed_lower = std::get(lower_val); + const T& typed_upper = std::get(upper_val); + return RangeShouldSkip(typed_lower, + typed_upper, + min_, + max_, + lower_inclusive, + upper_inclusive); + } + + FieldChunkMetricsType + GetMetricsType() const override { + return FieldChunkMetricsType::FLOAT; + } + + nlohmann::json + ToJson() const override { + nlohmann::json j; + j["type"] = FieldChunkMetricsTypeToString(GetMetricsType()); + if (this->has_value_) { + j["min"] = min_; + j["max"] = max_; + } + return j; + } + + private: + T min_; + T max_; +}; + +template +class IntFieldChunkMetrics : public FieldChunkMetrics { + public: + IntFieldChunkMetrics() = default; + IntFieldChunkMetrics(T min, T max, BloomFilterPtr bloom_filter) + : min_(min), max_(max), bloom_filter_(bloom_filter) { + this->has_value_ = true; + } + + std::unique_ptr + Clone() const override { + if (!this->has_value_) { + return std::make_unique(); + } + return std::make_unique( + min_, max_, bloom_filter_); + } + + FieldChunkMetricsType + GetMetricsType() const override { + return FieldChunkMetricsType::INT; + } + + bool + CanSkipUnaryRange(OpType op_type, const Metrics& val) const override { + if (!this->has_value_) { + return false; + } + if (!std::holds_alternative(val)) { + return false; + } + const T& typed_val = std::get(val); + if (op_type == OpType::Equal && bloom_filter_) { + return !bloom_filter_->Test( + reinterpret_cast(&typed_val), + sizeof(typed_val)); + } + return RangeShouldSkip(typed_val, min_, max_, op_type); + } + + bool + CanSkipIn(const std::vector& values) const override { + if (!this->has_value_ || values.empty()) { + return false; + } + for (const auto& v : values) { + if (!std::holds_alternative(v)) { + return false; + } + } + if (!bloom_filter_) { + T typed_min = std::get(values[0]); + T typed_max = std::get(values[0]); + for (const auto& v : values) { + const T& current_val = std::get(v); + if (current_val < typed_min) { + typed_min = current_val; + } + if (current_val > typed_max) { + typed_max = current_val; + } + } + return RangeShouldSkip( + typed_min, typed_max, min_, max_, true, true); + } + for (const auto& v : values) { + const T& current_val = std::get(v); + if (bloom_filter_->Test( + reinterpret_cast(¤t_val), + sizeof(current_val))) { + return false; + } + } + return true; + } + + bool + CanSkipBinaryRange(const Metrics& lower_val, + const Metrics& upper_val, + bool lower_inclusive, + bool upper_inclusive) const override { + if (!std::holds_alternative(lower_val) || + !std::holds_alternative(upper_val)) { + return false; + } + if (!this->has_value_) { + return false; + } + const T& typed_lower = std::get(lower_val); + const T& typed_upper = std::get(upper_val); + return RangeShouldSkip(typed_lower, + typed_upper, + min_, + max_, + lower_inclusive, + upper_inclusive); + } + + nlohmann::json + ToJson() const override { + nlohmann::json j; + j["type"] = FieldChunkMetricsTypeToString(GetMetricsType()); + + if (this->has_value_) { + j["min"] = min_; + j["max"] = max_; + if (bloom_filter_) { + auto bf_data = bloom_filter_->ToJson(); + j["bloom_filter"] = nlohmann::json::binary(bf_data); + } + } + + return j; + } + + private: + T min_; + T max_; + BloomFilterPtr bloom_filter_{nullptr}; +}; + +class StringFieldChunkMetrics : public FieldChunkMetrics { + public: + StringFieldChunkMetrics(std::string min, + std::string max, + BloomFilterPtr bloom_filter, + BloomFilterPtr ngram_bloom_filter) + : min_(min), + max_(max), + bloom_filter_(bloom_filter), + ngram_bloom_filter_(ngram_bloom_filter) { + this->has_value_ = true; + } + + std::unique_ptr + Clone() const override { + if (!this->has_value_) { + return std::make_unique(); + } + return std::make_unique( + min_, max_, bloom_filter_, ngram_bloom_filter_); + } + + FieldChunkMetricsType + GetMetricsType() const override { + return FieldChunkMetricsType::STRING; + } + + bool + CanSkipUnaryRange(OpType op_type, const Metrics& val) const override { + if (!this->has_value_) { + return false; + } + auto typed_val = ExtractStringView(val); + if (!typed_val.has_value()) { + return false; + } + auto value = *typed_val; + switch (op_type) { + case OpType::Equal: { + if (!bloom_filter_) { + return RangeShouldSkip(*typed_val, + std::string_view(min_), + std::string_view(max_), + op_type); + } + return !bloom_filter_->Test(value); + } + case OpType::LessThan: + case OpType::LessEqual: + case OpType::GreaterThan: + case OpType::GreaterEqual: { + return RangeShouldSkip(*typed_val, + std::string_view(min_), + std::string_view(max_), + op_type); + } + case OpType::InnerMatch: + case OpType::PrefixMatch: + case OpType::PostfixMatch: { + if (!ngram_bloom_filter_ || + typed_val->size() < DEFAULT_SKIPINDEX_MIN_NGRAM_LENGTH) { + return false; + } + + ankerl::unordered_dense::set ngrams; + ExtractNgrams( + ngrams, *typed_val, DEFAULT_SKIPINDEX_MIN_NGRAM_LENGTH); + for (const auto& ngram : ngrams) { + if (!ngram_bloom_filter_->Test(ngram)) { + return true; + } + } + return false; + } + default: + return false; + } + return false; + } + + bool + CanSkipIn(const std::vector& values) const override { + if (!this->has_value_ || values.empty()) { + return false; + } + std::vector string_values; + string_values.reserve(values.size()); + for (const auto& v : values) { + auto sv = ExtractStringView(v); + if (!sv.has_value()) { + return false; + } + string_values.push_back(*sv); + } + if (!bloom_filter_) { + std::string_view min, max; + for (auto v : string_values) { + if (min.empty() || v < min) { + min = v; + } + if (max.empty() || v > max) { + max = v; + } + } + return RangeShouldSkip(min, + max, + std::string_view(min_), + std::string_view(max_), + true, + true); + } + for (auto v : string_values) { + if (bloom_filter_->Test(v)) { + return false; + } + } + return true; + } + + bool + CanSkipBinaryRange(const Metrics& lower_val, + const Metrics& upper_val, + bool lower_inclusive, + bool upper_inclusive) const override { + if (!this->has_value_) { + return false; + } + auto typed_min = ExtractStringView(lower_val); + auto typed_max = ExtractStringView(upper_val); + if (!typed_min.has_value() || !typed_max.has_value()) { + return false; + } + + return RangeShouldSkip(*typed_min, + *typed_max, + std::string_view(min_), + std::string_view(max_), + lower_inclusive, + upper_inclusive); + } + + nlohmann::json + ToJson() const override { + nlohmann::json j; + j["type"] = FieldChunkMetricsTypeToString(GetMetricsType()); + + if (this->has_value_) { + j["min"] = min_; + j["max"] = max_; + if (bloom_filter_) { + auto bf_data = bloom_filter_->ToJson(); + j["bloom_filter"] = nlohmann::json::binary(bf_data); + } + if (ngram_bloom_filter_) { + auto ngram_bf_data = ngram_bloom_filter_->ToJson(); + j["ngram_bloom_filter"] = nlohmann::json::binary(ngram_bf_data); + } + } + + return j; + } + + static std::optional + ExtractStringView(const Metrics& val) { + if (std::holds_alternative(val)) { + return std::get(val); + } else if (std::holds_alternative(val)) { + return std::string_view(std::get(val)); + } + return std::nullopt; + } + + private: + std::string min_; + std::string max_; + BloomFilterPtr bloom_filter_{nullptr}; + BloomFilterPtr ngram_bloom_filter_{nullptr}; +}; + +template +inline std::unique_ptr +NewFieldMetrics(const nlohmann::json& data) { + std::unique_ptr none_metrics = + std::make_unique(); + if (!data.contains("type")) { + return none_metrics; + } + auto type = StringToFieldChunkMetricsType(data["type"].get()); + switch (type) { + case FieldChunkMetricsType::BOOLEAN: { + if (!data.contains("has_true") || !data.contains("has_false")) { + return none_metrics; + } + bool has_true = data["has_true"].get(); + bool has_false = data["has_false"].get(); + return std::make_unique(has_true, + has_false); + } + + case FieldChunkMetricsType::FLOAT: { + if (!data.contains("min") || !data.contains("max")) { + return none_metrics; + } + T min = data["min"].get(); + T max = data["max"].get(); + return std::make_unique>(min, max); + } + case FieldChunkMetricsType::INT: { + if (!data.contains("min") || !data.contains("max")) { + return none_metrics; + } + T min = data["min"].get(); + T max = data["max"].get(); + BloomFilterPtr bloom_filter = nullptr; + if (data.contains("bloom_filter")) { + bloom_filter = BloomFilterFromJson(data["bloom_filter"]); + } + return std::make_unique>( + min, max, bloom_filter); + } + + case FieldChunkMetricsType::STRING: { + if (!data.contains("min") || !data.contains("max")) { + return none_metrics; + } + std::string min = data["min"].get(); + std::string max = data["max"].get(); + BloomFilterPtr bloom_filter = nullptr; + if (data.contains("bloom_filter")) { + bloom_filter = BloomFilterFromJson(data["bloom_filter"]); + } + BloomFilterPtr ngram_filter = nullptr; + if (data.contains("ngram_bloom_filter")) { + ngram_filter = BloomFilterFromJson(data["ngram_bloom_filter"]); + } + return std::make_unique( + min, max, bloom_filter, ngram_filter); + } + default: + return none_metrics; + } + return none_metrics; +} + +class SkipIndexStatsBuilder { + public: + SkipIndexStatsBuilder() = default; + SkipIndexStatsBuilder(const Config& config) { + auto enable_bloom_filter = + GetValueFromConfig(config, "enable_bloom_filter"); + if (enable_bloom_filter.has_value()) { + enable_bloom_filter_ = *enable_bloom_filter; + } + } + + std::unique_ptr + Build(DataType data_type, + const std::shared_ptr& statistic) const; + + std::unique_ptr + Build(const std::vector>& batches, + int col_idx, + arrow::Type::type data_type) const; + + std::unique_ptr + Build(DataType data_type, const Chunk* chunk) const; + + private: + template + struct metricsInfo { + int64_t total_rows_ = 0; + int64_t null_count_ = 0; + + MetricsDataType min_; + MetricsDataType max_; + + bool contains_true_ = false; + bool contains_false_ = false; + + ankerl::unordered_dense::set> unique_values_; + ankerl::unordered_dense::set ngram_values_; + }; + + template + metricsInfo + ProcessFieldMetrics( + const std::shared_ptr& statistics) const { + auto typed_statistics = + std::dynamic_pointer_cast>( + statistics); + MetricsDataType min; + MetricsDataType max; + if constexpr (std::is_same_v) { + min = std::string_view(typed_statistics->min()); + max = std::string_view(typed_statistics->max()); + } else { + min = static_cast(typed_statistics->min()); + max = static_cast(typed_statistics->max()); + } + return {typed_statistics->num_values(), + typed_statistics->null_count(), + min, + max, + false, + false, + {}}; + } + + template + metricsInfo + ProcessFieldMetrics( + const std::vector>& batches, + int col_idx) const { + T min, max; + int64_t total_rows = 0; + int64_t null_count = 0; + bool contains_true = false; + bool contains_false = false; + ankerl::unordered_dense::set unique_values; + + bool has_first_valid = false; + for (const auto& batch : batches) { + auto arr = batch->column(col_idx); + auto array = std::static_pointer_cast(arr); + for (int64_t i = 0; i < array->length(); ++i) { + if (array->IsNull(i)) { + null_count++; + continue; + } + T value = array->Value(i); + if constexpr (std::is_same_v) { + if (value) { + contains_true = true; + } else { + contains_false = true; + } + continue; + } + if (!has_first_valid) { + min = value; + max = value; + has_first_valid = true; + } else { + if (value < min) { + min = value; + } + if (value > max) { + max = value; + } + } + if (!enable_bloom_filter_) { + continue; + } + if constexpr (std::is_integral_v) { + unique_values.insert(value); + } + } + total_rows += array->length(); + } + return {total_rows, + null_count, + min, + max, + contains_true, + contains_false, + std::move(unique_values)}; + } + + metricsInfo + ProcessStringFieldMetrics( + const std::vector>& batches, + int col_idx) const { + int64_t total_rows = 0; + int64_t null_count = 0; + std::string_view min; + std::string_view max; + ankerl::unordered_dense::set unique_values; + ankerl::unordered_dense::set ngram_values; + + bool has_first_valid = false; + for (const auto& batch : batches) { + auto arr = batch->column(col_idx); + auto array = std::static_pointer_cast(arr); + for (int64_t i = 0; i < array->length(); ++i) { + if (array->IsNull(i)) { + null_count++; + continue; + } + auto value = array->GetView(i); + if (!has_first_valid) { + min = value; + max = value; + has_first_valid = true; + } else { + if (value < min) { + min = value; + } + if (value > max) { + max = value; + } + } + if (!enable_bloom_filter_ || + unique_values.find(value) != unique_values.end()) { + continue; + } + unique_values.insert(value); + size_t length = value.length(); + ExtractNgrams( + ngram_values, value, DEFAULT_SKIPINDEX_MIN_NGRAM_LENGTH); + } + total_rows += array->length(); + } + return {total_rows, + null_count, + min, + max, + false, + false, + std::move(unique_values), + std::move(ngram_values)}; + } + + template + metricsInfo + ProcessFieldMetrics(const T* data, + const bool* valid_data, + int64_t count) const { + bool has_first_valid = false; + T min, max; + int64_t total_rows = count; + int64_t null_count = 0; + bool contains_true = false; + bool contains_false = false; + ankerl::unordered_dense::set unique_values; + + for (int64_t i = 0; i < count; i++) { + T value = data[i]; + if (valid_data != nullptr && !valid_data[i]) { + null_count++; + continue; + } + if constexpr (std::is_same_v) { + if (value) { + contains_true = true; + } else { + contains_false = true; + } + continue; + } + if (!has_first_valid) { + min = value; + max = value; + has_first_valid = true; + } else { + if (value < min) { + min = value; + } + if (value > max) { + max = value; + } + } + if (!enable_bloom_filter_) { + continue; + } + if constexpr (std::is_integral_v) { + unique_values.insert(value); + } + } + return {total_rows, + null_count, + min, + max, + contains_true, + contains_false, + std::move(unique_values)}; + } + + metricsInfo + ProcessStringFieldMetrics(const StringChunk* chunk) const { + // all captured by reference + bool has_first_valid = false; + int64_t total_rows = chunk->RowNums(); + int64_t null_count = 0; + std::string_view min; + std::string_view max; + ankerl::unordered_dense::set unique_values; + ankerl::unordered_dense::set ngram_values; + + for (int64_t i = 0; i < total_rows; ++i) { + bool is_valid = chunk->isValid(i); + if (!is_valid) { + null_count++; + continue; + } + auto value = chunk->operator[](i); + if (!has_first_valid) { + min = value; + max = value; + has_first_valid = true; + } else { + if (value < min) { + min = value; + } + if (value > max) { + max = value; + } + } + if (!enable_bloom_filter_ || + unique_values.find(value) != unique_values.end()) { + continue; + } + unique_values.insert(value); + size_t length = value.length(); + ExtractNgrams( + ngram_values, value, DEFAULT_SKIPINDEX_MIN_NGRAM_LENGTH); + } + return {total_rows, + null_count, + min, + max, + false, + false, + std::move(unique_values), + std::move(ngram_values)}; + } + + template + std::unique_ptr + LoadMetrics(const metricsInfo& info) const { + if (info.total_rows_ - info.null_count_ == 0) { + return std::make_unique(); + } + if constexpr (std::is_same_v) { + if (info.contains_true_ && info.contains_false_) { + return std::make_unique(); + } + return std::make_unique( + info.contains_true_, info.contains_false_); + } + T min, max; + if constexpr (std::is_same_v) { + min = std::string(info.min_); + max = std::string(info.max_); + } else { + min = info.min_; + max = info.max_; + } + if constexpr (std::is_floating_point_v) { + return std::make_unique>(min, max); + } + if (!enable_bloom_filter_) { + if constexpr (std::is_same_v) { + return std::make_unique( + min, max, nullptr, nullptr); + } + return std::make_unique>(min, max, nullptr); + } + BloomFilterPtr bloom_filter = + NewBloomFilterWithType(info.unique_values_.size(), + DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE, + BFType::Blocked); + if constexpr (std::is_same_v) { + for (const auto& val : info.unique_values_) { + bloom_filter->Add(val); + } + if (info.ngram_values_.empty()) { + return std::make_unique( + min, max, std::move(bloom_filter), nullptr); + } + BloomFilterPtr ngram_bloom_filter = + NewBloomFilterWithType(info.ngram_values_.size(), + DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE, + BFType::Blocked); + for (const auto& ngram : info.ngram_values_) { + ngram_bloom_filter->Add(std::string_view(ngram)); + } + return std::make_unique( + min, + max, + std::move(bloom_filter), + std::move(ngram_bloom_filter)); + } + + for (const auto& val : info.unique_values_) { + bloom_filter->Add(reinterpret_cast(&val), + sizeof(val)); + } + return std::make_unique>( + min, max, std::move(bloom_filter)); + } + + private: + bool enable_bloom_filter_ = false; +}; + +} // namespace milvus::index diff --git a/internal/core/src/index/skipindex_stats/SkipIndexStatsTest.cpp b/internal/core/src/index/skipindex_stats/SkipIndexStatsTest.cpp new file mode 100644 index 0000000000..ed01601069 --- /dev/null +++ b/internal/core/src/index/skipindex_stats/SkipIndexStatsTest.cpp @@ -0,0 +1,1097 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +#include "storage/Util.h" +#include "common/ChunkWriter.h" +#include "index/skipindex_stats/SkipIndexStats.h" + +using namespace milvus; +using namespace milvus::index; + +class SkipIndexStatsBuilderTest : public ::testing::Test { + protected: + void + SetUp() override { + auto config = milvus::Config(); + config["enable_bloom_filter"] = true; + builder_ = std::make_unique(config); + } + + std::unique_ptr builder_; +}; + +TEST_F(SkipIndexStatsBuilderTest, BuildFromArrowBatches) { + // BOOL + { + auto schema = arrow::schema({arrow::field("col", arrow::boolean())}); + arrow::BooleanBuilder builder; + ASSERT_TRUE(builder.Append(false).ok()); + ASSERT_TRUE(builder.Append(false).ok()); + ASSERT_TRUE(builder.Append(false).ok()); + ASSERT_TRUE(builder.Append(false).ok()); + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto batch = arrow::RecordBatch::Make(schema, 4, {array}); + std::vector> batches = {batch}; + + auto metrics = builder_->Build(batches, 0, arrow::Type::BOOL); + ASSERT_NE(metrics, nullptr); + EXPECT_EQ(metrics->GetMetricsType(), FieldChunkMetricsType::BOOLEAN); + + std::vector query_true = {true, false}; + std::vector query_false = {false, true}; + EXPECT_TRUE(metrics->CanSkipIn(query_true)); + EXPECT_FALSE(metrics->CanSkipIn(query_false)); + } + + // INT8 + { + auto schema = arrow::schema({arrow::field("col", arrow::int8())}); + arrow::Int8Builder builder; + std::vector values = {-10, 0, 5, 10, 20}; + ASSERT_TRUE(builder.AppendValues(values).ok()); + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto batch = arrow::RecordBatch::Make(schema, values.size(), {array}); + std::vector> batches = {batch}; + + auto metrics = builder_->Build(batches, 0, arrow::Type::INT8); + ASSERT_NE(metrics, nullptr); + EXPECT_EQ(metrics->GetMetricsType(), FieldChunkMetricsType::INT); + EXPECT_FALSE(metrics->CanSkipUnaryRange(OpType::Equal, int8_t(5))); + EXPECT_TRUE(metrics->CanSkipUnaryRange(OpType::Equal, int8_t(100))); + } + + // INT16 + { + auto schema = arrow::schema({arrow::field("col", arrow::int16())}); + arrow::Int16Builder builder; + for (int16_t i = 0; i < 100; ++i) { + ASSERT_TRUE(builder.Append(i * 10).ok()); + } + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto batch = arrow::RecordBatch::Make(schema, 100, {array}); + std::vector> batches = {batch}; + + auto metrics = builder_->Build(batches, 0, arrow::Type::INT16); + ASSERT_NE(metrics, nullptr); + EXPECT_TRUE( + metrics->CanSkipUnaryRange(OpType::GreaterThan, int16_t(1000))); + EXPECT_FALSE(metrics->CanSkipUnaryRange(OpType::Equal, int16_t(50))); + } + + // INT32 + { + auto schema = arrow::schema({arrow::field("col", arrow::int32())}); + arrow::Int32Builder builder; + for (int32_t i = 0; i < 1000; ++i) { + ASSERT_TRUE(builder.Append(i).ok()); + } + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto batch = arrow::RecordBatch::Make(schema, 1000, {array}); + std::vector> batches = {batch}; + + auto metrics = builder_->Build(batches, 0, arrow::Type::INT32); + ASSERT_NE(metrics, nullptr); + + std::vector in_values = {int32_t(2000), int32_t(3000)}; + EXPECT_TRUE(metrics->CanSkipIn(in_values)); + std::vector in_values2 = {int32_t(10), int32_t(20)}; + EXPECT_FALSE(metrics->CanSkipIn(in_values2)); + } + + // INT64 + { + auto schema = arrow::schema({arrow::field("col", arrow::int64())}); + arrow::Int64Builder builder; + std::vector values = {INT64_MIN / 2, 0, INT64_MAX / 2}; + ASSERT_TRUE(builder.AppendValues(values).ok()); + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto batch = arrow::RecordBatch::Make(schema, values.size(), {array}); + std::vector> batches = {batch}; + + auto metrics = builder_->Build(batches, 0, arrow::Type::INT64); + ASSERT_NE(metrics, nullptr); + EXPECT_EQ(metrics->GetMetricsType(), FieldChunkMetricsType::INT); + EXPECT_TRUE(metrics->CanSkipUnaryRange(OpType::Equal, int64_t(114514))); + } + + // FLOAT + { + auto schema = arrow::schema({arrow::field("col", arrow::float32())}); + arrow::FloatBuilder builder; + std::vector values = {-1.5f, 0.0f, 1.5f, 3.14f}; + ASSERT_TRUE(builder.AppendValues(values).ok()); + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto batch = arrow::RecordBatch::Make(schema, values.size(), {array}); + std::vector> batches = {batch}; + + auto metrics = builder_->Build(batches, 0, arrow::Type::FLOAT); + ASSERT_NE(metrics, nullptr); + EXPECT_EQ(metrics->GetMetricsType(), FieldChunkMetricsType::FLOAT); + EXPECT_TRUE(metrics->CanSkipUnaryRange(OpType::LessThan, -2.0f)); + EXPECT_FALSE(metrics->CanSkipUnaryRange(OpType::LessThan, 0.0f)); + } + + // DOUBLE + { + auto schema = arrow::schema({arrow::field("col", arrow::float64())}); + arrow::DoubleBuilder builder; + std::vector values = { + -3.141592653589793, 0.0, 2.718281828459045}; + ASSERT_TRUE(builder.AppendValues(values).ok()); + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto batch = arrow::RecordBatch::Make(schema, values.size(), {array}); + std::vector> batches = {batch}; + + auto metrics = builder_->Build(batches, 0, arrow::Type::DOUBLE); + ASSERT_NE(metrics, nullptr); + EXPECT_EQ(metrics->GetMetricsType(), FieldChunkMetricsType::FLOAT); + EXPECT_FALSE(metrics->CanSkipUnaryRange(OpType::Equal, 0.0)); + } + + // STRING + { + auto schema = arrow::schema({arrow::field("col", arrow::utf8())}); + arrow::StringBuilder builder; + std::vector values = {"apple", "banana", "cherry", "date"}; + ASSERT_TRUE(builder.AppendValues(values).ok()); + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto batch = arrow::RecordBatch::Make(schema, values.size(), {array}); + std::vector> batches = {batch}; + + auto metrics = builder_->Build(batches, 0, arrow::Type::STRING); + ASSERT_NE(metrics, nullptr); + EXPECT_EQ(metrics->GetMetricsType(), FieldChunkMetricsType::STRING); + EXPECT_FALSE( + metrics->CanSkipUnaryRange(OpType::Equal, std::string("banana"))); + EXPECT_TRUE( + metrics->CanSkipUnaryRange(OpType::Equal, std::string("zebra"))); + EXPECT_TRUE( + metrics->CanSkipUnaryRange(OpType::LessThan, std::string("aaa"))); + } + + // STRING with N-gram + { + auto schema = arrow::schema({arrow::field("col", arrow::utf8())}); + arrow::StringBuilder builder; + std::vector values = {"milvus_vector", "database_system"}; + ASSERT_TRUE(builder.AppendValues(values).ok()); + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto batch = arrow::RecordBatch::Make(schema, values.size(), {array}); + std::vector> batches = {batch}; + + auto metrics = builder_->Build(batches, 0, arrow::Type::STRING); + ASSERT_NE(metrics, nullptr); + + EXPECT_TRUE(metrics->CanSkipUnaryRange(OpType::InnerMatch, + std::string("zzzzz"))); + EXPECT_FALSE(metrics->CanSkipUnaryRange(OpType::InnerMatch, + std::string("vector"))); + } + + // Contains null values + { + auto schema = arrow::schema({arrow::field("col", arrow::int32())}); + arrow::Int32Builder builder; + ASSERT_TRUE(builder.Append(10).ok()); + ASSERT_TRUE(builder.AppendNull().ok()); + ASSERT_TRUE(builder.Append(20).ok()); + ASSERT_TRUE(builder.AppendNull().ok()); + ASSERT_TRUE(builder.Append(30).ok()); + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto batch = arrow::RecordBatch::Make(schema, 5, {array}); + std::vector> batches = {batch}; + + auto metrics = builder_->Build(batches, 0, arrow::Type::INT32); + ASSERT_NE(metrics, nullptr); + + EXPECT_FALSE(metrics->CanSkipUnaryRange(OpType::Equal, int32_t(10))); + EXPECT_TRUE(metrics->CanSkipUnaryRange(OpType::Equal, int32_t(5))); + } + + // Multiple Batches + { + auto schema = arrow::schema({arrow::field("col", arrow::int32())}); + std::vector> batches; + + arrow::Int32Builder builder1; + for (int i = 0; i < 100; ++i) { + ASSERT_TRUE(builder1.Append(i).ok()); + } + std::shared_ptr array1; + ASSERT_TRUE(builder1.Finish(&array1).ok()); + batches.push_back(arrow::RecordBatch::Make(schema, 100, {array1})); + + arrow::Int32Builder builder2; + for (int i = 100; i < 200; ++i) { + ASSERT_TRUE(builder2.Append(i).ok()); + } + std::shared_ptr array2; + ASSERT_TRUE(builder2.Finish(&array2).ok()); + batches.push_back(arrow::RecordBatch::Make(schema, 100, {array2})); + + auto metrics = builder_->Build(batches, 0, arrow::Type::INT32); + ASSERT_NE(metrics, nullptr); + + EXPECT_FALSE(metrics->CanSkipUnaryRange(OpType::Equal, int32_t(50))); + EXPECT_FALSE(metrics->CanSkipUnaryRange(OpType::Equal, int32_t(150))); + EXPECT_TRUE(metrics->CanSkipUnaryRange(OpType::Equal, int32_t(300))); + } + + // Empty Batches + { + std::vector> empty_batches; + auto metrics = builder_->Build(empty_batches, 0, arrow::Type::INT32); + + ASSERT_NE(metrics, nullptr); + EXPECT_EQ(metrics->GetMetricsType(), FieldChunkMetricsType::NONE); + } +} + +TEST_F(SkipIndexStatsBuilderTest, BuildFromChunk) { + // BOOL + { + FixedVector data = {true, true, true, true}; + auto field_data = milvus::storage::CreateFieldData( + storage::DataType::BOOL, DataType::NONE); + field_data->FillFieldData(data.data(), data.size()); + + storage::InsertEventData event_data; + auto payload_reader = + std::make_shared(field_data); + event_data.payload_reader = payload_reader; + auto ser_data = event_data.Serialize(); + auto buffer = std::make_shared( + ser_data.data() + 2 * sizeof(milvus::Timestamp), + ser_data.size() - 2 * sizeof(milvus::Timestamp)); + + parquet::arrow::FileReaderBuilder reader_builder; + ASSERT_TRUE(reader_builder.Open(buffer).ok()); + std::unique_ptr arrow_reader; + ASSERT_TRUE(reader_builder.Build(&arrow_reader).ok()); + + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + ASSERT_TRUE(arrow_reader->GetRecordBatchReader(&rb_reader).ok()); + + FieldMeta field_meta(FieldName("a"), + milvus::FieldId(1), + DataType::BOOL, + false, + std::nullopt); + arrow::ArrayVector array_vec = read_single_column_batches(rb_reader); + auto chunk = create_chunk(field_meta, array_vec); + + auto metrics = builder_->Build(DataType::BOOL, chunk.get()); + ASSERT_NE(metrics, nullptr); + EXPECT_EQ(metrics->GetMetricsType(), FieldChunkMetricsType::BOOLEAN); + + std::vector query_true = {true, false}; + std::vector query_false = {false, true}; + EXPECT_FALSE(metrics->CanSkipIn(query_true)); + EXPECT_TRUE(metrics->CanSkipIn(query_false)); + } + + // INT8 + { + FixedVector data = {-50, -25, 0, 25, 50}; + auto field_data = milvus::storage::CreateFieldData( + storage::DataType::INT8, DataType::NONE); + field_data->FillFieldData(data.data(), data.size()); + + storage::InsertEventData event_data; + auto payload_reader = + std::make_shared(field_data); + event_data.payload_reader = payload_reader; + auto ser_data = event_data.Serialize(); + auto buffer = std::make_shared( + ser_data.data() + 2 * sizeof(milvus::Timestamp), + ser_data.size() - 2 * sizeof(milvus::Timestamp)); + + parquet::arrow::FileReaderBuilder reader_builder; + ASSERT_TRUE(reader_builder.Open(buffer).ok()); + std::unique_ptr arrow_reader; + ASSERT_TRUE(reader_builder.Build(&arrow_reader).ok()); + + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + ASSERT_TRUE(arrow_reader->GetRecordBatchReader(&rb_reader).ok()); + + FieldMeta field_meta(FieldName("a"), + milvus::FieldId(1), + DataType::INT8, + false, + std::nullopt); + arrow::ArrayVector array_vec = read_single_column_batches(rb_reader); + auto chunk = create_chunk(field_meta, array_vec); + + auto metrics = builder_->Build(DataType::INT8, chunk.get()); + ASSERT_NE(metrics, nullptr); + EXPECT_EQ(metrics->GetMetricsType(), FieldChunkMetricsType::INT); + EXPECT_TRUE(metrics->CanSkipUnaryRange(OpType::LessThan, int8_t(-51))); + EXPECT_FALSE(metrics->CanSkipUnaryRange(OpType::Equal, int8_t(0))); + } + + // INT16 + { + FixedVector data; + for (int16_t i = 0; i < 100; ++i) { + data.push_back(i * 10); + } + auto field_data = milvus::storage::CreateFieldData( + storage::DataType::INT16, DataType::NONE); + field_data->FillFieldData(data.data(), data.size()); + + storage::InsertEventData event_data; + auto payload_reader = + std::make_shared(field_data); + event_data.payload_reader = payload_reader; + auto ser_data = event_data.Serialize(); + auto buffer = std::make_shared( + ser_data.data() + 2 * sizeof(milvus::Timestamp), + ser_data.size() - 2 * sizeof(milvus::Timestamp)); + + parquet::arrow::FileReaderBuilder reader_builder; + ASSERT_TRUE(reader_builder.Open(buffer).ok()); + std::unique_ptr arrow_reader; + ASSERT_TRUE(reader_builder.Build(&arrow_reader).ok()); + + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + ASSERT_TRUE(arrow_reader->GetRecordBatchReader(&rb_reader).ok()); + + FieldMeta field_meta(FieldName("a"), + milvus::FieldId(1), + DataType::INT16, + false, + std::nullopt); + arrow::ArrayVector array_vec = read_single_column_batches(rb_reader); + auto chunk = create_chunk(field_meta, array_vec); + + auto metrics = builder_->Build(DataType::INT16, chunk.get()); + ASSERT_NE(metrics, nullptr); + EXPECT_EQ(metrics->GetMetricsType(), FieldChunkMetricsType::INT); + EXPECT_TRUE( + metrics->CanSkipUnaryRange(OpType::GreaterThan, int16_t(1000))); + EXPECT_FALSE(metrics->CanSkipUnaryRange(OpType::Equal, int16_t(50))); + } + + // INT32 + { + FixedVector data; + for (int32_t i = 0; i < 1000; ++i) { + data.push_back(i); + } + auto field_data = milvus::storage::CreateFieldData( + storage::DataType::INT32, DataType::NONE); + field_data->FillFieldData(data.data(), data.size()); + + storage::InsertEventData event_data; + auto payload_reader = + std::make_shared(field_data); + event_data.payload_reader = payload_reader; + auto ser_data = event_data.Serialize(); + auto buffer = std::make_shared( + ser_data.data() + 2 * sizeof(milvus::Timestamp), + ser_data.size() - 2 * sizeof(milvus::Timestamp)); + + parquet::arrow::FileReaderBuilder reader_builder; + ASSERT_TRUE(reader_builder.Open(buffer).ok()); + std::unique_ptr arrow_reader; + ASSERT_TRUE(reader_builder.Build(&arrow_reader).ok()); + + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + ASSERT_TRUE(arrow_reader->GetRecordBatchReader(&rb_reader).ok()); + + FieldMeta field_meta(FieldName("a"), + milvus::FieldId(1), + DataType::INT32, + false, + std::nullopt); + arrow::ArrayVector array_vec = read_single_column_batches(rb_reader); + auto chunk = create_chunk(field_meta, array_vec); + + auto metrics = builder_->Build(DataType::INT32, chunk.get()); + ASSERT_NE(metrics, nullptr); + EXPECT_EQ(metrics->GetMetricsType(), FieldChunkMetricsType::INT); + EXPECT_FALSE(metrics->CanSkipUnaryRange(OpType::Equal, int32_t(500))); + EXPECT_TRUE(metrics->CanSkipUnaryRange(OpType::Equal, int32_t(2000))); + } + + // INT64 + { + FixedVector data = {-1145141919810, 0, 1145141919810}; + auto field_data = milvus::storage::CreateFieldData( + storage::DataType::INT64, DataType::NONE); + field_data->FillFieldData(data.data(), data.size()); + + storage::InsertEventData event_data; + auto payload_reader = + std::make_shared(field_data); + event_data.payload_reader = payload_reader; + auto ser_data = event_data.Serialize(); + auto buffer = std::make_shared( + ser_data.data() + 2 * sizeof(milvus::Timestamp), + ser_data.size() - 2 * sizeof(milvus::Timestamp)); + + parquet::arrow::FileReaderBuilder reader_builder; + ASSERT_TRUE(reader_builder.Open(buffer).ok()); + std::unique_ptr arrow_reader; + ASSERT_TRUE(reader_builder.Build(&arrow_reader).ok()); + + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + ASSERT_TRUE(arrow_reader->GetRecordBatchReader(&rb_reader).ok()); + + FieldMeta field_meta(FieldName("a"), + milvus::FieldId(1), + DataType::INT64, + false, + std::nullopt); + arrow::ArrayVector array_vec = read_single_column_batches(rb_reader); + auto chunk = create_chunk(field_meta, array_vec); + + auto metrics = builder_->Build(DataType::INT64, chunk.get()); + ASSERT_NE(metrics, nullptr); + EXPECT_EQ(metrics->GetMetricsType(), FieldChunkMetricsType::INT); + EXPECT_TRUE(metrics->CanSkipUnaryRange(OpType::Equal, int64_t(114514))); + } + + // FLOAT + { + FixedVector data = {-3.14f, -1.0f, 0.0f, 1.0f, 2.718f}; + auto field_data = milvus::storage::CreateFieldData( + storage::DataType::FLOAT, DataType::NONE); + field_data->FillFieldData(data.data(), data.size()); + + storage::InsertEventData event_data; + auto payload_reader = + std::make_shared(field_data); + event_data.payload_reader = payload_reader; + auto ser_data = event_data.Serialize(); + auto buffer = std::make_shared( + ser_data.data() + 2 * sizeof(milvus::Timestamp), + ser_data.size() - 2 * sizeof(milvus::Timestamp)); + + parquet::arrow::FileReaderBuilder reader_builder; + ASSERT_TRUE(reader_builder.Open(buffer).ok()); + std::unique_ptr arrow_reader; + ASSERT_TRUE(reader_builder.Build(&arrow_reader).ok()); + + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + ASSERT_TRUE(arrow_reader->GetRecordBatchReader(&rb_reader).ok()); + + FieldMeta field_meta(FieldName("a"), + milvus::FieldId(1), + DataType::FLOAT, + false, + std::nullopt); + arrow::ArrayVector array_vec = read_single_column_batches(rb_reader); + auto chunk = create_chunk(field_meta, array_vec); + + auto metrics = builder_->Build(DataType::FLOAT, chunk.get()); + ASSERT_NE(metrics, nullptr); + EXPECT_EQ(metrics->GetMetricsType(), FieldChunkMetricsType::FLOAT); + EXPECT_TRUE(metrics->CanSkipUnaryRange(OpType::GreaterThan, 3.0f)); + EXPECT_FALSE(metrics->CanSkipUnaryRange(OpType::Equal, 0.0f)); + } + + // DOUBLE + { + FixedVector data = {-3.141592653589793, 0.0, 2.718281828459045}; + auto field_data = milvus::storage::CreateFieldData( + storage::DataType::DOUBLE, DataType::NONE); + field_data->FillFieldData(data.data(), data.size()); + + storage::InsertEventData event_data; + auto payload_reader = + std::make_shared(field_data); + event_data.payload_reader = payload_reader; + auto ser_data = event_data.Serialize(); + auto buffer = std::make_shared( + ser_data.data() + 2 * sizeof(milvus::Timestamp), + ser_data.size() - 2 * sizeof(milvus::Timestamp)); + + parquet::arrow::FileReaderBuilder reader_builder; + ASSERT_TRUE(reader_builder.Open(buffer).ok()); + std::unique_ptr arrow_reader; + ASSERT_TRUE(reader_builder.Build(&arrow_reader).ok()); + + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + ASSERT_TRUE(arrow_reader->GetRecordBatchReader(&rb_reader).ok()); + + FieldMeta field_meta(FieldName("a"), + milvus::FieldId(1), + DataType::DOUBLE, + false, + std::nullopt); + arrow::ArrayVector array_vec = read_single_column_batches(rb_reader); + auto chunk = create_chunk(field_meta, array_vec); + + auto metrics = builder_->Build(DataType::DOUBLE, chunk.get()); + ASSERT_NE(metrics, nullptr); + EXPECT_EQ(metrics->GetMetricsType(), FieldChunkMetricsType::FLOAT); + EXPECT_FALSE(metrics->CanSkipUnaryRange(OpType::Equal, 0.0)); + } + + // VARCHAR + { + FixedVector data = { + "apple", "banana", "cherry", "date", "elderberry"}; + auto field_data = milvus::storage::CreateFieldData( + storage::DataType::VARCHAR, DataType::NONE); + field_data->FillFieldData(data.data(), data.size()); + + storage::InsertEventData event_data; + auto payload_reader = + std::make_shared(field_data); + event_data.payload_reader = payload_reader; + auto ser_data = event_data.Serialize(); + auto buffer = std::make_shared( + ser_data.data() + 2 * sizeof(milvus::Timestamp), + ser_data.size() - 2 * sizeof(milvus::Timestamp)); + + parquet::arrow::FileReaderBuilder reader_builder; + ASSERT_TRUE(reader_builder.Open(buffer).ok()); + std::unique_ptr arrow_reader; + ASSERT_TRUE(reader_builder.Build(&arrow_reader).ok()); + + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + ASSERT_TRUE(arrow_reader->GetRecordBatchReader(&rb_reader).ok()); + + FieldMeta field_meta(FieldName("a"), + milvus::FieldId(1), + DataType::STRING, + false, + std::nullopt); + arrow::ArrayVector array_vec = read_single_column_batches(rb_reader); + auto chunk = create_chunk(field_meta, array_vec); + + auto metrics = builder_->Build(DataType::VARCHAR, chunk.get()); + ASSERT_NE(metrics, nullptr); + EXPECT_EQ(metrics->GetMetricsType(), FieldChunkMetricsType::STRING); + EXPECT_FALSE( + metrics->CanSkipUnaryRange(OpType::Equal, std::string("banana"))); + EXPECT_TRUE( + metrics->CanSkipUnaryRange(OpType::Equal, std::string("xyz"))); + EXPECT_TRUE( + metrics->CanSkipUnaryRange(OpType::LessThan, std::string("aaa"))); + EXPECT_FALSE(metrics->CanSkipUnaryRange(OpType::LessThan, + std::string("cherry"))); + } +} + +TEST_F(SkipIndexStatsBuilderTest, BuildFromArrowBatch_InQuery) { + // Test INT64 + { + auto schema = arrow::schema({arrow::field("col", arrow::int64())}); + arrow::Int64Builder builder; + std::vector values = {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}; + ASSERT_TRUE(builder.AppendValues(values).ok()); + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto batch = arrow::RecordBatch::Make(schema, values.size(), {array}); + std::vector> batches = {batch}; + + auto metrics = builder_->Build(batches, 0, arrow::Type::INT64); + ASSERT_NE(metrics, nullptr); + + std::vector in_values1 = { + int64_t(50), int64_t(150), int64_t(200)}; + ASSERT_FALSE(metrics->CanSkipIn(in_values1)); + + std::vector in_values2 = {int64_t(5), int64_t(10)}; + ASSERT_FALSE(metrics->CanSkipIn(in_values2)); + + std::vector in_values3 = {int64_t(100), int64_t(110)}; + ASSERT_FALSE(metrics->CanSkipIn(in_values3)); + + std::vector in_values4 = {int64_t(2), int64_t(3), int64_t(4)}; + ASSERT_TRUE(metrics->CanSkipIn(in_values4)); + + std::vector in_values5 = { + int64_t(110), int64_t(120), int64_t(130)}; + ASSERT_TRUE(metrics->CanSkipIn(in_values5)); + + std::vector in_values6 = { + int64_t(15), int64_t(23), int64_t(55)}; + ASSERT_TRUE(metrics->CanSkipIn(in_values6)); + } + + // Test STRING + { + auto schema = arrow::schema({arrow::field("col", arrow::utf8())}); + arrow::StringBuilder builder; + std::vector values = {"apple", + "banana", + "cherry", + "date", + "elderberry", + "fig", + "grape", + "honeydew", + "kiwi", + "lemon"}; + ASSERT_TRUE(builder.AppendValues(values).ok()); + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto batch = arrow::RecordBatch::Make(schema, values.size(), {array}); + std::vector> batches = {batch}; + + auto metrics = builder_->Build(batches, 0, arrow::Type::STRING); + ASSERT_NE(metrics, nullptr); + + std::vector in_values1 = {std::string("banana"), + std::string("zebra")}; + ASSERT_FALSE(metrics->CanSkipIn(in_values1)); + + std::vector in_values2 = {std::string("aardvark"), + std::string("apple")}; + ASSERT_FALSE(metrics->CanSkipIn(in_values2)); + + std::vector in_values3 = {std::string("lemon"), + std::string("mango")}; + ASSERT_FALSE(metrics->CanSkipIn(in_values3)); + + std::vector in_values4 = {std::string("aaa"), + std::string("aardvark")}; + ASSERT_TRUE(metrics->CanSkipIn(in_values4)); + + std::vector in_values5 = { + std::string("mango"), std::string("orange"), std::string("zebra")}; + ASSERT_TRUE(metrics->CanSkipIn(in_values5)); + + std::vector in_values6 = {}; + ASSERT_FALSE(metrics->CanSkipIn(in_values6)); + } +} + +TEST_F(SkipIndexStatsBuilderTest, BuildFromArrowBatch_InQuery_Nullable) { + { + auto schema = arrow::schema({arrow::field("col", arrow::int64())}); + arrow::Int64Builder builder; + + // Data: [10, 20, NULL, NULL, 50], valid_data: 0b00010011 = 0x13 + ASSERT_TRUE(builder.Append(10).ok()); + ASSERT_TRUE(builder.Append(20).ok()); + ASSERT_TRUE(builder.AppendNull().ok()); + ASSERT_TRUE(builder.AppendNull().ok()); + ASSERT_TRUE(builder.Append(50).ok()); + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto batch = arrow::RecordBatch::Make(schema, 5, {array}); + std::vector> batches = {batch}; + + auto metrics = builder_->Build(batches, 0, arrow::Type::INT64); + ASSERT_NE(metrics, nullptr); + + // Actual valid values: [10, 20, 50] + std::vector in_values1 = { + int64_t(20), int64_t(60), int64_t(70)}; + ASSERT_FALSE(metrics->CanSkipIn(in_values1)); + + std::vector in_values2 = {int64_t(1), int64_t(2), int64_t(50)}; + ASSERT_FALSE(metrics->CanSkipIn(in_values2)); + + std::vector in_values3 = {int64_t(1), int64_t(2), int64_t(3)}; + ASSERT_TRUE(metrics->CanSkipIn(in_values3)); + + std::vector in_values4 = { + int64_t(60), int64_t(70), int64_t(80)}; + ASSERT_TRUE(metrics->CanSkipIn(in_values4)); + + // Should skip: NULL values (30, 40) are not in valid set + std::vector in_values5 = {int64_t(30)}; + ASSERT_TRUE(metrics->CanSkipIn(in_values5)); + + std::vector in_values6 = {}; + ASSERT_FALSE(metrics->CanSkipIn(in_values6)); + } + + // Test nullable STRING + { + auto schema = arrow::schema({arrow::field("col", arrow::utf8())}); + arrow::StringBuilder builder; + + // Data: ["a", "b", NULL, NULL, "e"], valid_data: 0b00010011 = 0x13 + ASSERT_TRUE(builder.Append("a").ok()); + ASSERT_TRUE(builder.Append("b").ok()); + ASSERT_TRUE(builder.AppendNull().ok()); + ASSERT_TRUE(builder.AppendNull().ok()); + ASSERT_TRUE(builder.Append("e").ok()); + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto batch = arrow::RecordBatch::Make(schema, 5, {array}); + std::vector> batches = {batch}; + + auto metrics = builder_->Build(batches, 0, arrow::Type::STRING); + ASSERT_NE(metrics, nullptr); + + // Actual valid values: ["a", "b", "e"] + std::vector in_values1 = { + std::string("b"), std::string("x"), std::string("y")}; + ASSERT_FALSE(metrics->CanSkipIn(in_values1)); + + std::vector in_values2 = { + std::string("e"), std::string("x"), std::string("y")}; + ASSERT_FALSE(metrics->CanSkipIn(in_values2)); + + std::vector in_values3 = { + std::string("0"), std::string("1"), std::string("2")}; + ASSERT_TRUE(metrics->CanSkipIn(in_values3)); + + std::vector in_values4 = { + std::string("x"), std::string("y"), std::string("z")}; + ASSERT_TRUE(metrics->CanSkipIn(in_values4)); + + // Should skip: NULL values ("c", "d") are not in valid set + std::vector in_values5 = {std::string("c")}; + ASSERT_TRUE(metrics->CanSkipIn(in_values5)); + + std::vector in_values6 = {}; + ASSERT_FALSE(metrics->CanSkipIn(in_values6)); + } +} + +TEST_F(SkipIndexStatsBuilderTest, BuildFromChunk_InQuery) { + // Test INT64 + { + FixedVector data = {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}; + auto field_data = milvus::storage::CreateFieldData( + storage::DataType::INT64, DataType::NONE); + field_data->FillFieldData(data.data(), data.size()); + + storage::InsertEventData event_data; + auto payload_reader = + std::make_shared(field_data); + event_data.payload_reader = payload_reader; + auto ser_data = event_data.Serialize(); + auto buffer = std::make_shared( + ser_data.data() + 2 * sizeof(milvus::Timestamp), + ser_data.size() - 2 * sizeof(milvus::Timestamp)); + + parquet::arrow::FileReaderBuilder reader_builder; + ASSERT_TRUE(reader_builder.Open(buffer).ok()); + std::unique_ptr arrow_reader; + ASSERT_TRUE(reader_builder.Build(&arrow_reader).ok()); + + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + ASSERT_TRUE(arrow_reader->GetRecordBatchReader(&rb_reader).ok()); + + FieldMeta field_meta(FieldName("a"), + milvus::FieldId(1), + DataType::INT64, + false, + std::nullopt); + arrow::ArrayVector array_vec = read_single_column_batches(rb_reader); + auto chunk = create_chunk(field_meta, array_vec); + + auto metrics = builder_->Build(DataType::INT64, chunk.get()); + ASSERT_NE(metrics, nullptr); + + std::vector in_values1 = { + int64_t(50), int64_t(150), int64_t(200)}; + ASSERT_FALSE(metrics->CanSkipIn(in_values1)); + + std::vector in_values2 = {int64_t(2), int64_t(3), int64_t(4)}; + ASSERT_TRUE(metrics->CanSkipIn(in_values2)); + } + + // Test STRING + { + FixedVector data = { + "apple", "banana", "cherry", "date", "elderberry"}; + auto field_data = milvus::storage::CreateFieldData( + storage::DataType::VARCHAR, DataType::NONE); + field_data->FillFieldData(data.data(), data.size()); + + storage::InsertEventData event_data; + auto payload_reader = + std::make_shared(field_data); + event_data.payload_reader = payload_reader; + auto ser_data = event_data.Serialize(); + auto buffer = std::make_shared( + ser_data.data() + 2 * sizeof(milvus::Timestamp), + ser_data.size() - 2 * sizeof(milvus::Timestamp)); + + parquet::arrow::FileReaderBuilder reader_builder; + ASSERT_TRUE(reader_builder.Open(buffer).ok()); + std::unique_ptr arrow_reader; + ASSERT_TRUE(reader_builder.Build(&arrow_reader).ok()); + + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + ASSERT_TRUE(arrow_reader->GetRecordBatchReader(&rb_reader).ok()); + + FieldMeta field_meta(FieldName("a"), + milvus::FieldId(1), + DataType::STRING, + false, + std::nullopt); + arrow::ArrayVector array_vec = read_single_column_batches(rb_reader); + auto chunk = create_chunk(field_meta, array_vec); + + auto metrics = builder_->Build(DataType::VARCHAR, chunk.get()); + ASSERT_NE(metrics, nullptr); + + std::vector in_values1 = {std::string("banana"), + std::string("zebra")}; + ASSERT_FALSE(metrics->CanSkipIn(in_values1)); + + std::vector in_values2 = {std::string("aaa"), + std::string("aardvark")}; + ASSERT_TRUE(metrics->CanSkipIn(in_values2)); + + std::vector in_values3 = {std::string("xyz"), + std::string("zzz")}; + ASSERT_TRUE(metrics->CanSkipIn(in_values3)); + } +} + +TEST_F(SkipIndexStatsBuilderTest, InQuery_DisableBloomFilter) { + auto builder_no_bf = std::make_unique(); + + auto schema = arrow::schema({arrow::field("col", arrow::int64())}); + arrow::Int64Builder builder; + + std::vector values; + for (int64_t i = 0; i < 100; ++i) { + values.push_back(i * 10); + } + ASSERT_TRUE(builder.AppendValues(values).ok()); + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto batch = arrow::RecordBatch::Make(schema, values.size(), {array}); + std::vector> batches = {batch}; + + auto metrics = builder_no_bf->Build(batches, 0, arrow::Type::INT64); + ASSERT_NE(metrics, nullptr); + + std::vector in_values1 = {int64_t(1), int64_t(2), int64_t(3)}; + ASSERT_FALSE(metrics->CanSkipIn(in_values1)); + + std::vector in_values2 = {int64_t(15), int64_t(25)}; + ASSERT_FALSE(metrics->CanSkipIn(in_values2)); + + // Values outside range should be skipped + std::vector in_values3 = {int64_t(-100), int64_t(-50)}; + ASSERT_TRUE(metrics->CanSkipIn(in_values3)); + + std::vector in_values4 = {int64_t(1000), int64_t(2000)}; + ASSERT_TRUE(metrics->CanSkipIn(in_values4)); +} + +TEST_F(SkipIndexStatsBuilderTest, BuildFromArrowBatch_StringNgramMatch) { + // Multi-language test data + auto schema = arrow::schema({arrow::field("col", arrow::utf8())}); + arrow::StringBuilder builder; + std::vector strings = { + "张华考上了北京大学;李萍进了中等技术学校;我在百货公司当售货员:我们都" + "有光明的前途", + "張華は北京大学に入学し、李平は中等技術学校に入学し、私はデパートの販売" + "員として働き、私たち全員に明るい未来が約束されていました。", + "Zhang Hua a été admis à l'Université de Pékin ; Li Ping est entré " + "dans une école secondaire technique ; j'ai travaillé comme vendeur " + "dans un grand magasin : nous avions tous un brillant avenir.", + "Zhang Hua was admitted to Peking University; Li Ping entered a " + "secondary technical school; I worked as a salesperson in a department " + "store: we all had bright futures.", + "Zhang Hua wurde an der Peking-Universität aufgenommen, Li Ping " + "besuchte eine technische Sekundarschule, ich arbeitete als Verkäufer " + "in einem Kaufhaus: Wir alle hatten eine glänzende Zukunft.", + "😀😁😂", // Emoji (4-byte UTF-8) + "Ñoño", // Combining characters + "👨‍👩‍👧‍👦" // Family emoji (multiple codepoints) + }; + ASSERT_TRUE(builder.AppendValues(strings).ok()); + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto batch = arrow::RecordBatch::Make(schema, strings.size(), {array}); + std::vector> batches = {batch}; + + auto metrics = builder_->Build(batches, 0, arrow::Type::STRING); + ASSERT_NE(metrics, nullptr); + ASSERT_EQ(metrics->GetMetricsType(), FieldChunkMetricsType::STRING); + + // PrefixMatch tests - Chinese + ASSERT_FALSE(metrics->CanSkipUnaryRange( + OpType::PrefixMatch, std::string("李萍进了中等技术学校"))); + ASSERT_TRUE(metrics->CanSkipUnaryRange(OpType::PrefixMatch, + std::string("烽火戏诸侯"))); + ASSERT_TRUE(metrics->CanSkipUnaryRange(OpType::PrefixMatch, + std::string("测试中文分词效果"))); + + // PrefixMatch tests - Japanese + ASSERT_FALSE(metrics->CanSkipUnaryRange( + OpType::PrefixMatch, std::string("私はデパートの販売員として働き"))); + ASSERT_FALSE(metrics->CanSkipUnaryRange( + OpType::PrefixMatch, std::string("未来が約束されていました"))); + ASSERT_FALSE(metrics->CanSkipUnaryRange(OpType::PrefixMatch, + std::string("北京大学"))); + ASSERT_TRUE(metrics->CanSkipUnaryRange( + OpType::PrefixMatch, + std::string("日本語の単語分割の効果をテストする"))); + + // PrefixMatch tests - English + ASSERT_TRUE(metrics->CanSkipUnaryRange( + OpType::PrefixMatch, std::string("Ingenious Film Partners"))); + + // InnerMatch tests + ASSERT_FALSE(metrics->CanSkipUnaryRange( + OpType::InnerMatch, + std::string("Ping besuchte eine technische Sekundarschule"))); + ASSERT_FALSE(metrics->CanSkipUnaryRange( + OpType::InnerMatch, std::string("張華は北京大学に入学し"))); + ASSERT_FALSE(metrics->CanSkipUnaryRange( + OpType::InnerMatch, + std::string(" der Peking-Universität aufgenommen"))); + ASSERT_FALSE(metrics->CanSkipUnaryRange( + OpType::InnerMatch, + std::string("Li Ping est entré dans une école secondaire technique "))); + ASSERT_TRUE(metrics->CanSkipUnaryRange(OpType::InnerMatch, + std::string("swimming, football"))); + ASSERT_TRUE( + metrics->CanSkipUnaryRange(OpType::InnerMatch, std::string("dog"))); + ASSERT_TRUE( + metrics->CanSkipUnaryRange(OpType::InnerMatch, std::string("xyz"))); + + // PostfixMatch tests + ASSERT_FALSE(metrics->CanSkipUnaryRange( + OpType::PostfixMatch, + std::string("Li Ping entered a secondary technical school;"))); + ASSERT_FALSE(metrics->CanSkipUnaryRange( + OpType::PostfixMatch, + std::string("nous avions tous un brillant avenir."))); + ASSERT_FALSE( + metrics->CanSkipUnaryRange(OpType::PostfixMatch, std::string("j'ai "))); + ASSERT_FALSE(metrics->CanSkipUnaryRange(OpType::PostfixMatch, + std::string("j'ai travaillé"))); + ASSERT_FALSE(metrics->CanSkipUnaryRange( + OpType::PostfixMatch, std::string("Li Ping entered a secondary"))); + ASSERT_TRUE(metrics->CanSkipUnaryRange(OpType::PostfixMatch, + std::string("後藤一里"))); + ASSERT_TRUE(metrics->CanSkipUnaryRange(OpType::PostfixMatch, + std::string("ルフィ"))); + ASSERT_TRUE( + metrics->CanSkipUnaryRange(OpType::PostfixMatch, std::string("zzz"))); + ASSERT_TRUE(metrics->CanSkipUnaryRange(OpType::PostfixMatch, + std::string("Guillotine"))); + + // Should handle emoji correctly + ASSERT_FALSE( + metrics->CanSkipUnaryRange(OpType::InnerMatch, std::string("😀"))); + ASSERT_TRUE( + metrics->CanSkipUnaryRange(OpType::InnerMatch, std::string("xyz"))); +} + +TEST_F(SkipIndexStatsBuilderTest, + BuildFromArrowBatch_StringNgramMatchNullable) { + auto schema = arrow::schema({arrow::field("col", arrow::utf8())}); + arrow::StringBuilder builder; + + // Test data: ["apple", "application", NULL, NULL, "candy"] + // Valid data: 0b00010011 = 0x13 (bits 0, 1, 4 are valid) + std::vector strings = { + "apple", "application", "abandon", "band", "candy"}; + ASSERT_TRUE(builder.Append("apple").ok()); + ASSERT_TRUE(builder.Append("application").ok()); + ASSERT_TRUE(builder.AppendNull().ok()); + ASSERT_TRUE(builder.AppendNull().ok()); + ASSERT_TRUE(builder.Append("candy").ok()); + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto batch = arrow::RecordBatch::Make(schema, 5, {array}); + std::vector> batches = {batch}; + + auto metrics = builder_->Build(batches, 0, arrow::Type::STRING); + ASSERT_NE(metrics, nullptr); + ASSERT_EQ(metrics->GetMetricsType(), FieldChunkMetricsType::STRING); + + // PrefixMatch tests + // Actual valid values: ["apple", "application", "candy"] + ASSERT_FALSE( + metrics->CanSkipUnaryRange(OpType::PrefixMatch, std::string("ap"))); + ASSERT_FALSE( + metrics->CanSkipUnaryRange(OpType::PrefixMatch, std::string("app"))); + ASSERT_FALSE( + metrics->CanSkipUnaryRange(OpType::PrefixMatch, std::string("ba"))); + ASSERT_TRUE( + metrics->CanSkipUnaryRange(OpType::PrefixMatch, std::string("aba"))); + ASSERT_FALSE( + metrics->CanSkipUnaryRange(OpType::PrefixMatch, std::string("can"))); + ASSERT_TRUE( + metrics->CanSkipUnaryRange(OpType::PrefixMatch, std::string("xyz"))); + ASSERT_TRUE( + metrics->CanSkipUnaryRange(OpType::PrefixMatch, std::string("dog"))); + + // InnerMatch tests + ASSERT_FALSE( + metrics->CanSkipUnaryRange(OpType::InnerMatch, std::string("ap"))); + ASSERT_FALSE( + metrics->CanSkipUnaryRange(OpType::InnerMatch, std::string("pp"))); + ASSERT_TRUE( + metrics->CanSkipUnaryRange(OpType::InnerMatch, std::string("ana"))); + ASSERT_FALSE( + metrics->CanSkipUnaryRange(OpType::InnerMatch, std::string("and"))); + ASSERT_TRUE( + metrics->CanSkipUnaryRange(OpType::InnerMatch, std::string("dog"))); + ASSERT_TRUE( + metrics->CanSkipUnaryRange(OpType::InnerMatch, std::string("xyz"))); + + // PostfixMatch tests + ASSERT_FALSE( + metrics->CanSkipUnaryRange(OpType::PostfixMatch, std::string("le"))); + ASSERT_FALSE( + metrics->CanSkipUnaryRange(OpType::PostfixMatch, std::string("on"))); + ASSERT_TRUE( + metrics->CanSkipUnaryRange(OpType::PostfixMatch, std::string("ana"))); + ASSERT_FALSE( + metrics->CanSkipUnaryRange(OpType::PostfixMatch, std::string("dy"))); + ASSERT_TRUE(metrics->CanSkipUnaryRange(OpType::PostfixMatch, + std::string("milvus"))); + ASSERT_TRUE( + metrics->CanSkipUnaryRange(OpType::PostfixMatch, std::string("zzz"))); + ASSERT_TRUE( + metrics->CanSkipUnaryRange(OpType::PostfixMatch, std::string("xyz"))); +} diff --git a/internal/core/src/index/skipindex_stats/utils.h b/internal/core/src/index/skipindex_stats/utils.h new file mode 100644 index 0000000000..fbbcf71694 --- /dev/null +++ b/internal/core/src/index/skipindex_stats/utils.h @@ -0,0 +1,116 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include "unicode/brkiter.h" +#include "unicode/unistr.h" +#include "unicode/utypes.h" +#include "ankerl/unordered_dense.h" +#include "common/Types.h" +#include "arrow/api.h" +#include "log/Log.h" +#include +namespace milvus::index { + +inline bool +SupportsSkipIndex(arrow::Type::type type) { + switch (type) { + case arrow::Type::BOOL: + case arrow::Type::INT8: + case arrow::Type::INT16: + case arrow::Type::INT32: + case arrow::Type::INT64: + case arrow::Type::FLOAT: + case arrow::Type::DOUBLE: + case arrow::Type::STRING: + return true; + default: + return false; + } +} + +inline bool +SupportsSkipIndex(DataType type) { + switch (type) { + case DataType::BOOL: + case DataType::INT8: + case DataType::INT16: + case DataType::INT32: + case DataType::INT64: + case DataType::FLOAT: + case DataType::DOUBLE: + case DataType::VARCHAR: + case DataType::STRING: + case DataType::TIMESTAMPTZ: + return true; + default: + return false; + } +} + +inline void +ExtractNgrams(ankerl::unordered_dense::set& ngrams_set, + const std::string_view& text, + size_t n) { + if (n == 0 || text.size() < n) { + return; + } + UErrorCode status = U_ZERO_ERROR; + std::unique_ptr bi( + icu::BreakIterator::createCharacterInstance(icu::Locale::getUS(), + status)); + if (U_FAILURE(status)) { + LOG_WARN("Failed to create ICU BreakIterator: {}", u_errorName(status)); + return; + } + + icu::UnicodeString ustr = icu::UnicodeString::fromUTF8( + icu::StringPiece(text.data(), text.size())); + bi->setText(ustr); + + std::vector boundaries; + boundaries.reserve(ustr.length() + 1); + int32_t pos = bi->first(); + while (pos != icu::BreakIterator::DONE) { + boundaries.push_back(pos); + pos = bi->next(); + } + + size_t char_count = boundaries.size() > 0 ? boundaries.size() - 1 : 0; + if (char_count < n) { + return; + } + + for (size_t i = 0; i + n < boundaries.size(); ++i) { + int32_t start_pos = boundaries[i]; + int32_t end_pos = boundaries[i + n]; + int32_t length = end_pos - start_pos; + if (length <= 0) { + continue; + } + icu::UnicodeString ngram_ustr(ustr, start_pos, length); + std::string ngram_utf8; + ngram_ustr.toUTF8String(ngram_utf8); + if (!ngram_utf8.empty()) { + ngrams_set.insert(std::move(ngram_utf8)); + } + } +} + +} // namespace milvus::index \ No newline at end of file