diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index ddee622669..44e3e7217a 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -104,13 +104,18 @@ Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& date //dates partly delete files of the table but currently we don't support ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id; - mem_mgr_->EraseMemVector(table_id); //not allow insert - meta_ptr_->DeleteTable(table_id); //soft delete table + if (dates.empty()) { + mem_mgr_->EraseMemVector(table_id); //not allow insert + meta_ptr_->DeleteTable(table_id); //soft delete table + + //scheduler will determine when to delete table files + TaskScheduler& scheduler = TaskScheduler::GetInstance(); + DeleteContextPtr context = std::make_shared(table_id, meta_ptr_); + scheduler.Schedule(context); + } else { + meta_ptr_->DropPartitionsByDates(table_id, dates); + } - //scheduler will determine when to delete table files - TaskScheduler& scheduler = TaskScheduler::GetInstance(); - DeleteContextPtr context = std::make_shared(table_id, meta_ptr_); - scheduler.Schedule(context); return Status::OK(); } diff --git a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp index 5225f2a97e..1e8706e98b 100644 --- a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp +++ b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp @@ -290,6 +290,15 @@ ClientTest::Test(const std::string& address, const std::string& port) { DoSearch(conn, search_record_array, "Search after build index finish"); } + { + Range rg; + rg.start_value = CurrentTmDate(-2); + rg.end_value = CurrentTmDate(-3); + + Status stat = conn->DeleteByRange(rg, TABLE_NAME); + std::cout << "DeleteByRange function call status: " << stat.ToString() << std::endl; + } + {//delete table Status stat = conn->DropTable(TABLE_NAME); std::cout << "DeleteTable function call status: " << stat.ToString() << std::endl; diff --git a/cpp/src/sdk/grpc/ClientProxy.cpp b/cpp/src/sdk/grpc/ClientProxy.cpp index 1107da6eff..2e902d8249 100644 --- a/cpp/src/sdk/grpc/ClientProxy.cpp +++ b/cpp/src/sdk/grpc/ClientProxy.cpp @@ -330,10 +330,18 @@ ClientProxy::ServerStatus() const { Status ClientProxy::DeleteByRange(milvus::Range &range, const std::string &table_name) { - + try { + ::milvus::grpc::DeleteByRangeParam delete_by_range_param; + delete_by_range_param.set_table_name(table_name); + delete_by_range_param.mutable_range()->set_start_value(range.start_value); + delete_by_range_param.mutable_range()->set_end_value(range.end_value); + return client_ptr_->DeleteByRange(delete_by_range_param); + } catch (std::exception &ex) { + return Status(StatusCode::UnknownError, "fail to delete by range: " + std::string(ex.what())); + } } -Status + Status ClientProxy::PreloadTable(const std::string &table_name) const { try { ::milvus::grpc::TableName grpc_table_name; @@ -341,13 +349,12 @@ ClientProxy::PreloadTable(const std::string &table_name) const { Status status = client_ptr_->PreloadTable(grpc_table_name); return status; } catch (std::exception &ex) { - return Status(StatusCode::UnknownError, "fail to show tables: " + std::string(ex.what())); + return Status(StatusCode::UnknownError, "fail to preload tables: " + std::string(ex.what())); } } IndexParam ClientProxy::DescribeIndex(const std::string &table_name) const { - } Status diff --git a/cpp/src/sdk/grpc/GrpcClient.cpp b/cpp/src/sdk/grpc/GrpcClient.cpp index 00894ea529..210c3fdf61 100644 --- a/cpp/src/sdk/grpc/GrpcClient.cpp +++ b/cpp/src/sdk/grpc/GrpcClient.cpp @@ -264,6 +264,24 @@ GrpcClient::PreloadTable(milvus::grpc::TableName &table_name) { return Status::OK(); } +Status +GrpcClient::DeleteByRange(grpc::DeleteByRangeParam &delete_by_range_param) { + ClientContext context; + ::milvus::grpc::Status response; + ::grpc::Status grpc_status = stub_->DeleteByRange(&context, delete_by_range_param, &response); + + if (!grpc_status.ok()) { + std::cerr << "DeleteByRange gRPC failed!" << std::endl; + return Status(StatusCode::RPCFailed, grpc_status.error_message()); + } + + if (response.error_code() != grpc::SUCCESS) { + std::cerr << response.reason() << std::endl; + return Status(StatusCode::ServerFailed, response.reason()); + } + return Status::OK(); +} + Status GrpcClient::Disconnect() { stub_.release(); diff --git a/cpp/src/sdk/interface/ConnectionImpl.cpp b/cpp/src/sdk/interface/ConnectionImpl.cpp index b496d1c104..f0875638fd 100644 --- a/cpp/src/sdk/interface/ConnectionImpl.cpp +++ b/cpp/src/sdk/interface/ConnectionImpl.cpp @@ -117,7 +117,7 @@ ConnectionImpl::ServerStatus() const { Status ConnectionImpl::DeleteByRange(Range &range, const std::string &table_name) { - + return client_proxy_->DeleteByRange(range, table_name); } Status diff --git a/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp b/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp index 584023e4fc..cb705aed80 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -168,7 +168,12 @@ GrpcRequestHandler::Cmd(::grpc::ServerContext *context, GrpcRequestHandler::DeleteByRange(::grpc::ServerContext *context, const ::milvus::grpc::DeleteByRangeParam *request, ::milvus::grpc::Status *response) { - + BaseTaskPtr task_ptr = DeleteByRangeTask::Create(*request); + ::milvus::grpc::Status grpc_status; + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); + response->set_error_code(grpc_status.error_code()); + response->set_reason(grpc_status.reason()); + return ::grpc::Status::OK; } ::grpc::Status diff --git a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp index 8934045579..20f74ca3c8 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp @@ -721,6 +721,73 @@ CmdTask::OnExecute() { return SERVER_SUCCESS; } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +DeleteByRangeTask::DeleteByRangeTask(const ::milvus::grpc::DeleteByRangeParam &delete_by_range_param) + : GrpcBaseTask(DDL_DML_TASK_GROUP), + delete_by_range_param_(delete_by_range_param){ +} + +BaseTaskPtr +DeleteByRangeTask::Create(const ::milvus::grpc::DeleteByRangeParam &delete_by_range_param) { + return std::shared_ptr(new DeleteByRangeTask(delete_by_range_param)); +} + +ServerError +DeleteByRangeTask::OnExecute() { + try { + TimeRecorder rc("DeleteByRangeTask"); + + //step 1: check arguments + std::string table_name = delete_by_range_param_.table_name(); + ServerError res = ValidationUtil::ValidateTableName(table_name); + if (res != SERVER_SUCCESS) { + return SetError(res, "Invalid table name: " + table_name); + } + + //step 2: check table existence + engine::meta::TableSchema table_info; + table_info.table_id_ = table_name; + engine::Status stat = DBWrapper::DB()->DescribeTable(table_info); + if (!stat.ok()) { + if (stat.IsNotFound()) { + return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name + " not exists"); + } else { + return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); + } + } + + rc.ElapseFromBegin("check validation"); + + //step 3: check date range, and convert to db dates + std::vector dates; + ServerError error_code = SERVER_SUCCESS; + std::string error_msg; + + std::vector<::milvus::grpc::Range> range_array; + range_array.emplace_back(delete_by_range_param_.range()); + ConvertTimeRangeToDBDates(range_array, dates, error_code, error_msg); + if (error_code != SERVER_SUCCESS) { + return SetError(error_code, error_msg); + } + +#ifdef MILVUS_ENABLE_PROFILING + std::string fname = "/tmp/search_nq_" + std::to_string(this->record_array_.size()) + + "_top_" + std::to_string(this->top_k_) + "_" + + GetCurrTimeStr() + ".profiling"; + ProfilerStart(fname.c_str()); +#endif + engine::Status status = DBWrapper::DB()->DeleteTable(table_name, dates); + if (!status.ok()) { + return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); + } + + } catch (std::exception &ex) { + return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); + } + + return SERVER_SUCCESS; +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// PreloadTableTask::PreloadTableTask(const std::string &table_name) : GrpcBaseTask(DDL_DML_TASK_GROUP), diff --git a/cpp/unittest/db/db_tests.cpp b/cpp/unittest/db/db_tests.cpp index 8b36d2efbd..ad8a196494 100644 --- a/cpp/unittest/db/db_tests.cpp +++ b/cpp/unittest/db/db_tests.cpp @@ -9,6 +9,7 @@ #include "db/meta/MetaConsts.h" #include "db/Factories.h" #include "cache/CpuCacheMgr.h" +#include "utils/CommonUtil.h" #include #include @@ -26,6 +27,8 @@ namespace { static constexpr int64_t TABLE_DIM = 256; static constexpr int64_t VECTOR_COUNT = 250000; static constexpr int64_t INSERT_LOOP = 10000; + static constexpr int64_t SECONDS_EACH_HOUR = 3600; + static constexpr int64_t DAY_SECONDS = 24 * 60 * 60; engine::meta::TableSchema BuildTableSchema() { engine::meta::TableSchema table_info; @@ -45,6 +48,52 @@ namespace { } } + std::string CurrentTmDate(int64_t offset_day = 0) { + time_t tt; + time( &tt ); + tt = tt + 8*SECONDS_EACH_HOUR; + tt = tt + 24*SECONDS_EACH_HOUR*offset_day; + tm* t= gmtime( &tt ); + + std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1) + + "-" + std::to_string(t->tm_mday); + + return str; + } + + void + ConvertTimeRangeToDBDates(const std::string &start_value, + const std::string &end_value, + std::vector &dates) { + dates.clear(); + + time_t tt_start, tt_end; + tm tm_start, tm_end; + if (!zilliz::milvus::server::CommonUtil::TimeStrToTime(start_value, tt_start, tm_start)) { + return; + } + + if (!zilliz::milvus::server::CommonUtil::TimeStrToTime(end_value, tt_end, tm_end)) { + return; + } + + long days = (tt_end > tt_start) ? (tt_end - tt_start) / DAY_SECONDS : (tt_start - tt_end) / + DAY_SECONDS; + if (days == 0) { + return; + } + + for (long i = 0; i < days; i++) { + time_t tt_day = tt_start + DAY_SECONDS * i; + tm tm_day; + zilliz::milvus::server::CommonUtil::ConvertTime(tt_day, tm_day); + + long date = tm_day.tm_year * 10000 + tm_day.tm_mon * 100 + + tm_day.tm_mday;//according to db logic + dates.push_back(date); + } + } + } TEST_F(DBTest, CONFIG_TEST) { @@ -307,8 +356,6 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) { }; TEST_F(DBTest2, DELETE_TEST) { - - engine::meta::TableSchema table_info = BuildTableSchema(); engine::Status stat = db_->CreateTable(table_info); @@ -343,4 +390,46 @@ TEST_F(DBTest2, DELETE_TEST) { db_->HasTable(TABLE_NAME, has_table); ASSERT_FALSE(has_table); -}; \ No newline at end of file +}; + +TEST_F(DBTest2, DELETE_BY_RANGE_TEST) { + auto options = engine::OptionsFactory::Build(); + options.meta.path = "/tmp/milvus_test"; + options.meta.backend_uri = "sqlite://:@:/"; + auto db_ = engine::DBFactory::Build(options); + + engine::meta::TableSchema table_info = BuildTableSchema(); + engine::Status stat = db_->CreateTable(table_info); + + engine::meta::TableSchema table_info_get; + table_info_get.table_id_ = TABLE_NAME; + stat = db_->DescribeTable(table_info_get); + ASSERT_STATS(stat); + + bool has_table = false; + db_->HasTable(TABLE_NAME, has_table); + ASSERT_TRUE(has_table); + + engine::IDNumbers vector_ids; + + uint64_t size; + db_->Size(size); + + int64_t nb = INSERT_LOOP; + std::vector xb; + BuildVectors(nb, xb); + + int loop = 20; + for (auto i=0; iInsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + + std::vector dates; + engine::meta::DateT date; + std::string start_value = CurrentTmDate(-3); + std::string end_value = CurrentTmDate(-2); + ConvertTimeRangeToDBDates(start_value, end_value, dates); + + db_->DeleteTable(TABLE_NAME, dates); +} \ No newline at end of file