From ac8e9ff020a6b429715cb15566a349a00fc36b34 Mon Sep 17 00:00:00 2001 From: groot Date: Wed, 6 May 2020 03:52:37 -0500 Subject: [PATCH] write error (#2184) * write error Signed-off-by: groot * out of storage Signed-off-by: groot * clang format Signed-off-by: groot * fix ut Signed-off-by: groot * fix #1955 Signed-off-by: groot * refine code Signed-off-by: groot * compact threashold Signed-off-by: groot * changelog Signed-off-by: groot * search by id for hnsw/pq/annoy Signed-off-by: groot * fix python test Signed-off-by: yhmo --- CHANGELOG.md | 1 + core/src/db/DB.h | 5 +- core/src/db/DBImpl.cpp | 73 +++++++++++++------ core/src/db/DBImpl.h | 10 ++- core/src/db/Utils.cpp | 15 ++++ core/src/db/Utils.h | 6 ++ core/src/db/engine/ExecutionEngineImpl.cpp | 6 +- core/src/scheduler/task/BuildIndexTask.cpp | 4 + core/src/segment/SegmentWriter.cpp | 19 +++++ core/src/server/DBWrapper.cpp | 2 +- core/src/server/context/ConnectionContext.h | 30 ++++++++ core/src/server/context/Context.cpp | 14 ++++ core/src/server/context/Context.h | 10 +++ core/src/server/delivery/RequestHandler.cpp | 5 +- core/src/server/delivery/RequestHandler.h | 2 +- .../delivery/request/CompactRequest.cpp | 13 ++-- .../server/delivery/request/CompactRequest.h | 7 +- .../delivery/request/CreateIndexRequest.cpp | 2 +- .../delivery/request/SearchByIDRequest.cpp | 16 +--- .../server/grpc_impl/GrpcRequestHandler.cpp | 33 ++++++++- .../web_impl/handler/WebRequestHandler.cpp | 3 +- core/src/utils/Error.h | 3 +- core/unittest/db/test_db.cpp | 36 ++++----- core/unittest/db/test_db_mysql.cpp | 2 +- core/unittest/db/test_delete.cpp | 16 ++-- core/unittest/db/test_search_by_id.cpp | 4 +- sdk/grpc/ClientProxy.cpp | 2 +- tests/milvus_python_test/test_compact.py | 12 +-- 28 files changed, 256 insertions(+), 95 deletions(-) create mode 100644 core/src/server/context/ConnectionContext.h diff --git a/CHANGELOG.md b/CHANGELOG.md index 8428c12153..0df3e9ca62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Please mark all change in change log and use the issue from GitHub - \#1776 Error out when index SQ8H run in CPU mode - \#1929 Skip MySQL meta schema field width check - \#1946 Fix load index file CPU2GPU fail during searching +- \#1955 Switch create_index operation to background once client break connection - \#1997 Index file missed after compact - \#2073 Fix CheckDBConfigBackendUrl error message - \#2076 CheckMetricConfigAddress error message diff --git a/core/src/db/DB.h b/core/src/db/DB.h index 790f276f17..94057ccb5d 100644 --- a/core/src/db/DB.h +++ b/core/src/db/DB.h @@ -105,7 +105,7 @@ class DB { Flush() = 0; virtual Status - Compact(const std::string& collection_id) = 0; + Compact(const std::string& collection_id, double threshold = 0.0) = 0; virtual Status GetVectorsByID(const std::string& collection_id, const IDNumbers& id_array, @@ -136,7 +136,8 @@ class DB { Size(uint64_t& result) = 0; virtual Status - CreateIndex(const std::string& collection_id, const CollectionIndex& index) = 0; + CreateIndex(const std::shared_ptr& context, const std::string& collection_id, + const CollectionIndex& index) = 0; virtual Status DescribeIndex(const std::string& collection_id, CollectionIndex& index) = 0; diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 46053ce155..4595b809ea 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -852,7 +852,7 @@ DBImpl::Flush() { } Status -DBImpl::Compact(const std::string& collection_id) { +DBImpl::Compact(const std::string& collection_id, double threshold) { if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -917,7 +917,7 @@ DBImpl::Compact(const std::string& collection_id) { meta::SegmentsSchema files_to_update; if (deleted_docs_size != 0) { - compact_status = CompactFile(collection_id, file, files_to_update); + compact_status = CompactFile(collection_id, threshold, file, files_to_update); if (!compact_status.ok()) { LOG_ENGINE_ERROR_ << "Compact failed for segment " << file.segment_id_ << ": " @@ -948,16 +948,35 @@ DBImpl::Compact(const std::string& collection_id) { } Status -DBImpl::CompactFile(const std::string& collection_id, const meta::SegmentSchema& file, +DBImpl::CompactFile(const std::string& collection_id, double threshold, const meta::SegmentSchema& file, meta::SegmentsSchema& files_to_update) { LOG_ENGINE_DEBUG_ << "Compacting segment " << file.segment_id_ << " for collection: " << collection_id; + std::string segment_dir_to_merge; + utils::GetParentPath(file.location_, segment_dir_to_merge); + + // no need to compact if deleted vectors are too few(less than threashold) + if (file.row_count_ > 0 && threshold > 0.0) { + segment::SegmentReader segment_reader_to_merge(segment_dir_to_merge); + segment::DeletedDocsPtr deleted_docs_ptr; + auto status = segment_reader_to_merge.LoadDeletedDocs(deleted_docs_ptr); + if (status.ok()) { + auto delete_items = deleted_docs_ptr->GetDeletedDocs(); + double delete_rate = (double)delete_items.size() / (double)file.row_count_; + if (delete_rate < threshold) { + LOG_ENGINE_DEBUG_ << "Delete rate less than " << threshold << ", no need to compact for" + << segment_dir_to_merge; + return Status::OK(); + } + } + } + // Create new collection file meta::SegmentSchema compacted_file; compacted_file.collection_id_ = collection_id; // compacted_file.date_ = date; compacted_file.file_type_ = meta::SegmentSchema::NEW_MERGE; // TODO: use NEW_MERGE for now - Status status = meta_ptr_->CreateCollectionFile(compacted_file); + auto status = meta_ptr_->CreateCollectionFile(compacted_file); if (!status.ok()) { LOG_ENGINE_ERROR_ << "Failed to create collection file: " << status.message(); @@ -970,9 +989,6 @@ DBImpl::CompactFile(const std::string& collection_id, const meta::SegmentSchema& utils::GetParentPath(compacted_file.location_, new_segment_dir); auto segment_writer_ptr = std::make_shared(new_segment_dir); - std::string segment_dir_to_merge; - utils::GetParentPath(file.location_, segment_dir_to_merge); - LOG_ENGINE_DEBUG_ << "Compacting begin..."; segment_writer_ptr->Merge(segment_dir_to_merge, compacted_file.file_id_); @@ -986,6 +1002,7 @@ DBImpl::CompactFile(const std::string& collection_id, const meta::SegmentSchema& if (mark_status.ok()) { LOG_ENGINE_DEBUG_ << "Mark file: " << compacted_file.file_id_ << " to to_delete"; } + return status; } @@ -1266,7 +1283,8 @@ DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers& } Status -DBImpl::CreateIndex(const std::string& collection_id, const CollectionIndex& index) { +DBImpl::CreateIndex(const std::shared_ptr& context, const std::string& collection_id, + const CollectionIndex& index) { if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -1304,7 +1322,7 @@ DBImpl::CreateIndex(const std::string& collection_id, const CollectionIndex& ind // step 4: wait and build index status = index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id); - status = WaitCollectionIndexRecursively(collection_id, index); + status = WaitCollectionIndexRecursively(context, collection_id, index); return status; } @@ -1845,7 +1863,6 @@ DBImpl::MergeFiles(const std::string& collection_id, meta::FilesHolder& files_ho collection_file.collection_id_ = collection_id; collection_file.file_type_ = meta::SegmentSchema::NEW_MERGE; Status status = meta_ptr_->CreateCollectionFile(collection_file); - if (!status.ok()) { LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString(); return status; @@ -2224,7 +2241,7 @@ DBImpl::GetPartitionsByTags(const std::string& collection_id, const std::vector< } if (partition_name_array.empty()) { - return Status(PARTITION_NOT_FOUND, "Cannot find the specified partitions"); + return Status(DB_PARTITION_NOT_FOUND, "The specified partiton does not exist"); } return Status::OK(); @@ -2291,7 +2308,8 @@ DBImpl::UpdateCollectionIndexRecursively(const std::string& collection_id, const } Status -DBImpl::WaitCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index) { +DBImpl::WaitCollectionIndexRecursively(const std::shared_ptr& context, + const std::string& collection_id, const CollectionIndex& index) { // for IDMAP type, only wait all NEW file converted to RAW file // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files std::vector file_types; @@ -2313,17 +2331,30 @@ DBImpl::WaitCollectionIndexRecursively(const std::string& collection_id, const C meta::FilesHolder files_holder; auto status = GetFilesToBuildIndex(collection_id, file_types, files_holder); int times = 1; - + uint64_t repeat = 0; while (!files_holder.HoldFiles().empty()) { - LOG_ENGINE_DEBUG_ << files_holder.HoldFiles().size() << " non-index files detected! Will build index " - << times; - if (!utils::IsRawIndexType(index.engine_type_)) { - status = meta_ptr_->UpdateCollectionFilesToIndex(collection_id); + if (repeat % WAIT_BUILD_INDEX_INTERVAL == 0) { + LOG_ENGINE_DEBUG_ << files_holder.HoldFiles().size() << " non-index files detected! Will build index " + << times; + if (!utils::IsRawIndexType(index.engine_type_)) { + status = meta_ptr_->UpdateCollectionFilesToIndex(collection_id); + } } - index_req_swn_.Wait_For(std::chrono::seconds(WAIT_BUILD_INDEX_INTERVAL)); - GetFilesToBuildIndex(collection_id, file_types, files_holder); - ++times; + index_req_swn_.Wait_For(std::chrono::seconds(1)); + + // client break the connection, no need to block, check every 1 second + if (context->IsConnectionBroken()) { + LOG_ENGINE_DEBUG_ << "Client connection broken, build index in background"; + break; // just break, not return, continue to update partitions files to to_index + } + + // check to_index files every 5 seconds + repeat++; + if (repeat % WAIT_BUILD_INDEX_INTERVAL == 0) { + GetFilesToBuildIndex(collection_id, file_types, files_holder); + ++times; + } } } @@ -2331,7 +2362,7 @@ DBImpl::WaitCollectionIndexRecursively(const std::string& collection_id, const C std::vector partition_array; auto status = meta_ptr_->ShowPartitions(collection_id, partition_array); for (auto& schema : partition_array) { - status = WaitCollectionIndexRecursively(schema.collection_id_, index); + status = WaitCollectionIndexRecursively(context, schema.collection_id_, index); fiu_do_on("DBImpl.WaitCollectionIndexRecursively.fail_build_collection_Index_for_partition", status = Status(DB_ERROR, "")); if (!status.ok()) { diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index fddd7b72c5..4e3b86fc8c 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -115,7 +115,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi Flush() override; Status - Compact(const std::string& collection_id) override; + Compact(const std::string& collection_id, double threshold = 0.0) override; Status GetVectorsByID(const std::string& collection_id, const IDNumbers& id_array, @@ -128,7 +128,8 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi // Merge(const std::set& collection_ids) override; Status - CreateIndex(const std::string& collection_id, const CollectionIndex& index) override; + CreateIndex(const std::shared_ptr& context, const std::string& collection_id, + const CollectionIndex& index) override; Status DescribeIndex(const std::string& collection_id, CollectionIndex& index) override; @@ -244,7 +245,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi BackgroundBuildIndex(); Status - CompactFile(const std::string& collection_id, const meta::SegmentSchema& file, + CompactFile(const std::string& collection_id, double threshold, const meta::SegmentSchema& file, meta::SegmentsSchema& files_to_update); /* @@ -270,7 +271,8 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi UpdateCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index); Status - WaitCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index); + WaitCollectionIndexRecursively(const std::shared_ptr& context, const std::string& collection_id, + const CollectionIndex& index); Status DropCollectionIndexRecursively(const std::string& collection_id); diff --git a/core/src/db/Utils.cpp b/core/src/db/Utils.cpp index 5ad66f58d7..10bcdcd743 100644 --- a/core/src/db/Utils.cpp +++ b/core/src/db/Utils.cpp @@ -13,6 +13,7 @@ #include +#include #include #include #include @@ -318,6 +319,20 @@ GetIndexName(int32_t index_type) { return index_type_name[index_type]; } +void +SendExitSignal() { + LOG_SERVER_INFO_ << "Send SIGUSR2 signal to exit"; + pid_t pid = getpid(); + kill(pid, SIGUSR2); +} + +void +ExitOnWriteError(Status& status) { + if (status.code() == SERVER_WRITE_ERROR) { + utils::SendExitSignal(); + } +} + } // namespace utils } // namespace engine } // namespace milvus diff --git a/core/src/db/Utils.h b/core/src/db/Utils.h index b40d852be0..b701ad41eb 100644 --- a/core/src/db/Utils.h +++ b/core/src/db/Utils.h @@ -73,6 +73,12 @@ ParseMetaUri(const std::string& uri, MetaUriInfo& info); std::string GetIndexName(int32_t index_type); +void +SendExitSignal(); + +void +ExitOnWriteError(Status& status); + } // namespace utils } // namespace engine } // namespace milvus diff --git a/core/src/db/engine/ExecutionEngineImpl.cpp b/core/src/db/engine/ExecutionEngineImpl.cpp index f9e34397ac..ecf8f007e6 100644 --- a/core/src/db/engine/ExecutionEngineImpl.cpp +++ b/core/src/db/engine/ExecutionEngineImpl.cpp @@ -359,7 +359,11 @@ ExecutionEngineImpl::Serialize() { utils::GetParentPath(location_, segment_dir); auto segment_writer_ptr = std::make_shared(segment_dir); segment_writer_ptr->SetVectorIndex(index_); - segment_writer_ptr->WriteVectorIndex(location_); + auto status = segment_writer_ptr->WriteVectorIndex(location_); + + if (!status.ok()) { + return status; + } // here we reset index size by file size, // since some index type(such as SQ8) data size become smaller after serialized diff --git a/core/src/scheduler/task/BuildIndexTask.cpp b/core/src/scheduler/task/BuildIndexTask.cpp index fc8dca9d3e..491785b31b 100644 --- a/core/src/scheduler/task/BuildIndexTask.cpp +++ b/core/src/scheduler/task/BuildIndexTask.cpp @@ -75,6 +75,7 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) { } catch (std::exception& ex) { // typical error: out of disk space or permition denied error_msg = "Failed to load to_index file: " + std::string(ex.what()); + LOG_ENGINE_ERROR_ << error_msg; stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); } fiu_do_on("XBuildIndexTask.Load.out_of_memory", stat = Status(SERVER_UNEXPECTED_ERROR, "out of memory")); @@ -88,8 +89,11 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) { s = Status(SERVER_UNEXPECTED_ERROR, error_msg); } + LOG_ENGINE_ERROR_ << s.message(); + if (auto job = job_.lock()) { auto build_index_job = std::static_pointer_cast(job); + build_index_job->GetStatus() = s; build_index_job->BuildIndexDone(file_->id_); } diff --git a/core/src/segment/SegmentWriter.cpp b/core/src/segment/SegmentWriter.cpp index 889b33f48a..91416d0ebe 100644 --- a/core/src/segment/SegmentWriter.cpp +++ b/core/src/segment/SegmentWriter.cpp @@ -24,6 +24,7 @@ #include "SegmentReader.h" #include "Vectors.h" #include "codecs/default/DefaultCodec.h" +#include "db/Utils.h" #include "storage/disk/DiskIOReader.h" #include "storage/disk/DiskIOWriter.h" #include "storage/disk/DiskOperation.h" @@ -119,6 +120,8 @@ SegmentWriter::WriteVectors() { } catch (std::exception& e) { std::string err_msg = "Failed to write vectors: " + std::string(e.what()); LOG_ENGINE_ERROR_ << err_msg; + + engine::utils::SendExitSignal(); return Status(SERVER_WRITE_ERROR, err_msg); } return Status::OK(); @@ -133,6 +136,8 @@ SegmentWriter::WriteAttrs() { } catch (std::exception& e) { std::string err_msg = "Failed to write vectors: " + std::string(e.what()); LOG_ENGINE_ERROR_ << err_msg; + + engine::utils::SendExitSignal(); return Status(SERVER_WRITE_ERROR, err_msg); } return Status::OK(); @@ -140,6 +145,10 @@ SegmentWriter::WriteAttrs() { Status SegmentWriter::WriteVectorIndex(const std::string& location) { + if (location.empty()) { + return Status(SERVER_WRITE_ERROR, "Invalid parameter of WriteVectorIndex"); + } + codec::DefaultCodec default_codec; try { fs_ptr_->operation_ptr_->CreateDirectory(); @@ -147,6 +156,8 @@ SegmentWriter::WriteVectorIndex(const std::string& location) { } catch (std::exception& e) { std::string err_msg = "Failed to write vector index: " + std::string(e.what()); LOG_ENGINE_ERROR_ << err_msg; + + engine::utils::SendExitSignal(); return Status(SERVER_WRITE_ERROR, err_msg); } return Status::OK(); @@ -177,6 +188,8 @@ SegmentWriter::WriteBloomFilter() { } catch (std::exception& e) { std::string err_msg = "Failed to write vectors: " + std::string(e.what()); LOG_ENGINE_ERROR_ << err_msg; + + engine::utils::SendExitSignal(); return Status(SERVER_WRITE_ERROR, err_msg); } return Status::OK(); @@ -192,6 +205,8 @@ SegmentWriter::WriteDeletedDocs() { } catch (std::exception& e) { std::string err_msg = "Failed to write deleted docs: " + std::string(e.what()); LOG_ENGINE_ERROR_ << err_msg; + + engine::utils::SendExitSignal(); return Status(SERVER_WRITE_ERROR, err_msg); } return Status::OK(); @@ -206,6 +221,8 @@ SegmentWriter::WriteDeletedDocs(const DeletedDocsPtr& deleted_docs) { } catch (std::exception& e) { std::string err_msg = "Failed to write deleted docs: " + std::string(e.what()); LOG_ENGINE_ERROR_ << err_msg; + + engine::utils::SendExitSignal(); return Status(SERVER_WRITE_ERROR, err_msg); } return Status::OK(); @@ -220,6 +237,8 @@ SegmentWriter::WriteBloomFilter(const IdBloomFilterPtr& id_bloom_filter_ptr) { } catch (std::exception& e) { std::string err_msg = "Failed to write bloom filter: " + std::string(e.what()); LOG_ENGINE_ERROR_ << err_msg; + + engine::utils::SendExitSignal(); return Status(SERVER_WRITE_ERROR, err_msg); } return Status::OK(); diff --git a/core/src/server/DBWrapper.cpp b/core/src/server/DBWrapper.cpp index 26672f4d0b..1ec00fb17e 100644 --- a/core/src/server/DBWrapper.cpp +++ b/core/src/server/DBWrapper.cpp @@ -197,7 +197,7 @@ DBWrapper::StartService() { db_ = engine::DBFactory::Build(opt); } catch (std::exception& ex) { std::cerr << "Error: failed to open database: " << ex.what() - << ". Possible reason: Meta Tables schema is damaged " + << ". Possible reason: out of storage, meta schema is damaged " << "or created by in-compatible Milvus version." << std::endl; kill(0, SIGUSR1); } diff --git a/core/src/server/context/ConnectionContext.h b/core/src/server/context/ConnectionContext.h new file mode 100644 index 0000000000..f1f2248c7d --- /dev/null +++ b/core/src/server/context/ConnectionContext.h @@ -0,0 +1,30 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include + +namespace milvus { +namespace server { + +class ConnectionContext { + public: + virtual ~ConnectionContext() { + } + virtual bool + IsConnectionBroken() const = 0; +}; + +using ConnectionContextPtr = std::shared_ptr; + +} // namespace server +} // namespace milvus diff --git a/core/src/server/context/Context.cpp b/core/src/server/context/Context.cpp index 19cd09414a..a95e7beb9a 100644 --- a/core/src/server/context/Context.cpp +++ b/core/src/server/context/Context.cpp @@ -40,6 +40,20 @@ Context::Follower(const std::string& operation_name) const { return new_context; } +void +Context::SetConnectionContext(ConnectionContextPtr& context) { + context_ = context; +} + +bool +Context::IsConnectionBroken() const { + if (context_ == nullptr) { + return false; + } + + return context_->IsConnectionBroken(); +} + ///////////////////////////////////////////////////////////////////////////////////////////////// ContextChild::ContextChild(const ContextPtr& context, const std::string& operation_name) { if (context) { diff --git a/core/src/server/context/Context.h b/core/src/server/context/Context.h index 6d06c2d4d2..94ea83ea59 100644 --- a/core/src/server/context/Context.h +++ b/core/src/server/context/Context.h @@ -15,6 +15,9 @@ #include #include +#include + +#include "server/context/ConnectionContext.h" #include "tracing/TraceContext.h" namespace milvus { @@ -41,9 +44,16 @@ class Context { const std::shared_ptr& GetTraceContext() const; + void + SetConnectionContext(ConnectionContextPtr& context); + + bool + IsConnectionBroken() const; + private: std::string request_id_; std::shared_ptr trace_context_; + ConnectionContextPtr context_; }; using ContextPtr = std::shared_ptr; diff --git a/core/src/server/delivery/RequestHandler.cpp b/core/src/server/delivery/RequestHandler.cpp index 9eb978c4a4..ac3d234128 100644 --- a/core/src/server/delivery/RequestHandler.cpp +++ b/core/src/server/delivery/RequestHandler.cpp @@ -256,8 +256,9 @@ RequestHandler::Flush(const std::shared_ptr& context, const std::vector } Status -RequestHandler::Compact(const std::shared_ptr& context, const std::string& collection_name) { - BaseRequestPtr request_ptr = CompactRequest::Create(context, collection_name); +RequestHandler::Compact(const std::shared_ptr& context, const std::string& collection_name, + double compact_threshold) { + BaseRequestPtr request_ptr = CompactRequest::Create(context, collection_name, compact_threshold); RequestScheduler::ExecRequest(request_ptr); return request_ptr->status(); diff --git a/core/src/server/delivery/RequestHandler.h b/core/src/server/delivery/RequestHandler.h index 7fe414cf18..8024189dc5 100644 --- a/core/src/server/delivery/RequestHandler.h +++ b/core/src/server/delivery/RequestHandler.h @@ -115,7 +115,7 @@ class RequestHandler { Flush(const std::shared_ptr& context, const std::vector& collection_names); Status - Compact(const std::shared_ptr& context, const std::string& collection_name); + Compact(const std::shared_ptr& context, const std::string& collection_name, double compact_threshold); /*******************************************New Interface*********************************************/ diff --git a/core/src/server/delivery/request/CompactRequest.cpp b/core/src/server/delivery/request/CompactRequest.cpp index e6e29be6f5..6ee7115e17 100644 --- a/core/src/server/delivery/request/CompactRequest.cpp +++ b/core/src/server/delivery/request/CompactRequest.cpp @@ -27,13 +27,16 @@ namespace milvus { namespace server { CompactRequest::CompactRequest(const std::shared_ptr& context, - const std::string& collection_name) - : BaseRequest(context, BaseRequest::kCompact), collection_name_(collection_name) { + const std::string& collection_name, double compact_threshold) + : BaseRequest(context, BaseRequest::kCompact), + collection_name_(collection_name), + compact_threshold_(compact_threshold) { } BaseRequestPtr -CompactRequest::Create(const std::shared_ptr& context, const std::string& collection_name) { - return std::shared_ptr(new CompactRequest(context, collection_name)); +CompactRequest::Create(const std::shared_ptr& context, const std::string& collection_name, + double compact_threshold) { + return std::shared_ptr(new CompactRequest(context, collection_name, compact_threshold)); } Status @@ -67,7 +70,7 @@ CompactRequest::OnExecute() { rc.RecordSection("check validation"); // step 2: check collection existence - status = DBWrapper::DB()->Compact(collection_name_); + status = DBWrapper::DB()->Compact(collection_name_, compact_threshold_); if (!status.ok()) { return status; } diff --git a/core/src/server/delivery/request/CompactRequest.h b/core/src/server/delivery/request/CompactRequest.h index c8f3fd3341..e2a1e998fa 100644 --- a/core/src/server/delivery/request/CompactRequest.h +++ b/core/src/server/delivery/request/CompactRequest.h @@ -28,16 +28,19 @@ namespace server { class CompactRequest : public BaseRequest { public: static BaseRequestPtr - Create(const std::shared_ptr& context, const std::string& collection_name); + Create(const std::shared_ptr& context, const std::string& collection_name, + double compact_threshold); protected: - CompactRequest(const std::shared_ptr& context, const std::string& collection_name); + CompactRequest(const std::shared_ptr& context, const std::string& collection_name, + double compact_threshold); Status OnExecute() override; private: const std::string collection_name_; + double compact_threshold_ = 0.0; }; } // namespace server diff --git a/core/src/server/delivery/request/CreateIndexRequest.cpp b/core/src/server/delivery/request/CreateIndexRequest.cpp index 930bc6495b..dc198f6472 100644 --- a/core/src/server/delivery/request/CreateIndexRequest.cpp +++ b/core/src/server/delivery/request/CreateIndexRequest.cpp @@ -116,7 +116,7 @@ CreateIndexRequest::OnExecute() { engine::CollectionIndex index; index.engine_type_ = adapter_index_type; index.extra_params_ = json_params_; - status = DBWrapper::DB()->CreateIndex(collection_name_, index); + status = DBWrapper::DB()->CreateIndex(context_, collection_name_, index); fiu_do_on("CreateIndexRequest.OnExecute.create_index_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { diff --git a/core/src/server/delivery/request/SearchByIDRequest.cpp b/core/src/server/delivery/request/SearchByIDRequest.cpp index 5984d49f7a..471bfa77ed 100644 --- a/core/src/server/delivery/request/SearchByIDRequest.cpp +++ b/core/src/server/delivery/request/SearchByIDRequest.cpp @@ -104,21 +104,9 @@ SearchByIDRequest::OnExecute() { return status; } - // step 6: check collection's index type supports search by id - if (collection_schema.engine_type_ != (int32_t)engine::EngineType::FAISS_IDMAP && - collection_schema.engine_type_ != (int32_t)engine::EngineType::FAISS_BIN_IDMAP && - collection_schema.engine_type_ != (int32_t)engine::EngineType::FAISS_IVFFLAT && - collection_schema.engine_type_ != (int32_t)engine::EngineType::FAISS_BIN_IVFFLAT && - collection_schema.engine_type_ != (int32_t)engine::EngineType::FAISS_IVFSQ8) { - std::string err_msg = "Index type " + std::to_string(collection_schema.engine_type_) + - " does not support SearchByID operation"; - LOG_SERVER_ERROR_ << err_msg; - return Status(SERVER_UNSUPPORTED_ERROR, err_msg); - } - rc.RecordSection("check validation"); - // step 7: search vectors + // step 6: search vectors engine::ResultIds result_ids; engine::ResultDistances result_distances; @@ -145,7 +133,7 @@ SearchByIDRequest::OnExecute() { return Status::OK(); // empty collection } - // step 8: construct result array + // step 7: construct result array milvus::server::ContextChild tracer(context_, "Constructing result"); result_.row_num_ = id_array_.size(); result_.distance_list_.swap(result_distances); diff --git a/core/src/server/grpc_impl/GrpcRequestHandler.cpp b/core/src/server/grpc_impl/GrpcRequestHandler.cpp index 01fa3c45b7..46b1bfc1e5 100644 --- a/core/src/server/grpc_impl/GrpcRequestHandler.cpp +++ b/core/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -21,6 +21,7 @@ #include "context/HybridSearchContext.h" #include "query/BinaryQuery.h" +#include "server/context/ConnectionContext.h" #include "tracing/TextMapCarrier.h" #include "tracing/TracerUtil.h" #include "utils/Log.h" @@ -129,6 +130,24 @@ ConstructResults(const TopKQueryResult& result, ::milvus::grpc::TopKQueryResult* result.distance_list_.size() * sizeof(float)); } +class GrpcConnectionContext : public milvus::server::ConnectionContext { + public: + explicit GrpcConnectionContext(::grpc::ServerContext* context) : context_(context) { + } + + bool + IsConnectionBroken() const override { + if (context_ == nullptr) { + return true; + } + + return context_->IsCancelled(); + } + + private: + ::grpc::ServerContext* context_ = nullptr; +}; + } // namespace namespace { @@ -265,11 +284,18 @@ std::shared_ptr GrpcRequestHandler::GetContext(::grpc::ServerContext* server_context) { std::lock_guard lock(context_map_mutex_); auto request_id = get_request_id(server_context); - if (context_map_.find(request_id) == context_map_.end()) { + + auto iter = context_map_.find(request_id); + if (iter == context_map_.end()) { LOG_SERVER_ERROR_ << "GetContext: request_id " << request_id << " not found in context_map_"; return nullptr; } - return context_map_[request_id]; + + if (iter->second != nullptr) { + ConnectionContextPtr connection_context = std::make_shared(server_context); + iter->second->SetConnectionContext(connection_context); + } + return iter->second; } void @@ -810,7 +836,8 @@ GrpcRequestHandler::Compact(::grpc::ServerContext* context, const ::milvus::grpc CHECK_NULLPTR_RETURN(request); LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->RequestID().c_str(), __func__); - Status status = request_handler_.Compact(GetContext(context), request->collection_name()); + double compact_threshold = 0.1; // compact trigger threshold: delete_counts/segment_counts + Status status = request_handler_.Compact(GetContext(context), request->collection_name(), compact_threshold); LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->RequestID().c_str(), __func__); SET_RESPONSE(response, status, context); diff --git a/core/src/server/web_impl/handler/WebRequestHandler.cpp b/core/src/server/web_impl/handler/WebRequestHandler.cpp index f9bff6128f..8571e66188 100644 --- a/core/src/server/web_impl/handler/WebRequestHandler.cpp +++ b/core/src/server/web_impl/handler/WebRequestHandler.cpp @@ -305,7 +305,8 @@ WebRequestHandler::Compact(const nlohmann::json& json, std::string& result_str) auto name = collection_name.get(); - auto status = request_handler_.Compact(context_ptr_, name); + double compact_threshold = 0.1; // compact trigger threshold: delete_counts/segment_counts + auto status = request_handler_.Compact(context_ptr_, name, compact_threshold); if (status.ok()) { nlohmann::json result; diff --git a/core/src/utils/Error.h b/core/src/utils/Error.h index c96cb4944c..d24ff8b373 100644 --- a/core/src/utils/Error.h +++ b/core/src/utils/Error.h @@ -95,7 +95,8 @@ constexpr ErrorCode DB_INCOMPATIB_META = ToDbErrorCode(6); constexpr ErrorCode DB_INVALID_META_URI = ToDbErrorCode(7); constexpr ErrorCode DB_EMPTY_COLLECTION = ToDbErrorCode(8); constexpr ErrorCode DB_BLOOM_FILTER_ERROR = ToDbErrorCode(9); -constexpr ErrorCode PARTITION_NOT_FOUND = ToDbErrorCode(10); +constexpr ErrorCode DB_PARTITION_NOT_FOUND = ToDbErrorCode(10); +constexpr ErrorCode DB_OUT_OF_STORAGE = ToDbErrorCode(11); // knowhere error code constexpr ErrorCode KNOWHERE_ERROR = ToKnowhereErrorCode(1); diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index 9b8f7927a6..c55508c211 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -314,7 +314,7 @@ TEST_F(DBTest, SEARCH_TEST) { milvus::engine::CollectionIndex index; index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IDMAP; index.extra_params_ = {{"nlist", 16384}}; -// db_->CreateIndex(COLLECTION_NAME, index); // wait until build index finish +// db_->CreateIndex(dummy_context_, COLLECTION_NAME, index); // wait until build index finish // // { // std::vector tags; @@ -326,7 +326,7 @@ TEST_F(DBTest, SEARCH_TEST) { // // index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT; // index.extra_params_ = {{"nlist", 16384}}; -// db_->CreateIndex(COLLECTION_NAME, index); // wait until build index finish +// db_->CreateIndex(dummy_context_, COLLECTION_NAME, index); // wait until build index finish // // { // std::vector tags; @@ -338,7 +338,7 @@ TEST_F(DBTest, SEARCH_TEST) { index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8; index.extra_params_ = {{"nlist", 16384}}; - db_->CreateIndex(COLLECTION_NAME, index); // wait until build index finish + db_->CreateIndex(dummy_context_, COLLECTION_NAME, index); // wait until build index finish { std::vector tags; @@ -350,7 +350,7 @@ TEST_F(DBTest, SEARCH_TEST) { #ifdef MILVUS_GPU_VERSION index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8H; - db_->CreateIndex(COLLECTION_NAME, index); // wait until build index finish + db_->CreateIndex(dummy_context_, COLLECTION_NAME, index); // wait until build index finish { std::vector tags; @@ -400,7 +400,7 @@ TEST_F(DBTest, SEARCH_TEST) { #ifdef MILVUS_GPU_VERSION // test FAISS_IVFSQ8H optimizer index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8H; - db_->CreateIndex(COLLECTION_NAME, index); // wait until build index finish + db_->CreateIndex(dummy_context_, COLLECTION_NAME, index); // wait until build index finish std::vector partition_tag; milvus::engine::ResultIds result_ids; milvus::engine::ResultDistances result_dists; @@ -452,7 +452,7 @@ TEST_F(DBTest, PRELOAD_TEST) { milvus::engine::CollectionIndex index; index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IDMAP; - db_->CreateIndex(COLLECTION_NAME, index); // wait until build index finish + db_->CreateIndex(dummy_context_, COLLECTION_NAME, index); // wait until build index finish int64_t prev_cache_usage = milvus::cache::CpuCacheMgr::GetInstance()->CacheUsage(); stat = db_->PreloadCollection(COLLECTION_NAME); @@ -551,7 +551,7 @@ TEST_F(DBTest, SHUTDOWN_TEST) { ASSERT_FALSE(stat.ok()); milvus::engine::CollectionIndex index; - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_FALSE(stat.ok()); stat = db_->DescribeIndex(collection_info.collection_id_, index); @@ -713,28 +713,28 @@ TEST_F(DBTest, INDEX_TEST) { milvus::engine::CollectionIndex index; index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8; index.metric_type_ = (int)milvus::engine::MetricType::IP; - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT; - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); fiu_init(0); FIU_ENABLE_FIU("SqliteMetaImpl.DescribeCollectionIndex.throw_exception"); - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_FALSE(stat.ok()); fiu_disable("SqliteMetaImpl.DescribeCollectionIndex.throw_exception"); index.engine_type_ = (int)milvus::engine::EngineType::FAISS_PQ; FIU_ENABLE_FIU("DBImpl.UpdateCollectionIndexRecursively.fail_update_collection_index"); - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_FALSE(stat.ok()); fiu_disable("DBImpl.UpdateCollectionIndexRecursively.fail_update_collection_index"); #ifdef MILVUS_GPU_VERSION index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8H; - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); #endif @@ -815,17 +815,17 @@ TEST_F(DBTest, PARTITION_TEST) { milvus::engine::CollectionIndex index; index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT; index.metric_type_ = (int)milvus::engine::MetricType::L2; - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); fiu_init(0); FIU_ENABLE_FIU("DBImpl.WaitCollectionIndexRecursively.fail_build_collection_Index_for_partition"); - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_FALSE(stat.ok()); fiu_disable("DBImpl.WaitCollectionIndexRecursively.fail_build_collection_Index_for_partition"); FIU_ENABLE_FIU("DBImpl.WaitCollectionIndexRecursively.not_empty_err_msg"); - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_FALSE(stat.ok()); fiu_disable("DBImpl.WaitCollectionIndexRecursively.not_empty_err_msg"); @@ -967,7 +967,7 @@ TEST_F(DBTest2, DELETE_TEST) { milvus::engine::IDNumbers vector_ids; stat = db_->InsertVectors(COLLECTION_NAME, "", xb); milvus::engine::CollectionIndex index; - stat = db_->CreateIndex(COLLECTION_NAME, index); + stat = db_->CreateIndex(dummy_context_, COLLECTION_NAME, index); // create partition, drop collection will drop partition recursively stat = db_->CreatePartition(COLLECTION_NAME, "part0", "0"); @@ -1341,7 +1341,7 @@ TEST_F(DBTest2, SEARCH_WITH_DIFFERENT_INDEX) { milvus::engine::CollectionIndex index; // index.metric_type_ = (int)milvus::engine::MetricType::IP; index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT; - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); stat = db_->PreloadCollection(collection_info.collection_id_); @@ -1366,7 +1366,7 @@ result_distances); db_->DropIndex(collection_info.collection_id_); index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8; - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); stat = db_->PreloadCollection(collection_info.collection_id_); diff --git a/core/unittest/db/test_db_mysql.cpp b/core/unittest/db/test_db_mysql.cpp index 1139314f73..cb9b8fd54a 100644 --- a/core/unittest/db/test_db_mysql.cpp +++ b/core/unittest/db/test_db_mysql.cpp @@ -362,7 +362,7 @@ TEST_F(MySqlDBTest, PARTITION_TEST) { milvus::engine::CollectionIndex index; index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT; index.metric_type_ = (int)milvus::engine::MetricType::L2; - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); uint64_t row_count = 0; diff --git a/core/unittest/db/test_delete.cpp b/core/unittest/db/test_delete.cpp index 194d55913e..47f91202ee 100644 --- a/core/unittest/db/test_delete.cpp +++ b/core/unittest/db/test_delete.cpp @@ -329,7 +329,7 @@ TEST_F(DeleteTest, delete_before_create_index) { milvus::engine::CollectionIndex index; index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8; index.extra_params_ = {{"nlist", 100}}; - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); uint64_t row_count; @@ -399,7 +399,7 @@ TEST_F(DeleteTest, delete_with_index) { milvus::engine::CollectionIndex index; index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8; index.extra_params_ = {{"nlist", 100}}; - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); // std::this_thread::sleep_for(std::chrono::seconds(3)); // ensure raw data write to disk @@ -485,7 +485,7 @@ TEST_F(DeleteTest, delete_multiple_times_with_index) { milvus::engine::CollectionIndex index; index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT; index.extra_params_ = {{"nlist", 1}}; - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); int topk = 10, nprobe = 10; @@ -594,7 +594,7 @@ TEST_F(DeleteTest, delete_add_create_index) { milvus::engine::CollectionIndex index; index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT; index.extra_params_ = {{"nlist", 100}}; - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); std::vector ids_to_delete; @@ -610,7 +610,7 @@ TEST_F(DeleteTest, delete_add_create_index) { // stat = db_->Flush(); // ASSERT_TRUE(stat.ok()); - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); uint64_t row_count; @@ -665,7 +665,7 @@ TEST_F(DeleteTest, delete_add_auto_flush) { // ASSERT_TRUE(stat.ok()); // milvus::engine::CollectionIndex index; // index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8; - // stat = db_->CreateIndex(collection_info.collection_id_, index); + // stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); // ASSERT_TRUE(stat.ok()); std::vector ids_to_delete; @@ -682,7 +682,7 @@ TEST_F(DeleteTest, delete_add_auto_flush) { std::this_thread::sleep_for(std::chrono::seconds(2)); // stat = db_->Flush(); // ASSERT_TRUE(stat.ok()); - // stat = db_->CreateIndex(collection_info.collection_id_, index); + // stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); // ASSERT_TRUE(stat.ok()); uint64_t row_count; @@ -814,7 +814,7 @@ TEST_F(CompactTest, compact_with_index) { milvus::engine::CollectionIndex index; index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8; - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); stat = db_->Flush(); diff --git a/core/unittest/db/test_search_by_id.cpp b/core/unittest/db/test_search_by_id.cpp index c5ed9283a4..812d80268b 100644 --- a/core/unittest/db/test_search_by_id.cpp +++ b/core/unittest/db/test_search_by_id.cpp @@ -214,7 +214,7 @@ TEST_F(SearchByIdTest, WITH_INDEX_TEST) { milvus::engine::CollectionIndex index; index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8; index.extra_params_ = {{"nlist", 10}}; - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); const int64_t topk = 10, nprobe = 10; @@ -398,7 +398,7 @@ TEST_F(GetVectorByIdTest, WITH_INDEX_TEST) { milvus::engine::CollectionIndex index; index.extra_params_ = {{"nlist", 10}}; index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8; - stat = db_->CreateIndex(collection_info.collection_id_, index); + stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); const int64_t topk = 10, nprobe = 10; diff --git a/sdk/grpc/ClientProxy.cpp b/sdk/grpc/ClientProxy.cpp index dd18699289..8d449929e1 100644 --- a/sdk/grpc/ClientProxy.cpp +++ b/sdk/grpc/ClientProxy.cpp @@ -17,7 +17,7 @@ #include "grpc-gen/gen-milvus/milvus.grpc.pb.h" -#define MILVUS_SDK_VERSION "0.8.0"; +#define MILVUS_SDK_VERSION "0.9.0"; namespace milvus { diff --git a/tests/milvus_python_test/test_compact.py b/tests/milvus_python_test/test_compact.py index fa9ac3d25f..db73a04a5a 100644 --- a/tests/milvus_python_test/test_compact.py +++ b/tests/milvus_python_test/test_compact.py @@ -142,7 +142,7 @@ class TestCompactBase: logging.getLogger().info(info["partitions"]) size_after = info["partitions"][0]["segments"][0]["data_size"] logging.getLogger().info(size_after) - assert(size_before > size_after) + assert(size_before >= size_after) @pytest.mark.timeout(COMPACT_TIMEOUT) def test_add_vectors_delete_all_and_compact(self, connect, collection): @@ -280,7 +280,7 @@ class TestCompactBase: status, info = connect.collection_info(collection) assert status.OK() size_after = info["partitions"][0]["segments"][0]["data_size"] - assert(size_before > size_after) + assert(size_before >= size_after) status = connect.compact(collection) assert status.OK() # get collection info after compact twice @@ -521,7 +521,7 @@ class TestCompactJAC: logging.getLogger().info(info["partitions"]) size_after = info["partitions"][0]["segments"][0]["data_size"] logging.getLogger().info(size_after) - assert(size_before > size_after) + assert(size_before >= size_after) @pytest.mark.timeout(COMPACT_TIMEOUT) def test_add_vectors_delete_all_and_compact(self, connect, jac_collection): @@ -608,7 +608,7 @@ class TestCompactJAC: status, info = connect.collection_info(jac_collection) assert status.OK() size_after = info["partitions"][0]["segments"][0]["data_size"] - assert(size_before > size_after) + assert(size_before >= size_after) status = connect.compact(jac_collection) assert status.OK() # get collection info after compact twice @@ -802,7 +802,7 @@ class TestCompactIP: logging.getLogger().info(info["partitions"]) size_after = info["partitions"][0]["segments"][0]["data_size"] logging.getLogger().info(size_after) - assert(size_before > size_after) + assert(size_before >= size_after) @pytest.mark.timeout(COMPACT_TIMEOUT) def test_add_vectors_delete_all_and_compact(self, connect, ip_collection): @@ -891,7 +891,7 @@ class TestCompactIP: status, info = connect.collection_info(ip_collection) assert status.OK() size_after = info["partitions"][0]["segments"][0]["data_size"] - assert(size_before > size_after) + assert(size_before >= size_after) status = connect.compact(ip_collection) assert status.OK() status = connect.flush([ip_collection])