fix: inverted index file not found (#29695)

issue: https://github.com/milvus-io/milvus/issues/29654

---------

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
Jiquan Long 2024-01-07 20:26:49 +08:00 committed by GitHub
parent 20fb847521
commit e9f3df3626
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 43 additions and 29 deletions

View File

@ -7,7 +7,7 @@ fi
CorePath=$1
formatThis() {
find "$1" | grep -E "(*\.cpp|*\.h|*\.cc)$" | grep -v "gen_tools/templates" | grep -v "\.pb\." | xargs clang-format-10 -i
find "$1" | grep -E "(*\.cpp|*\.h|*\.cc)$" | grep -v "gen_tools/templates" | grep -v "\.pb\." | grep -v "tantivy-binding.h" | xargs clang-format-10 -i
}
formatThis "${CorePath}/src"

View File

@ -71,8 +71,20 @@ BinarySet
InvertedIndexTantivy<T>::Upload(const Config& config) {
finish();
for (const auto& entry : std::filesystem::directory_iterator(path_)) {
disk_file_manager_->AddFile(entry.path());
boost::filesystem::path p(path_);
boost::filesystem::directory_iterator end_iter;
for (boost::filesystem::directory_iterator iter(p); iter != end_iter;
iter++) {
if (boost::filesystem::is_directory(*iter)) {
LOG_WARN("{} is a directory", iter->path().string());
} else {
LOG_INFO("trying to add index file: {}", iter->path().string());
AssertInfo(disk_file_manager_->AddFile(iter->path().string()),
"failed to add index file: {}",
iter->path().string());
LOG_INFO("index file: {} added", iter->path().string());
}
}
BinarySet ret;

View File

@ -27,8 +27,6 @@ void tantivy_free_index_writer(void *ptr);
void tantivy_finish_index(void *ptr);
void *tantivy_create_reader_for_index(void *ptr);
void tantivy_index_add_int8s(void *ptr, const int8_t *array, uintptr_t len);
void tantivy_index_add_int16s(void *ptr, const int16_t *array, uintptr_t len);

View File

@ -16,10 +16,6 @@ pub struct IndexWriterWrapper {
}
impl IndexWriterWrapper {
pub fn create_reader(&self) -> IndexReaderWrapper {
IndexReaderWrapper::new(&self.index, &self.field_name, self.field)
}
pub fn new(field_name: String, data_type: TantivyDataType, path: String) -> IndexWriterWrapper {
let field: Field;
let mut schema_builder = Schema::builder();
@ -101,8 +97,9 @@ impl IndexWriterWrapper {
.unwrap();
}
pub fn finish(&mut self) {
pub fn finish(mut self) {
self.index_writer.commit().unwrap();
block_on(self.index_writer.garbage_collect_files()).unwrap();
self.index_writer.wait_merging_threads().unwrap();
}
}

View File

@ -26,25 +26,16 @@ pub extern "C" fn tantivy_free_index_writer(ptr: *mut c_void) {
free_binding::<IndexWriterWrapper>(ptr);
}
// tantivy_finish_index will finish the index writer, and the index writer can't be used any more.
// After this was called, you should reset the pointer to null.
#[no_mangle]
pub extern "C" fn tantivy_finish_index(ptr: *mut c_void) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
(*real).finish();
Box::from_raw(real).finish()
}
}
// should be only used for test
#[no_mangle]
pub extern "C" fn tantivy_create_reader_for_index(ptr: *mut c_void) -> *mut c_void{
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let reader = (*real).create_reader();
create_binding(reader)
}
}
// -------------------------build--------------------
#[no_mangle]
pub extern "C" fn tantivy_index_add_int8s(ptr: *mut c_void, array: *const i8, len: usize) {

View File

@ -93,9 +93,11 @@ struct TantivyIndexWrapper {
writer_ = other.writer_;
reader_ = other.reader_;
finished_ = other.finished_;
path_ = other.path_;
other.writer_ = nullptr;
other.reader_ = nullptr;
other.finished_ = false;
other.path_ = "";
}
TantivyIndexWrapper&
@ -104,10 +106,12 @@ struct TantivyIndexWrapper {
free();
writer_ = other.writer_;
reader_ = other.reader_;
path_ = other.path_;
finished_ = other.finished_;
other.writer_ = nullptr;
other.reader_ = nullptr;
other.finished_ = false;
other.path_ = "";
}
return *this;
}
@ -116,11 +120,13 @@ struct TantivyIndexWrapper {
TantivyDataType data_type,
const char* path) {
writer_ = tantivy_create_index(field_name, data_type, path);
path_ = std::string(path);
}
explicit TantivyIndexWrapper(const char* path) {
assert(tantivy_index_exist(path));
reader_ = tantivy_load_index(path);
path_ = std::string(path);
}
~TantivyIndexWrapper() {
@ -130,6 +136,8 @@ struct TantivyIndexWrapper {
template <typename T>
void
add_data(const T* array, uintptr_t len) {
assert(!finished_);
if constexpr (std::is_same_v<T, bool>) {
tantivy_index_add_bools(writer_, array, len);
return;
@ -182,7 +190,9 @@ struct TantivyIndexWrapper {
finish() {
if (!finished_) {
tantivy_finish_index(writer_);
reader_ = tantivy_create_reader_for_index(writer_);
writer_ = nullptr;
reader_ = tantivy_load_index(path_.c_str());
finished_ = true;
}
}
@ -358,5 +368,6 @@ struct TantivyIndexWrapper {
bool finished_ = false;
IndexWriter writer_ = nullptr;
IndexReader reader_ = nullptr;
std::string path_;
};
} // namespace milvus::tantivy

View File

@ -316,7 +316,7 @@ TEST(CApiTest, SegmentTest) {
ASSERT_NE(status.error_code, Success);
DeleteCollection(collection);
DeleteSegment(segment);
free((char *)status.error_msg);
free((char*)status.error_msg);
}
TEST(CApiTest, CPlan) {
@ -1579,7 +1579,10 @@ TEST(CApiTest, ReduceRemoveDuplicates) {
}
void
testReduceSearchWithExpr(int N, int topK, int num_queries, bool filter_all = false) {
testReduceSearchWithExpr(int N,
int topK,
int num_queries,
bool filter_all = false) {
std::cerr << "testReduceSearchWithExpr(" << N << ", " << topK << ", "
<< num_queries << ")" << std::endl;
@ -1637,7 +1640,8 @@ testReduceSearchWithExpr(int N, int topK, int num_queries, bool filter_all = fal
search_params: "{\"nprobe\": 10}"
>
placeholder_tag: "$0">
output_field_ids: 100)") %topK %N;
output_field_ids: 100)") %
topK % N;
}
auto serialized_expr_plan = fmt.str();
auto blob = generate_query_data(num_queries);
@ -2305,7 +2309,7 @@ TEST(CApiTest, Indexing_Expr_With_float_Predicate_Range) {
generate_collection_schema(knowhere::metric::L2, DIM, false);
auto collection = NewCollection(schema_string.c_str());
auto schema = ((segcore::Collection*)collection)->get_schema();
CSegmentInterface segment;
CSegmentInterface segment;
auto status = NewSegment(collection, Growing, -1, &segment);
ASSERT_EQ(status.error_code, Success);

View File

@ -459,7 +459,8 @@ GetIndexTypesV2() {
template <>
inline std::vector<std::string>
GetIndexTypesV2<std::string>() {
return std::vector<std::string>{milvus::index::INVERTED_INDEX_TYPE, "marisa"};
return std::vector<std::string>{milvus::index::INVERTED_INDEX_TYPE,
"marisa"};
}
} // namespace

View File

@ -25,11 +25,11 @@
#include "storage/ThreadPools.h"
using milvus::DataType;
using milvus::storage::FieldDataMeta;
using milvus::FieldDataPtr;
using milvus::FieldId;
using milvus::segcore::GeneratedData;
using milvus::storage::ChunkManagerPtr;
using milvus::storage::FieldDataMeta;
using milvus::storage::InsertData;
using milvus::storage::StorageConfig;