mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: support TTL expiration with queries returning no results (#42086)
support TTL expiration with queries returning no results issue:https://github.com/milvus-io/milvus/issues/41959 Signed-off-by: Xianhui.Lin <xianhui.lin@zilliz.com>
This commit is contained in:
parent
bd31a5db7a
commit
6a0e182e13
@ -176,6 +176,7 @@ class QueryContext : public Context {
|
||||
const milvus::segcore::SegmentInternalInterface* segment,
|
||||
int64_t active_count,
|
||||
milvus::Timestamp timestamp,
|
||||
milvus::Timestamp collection_ttl = 0,
|
||||
int32_t consistency_level = 0,
|
||||
std::shared_ptr<QueryConfig> query_config =
|
||||
std::make_shared<QueryConfig>(),
|
||||
@ -187,6 +188,7 @@ class QueryContext : public Context {
|
||||
segment_(segment),
|
||||
active_count_(active_count),
|
||||
query_timestamp_(timestamp),
|
||||
collection_ttl_timestamp_(collection_ttl),
|
||||
query_config_(query_config),
|
||||
executor_(executor),
|
||||
consistency_level_(consistency_level) {
|
||||
@ -222,6 +224,11 @@ class QueryContext : public Context {
|
||||
return query_timestamp_;
|
||||
}
|
||||
|
||||
milvus::Timestamp
|
||||
get_collection_ttl() {
|
||||
return collection_ttl_timestamp_;
|
||||
}
|
||||
|
||||
int64_t
|
||||
get_active_count() {
|
||||
return active_count_;
|
||||
@ -290,7 +297,7 @@ class QueryContext : public Context {
|
||||
int64_t active_count_;
|
||||
// timestamp this query generate
|
||||
milvus::Timestamp query_timestamp_;
|
||||
|
||||
milvus::Timestamp collection_ttl_timestamp_;
|
||||
// used for vector search
|
||||
milvus::SearchInfo search_info_;
|
||||
const query::PlaceholderGroup* placeholder_group_;
|
||||
|
||||
@ -33,6 +33,7 @@ PhyMvccNode::PhyMvccNode(int32_t operator_id,
|
||||
query_timestamp_ = query_context->get_query_timestamp();
|
||||
active_count_ = query_context->get_active_count();
|
||||
is_source_node_ = mvcc_node->sources().size() == 0;
|
||||
collection_ttl_timestamp_ = query_context->get_collection_ttl();
|
||||
}
|
||||
|
||||
void
|
||||
@ -63,7 +64,8 @@ PhyMvccNode::GetOutput() {
|
||||
|
||||
TargetBitmapView data(col_input->GetRawData(), col_input->size());
|
||||
// need to expose null?
|
||||
segment_->mask_with_timestamps(data, query_timestamp_);
|
||||
segment_->mask_with_timestamps(
|
||||
data, query_timestamp_, collection_ttl_timestamp_);
|
||||
segment_->mask_with_delete(data, active_count_, query_timestamp_);
|
||||
is_finished_ = true;
|
||||
|
||||
|
||||
@ -72,6 +72,7 @@ class PhyMvccNode : public Operator {
|
||||
int64_t active_count_;
|
||||
bool is_finished_{false};
|
||||
bool is_source_node_{false};
|
||||
milvus::Timestamp collection_ttl_timestamp_;
|
||||
};
|
||||
|
||||
} // namespace exec
|
||||
|
||||
@ -34,11 +34,13 @@ class ExecPlanNodeVisitor : PlanNodeVisitor {
|
||||
ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
|
||||
Timestamp timestamp,
|
||||
const PlaceholderGroup& placeholder_group,
|
||||
int32_t consystency_level)
|
||||
int32_t consystency_level,
|
||||
Timestamp collection_ttl)
|
||||
: segment_(segment),
|
||||
timestamp_(timestamp),
|
||||
placeholder_group_(placeholder_group),
|
||||
consystency_level_(consystency_level) {
|
||||
consystency_level_(consystency_level),
|
||||
collection_ttl_timestamp_(collection_ttl_timestamp_) {
|
||||
}
|
||||
|
||||
SearchResult
|
||||
@ -59,6 +61,7 @@ class ExecPlanNodeVisitor : PlanNodeVisitor {
|
||||
private:
|
||||
const segcore::SegmentInterface& segment_;
|
||||
Timestamp timestamp_;
|
||||
Timestamp collection_ttl_timestamp_;
|
||||
const PlaceholderGroup& placeholder_group_;
|
||||
|
||||
SearchResultOpt search_result_opt_;
|
||||
@ -134,6 +137,7 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
|
||||
segment,
|
||||
active_count,
|
||||
timestamp_,
|
||||
collection_ttl_timestamp_,
|
||||
consystency_level_);
|
||||
query_context->set_search_info(node.search_info_);
|
||||
query_context->set_placeholder_group(placeholder_group_);
|
||||
@ -189,6 +193,7 @@ ExecPlanNodeVisitor::visit(RetrievePlanNode& node) {
|
||||
segment,
|
||||
active_count,
|
||||
timestamp_,
|
||||
collection_ttl_timestamp_,
|
||||
consystency_level_);
|
||||
|
||||
// Do task execution
|
||||
|
||||
@ -47,18 +47,22 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
|
||||
ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
|
||||
Timestamp timestamp,
|
||||
const PlaceholderGroup* placeholder_group,
|
||||
int32_t consystency_level = 0)
|
||||
int32_t consystency_level = 0,
|
||||
Timestamp collection_ttl = 0)
|
||||
: segment_(segment),
|
||||
timestamp_(timestamp),
|
||||
collection_ttl_timestamp_(collection_ttl),
|
||||
placeholder_group_(placeholder_group),
|
||||
consystency_level_(consystency_level) {
|
||||
}
|
||||
|
||||
ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
|
||||
Timestamp timestamp,
|
||||
int32_t consystency_level = 0)
|
||||
int32_t consystency_level = 0,
|
||||
Timestamp collection_ttl = 0)
|
||||
: segment_(segment),
|
||||
timestamp_(timestamp),
|
||||
collection_ttl_timestamp_(collection_ttl),
|
||||
consystency_level_(consystency_level) {
|
||||
placeholder_group_ = nullptr;
|
||||
}
|
||||
@ -108,6 +112,7 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
|
||||
private:
|
||||
const segcore::SegmentInterface& segment_;
|
||||
Timestamp timestamp_;
|
||||
Timestamp collection_ttl_timestamp_;
|
||||
const PlaceholderGroup* placeholder_group_;
|
||||
|
||||
SearchResultOpt search_result_opt_;
|
||||
|
||||
@ -1610,13 +1610,27 @@ ChunkedSegmentSealedImpl::get_active_count(Timestamp ts) const {
|
||||
|
||||
void
|
||||
ChunkedSegmentSealedImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
|
||||
Timestamp timestamp) const {
|
||||
Timestamp timestamp,
|
||||
Timestamp collection_ttl) const {
|
||||
// TODO change the
|
||||
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
|
||||
"num chunk not equal to 1 for sealed segment");
|
||||
auto timestamps_data =
|
||||
(const milvus::Timestamp*)insert_record_.timestamps_.get_chunk_data(0);
|
||||
auto timestamps_data_size = insert_record_.timestamps_.get_chunk_size(0);
|
||||
if (collection_ttl > 0) {
|
||||
auto range =
|
||||
insert_record_.timestamp_index_.get_active_range(collection_ttl);
|
||||
if (range.first == range.second &&
|
||||
range.first == timestamps_data_size) {
|
||||
bitset_chunk.set();
|
||||
return;
|
||||
} else {
|
||||
auto ttl_mask = TimestampIndex::GenerateTTLBitset(
|
||||
timestamps_data, timestamps_data_size, collection_ttl, range);
|
||||
bitset_chunk |= ttl_mask;
|
||||
}
|
||||
}
|
||||
|
||||
AssertInfo(timestamps_data_size == get_row_count(),
|
||||
fmt::format("Timestamp size not equal to row count: {}, {}",
|
||||
|
||||
@ -343,7 +343,8 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
|
||||
|
||||
void
|
||||
mask_with_timestamps(BitsetTypeView& bitset_chunk,
|
||||
Timestamp timestamp) const override;
|
||||
Timestamp timestamp,
|
||||
Timestamp collection_ttl) const override;
|
||||
|
||||
void
|
||||
vector_search(SearchInfo& search_info,
|
||||
|
||||
@ -1074,8 +1074,22 @@ SegmentGrowingImpl::get_active_count(Timestamp ts) const {
|
||||
|
||||
void
|
||||
SegmentGrowingImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
|
||||
Timestamp timestamp) const {
|
||||
// DO NOTHING
|
||||
Timestamp timestamp,
|
||||
Timestamp collection_ttl) const {
|
||||
if (collection_ttl > 0) {
|
||||
auto& timestamps = get_timestamps();
|
||||
auto size = bitset_chunk.size();
|
||||
if (timestamps[size - 1] <= collection_ttl) {
|
||||
bitset_chunk.set();
|
||||
return;
|
||||
}
|
||||
auto pilot = upper_bound(timestamps, 0, size, collection_ttl);
|
||||
BitsetType bitset;
|
||||
bitset.reserve(size);
|
||||
bitset.resize(pilot, true);
|
||||
bitset.resize(size, false);
|
||||
bitset_chunk |= bitset;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@ -307,7 +307,8 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
|
||||
void
|
||||
mask_with_timestamps(BitsetTypeView& bitset_chunk,
|
||||
Timestamp timestamp) const override;
|
||||
Timestamp timestamp,
|
||||
Timestamp ttl = 0) const override;
|
||||
|
||||
void
|
||||
vector_search(SearchInfo& search_info,
|
||||
|
||||
@ -84,12 +84,13 @@ SegmentInternalInterface::Search(
|
||||
const query::Plan* plan,
|
||||
const query::PlaceholderGroup* placeholder_group,
|
||||
Timestamp timestamp,
|
||||
int32_t consistency_level) const {
|
||||
int32_t consistency_level,
|
||||
Timestamp collection_ttl) const {
|
||||
std::shared_lock lck(mutex_);
|
||||
milvus::tracer::AddEvent("obtained_segment_lock_mutex");
|
||||
check_search(plan);
|
||||
query::ExecPlanNodeVisitor visitor(
|
||||
*this, timestamp, placeholder_group, consistency_level);
|
||||
*this, timestamp, placeholder_group, consistency_level, collection_ttl);
|
||||
auto results = std::make_unique<SearchResult>();
|
||||
*results = visitor.get_moved_result(*plan->plan_node_);
|
||||
results->segment_ = (void*)this;
|
||||
@ -102,11 +103,13 @@ SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,
|
||||
Timestamp timestamp,
|
||||
int64_t limit_size,
|
||||
bool ignore_non_pk,
|
||||
int32_t consistency_level) const {
|
||||
int32_t consistency_level,
|
||||
Timestamp collection_ttl) const {
|
||||
std::shared_lock lck(mutex_);
|
||||
tracer::AutoSpan span("Retrieve", tracer::GetRootSpan());
|
||||
auto results = std::make_unique<proto::segcore::RetrieveResults>();
|
||||
query::ExecPlanNodeVisitor visitor(*this, timestamp, consistency_level);
|
||||
query::ExecPlanNodeVisitor visitor(
|
||||
*this, timestamp, consistency_level, collection_ttl);
|
||||
auto retrieve_results = visitor.get_retrieve_result(*plan->plan_node_);
|
||||
retrieve_results.segment_ = (void*)this;
|
||||
results->set_has_more_result(retrieve_results.has_more_result);
|
||||
|
||||
@ -66,7 +66,8 @@ class SegmentInterface {
|
||||
Search(const query::Plan* Plan,
|
||||
const query::PlaceholderGroup* placeholder_group,
|
||||
Timestamp timestamp,
|
||||
int32_t consistency_level = 0) const = 0;
|
||||
int32_t consistency_level = 0,
|
||||
Timestamp collection_ttl = 0) const = 0;
|
||||
|
||||
virtual std::unique_ptr<proto::segcore::RetrieveResults>
|
||||
Retrieve(tracer::TraceContext* trace_ctx,
|
||||
@ -74,7 +75,8 @@ class SegmentInterface {
|
||||
Timestamp timestamp,
|
||||
int64_t limit_size,
|
||||
bool ignore_non_pk,
|
||||
int32_t consistency_level = 0) const = 0;
|
||||
int32_t consistency_level = 0,
|
||||
Timestamp collection_ttl = 0) const = 0;
|
||||
|
||||
virtual std::unique_ptr<proto::segcore::RetrieveResults>
|
||||
Retrieve(tracer::TraceContext* trace_ctx,
|
||||
@ -266,7 +268,8 @@ class SegmentInternalInterface : public SegmentInterface {
|
||||
Search(const query::Plan* Plan,
|
||||
const query::PlaceholderGroup* placeholder_group,
|
||||
Timestamp timestamp,
|
||||
int32_t consistency_level = 0) const override;
|
||||
int32_t consistency_level = 0,
|
||||
Timestamp collection_ttl = 0) const override;
|
||||
|
||||
void
|
||||
FillPrimaryKeys(const query::Plan* plan,
|
||||
@ -282,7 +285,8 @@ class SegmentInternalInterface : public SegmentInterface {
|
||||
Timestamp timestamp,
|
||||
int64_t limit_size,
|
||||
bool ignore_non_pk,
|
||||
int32_t consistency_level = 0) const override;
|
||||
int32_t consistency_level = 0,
|
||||
Timestamp collection_ttl = 0) const override;
|
||||
|
||||
std::unique_ptr<proto::segcore::RetrieveResults>
|
||||
Retrieve(tracer::TraceContext* trace_ctx,
|
||||
@ -367,7 +371,8 @@ class SegmentInternalInterface : public SegmentInterface {
|
||||
// bitset 1 means not hit. 0 means hit.
|
||||
virtual void
|
||||
mask_with_timestamps(BitsetTypeView& bitset_chunk,
|
||||
Timestamp timestamp) const = 0;
|
||||
Timestamp timestamp,
|
||||
Timestamp collection_ttl) const = 0;
|
||||
|
||||
// count of chunks
|
||||
virtual int64_t
|
||||
|
||||
@ -84,6 +84,25 @@ TimestampIndex::GenerateBitset(Timestamp query_timestamp,
|
||||
return bitset;
|
||||
}
|
||||
|
||||
BitsetType
|
||||
TimestampIndex::GenerateTTLBitset(const Timestamp* timestamps,
|
||||
int64_t size,
|
||||
Timestamp expire_ts,
|
||||
std::pair<int64_t, int64_t> active_range) {
|
||||
auto beg = active_range.first;
|
||||
auto end = active_range.second;
|
||||
|
||||
BitsetType bitset;
|
||||
bitset.reserve(size);
|
||||
bitset.resize(beg, true);
|
||||
bitset.resize(size, false);
|
||||
|
||||
for (int64_t i = beg; i < end; ++i) {
|
||||
bitset[i] = timestamps[i] <= expire_ts;
|
||||
}
|
||||
return bitset;
|
||||
}
|
||||
|
||||
std::vector<int64_t>
|
||||
GenerateFakeSlices(const Timestamp* timestamps,
|
||||
int64_t size,
|
||||
|
||||
@ -16,7 +16,6 @@
|
||||
#include <utility>
|
||||
|
||||
#include "common/Schema.h"
|
||||
|
||||
namespace milvus::segcore {
|
||||
|
||||
class TimestampIndex {
|
||||
@ -40,6 +39,12 @@ class TimestampIndex {
|
||||
const Timestamp* timestamps,
|
||||
int64_t size);
|
||||
|
||||
static BitsetType
|
||||
GenerateTTLBitset(const Timestamp* timestamps,
|
||||
int64_t size,
|
||||
Timestamp expire_ts,
|
||||
std::pair<int64_t, int64_t> active_range);
|
||||
|
||||
private:
|
||||
// numSlice
|
||||
std::vector<int64_t> lengths_;
|
||||
@ -56,5 +61,4 @@ std::vector<int64_t>
|
||||
GenerateFakeSlices(const Timestamp* timestamps,
|
||||
int64_t size,
|
||||
int min_slice_length = 1);
|
||||
|
||||
} // namespace milvus::segcore
|
||||
|
||||
@ -104,7 +104,8 @@ AsyncSearch(CTraceContext c_trace,
|
||||
CSearchPlan c_plan,
|
||||
CPlaceholderGroup c_placeholder_group,
|
||||
uint64_t timestamp,
|
||||
int32_t consistency_level) {
|
||||
int32_t consistency_level,
|
||||
uint64_t collection_ttl) {
|
||||
auto segment = (milvus::segcore::SegmentInterface*)c_segment;
|
||||
auto plan = (milvus::query::Plan*)c_plan;
|
||||
auto phg_ptr = reinterpret_cast<const milvus::query::PlaceholderGroup*>(
|
||||
@ -113,8 +114,13 @@ AsyncSearch(CTraceContext c_trace,
|
||||
auto future = milvus::futures::Future<milvus::SearchResult>::async(
|
||||
milvus::futures::getGlobalCPUExecutor(),
|
||||
milvus::futures::ExecutePriority::HIGH,
|
||||
[c_trace, segment, plan, phg_ptr, timestamp, consistency_level](
|
||||
milvus::futures::CancellationToken cancel_token) {
|
||||
[c_trace,
|
||||
segment,
|
||||
plan,
|
||||
phg_ptr,
|
||||
timestamp,
|
||||
consistency_level,
|
||||
collection_ttl](milvus::futures::CancellationToken cancel_token) {
|
||||
// save trace context into search_info
|
||||
auto& trace_ctx = plan->plan_node_->search_info_.trace_ctx_;
|
||||
trace_ctx.traceID = c_trace.traceID;
|
||||
@ -126,8 +132,8 @@ AsyncSearch(CTraceContext c_trace,
|
||||
|
||||
segment->LazyCheckSchema(plan->schema_);
|
||||
|
||||
auto search_result =
|
||||
segment->Search(plan, phg_ptr, timestamp, consistency_level);
|
||||
auto search_result = segment->Search(
|
||||
plan, phg_ptr, timestamp, consistency_level, collection_ttl);
|
||||
if (!milvus::PositivelyRelated(
|
||||
plan->plan_node_->search_info_.metric_type_)) {
|
||||
for (auto& dis : search_result->distances_) {
|
||||
@ -176,10 +182,10 @@ AsyncRetrieve(CTraceContext c_trace,
|
||||
uint64_t timestamp,
|
||||
int64_t limit_size,
|
||||
bool ignore_non_pk,
|
||||
int32_t consistency_level) {
|
||||
int32_t consistency_level,
|
||||
uint64_t collection_ttl) {
|
||||
auto segment = static_cast<milvus::segcore::SegmentInterface*>(c_segment);
|
||||
auto plan = static_cast<const milvus::query::RetrievePlan*>(c_plan);
|
||||
|
||||
auto future = milvus::futures::Future<CRetrieveResult>::async(
|
||||
milvus::futures::getGlobalCPUExecutor(),
|
||||
milvus::futures::ExecutePriority::HIGH,
|
||||
@ -189,7 +195,8 @@ AsyncRetrieve(CTraceContext c_trace,
|
||||
timestamp,
|
||||
limit_size,
|
||||
ignore_non_pk,
|
||||
consistency_level](milvus::futures::CancellationToken cancel_token) {
|
||||
consistency_level,
|
||||
collection_ttl](milvus::futures::CancellationToken cancel_token) {
|
||||
auto trace_ctx = milvus::tracer::TraceContext{
|
||||
c_trace.traceID, c_trace.spanID, c_trace.traceFlags};
|
||||
milvus::tracer::AutoSpan span("SegCoreRetrieve", &trace_ctx, true);
|
||||
@ -201,7 +208,8 @@ AsyncRetrieve(CTraceContext c_trace,
|
||||
timestamp,
|
||||
limit_size,
|
||||
ignore_non_pk,
|
||||
consistency_level);
|
||||
consistency_level,
|
||||
collection_ttl);
|
||||
|
||||
return CreateLeakedCRetrieveResultFromProto(
|
||||
std::move(retrieve_result));
|
||||
|
||||
@ -51,7 +51,8 @@ AsyncSearch(CTraceContext c_trace,
|
||||
CSearchPlan c_plan,
|
||||
CPlaceholderGroup c_placeholder_group,
|
||||
uint64_t timestamp,
|
||||
int32_t consistency_level);
|
||||
int32_t consistency_level,
|
||||
uint64_t collection_ttl);
|
||||
|
||||
void
|
||||
DeleteRetrieveResult(CRetrieveResult* retrieve_result);
|
||||
@ -63,7 +64,8 @@ AsyncRetrieve(CTraceContext c_trace,
|
||||
uint64_t timestamp,
|
||||
int64_t limit_size,
|
||||
bool ignore_non_pk,
|
||||
int32_t consistency_level);
|
||||
int32_t consistency_level,
|
||||
uint64_t collection_ttl);
|
||||
|
||||
CFuture* // Future<CRetrieveResult>
|
||||
AsyncRetrieveByOffsets(CTraceContext c_trace,
|
||||
|
||||
@ -72,7 +72,7 @@ CRetrieve(CSegmentInterface c_segment,
|
||||
uint64_t timestamp,
|
||||
CRetrieveResult** result) {
|
||||
auto future = AsyncRetrieve(
|
||||
{}, c_segment, c_plan, timestamp, DEFAULT_MAX_OUTPUT_SIZE, false, 0);
|
||||
{}, c_segment, c_plan, timestamp, DEFAULT_MAX_OUTPUT_SIZE, false, 0, 0);
|
||||
auto futurePtr = static_cast<milvus::futures::IFuture*>(
|
||||
static_cast<void*>(static_cast<CFuture*>(future)));
|
||||
|
||||
|
||||
@ -143,6 +143,7 @@ TEST_P(TaskTest, CallExprEmpty) {
|
||||
100000,
|
||||
MAX_TIMESTAMP,
|
||||
0,
|
||||
0,
|
||||
std::make_shared<milvus::exec::QueryConfig>(
|
||||
std::unordered_map<std::string, std::string>{}));
|
||||
|
||||
@ -181,6 +182,7 @@ TEST_P(TaskTest, UnaryExpr) {
|
||||
100000,
|
||||
MAX_TIMESTAMP,
|
||||
0,
|
||||
0,
|
||||
std::make_shared<milvus::exec::QueryConfig>(
|
||||
std::unordered_map<std::string, std::string>{}));
|
||||
|
||||
@ -228,6 +230,7 @@ TEST_P(TaskTest, LogicalExpr) {
|
||||
100000,
|
||||
MAX_TIMESTAMP,
|
||||
0,
|
||||
0,
|
||||
std::make_shared<milvus::exec::QueryConfig>(
|
||||
std::unordered_map<std::string, std::string>{}));
|
||||
|
||||
|
||||
@ -148,6 +148,8 @@ TEST(Sealed, without_predicate) {
|
||||
|
||||
sr = sealed_segment->Search(plan.get(), ph_group.get(), 0);
|
||||
EXPECT_EQ(sr->get_total_result_count(), 0);
|
||||
sr = sealed_segment->Search(plan.get(), ph_group.get(), timestamp, 0, 100);
|
||||
EXPECT_EQ(sr->get_total_result_count(), 0);
|
||||
}
|
||||
|
||||
TEST(Sealed, without_search_ef_less_than_limit) {
|
||||
@ -982,7 +984,7 @@ TEST(Sealed, LoadScalarIndex) {
|
||||
nothing_index.cache_index = CreateTestCacheIndex("test", std::move(temp2));
|
||||
segment->LoadIndex(nothing_index);
|
||||
|
||||
auto sr = segment->Search(plan.get(), ph_group.get(), timestamp);
|
||||
auto sr = segment->Search(plan.get(), ph_group.get(), timestamp, 0, 100000);
|
||||
auto json = SearchResultToJson(*sr);
|
||||
std::cout << json.dump(1);
|
||||
}
|
||||
|
||||
@ -186,8 +186,8 @@ CSearch(CSegmentInterface c_segment,
|
||||
CPlaceholderGroup c_placeholder_group,
|
||||
uint64_t timestamp,
|
||||
CSearchResult* result) {
|
||||
auto future =
|
||||
AsyncSearch({}, c_segment, c_plan, c_placeholder_group, timestamp, 0);
|
||||
auto future = AsyncSearch(
|
||||
{}, c_segment, c_plan, c_placeholder_group, timestamp, 0, 0);
|
||||
auto futurePtr = static_cast<milvus::futures::IFuture*>(
|
||||
static_cast<void*>(static_cast<CFuture*>(future)));
|
||||
|
||||
|
||||
@ -1240,7 +1240,7 @@ func GenSimpleRetrievePlan(collection *segcore.CCollection) (*segcore.RetrievePl
|
||||
return nil, err
|
||||
}
|
||||
|
||||
plan, err2 := segcore.NewRetrievePlan(collection, planBytes, timestamp, 100, 0)
|
||||
plan, err2 := segcore.NewRetrievePlan(collection, planBytes, timestamp, 100, 0, 0)
|
||||
return plan, err2
|
||||
}
|
||||
|
||||
|
||||
@ -104,6 +104,7 @@ type collectionInfo struct {
|
||||
partitionKeyIsolation bool
|
||||
replicateID string
|
||||
updateTimestamp uint64
|
||||
collectionTTL uint64
|
||||
}
|
||||
|
||||
type databaseInfo struct {
|
||||
@ -474,6 +475,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
|
||||
consistencyLevel: collection.ConsistencyLevel,
|
||||
partitionKeyIsolation: isolation,
|
||||
updateTimestamp: collection.UpdateTimestamp,
|
||||
collectionTTL: getCollectionTTL(schemaInfo.CollectionSchema.GetProperties()),
|
||||
}, nil
|
||||
}
|
||||
_, dbOk := m.collInfo[database]
|
||||
@ -492,6 +494,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
|
||||
partitionKeyIsolation: isolation,
|
||||
replicateID: replicateID,
|
||||
updateTimestamp: collection.UpdateTimestamp,
|
||||
collectionTTL: getCollectionTTL(schemaInfo.CollectionSchema.GetProperties()),
|
||||
}
|
||||
|
||||
log.Ctx(ctx).Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName),
|
||||
|
||||
@ -492,6 +492,11 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
|
||||
}
|
||||
t.RetrieveRequest.IsIterator = queryParams.isIterator
|
||||
|
||||
if collectionInfo.collectionTTL != 0 {
|
||||
physicalTime, _ := tsoutil.ParseTS(guaranteeTs)
|
||||
expireTime := physicalTime.Add(-time.Duration(collectionInfo.collectionTTL))
|
||||
t.CollectionTtlTimestamps = tsoutil.ComposeTSByTime(expireTime, 0)
|
||||
}
|
||||
deadline, ok := t.TraceCtx().Deadline()
|
||||
if ok {
|
||||
t.TimeoutTimestamp = tsoutil.ComposeTSByTime(deadline, 0)
|
||||
@ -501,7 +506,8 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
|
||||
log.Debug("Query PreExecute done.",
|
||||
zap.Uint64("guarantee_ts", guaranteeTs),
|
||||
zap.Uint64("mvcc_ts", t.GetMvccTimestamp()),
|
||||
zap.Uint64("timeout_ts", t.GetTimeoutTimestamp()))
|
||||
zap.Uint64("timeout_ts", t.GetTimeoutTimestamp()),
|
||||
zap.Uint64("collection_ttl_timestamps", t.CollectionTtlTimestamps))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -270,13 +270,20 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
|
||||
t.SearchRequest.Username = username
|
||||
}
|
||||
|
||||
if collectionInfo.collectionTTL != 0 {
|
||||
physicalTime, _ := tsoutil.ParseTS(guaranteeTs)
|
||||
expireTime := physicalTime.Add(-time.Duration(collectionInfo.collectionTTL))
|
||||
t.CollectionTtlTimestamps = tsoutil.ComposeTSByTime(expireTime, 0)
|
||||
}
|
||||
|
||||
t.resultBuf = typeutil.NewConcurrentSet[*internalpb.SearchResults]()
|
||||
|
||||
log.Debug("search PreExecute done.",
|
||||
zap.Uint64("guarantee_ts", guaranteeTs),
|
||||
zap.Bool("use_default_consistency", useDefaultConsistency),
|
||||
zap.Any("consistency level", consistencyLevel),
|
||||
zap.Uint64("timeout_ts", t.SearchRequest.GetTimeoutTimestamp()))
|
||||
zap.Uint64("timeout_ts", t.SearchRequest.GetTimeoutTimestamp()),
|
||||
zap.Uint64("collection_ttl_timestamps", t.CollectionTtlTimestamps))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -129,6 +129,12 @@ func constructCollectionSchema(
|
||||
pk,
|
||||
fVec,
|
||||
},
|
||||
Properties: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.CollectionTTLConfigKey,
|
||||
Value: "15",
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -2507,3 +2507,21 @@ func IsBM25FunctionOutputField(field *schemapb.FieldSchema, collSchema *schemapb
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func getCollectionTTL(pairs []*commonpb.KeyValuePair) uint64 {
|
||||
properties := make(map[string]string)
|
||||
for _, pair := range pairs {
|
||||
properties[pair.Key] = pair.Value
|
||||
}
|
||||
|
||||
v, ok := properties[common.CollectionTTLConfigKey]
|
||||
if ok {
|
||||
ttl, err := strconv.Atoi(v)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return uint64(time.Duration(ttl) * time.Second)
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
@ -883,7 +883,6 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
|
||||
defer func() {
|
||||
node.manager.Collection.Unref(req.GetReq().GetCollectionID(), 1)
|
||||
}()
|
||||
|
||||
// Send task to scheduler and wait until it finished.
|
||||
task := tasks.NewQueryTask(queryCtx, collection, node.manager, req)
|
||||
if err := node.scheduler.Add(task); err != nil {
|
||||
|
||||
@ -66,6 +66,7 @@ func (t *QueryStreamTask) Execute() error {
|
||||
t.req.Req.GetMvccTimestamp(),
|
||||
t.req.Req.Base.GetMsgID(),
|
||||
t.req.Req.GetConsistencyLevel(),
|
||||
t.req.Req.GetCollectionTtlTimestamps(),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@ -107,6 +107,7 @@ func (t *QueryTask) Execute() error {
|
||||
t.req.Req.GetMvccTimestamp(),
|
||||
t.req.Req.Base.GetMsgID(),
|
||||
t.req.Req.GetConsistencyLevel(),
|
||||
t.req.Req.GetCollectionTtlTimestamps(),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@ -81,6 +81,7 @@ type SearchRequest struct {
|
||||
searchFieldID int64
|
||||
mvccTimestamp typeutil.Timestamp
|
||||
consistencyLevel commonpb.ConsistencyLevel
|
||||
collectionTTL typeutil.Timestamp
|
||||
}
|
||||
|
||||
func NewSearchRequest(collection *CCollection, req *querypb.SearchRequest, placeholderGrp []byte) (*SearchRequest, error) {
|
||||
@ -125,6 +126,7 @@ func NewSearchRequest(collection *CCollection, req *querypb.SearchRequest, place
|
||||
searchFieldID: int64(fieldID),
|
||||
mvccTimestamp: req.GetReq().GetMvccTimestamp(),
|
||||
consistencyLevel: req.GetReq().GetConsistencyLevel(),
|
||||
collectionTTL: req.GetReq().GetCollectionTtlTimestamps(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -156,9 +158,10 @@ type RetrievePlan struct {
|
||||
maxLimitSize int64
|
||||
ignoreNonPk bool
|
||||
consistencyLevel commonpb.ConsistencyLevel
|
||||
collectionTTL typeutil.Timestamp
|
||||
}
|
||||
|
||||
func NewRetrievePlan(col *CCollection, expr []byte, timestamp typeutil.Timestamp, msgID int64, consistencylevel commonpb.ConsistencyLevel) (*RetrievePlan, error) {
|
||||
func NewRetrievePlan(col *CCollection, expr []byte, timestamp typeutil.Timestamp, msgID int64, consistencylevel commonpb.ConsistencyLevel, collectionTTL typeutil.Timestamp) (*RetrievePlan, error) {
|
||||
if col.rawPointer() == nil {
|
||||
return nil, errors.New("collection is released")
|
||||
}
|
||||
@ -174,6 +177,7 @@ func NewRetrievePlan(col *CCollection, expr []byte, timestamp typeutil.Timestamp
|
||||
msgID: msgID,
|
||||
maxLimitSize: maxLimitSize,
|
||||
consistencyLevel: consistencylevel,
|
||||
collectionTTL: collectionTTL,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@ -77,7 +77,7 @@ func (suite *PlanSuite) TestPlanCreateByExpr() {
|
||||
|
||||
func (suite *PlanSuite) TestQueryPlanCollectionReleased() {
|
||||
suite.collection.Release()
|
||||
_, err := segcore.NewRetrievePlan(suite.collection, nil, 0, 0, 0)
|
||||
_, err := segcore.NewRetrievePlan(suite.collection, nil, 0, 0, 0, 0)
|
||||
suite.Error(err)
|
||||
}
|
||||
|
||||
|
||||
@ -197,6 +197,7 @@ func (suite *ReduceSuite) TestReduceAllFunc() {
|
||||
[]byte(fmt.Sprintf("%d > 100", mock_segcore.RowIDField.ID)),
|
||||
typeutil.MaxTimestamp,
|
||||
0,
|
||||
0,
|
||||
0)
|
||||
suite.Error(err)
|
||||
suite.Nil(retrievePlan)
|
||||
@ -215,6 +216,7 @@ func (suite *ReduceSuite) TestReduceAllFunc() {
|
||||
expr,
|
||||
typeutil.MaxTimestamp,
|
||||
0,
|
||||
0,
|
||||
0)
|
||||
suite.NotNil(retrievePlan)
|
||||
suite.NoError(err)
|
||||
|
||||
@ -116,6 +116,7 @@ func (s *cSegmentImpl) Search(ctx context.Context, searchReq *SearchRequest) (*S
|
||||
searchReq.cPlaceholderGroup,
|
||||
C.uint64_t(searchReq.mvccTimestamp),
|
||||
C.int32_t(searchReq.consistencyLevel),
|
||||
C.uint64_t(searchReq.collectionTTL),
|
||||
))
|
||||
},
|
||||
cgo.WithName("search"),
|
||||
@ -144,6 +145,7 @@ func (s *cSegmentImpl) Retrieve(ctx context.Context, plan *RetrievePlan) (*Retri
|
||||
C.int64_t(plan.maxLimitSize),
|
||||
C.bool(plan.ignoreNonPk),
|
||||
C.int32_t(plan.consistencyLevel),
|
||||
C.uint64_t(plan.collectionTTL),
|
||||
))
|
||||
},
|
||||
cgo.WithName("retrieve"),
|
||||
|
||||
@ -107,6 +107,7 @@ func assertEqualCount(
|
||||
expr,
|
||||
typeutil.MaxTimestamp,
|
||||
100,
|
||||
0,
|
||||
0)
|
||||
defer retrievePlan.Delete()
|
||||
|
||||
|
||||
@ -132,6 +132,7 @@ message SearchRequest {
|
||||
bool is_recall_evaluation = 27;
|
||||
bool is_iterator = 28;
|
||||
string analyzer_name = 29;
|
||||
uint64 collection_ttl_timestamps = 30;
|
||||
}
|
||||
|
||||
message SubSearchResults {
|
||||
@ -198,6 +199,7 @@ message RetrieveRequest {
|
||||
int32 reduce_type = 17;
|
||||
common.ConsistencyLevel consistency_level = 18;
|
||||
bool is_iterator = 19;
|
||||
uint64 collection_ttl_timestamps = 20;
|
||||
}
|
||||
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Loading…
x
Reference in New Issue
Block a user