write error (#2184)

* write error

Signed-off-by: groot <yihua.mo@zilliz.com>

* out of storage

Signed-off-by: groot <yihua.mo@zilliz.com>

* clang format

Signed-off-by: groot <yihua.mo@zilliz.com>

* fix ut

Signed-off-by: groot <yihua.mo@zilliz.com>

* fix #1955

Signed-off-by: groot <yihua.mo@zilliz.com>

* refine code

Signed-off-by: groot <yihua.mo@zilliz.com>

* compact threashold

Signed-off-by: groot <yihua.mo@zilliz.com>

* changelog

Signed-off-by: groot <yihua.mo@zilliz.com>

* search by id for hnsw/pq/annoy

Signed-off-by: groot <yihua.mo@zilliz.com>

* fix python test

Signed-off-by: yhmo <yihua.mo@zilliz.com>
This commit is contained in:
groot 2020-05-06 03:52:37 -05:00 committed by GitHub
parent 82ab21aec3
commit ac8e9ff020
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 256 additions and 95 deletions

View File

@ -9,6 +9,7 @@ Please mark all change in change log and use the issue from GitHub
- \#1776 Error out when index SQ8H run in CPU mode
- \#1929 Skip MySQL meta schema field width check
- \#1946 Fix load index file CPU2GPU fail during searching
- \#1955 Switch create_index operation to background once client break connection
- \#1997 Index file missed after compact
- \#2073 Fix CheckDBConfigBackendUrl error message
- \#2076 CheckMetricConfigAddress error message

View File

@ -105,7 +105,7 @@ class DB {
Flush() = 0;
virtual Status
Compact(const std::string& collection_id) = 0;
Compact(const std::string& collection_id, double threshold = 0.0) = 0;
virtual Status
GetVectorsByID(const std::string& collection_id, const IDNumbers& id_array,
@ -136,7 +136,8 @@ class DB {
Size(uint64_t& result) = 0;
virtual Status
CreateIndex(const std::string& collection_id, const CollectionIndex& index) = 0;
CreateIndex(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
const CollectionIndex& index) = 0;
virtual Status
DescribeIndex(const std::string& collection_id, CollectionIndex& index) = 0;

View File

@ -852,7 +852,7 @@ DBImpl::Flush() {
}
Status
DBImpl::Compact(const std::string& collection_id) {
DBImpl::Compact(const std::string& collection_id, double threshold) {
if (!initialized_.load(std::memory_order_acquire)) {
return SHUTDOWN_ERROR;
}
@ -917,7 +917,7 @@ DBImpl::Compact(const std::string& collection_id) {
meta::SegmentsSchema files_to_update;
if (deleted_docs_size != 0) {
compact_status = CompactFile(collection_id, file, files_to_update);
compact_status = CompactFile(collection_id, threshold, file, files_to_update);
if (!compact_status.ok()) {
LOG_ENGINE_ERROR_ << "Compact failed for segment " << file.segment_id_ << ": "
@ -948,16 +948,35 @@ DBImpl::Compact(const std::string& collection_id) {
}
Status
DBImpl::CompactFile(const std::string& collection_id, const meta::SegmentSchema& file,
DBImpl::CompactFile(const std::string& collection_id, double threshold, const meta::SegmentSchema& file,
meta::SegmentsSchema& files_to_update) {
LOG_ENGINE_DEBUG_ << "Compacting segment " << file.segment_id_ << " for collection: " << collection_id;
std::string segment_dir_to_merge;
utils::GetParentPath(file.location_, segment_dir_to_merge);
// no need to compact if deleted vectors are too few(less than threashold)
if (file.row_count_ > 0 && threshold > 0.0) {
segment::SegmentReader segment_reader_to_merge(segment_dir_to_merge);
segment::DeletedDocsPtr deleted_docs_ptr;
auto status = segment_reader_to_merge.LoadDeletedDocs(deleted_docs_ptr);
if (status.ok()) {
auto delete_items = deleted_docs_ptr->GetDeletedDocs();
double delete_rate = (double)delete_items.size() / (double)file.row_count_;
if (delete_rate < threshold) {
LOG_ENGINE_DEBUG_ << "Delete rate less than " << threshold << ", no need to compact for"
<< segment_dir_to_merge;
return Status::OK();
}
}
}
// Create new collection file
meta::SegmentSchema compacted_file;
compacted_file.collection_id_ = collection_id;
// compacted_file.date_ = date;
compacted_file.file_type_ = meta::SegmentSchema::NEW_MERGE; // TODO: use NEW_MERGE for now
Status status = meta_ptr_->CreateCollectionFile(compacted_file);
auto status = meta_ptr_->CreateCollectionFile(compacted_file);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to create collection file: " << status.message();
@ -970,9 +989,6 @@ DBImpl::CompactFile(const std::string& collection_id, const meta::SegmentSchema&
utils::GetParentPath(compacted_file.location_, new_segment_dir);
auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);
std::string segment_dir_to_merge;
utils::GetParentPath(file.location_, segment_dir_to_merge);
LOG_ENGINE_DEBUG_ << "Compacting begin...";
segment_writer_ptr->Merge(segment_dir_to_merge, compacted_file.file_id_);
@ -986,6 +1002,7 @@ DBImpl::CompactFile(const std::string& collection_id, const meta::SegmentSchema&
if (mark_status.ok()) {
LOG_ENGINE_DEBUG_ << "Mark file: " << compacted_file.file_id_ << " to to_delete";
}
return status;
}
@ -1266,7 +1283,8 @@ DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers&
}
Status
DBImpl::CreateIndex(const std::string& collection_id, const CollectionIndex& index) {
DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
const CollectionIndex& index) {
if (!initialized_.load(std::memory_order_acquire)) {
return SHUTDOWN_ERROR;
}
@ -1304,7 +1322,7 @@ DBImpl::CreateIndex(const std::string& collection_id, const CollectionIndex& ind
// step 4: wait and build index
status = index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
status = WaitCollectionIndexRecursively(collection_id, index);
status = WaitCollectionIndexRecursively(context, collection_id, index);
return status;
}
@ -1845,7 +1863,6 @@ DBImpl::MergeFiles(const std::string& collection_id, meta::FilesHolder& files_ho
collection_file.collection_id_ = collection_id;
collection_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
Status status = meta_ptr_->CreateCollectionFile(collection_file);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString();
return status;
@ -2224,7 +2241,7 @@ DBImpl::GetPartitionsByTags(const std::string& collection_id, const std::vector<
}
if (partition_name_array.empty()) {
return Status(PARTITION_NOT_FOUND, "Cannot find the specified partitions");
return Status(DB_PARTITION_NOT_FOUND, "The specified partiton does not exist");
}
return Status::OK();
@ -2291,7 +2308,8 @@ DBImpl::UpdateCollectionIndexRecursively(const std::string& collection_id, const
}
Status
DBImpl::WaitCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index) {
DBImpl::WaitCollectionIndexRecursively(const std::shared_ptr<server::Context>& context,
const std::string& collection_id, const CollectionIndex& index) {
// for IDMAP type, only wait all NEW file converted to RAW file
// for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
std::vector<int> file_types;
@ -2313,17 +2331,30 @@ DBImpl::WaitCollectionIndexRecursively(const std::string& collection_id, const C
meta::FilesHolder files_holder;
auto status = GetFilesToBuildIndex(collection_id, file_types, files_holder);
int times = 1;
uint64_t repeat = 0;
while (!files_holder.HoldFiles().empty()) {
LOG_ENGINE_DEBUG_ << files_holder.HoldFiles().size() << " non-index files detected! Will build index "
<< times;
if (!utils::IsRawIndexType(index.engine_type_)) {
status = meta_ptr_->UpdateCollectionFilesToIndex(collection_id);
if (repeat % WAIT_BUILD_INDEX_INTERVAL == 0) {
LOG_ENGINE_DEBUG_ << files_holder.HoldFiles().size() << " non-index files detected! Will build index "
<< times;
if (!utils::IsRawIndexType(index.engine_type_)) {
status = meta_ptr_->UpdateCollectionFilesToIndex(collection_id);
}
}
index_req_swn_.Wait_For(std::chrono::seconds(WAIT_BUILD_INDEX_INTERVAL));
GetFilesToBuildIndex(collection_id, file_types, files_holder);
++times;
index_req_swn_.Wait_For(std::chrono::seconds(1));
// client break the connection, no need to block, check every 1 second
if (context->IsConnectionBroken()) {
LOG_ENGINE_DEBUG_ << "Client connection broken, build index in background";
break; // just break, not return, continue to update partitions files to to_index
}
// check to_index files every 5 seconds
repeat++;
if (repeat % WAIT_BUILD_INDEX_INTERVAL == 0) {
GetFilesToBuildIndex(collection_id, file_types, files_holder);
++times;
}
}
}
@ -2331,7 +2362,7 @@ DBImpl::WaitCollectionIndexRecursively(const std::string& collection_id, const C
std::vector<meta::CollectionSchema> partition_array;
auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
for (auto& schema : partition_array) {
status = WaitCollectionIndexRecursively(schema.collection_id_, index);
status = WaitCollectionIndexRecursively(context, schema.collection_id_, index);
fiu_do_on("DBImpl.WaitCollectionIndexRecursively.fail_build_collection_Index_for_partition",
status = Status(DB_ERROR, ""));
if (!status.ok()) {

View File

@ -115,7 +115,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
Flush() override;
Status
Compact(const std::string& collection_id) override;
Compact(const std::string& collection_id, double threshold = 0.0) override;
Status
GetVectorsByID(const std::string& collection_id, const IDNumbers& id_array,
@ -128,7 +128,8 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
// Merge(const std::set<std::string>& collection_ids) override;
Status
CreateIndex(const std::string& collection_id, const CollectionIndex& index) override;
CreateIndex(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
const CollectionIndex& index) override;
Status
DescribeIndex(const std::string& collection_id, CollectionIndex& index) override;
@ -244,7 +245,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
BackgroundBuildIndex();
Status
CompactFile(const std::string& collection_id, const meta::SegmentSchema& file,
CompactFile(const std::string& collection_id, double threshold, const meta::SegmentSchema& file,
meta::SegmentsSchema& files_to_update);
/*
@ -270,7 +271,8 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
UpdateCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index);
Status
WaitCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index);
WaitCollectionIndexRecursively(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
const CollectionIndex& index);
Status
DropCollectionIndexRecursively(const std::string& collection_id);

View File

@ -13,6 +13,7 @@
#include <fiu-local.h>
#include <unistd.h>
#include <boost/filesystem.hpp>
#include <chrono>
#include <mutex>
@ -318,6 +319,20 @@ GetIndexName(int32_t index_type) {
return index_type_name[index_type];
}
void
SendExitSignal() {
LOG_SERVER_INFO_ << "Send SIGUSR2 signal to exit";
pid_t pid = getpid();
kill(pid, SIGUSR2);
}
void
ExitOnWriteError(Status& status) {
if (status.code() == SERVER_WRITE_ERROR) {
utils::SendExitSignal();
}
}
} // namespace utils
} // namespace engine
} // namespace milvus

View File

@ -73,6 +73,12 @@ ParseMetaUri(const std::string& uri, MetaUriInfo& info);
std::string
GetIndexName(int32_t index_type);
void
SendExitSignal();
void
ExitOnWriteError(Status& status);
} // namespace utils
} // namespace engine
} // namespace milvus

View File

@ -359,7 +359,11 @@ ExecutionEngineImpl::Serialize() {
utils::GetParentPath(location_, segment_dir);
auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(segment_dir);
segment_writer_ptr->SetVectorIndex(index_);
segment_writer_ptr->WriteVectorIndex(location_);
auto status = segment_writer_ptr->WriteVectorIndex(location_);
if (!status.ok()) {
return status;
}
// here we reset index size by file size,
// since some index type(such as SQ8) data size become smaller after serialized

View File

@ -75,6 +75,7 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
} catch (std::exception& ex) {
// typical error: out of disk space or permition denied
error_msg = "Failed to load to_index file: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << error_msg;
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
fiu_do_on("XBuildIndexTask.Load.out_of_memory", stat = Status(SERVER_UNEXPECTED_ERROR, "out of memory"));
@ -88,8 +89,11 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
LOG_ENGINE_ERROR_ << s.message();
if (auto job = job_.lock()) {
auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
build_index_job->GetStatus() = s;
build_index_job->BuildIndexDone(file_->id_);
}

View File

@ -24,6 +24,7 @@
#include "SegmentReader.h"
#include "Vectors.h"
#include "codecs/default/DefaultCodec.h"
#include "db/Utils.h"
#include "storage/disk/DiskIOReader.h"
#include "storage/disk/DiskIOWriter.h"
#include "storage/disk/DiskOperation.h"
@ -119,6 +120,8 @@ SegmentWriter::WriteVectors() {
} catch (std::exception& e) {
std::string err_msg = "Failed to write vectors: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
@ -133,6 +136,8 @@ SegmentWriter::WriteAttrs() {
} catch (std::exception& e) {
std::string err_msg = "Failed to write vectors: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
@ -140,6 +145,10 @@ SegmentWriter::WriteAttrs() {
Status
SegmentWriter::WriteVectorIndex(const std::string& location) {
if (location.empty()) {
return Status(SERVER_WRITE_ERROR, "Invalid parameter of WriteVectorIndex");
}
codec::DefaultCodec default_codec;
try {
fs_ptr_->operation_ptr_->CreateDirectory();
@ -147,6 +156,8 @@ SegmentWriter::WriteVectorIndex(const std::string& location) {
} catch (std::exception& e) {
std::string err_msg = "Failed to write vector index: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
@ -177,6 +188,8 @@ SegmentWriter::WriteBloomFilter() {
} catch (std::exception& e) {
std::string err_msg = "Failed to write vectors: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
@ -192,6 +205,8 @@ SegmentWriter::WriteDeletedDocs() {
} catch (std::exception& e) {
std::string err_msg = "Failed to write deleted docs: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
@ -206,6 +221,8 @@ SegmentWriter::WriteDeletedDocs(const DeletedDocsPtr& deleted_docs) {
} catch (std::exception& e) {
std::string err_msg = "Failed to write deleted docs: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
@ -220,6 +237,8 @@ SegmentWriter::WriteBloomFilter(const IdBloomFilterPtr& id_bloom_filter_ptr) {
} catch (std::exception& e) {
std::string err_msg = "Failed to write bloom filter: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();

View File

@ -197,7 +197,7 @@ DBWrapper::StartService() {
db_ = engine::DBFactory::Build(opt);
} catch (std::exception& ex) {
std::cerr << "Error: failed to open database: " << ex.what()
<< ". Possible reason: Meta Tables schema is damaged "
<< ". Possible reason: out of storage, meta schema is damaged "
<< "or created by in-compatible Milvus version." << std::endl;
kill(0, SIGUSR1);
}

View File

@ -0,0 +1,30 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <memory>
namespace milvus {
namespace server {
class ConnectionContext {
public:
virtual ~ConnectionContext() {
}
virtual bool
IsConnectionBroken() const = 0;
};
using ConnectionContextPtr = std::shared_ptr<ConnectionContext>;
} // namespace server
} // namespace milvus

View File

@ -40,6 +40,20 @@ Context::Follower(const std::string& operation_name) const {
return new_context;
}
void
Context::SetConnectionContext(ConnectionContextPtr& context) {
context_ = context;
}
bool
Context::IsConnectionBroken() const {
if (context_ == nullptr) {
return false;
}
return context_->IsConnectionBroken();
}
/////////////////////////////////////////////////////////////////////////////////////////////////
ContextChild::ContextChild(const ContextPtr& context, const std::string& operation_name) {
if (context) {

View File

@ -15,6 +15,9 @@
#include <string>
#include <unordered_map>
#include <grpcpp/server_context.h>
#include "server/context/ConnectionContext.h"
#include "tracing/TraceContext.h"
namespace milvus {
@ -41,9 +44,16 @@ class Context {
const std::shared_ptr<tracing::TraceContext>&
GetTraceContext() const;
void
SetConnectionContext(ConnectionContextPtr& context);
bool
IsConnectionBroken() const;
private:
std::string request_id_;
std::shared_ptr<tracing::TraceContext> trace_context_;
ConnectionContextPtr context_;
};
using ContextPtr = std::shared_ptr<milvus::server::Context>;

View File

@ -256,8 +256,9 @@ RequestHandler::Flush(const std::shared_ptr<Context>& context, const std::vector
}
Status
RequestHandler::Compact(const std::shared_ptr<Context>& context, const std::string& collection_name) {
BaseRequestPtr request_ptr = CompactRequest::Create(context, collection_name);
RequestHandler::Compact(const std::shared_ptr<Context>& context, const std::string& collection_name,
double compact_threshold) {
BaseRequestPtr request_ptr = CompactRequest::Create(context, collection_name, compact_threshold);
RequestScheduler::ExecRequest(request_ptr);
return request_ptr->status();

View File

@ -115,7 +115,7 @@ class RequestHandler {
Flush(const std::shared_ptr<Context>& context, const std::vector<std::string>& collection_names);
Status
Compact(const std::shared_ptr<Context>& context, const std::string& collection_name);
Compact(const std::shared_ptr<Context>& context, const std::string& collection_name, double compact_threshold);
/*******************************************New Interface*********************************************/

View File

@ -27,13 +27,16 @@ namespace milvus {
namespace server {
CompactRequest::CompactRequest(const std::shared_ptr<milvus::server::Context>& context,
const std::string& collection_name)
: BaseRequest(context, BaseRequest::kCompact), collection_name_(collection_name) {
const std::string& collection_name, double compact_threshold)
: BaseRequest(context, BaseRequest::kCompact),
collection_name_(collection_name),
compact_threshold_(compact_threshold) {
}
BaseRequestPtr
CompactRequest::Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name) {
return std::shared_ptr<BaseRequest>(new CompactRequest(context, collection_name));
CompactRequest::Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
double compact_threshold) {
return std::shared_ptr<BaseRequest>(new CompactRequest(context, collection_name, compact_threshold));
}
Status
@ -67,7 +70,7 @@ CompactRequest::OnExecute() {
rc.RecordSection("check validation");
// step 2: check collection existence
status = DBWrapper::DB()->Compact(collection_name_);
status = DBWrapper::DB()->Compact(collection_name_, compact_threshold_);
if (!status.ok()) {
return status;
}

View File

@ -28,16 +28,19 @@ namespace server {
class CompactRequest : public BaseRequest {
public:
static BaseRequestPtr
Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name);
Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
double compact_threshold);
protected:
CompactRequest(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name);
CompactRequest(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
double compact_threshold);
Status
OnExecute() override;
private:
const std::string collection_name_;
double compact_threshold_ = 0.0;
};
} // namespace server

View File

@ -116,7 +116,7 @@ CreateIndexRequest::OnExecute() {
engine::CollectionIndex index;
index.engine_type_ = adapter_index_type;
index.extra_params_ = json_params_;
status = DBWrapper::DB()->CreateIndex(collection_name_, index);
status = DBWrapper::DB()->CreateIndex(context_, collection_name_, index);
fiu_do_on("CreateIndexRequest.OnExecute.create_index_fail",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {

View File

@ -104,21 +104,9 @@ SearchByIDRequest::OnExecute() {
return status;
}
// step 6: check collection's index type supports search by id
if (collection_schema.engine_type_ != (int32_t)engine::EngineType::FAISS_IDMAP &&
collection_schema.engine_type_ != (int32_t)engine::EngineType::FAISS_BIN_IDMAP &&
collection_schema.engine_type_ != (int32_t)engine::EngineType::FAISS_IVFFLAT &&
collection_schema.engine_type_ != (int32_t)engine::EngineType::FAISS_BIN_IVFFLAT &&
collection_schema.engine_type_ != (int32_t)engine::EngineType::FAISS_IVFSQ8) {
std::string err_msg = "Index type " + std::to_string(collection_schema.engine_type_) +
" does not support SearchByID operation";
LOG_SERVER_ERROR_ << err_msg;
return Status(SERVER_UNSUPPORTED_ERROR, err_msg);
}
rc.RecordSection("check validation");
// step 7: search vectors
// step 6: search vectors
engine::ResultIds result_ids;
engine::ResultDistances result_distances;
@ -145,7 +133,7 @@ SearchByIDRequest::OnExecute() {
return Status::OK(); // empty collection
}
// step 8: construct result array
// step 7: construct result array
milvus::server::ContextChild tracer(context_, "Constructing result");
result_.row_num_ = id_array_.size();
result_.distance_list_.swap(result_distances);

View File

@ -21,6 +21,7 @@
#include "context/HybridSearchContext.h"
#include "query/BinaryQuery.h"
#include "server/context/ConnectionContext.h"
#include "tracing/TextMapCarrier.h"
#include "tracing/TracerUtil.h"
#include "utils/Log.h"
@ -129,6 +130,24 @@ ConstructResults(const TopKQueryResult& result, ::milvus::grpc::TopKQueryResult*
result.distance_list_.size() * sizeof(float));
}
class GrpcConnectionContext : public milvus::server::ConnectionContext {
public:
explicit GrpcConnectionContext(::grpc::ServerContext* context) : context_(context) {
}
bool
IsConnectionBroken() const override {
if (context_ == nullptr) {
return true;
}
return context_->IsCancelled();
}
private:
::grpc::ServerContext* context_ = nullptr;
};
} // namespace
namespace {
@ -265,11 +284,18 @@ std::shared_ptr<Context>
GrpcRequestHandler::GetContext(::grpc::ServerContext* server_context) {
std::lock_guard<std::mutex> lock(context_map_mutex_);
auto request_id = get_request_id(server_context);
if (context_map_.find(request_id) == context_map_.end()) {
auto iter = context_map_.find(request_id);
if (iter == context_map_.end()) {
LOG_SERVER_ERROR_ << "GetContext: request_id " << request_id << " not found in context_map_";
return nullptr;
}
return context_map_[request_id];
if (iter->second != nullptr) {
ConnectionContextPtr connection_context = std::make_shared<GrpcConnectionContext>(server_context);
iter->second->SetConnectionContext(connection_context);
}
return iter->second;
}
void
@ -810,7 +836,8 @@ GrpcRequestHandler::Compact(::grpc::ServerContext* context, const ::milvus::grpc
CHECK_NULLPTR_RETURN(request);
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->RequestID().c_str(), __func__);
Status status = request_handler_.Compact(GetContext(context), request->collection_name());
double compact_threshold = 0.1; // compact trigger threshold: delete_counts/segment_counts
Status status = request_handler_.Compact(GetContext(context), request->collection_name(), compact_threshold);
LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->RequestID().c_str(), __func__);
SET_RESPONSE(response, status, context);

View File

@ -305,7 +305,8 @@ WebRequestHandler::Compact(const nlohmann::json& json, std::string& result_str)
auto name = collection_name.get<std::string>();
auto status = request_handler_.Compact(context_ptr_, name);
double compact_threshold = 0.1; // compact trigger threshold: delete_counts/segment_counts
auto status = request_handler_.Compact(context_ptr_, name, compact_threshold);
if (status.ok()) {
nlohmann::json result;

View File

@ -95,7 +95,8 @@ constexpr ErrorCode DB_INCOMPATIB_META = ToDbErrorCode(6);
constexpr ErrorCode DB_INVALID_META_URI = ToDbErrorCode(7);
constexpr ErrorCode DB_EMPTY_COLLECTION = ToDbErrorCode(8);
constexpr ErrorCode DB_BLOOM_FILTER_ERROR = ToDbErrorCode(9);
constexpr ErrorCode PARTITION_NOT_FOUND = ToDbErrorCode(10);
constexpr ErrorCode DB_PARTITION_NOT_FOUND = ToDbErrorCode(10);
constexpr ErrorCode DB_OUT_OF_STORAGE = ToDbErrorCode(11);
// knowhere error code
constexpr ErrorCode KNOWHERE_ERROR = ToKnowhereErrorCode(1);

View File

@ -314,7 +314,7 @@ TEST_F(DBTest, SEARCH_TEST) {
milvus::engine::CollectionIndex index;
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IDMAP;
index.extra_params_ = {{"nlist", 16384}};
// db_->CreateIndex(COLLECTION_NAME, index); // wait until build index finish
// db_->CreateIndex(dummy_context_, COLLECTION_NAME, index); // wait until build index finish
//
// {
// std::vector<std::string> tags;
@ -326,7 +326,7 @@ TEST_F(DBTest, SEARCH_TEST) {
//
// index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT;
// index.extra_params_ = {{"nlist", 16384}};
// db_->CreateIndex(COLLECTION_NAME, index); // wait until build index finish
// db_->CreateIndex(dummy_context_, COLLECTION_NAME, index); // wait until build index finish
//
// {
// std::vector<std::string> tags;
@ -338,7 +338,7 @@ TEST_F(DBTest, SEARCH_TEST) {
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8;
index.extra_params_ = {{"nlist", 16384}};
db_->CreateIndex(COLLECTION_NAME, index); // wait until build index finish
db_->CreateIndex(dummy_context_, COLLECTION_NAME, index); // wait until build index finish
{
std::vector<std::string> tags;
@ -350,7 +350,7 @@ TEST_F(DBTest, SEARCH_TEST) {
#ifdef MILVUS_GPU_VERSION
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8H;
db_->CreateIndex(COLLECTION_NAME, index); // wait until build index finish
db_->CreateIndex(dummy_context_, COLLECTION_NAME, index); // wait until build index finish
{
std::vector<std::string> tags;
@ -400,7 +400,7 @@ TEST_F(DBTest, SEARCH_TEST) {
#ifdef MILVUS_GPU_VERSION
// test FAISS_IVFSQ8H optimizer
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8H;
db_->CreateIndex(COLLECTION_NAME, index); // wait until build index finish
db_->CreateIndex(dummy_context_, COLLECTION_NAME, index); // wait until build index finish
std::vector<std::string> partition_tag;
milvus::engine::ResultIds result_ids;
milvus::engine::ResultDistances result_dists;
@ -452,7 +452,7 @@ TEST_F(DBTest, PRELOAD_TEST) {
milvus::engine::CollectionIndex index;
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IDMAP;
db_->CreateIndex(COLLECTION_NAME, index); // wait until build index finish
db_->CreateIndex(dummy_context_, COLLECTION_NAME, index); // wait until build index finish
int64_t prev_cache_usage = milvus::cache::CpuCacheMgr::GetInstance()->CacheUsage();
stat = db_->PreloadCollection(COLLECTION_NAME);
@ -551,7 +551,7 @@ TEST_F(DBTest, SHUTDOWN_TEST) {
ASSERT_FALSE(stat.ok());
milvus::engine::CollectionIndex index;
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_FALSE(stat.ok());
stat = db_->DescribeIndex(collection_info.collection_id_, index);
@ -713,28 +713,28 @@ TEST_F(DBTest, INDEX_TEST) {
milvus::engine::CollectionIndex index;
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8;
index.metric_type_ = (int)milvus::engine::MetricType::IP;
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT;
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
fiu_init(0);
FIU_ENABLE_FIU("SqliteMetaImpl.DescribeCollectionIndex.throw_exception");
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_FALSE(stat.ok());
fiu_disable("SqliteMetaImpl.DescribeCollectionIndex.throw_exception");
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_PQ;
FIU_ENABLE_FIU("DBImpl.UpdateCollectionIndexRecursively.fail_update_collection_index");
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_FALSE(stat.ok());
fiu_disable("DBImpl.UpdateCollectionIndexRecursively.fail_update_collection_index");
#ifdef MILVUS_GPU_VERSION
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8H;
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
#endif
@ -815,17 +815,17 @@ TEST_F(DBTest, PARTITION_TEST) {
milvus::engine::CollectionIndex index;
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT;
index.metric_type_ = (int)milvus::engine::MetricType::L2;
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
fiu_init(0);
FIU_ENABLE_FIU("DBImpl.WaitCollectionIndexRecursively.fail_build_collection_Index_for_partition");
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_FALSE(stat.ok());
fiu_disable("DBImpl.WaitCollectionIndexRecursively.fail_build_collection_Index_for_partition");
FIU_ENABLE_FIU("DBImpl.WaitCollectionIndexRecursively.not_empty_err_msg");
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_FALSE(stat.ok());
fiu_disable("DBImpl.WaitCollectionIndexRecursively.not_empty_err_msg");
@ -967,7 +967,7 @@ TEST_F(DBTest2, DELETE_TEST) {
milvus::engine::IDNumbers vector_ids;
stat = db_->InsertVectors(COLLECTION_NAME, "", xb);
milvus::engine::CollectionIndex index;
stat = db_->CreateIndex(COLLECTION_NAME, index);
stat = db_->CreateIndex(dummy_context_, COLLECTION_NAME, index);
// create partition, drop collection will drop partition recursively
stat = db_->CreatePartition(COLLECTION_NAME, "part0", "0");
@ -1341,7 +1341,7 @@ TEST_F(DBTest2, SEARCH_WITH_DIFFERENT_INDEX) {
milvus::engine::CollectionIndex index;
// index.metric_type_ = (int)milvus::engine::MetricType::IP;
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT;
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
stat = db_->PreloadCollection(collection_info.collection_id_);
@ -1366,7 +1366,7 @@ result_distances);
db_->DropIndex(collection_info.collection_id_);
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8;
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
stat = db_->PreloadCollection(collection_info.collection_id_);

View File

@ -362,7 +362,7 @@ TEST_F(MySqlDBTest, PARTITION_TEST) {
milvus::engine::CollectionIndex index;
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT;
index.metric_type_ = (int)milvus::engine::MetricType::L2;
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
uint64_t row_count = 0;

View File

@ -329,7 +329,7 @@ TEST_F(DeleteTest, delete_before_create_index) {
milvus::engine::CollectionIndex index;
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8;
index.extra_params_ = {{"nlist", 100}};
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
uint64_t row_count;
@ -399,7 +399,7 @@ TEST_F(DeleteTest, delete_with_index) {
milvus::engine::CollectionIndex index;
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8;
index.extra_params_ = {{"nlist", 100}};
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
// std::this_thread::sleep_for(std::chrono::seconds(3)); // ensure raw data write to disk
@ -485,7 +485,7 @@ TEST_F(DeleteTest, delete_multiple_times_with_index) {
milvus::engine::CollectionIndex index;
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT;
index.extra_params_ = {{"nlist", 1}};
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
int topk = 10, nprobe = 10;
@ -594,7 +594,7 @@ TEST_F(DeleteTest, delete_add_create_index) {
milvus::engine::CollectionIndex index;
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT;
index.extra_params_ = {{"nlist", 100}};
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
std::vector<milvus::engine::IDNumber> ids_to_delete;
@ -610,7 +610,7 @@ TEST_F(DeleteTest, delete_add_create_index) {
// stat = db_->Flush();
// ASSERT_TRUE(stat.ok());
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
uint64_t row_count;
@ -665,7 +665,7 @@ TEST_F(DeleteTest, delete_add_auto_flush) {
// ASSERT_TRUE(stat.ok());
// milvus::engine::CollectionIndex index;
// index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8;
// stat = db_->CreateIndex(collection_info.collection_id_, index);
// stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
// ASSERT_TRUE(stat.ok());
std::vector<milvus::engine::IDNumber> ids_to_delete;
@ -682,7 +682,7 @@ TEST_F(DeleteTest, delete_add_auto_flush) {
std::this_thread::sleep_for(std::chrono::seconds(2));
// stat = db_->Flush();
// ASSERT_TRUE(stat.ok());
// stat = db_->CreateIndex(collection_info.collection_id_, index);
// stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
// ASSERT_TRUE(stat.ok());
uint64_t row_count;
@ -814,7 +814,7 @@ TEST_F(CompactTest, compact_with_index) {
milvus::engine::CollectionIndex index;
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8;
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
stat = db_->Flush();

View File

@ -214,7 +214,7 @@ TEST_F(SearchByIdTest, WITH_INDEX_TEST) {
milvus::engine::CollectionIndex index;
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8;
index.extra_params_ = {{"nlist", 10}};
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
const int64_t topk = 10, nprobe = 10;
@ -398,7 +398,7 @@ TEST_F(GetVectorByIdTest, WITH_INDEX_TEST) {
milvus::engine::CollectionIndex index;
index.extra_params_ = {{"nlist", 10}};
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8;
stat = db_->CreateIndex(collection_info.collection_id_, index);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
const int64_t topk = 10, nprobe = 10;

View File

@ -17,7 +17,7 @@
#include "grpc-gen/gen-milvus/milvus.grpc.pb.h"
#define MILVUS_SDK_VERSION "0.8.0";
#define MILVUS_SDK_VERSION "0.9.0";
namespace milvus {

View File

@ -142,7 +142,7 @@ class TestCompactBase:
logging.getLogger().info(info["partitions"])
size_after = info["partitions"][0]["segments"][0]["data_size"]
logging.getLogger().info(size_after)
assert(size_before > size_after)
assert(size_before >= size_after)
@pytest.mark.timeout(COMPACT_TIMEOUT)
def test_add_vectors_delete_all_and_compact(self, connect, collection):
@ -280,7 +280,7 @@ class TestCompactBase:
status, info = connect.collection_info(collection)
assert status.OK()
size_after = info["partitions"][0]["segments"][0]["data_size"]
assert(size_before > size_after)
assert(size_before >= size_after)
status = connect.compact(collection)
assert status.OK()
# get collection info after compact twice
@ -521,7 +521,7 @@ class TestCompactJAC:
logging.getLogger().info(info["partitions"])
size_after = info["partitions"][0]["segments"][0]["data_size"]
logging.getLogger().info(size_after)
assert(size_before > size_after)
assert(size_before >= size_after)
@pytest.mark.timeout(COMPACT_TIMEOUT)
def test_add_vectors_delete_all_and_compact(self, connect, jac_collection):
@ -608,7 +608,7 @@ class TestCompactJAC:
status, info = connect.collection_info(jac_collection)
assert status.OK()
size_after = info["partitions"][0]["segments"][0]["data_size"]
assert(size_before > size_after)
assert(size_before >= size_after)
status = connect.compact(jac_collection)
assert status.OK()
# get collection info after compact twice
@ -802,7 +802,7 @@ class TestCompactIP:
logging.getLogger().info(info["partitions"])
size_after = info["partitions"][0]["segments"][0]["data_size"]
logging.getLogger().info(size_after)
assert(size_before > size_after)
assert(size_before >= size_after)
@pytest.mark.timeout(COMPACT_TIMEOUT)
def test_add_vectors_delete_all_and_compact(self, connect, ip_collection):
@ -891,7 +891,7 @@ class TestCompactIP:
status, info = connect.collection_info(ip_collection)
assert status.OK()
size_after = info["partitions"][0]["segments"][0]["data_size"]
assert(size_before > size_after)
assert(size_before >= size_after)
status = connect.compact(ip_collection)
assert status.OK()
status = connect.flush([ip_collection])