enhance: support TTL expiration with queries returning no results (#41960)

support TTL expiration with queries returning no results
issue:https://github.com/milvus-io/milvus/issues/41959
pr:https://github.com/milvus-io/milvus/pull/41720

---------

Signed-off-by: Xianhui.Lin <xianhui.lin@zilliz.com>
This commit is contained in:
Xianhui Lin 2025-05-26 15:52:28 +08:00 committed by GitHub
parent 0daebdc3c9
commit 0574fc7b7b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 693 additions and 528 deletions

View File

@ -176,6 +176,7 @@ class QueryContext : public Context {
const milvus::segcore::SegmentInternalInterface* segment,
int64_t active_count,
milvus::Timestamp timestamp,
milvus::Timestamp collection_ttl = 0,
int32_t consistency_level = 0,
std::shared_ptr<QueryConfig> query_config =
std::make_shared<QueryConfig>(),
@ -187,6 +188,7 @@ class QueryContext : public Context {
segment_(segment),
active_count_(active_count),
query_timestamp_(timestamp),
collection_ttl_(collection_ttl),
query_config_(query_config),
executor_(executor),
consistency_level_(consistency_level) {
@ -222,6 +224,11 @@ class QueryContext : public Context {
return query_timestamp_;
}
milvus::Timestamp
get_collection_ttl() {
return collection_ttl_;
}
int64_t
get_active_count() {
return active_count_;
@ -290,7 +297,7 @@ class QueryContext : public Context {
int64_t active_count_;
// timestamp this query generate
milvus::Timestamp query_timestamp_;
milvus::Timestamp collection_ttl_;
// used for vector search
milvus::SearchInfo search_info_;
const query::PlaceholderGroup* placeholder_group_;

View File

@ -30,6 +30,7 @@ PhyMvccNode::PhyMvccNode(int32_t operator_id,
query_timestamp_ = query_context->get_query_timestamp();
active_count_ = query_context->get_active_count();
is_source_node_ = mvcc_node->sources().size() == 0;
collection_ttl_ = query_context->get_collection_ttl();
}
void
@ -60,7 +61,7 @@ PhyMvccNode::GetOutput() {
TargetBitmapView data(col_input->GetRawData(), col_input->size());
// need to expose null?
segment_->mask_with_timestamps(data, query_timestamp_);
segment_->mask_with_timestamps(data, query_timestamp_, collection_ttl_);
segment_->mask_with_delete(data, active_count_, query_timestamp_);
is_finished_ = true;

View File

@ -72,6 +72,7 @@ class PhyMvccNode : public Operator {
int64_t active_count_;
bool is_finished_{false};
bool is_source_node_{false};
milvus::Timestamp collection_ttl_;
};
} // namespace exec

View File

@ -34,11 +34,13 @@ class ExecPlanNodeVisitor : PlanNodeVisitor {
ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
Timestamp timestamp,
const PlaceholderGroup& placeholder_group,
int32_t consystency_level)
int32_t consystency_level,
Timestamp collection_ttl)
: segment_(segment),
timestamp_(timestamp),
placeholder_group_(placeholder_group),
consystency_level_(consystency_level) {
consystency_level_(consystency_level),
collection_ttl_(collection_ttl_) {
}
SearchResult
@ -59,6 +61,7 @@ class ExecPlanNodeVisitor : PlanNodeVisitor {
private:
const segcore::SegmentInterface& segment_;
Timestamp timestamp_;
Timestamp collection_ttl_;
const PlaceholderGroup& placeholder_group_;
SearchResultOpt search_result_opt_;
@ -134,6 +137,7 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
segment,
active_count,
timestamp_,
collection_ttl_,
consystency_level_);
query_context->set_search_info(node.search_info_);
query_context->set_placeholder_group(placeholder_group_);
@ -189,6 +193,7 @@ ExecPlanNodeVisitor::visit(RetrievePlanNode& node) {
segment,
active_count,
timestamp_,
collection_ttl_,
consystency_level_);
// Do task execution

View File

@ -44,18 +44,22 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
Timestamp timestamp,
const PlaceholderGroup* placeholder_group,
int32_t consystency_level = 0)
int32_t consystency_level = 0,
Timestamp collection_ttl = 0)
: segment_(segment),
timestamp_(timestamp),
collection_ttl_(collection_ttl),
placeholder_group_(placeholder_group),
consystency_level_(consystency_level) {
}
ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
Timestamp timestamp,
int32_t consystency_level = 0)
int32_t consystency_level = 0,
Timestamp collection_ttl = 0)
: segment_(segment),
timestamp_(timestamp),
collection_ttl_(collection_ttl),
consystency_level_(consystency_level) {
placeholder_group_ = nullptr;
}
@ -105,6 +109,7 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
private:
const segcore::SegmentInterface& segment_;
Timestamp timestamp_;
Timestamp collection_ttl_;
const PlaceholderGroup* placeholder_group_;
SearchResultOpt search_result_opt_;

View File

@ -1950,13 +1950,19 @@ ChunkedSegmentSealedImpl::get_active_count(Timestamp ts) const {
void
ChunkedSegmentSealedImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
Timestamp timestamp) const {
Timestamp timestamp,
Timestamp collection_ttl) const {
// TODO change the
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
"num chunk not equal to 1 for sealed segment");
auto timestamps_data =
(const milvus::Timestamp*)insert_record_.timestamps_.get_chunk_data(0);
auto timestamps_data_size = insert_record_.timestamps_.get_chunk_size(0);
if (collection_ttl > 0) {
auto ttl_mask = TimestampIndex::GenerateTTLBitset(
collection_ttl, timestamps_data, timestamps_data_size);
bitset_chunk |= ttl_mask;
}
AssertInfo(timestamps_data_size == get_row_count(),
fmt::format("Timestamp size not equal to row count: {}, {}",

View File

@ -333,7 +333,8 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
void
mask_with_timestamps(BitsetTypeView& bitset_chunk,
Timestamp timestamp) const override;
Timestamp timestamp,
Timestamp collection_ttl) const override;
void
vector_search(SearchInfo& search_info,

View File

@ -862,7 +862,8 @@ SegmentGrowingImpl::get_active_count(Timestamp ts) const {
void
SegmentGrowingImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
Timestamp timestamp) const {
Timestamp timestamp,
Timestamp collection_ttl) const {
// DO NOTHING
}

View File

@ -268,7 +268,8 @@ class SegmentGrowingImpl : public SegmentGrowing {
void
mask_with_timestamps(BitsetTypeView& bitset_chunk,
Timestamp timestamp) const override;
Timestamp timestamp,
Timestamp ttl = 0) const override;
void
vector_search(SearchInfo& search_info,

View File

@ -80,12 +80,13 @@ SegmentInternalInterface::Search(
const query::Plan* plan,
const query::PlaceholderGroup* placeholder_group,
Timestamp timestamp,
int32_t consistency_level) const {
int32_t consistency_level,
Timestamp collection_ttl) const {
std::shared_lock lck(mutex_);
milvus::tracer::AddEvent("obtained_segment_lock_mutex");
check_search(plan);
query::ExecPlanNodeVisitor visitor(
*this, timestamp, placeholder_group, consistency_level);
*this, timestamp, placeholder_group, consistency_level, collection_ttl);
auto results = std::make_unique<SearchResult>();
*results = visitor.get_moved_result(*plan->plan_node_);
results->segment_ = (void*)this;
@ -98,11 +99,13 @@ SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,
Timestamp timestamp,
int64_t limit_size,
bool ignore_non_pk,
int32_t consistency_level) const {
int32_t consistency_level,
Timestamp collection_ttl) const {
std::shared_lock lck(mutex_);
tracer::AutoSpan span("Retrieve", tracer::GetRootSpan());
auto results = std::make_unique<proto::segcore::RetrieveResults>();
query::ExecPlanNodeVisitor visitor(*this, timestamp, consistency_level);
query::ExecPlanNodeVisitor visitor(
*this, timestamp, consistency_level, collection_ttl);
auto retrieve_results = visitor.get_retrieve_result(*plan->plan_node_);
retrieve_results.segment_ = (void*)this;
results->set_has_more_result(retrieve_results.has_more_result);

View File

@ -66,7 +66,8 @@ class SegmentInterface {
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_group,
Timestamp timestamp,
int32_t consistency_level = 0) const = 0;
int32_t consistency_level = 0,
Timestamp collection_ttl = 0) const = 0;
virtual std::unique_ptr<proto::segcore::RetrieveResults>
Retrieve(tracer::TraceContext* trace_ctx,
@ -74,7 +75,8 @@ class SegmentInterface {
Timestamp timestamp,
int64_t limit_size,
bool ignore_non_pk,
int32_t consistency_level = 0) const = 0;
int32_t consistency_level = 0,
Timestamp collection_ttl = 0) const = 0;
virtual std::unique_ptr<proto::segcore::RetrieveResults>
Retrieve(tracer::TraceContext* trace_ctx,
@ -270,7 +272,8 @@ class SegmentInternalInterface : public SegmentInterface {
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_group,
Timestamp timestamp,
int32_t consistency_level = 0) const override;
int32_t consistency_level = 0,
Timestamp collection_ttl = 0) const override;
void
FillPrimaryKeys(const query::Plan* plan,
@ -286,7 +289,8 @@ class SegmentInternalInterface : public SegmentInterface {
Timestamp timestamp,
int64_t limit_size,
bool ignore_non_pk,
int32_t consistency_level = 0) const override;
int32_t consistency_level = 0,
Timestamp collection_ttl = 0) const override;
std::unique_ptr<proto::segcore::RetrieveResults>
Retrieve(tracer::TraceContext* trace_ctx,
@ -380,7 +384,8 @@ class SegmentInternalInterface : public SegmentInterface {
// bitset 1 means not hit. 0 means hit.
virtual void
mask_with_timestamps(BitsetTypeView& bitset_chunk,
Timestamp timestamp) const = 0;
Timestamp timestamp,
Timestamp collection_ttl) const = 0;
// count of chunks
virtual int64_t

View File

@ -1835,14 +1835,19 @@ SegmentSealedImpl::get_active_count(Timestamp ts) const {
void
SegmentSealedImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
Timestamp timestamp) const {
Timestamp timestamp,
Timestamp collection_ttl) const {
// TODO change the
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
"num chunk not equal to 1 for sealed segment");
auto timestamps_data =
(const milvus::Timestamp*)insert_record_.timestamps_.get_chunk_data(0);
auto timestamps_data_size = insert_record_.timestamps_.get_chunk_size(0);
if (collection_ttl > 0) {
auto ttl_mask = TimestampIndex::GenerateTTLBitset(
collection_ttl, timestamps_data, timestamps_data_size);
bitset_chunk |= ttl_mask;
}
AssertInfo(timestamps_data_size == get_row_count(),
fmt::format("Timestamp size not equal to row count: {}, {}",
timestamps_data_size,
@ -1862,6 +1867,8 @@ SegmentSealedImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
bitset_chunk.set();
return;
}
// Generate bitset for timestamp range and TTL in one pass
auto mask = TimestampIndex::GenerateBitset(
timestamp, range, timestamps_data, timestamps_data_size);
bitset_chunk |= mask;

View File

@ -326,7 +326,8 @@ class SegmentSealedImpl : public SegmentSealed {
void
mask_with_timestamps(BitsetTypeView& bitset_chunk,
Timestamp timestamp) const override;
Timestamp timestamp,
Timestamp collection_ttl) const override;
void
vector_search(SearchInfo& search_info,

View File

@ -11,8 +11,25 @@
#include "TimestampIndex.h"
using namespace std;
using namespace std::chrono;
namespace milvus::segcore {
const int logicalBits = 18;
const uint64_t logicalBitsMask = (1ULL << logicalBits) - 1;
std::pair<system_clock::time_point, uint64_t>
ParseTS(uint64_t ts) {
uint64_t logical = ts & logicalBitsMask;
uint64_t physical = ts >> logicalBits;
auto physicalTime = system_clock::from_time_t(physical / 1000);
auto ms = milliseconds(physical % 1000);
physicalTime += ms;
return make_pair(physicalTime, logical);
}
void
TimestampIndex::set_length_meta(std::vector<int64_t> lengths) {
lengths_ = std::move(lengths);
@ -84,6 +101,25 @@ TimestampIndex::GenerateBitset(Timestamp query_timestamp,
return bitset;
}
BitsetType
TimestampIndex::GenerateTTLBitset(Timestamp collection_ttl,
const Timestamp* timestamps,
int64_t size) {
BitsetType bitset;
bitset.reserve(size);
bitset.resize(size, false);
auto cur = system_clock::now();
for (int64_t i = 0; i < size; ++i) {
auto [physicalTime, logical] = ParseTS(timestamps[i]);
bitset[i] =
(duration_cast<milliseconds>(physicalTime.time_since_epoch())
.count() +
collection_ttl) >
duration_cast<milliseconds>(cur.time_since_epoch()).count();
}
return bitset;
}
std::vector<int64_t>
GenerateFakeSlices(const Timestamp* timestamps,
int64_t size,

View File

@ -40,6 +40,11 @@ class TimestampIndex {
const Timestamp* timestamps,
int64_t size);
static BitsetType
GenerateTTLBitset(Timestamp collection_ttl,
const Timestamp* timestamps,
int64_t size);
private:
// numSlice
std::vector<int64_t> lengths_;

View File

@ -112,7 +112,8 @@ AsyncSearch(CTraceContext c_trace,
CSearchPlan c_plan,
CPlaceholderGroup c_placeholder_group,
uint64_t timestamp,
int32_t consistency_level) {
int32_t consistency_level,
uint64_t collection_ttl) {
auto segment = (milvus::segcore::SegmentInterface*)c_segment;
auto plan = (milvus::query::Plan*)c_plan;
auto phg_ptr = reinterpret_cast<const milvus::query::PlaceholderGroup*>(
@ -121,8 +122,13 @@ AsyncSearch(CTraceContext c_trace,
auto future = milvus::futures::Future<milvus::SearchResult>::async(
milvus::futures::getGlobalCPUExecutor(),
milvus::futures::ExecutePriority::HIGH,
[c_trace, segment, plan, phg_ptr, timestamp, consistency_level](
milvus::futures::CancellationToken cancel_token) {
[c_trace,
segment,
plan,
phg_ptr,
timestamp,
consistency_level,
collection_ttl](milvus::futures::CancellationToken cancel_token) {
// save trace context into search_info
auto& trace_ctx = plan->plan_node_->search_info_.trace_ctx_;
trace_ctx.traceID = c_trace.traceID;
@ -132,8 +138,8 @@ AsyncSearch(CTraceContext c_trace,
auto span = milvus::tracer::StartSpan("SegCoreSearch", &trace_ctx);
milvus::tracer::SetRootSpan(span);
auto search_result =
segment->Search(plan, phg_ptr, timestamp, consistency_level);
auto search_result = segment->Search(
plan, phg_ptr, timestamp, consistency_level, collection_ttl);
if (!milvus::PositivelyRelated(
plan->plan_node_->search_info_.metric_type_)) {
for (auto& dis : search_result->distances_) {
@ -182,7 +188,8 @@ AsyncRetrieve(CTraceContext c_trace,
uint64_t timestamp,
int64_t limit_size,
bool ignore_non_pk,
int32_t consistency_level) {
int32_t consistency_level,
uint64_t collection_ttl) {
auto segment = static_cast<milvus::segcore::SegmentInterface*>(c_segment);
auto plan = static_cast<const milvus::query::RetrievePlan*>(c_plan);
@ -195,7 +202,8 @@ AsyncRetrieve(CTraceContext c_trace,
timestamp,
limit_size,
ignore_non_pk,
consistency_level](milvus::futures::CancellationToken cancel_token) {
consistency_level,
collection_ttl](milvus::futures::CancellationToken cancel_token) {
auto trace_ctx = milvus::tracer::TraceContext{
c_trace.traceID, c_trace.spanID, c_trace.traceFlags};
milvus::tracer::AutoSpan span("SegCoreRetrieve", &trace_ctx, true);
@ -205,7 +213,8 @@ AsyncRetrieve(CTraceContext c_trace,
timestamp,
limit_size,
ignore_non_pk,
consistency_level);
consistency_level,
collection_ttl);
return CreateLeakedCRetrieveResultFromProto(
std::move(retrieve_result));

View File

@ -51,7 +51,8 @@ AsyncSearch(CTraceContext c_trace,
CSearchPlan c_plan,
CPlaceholderGroup c_placeholder_group,
uint64_t timestamp,
int32_t consistency_level);
int32_t consistency_level,
uint64_t collection_ttl);
void
DeleteRetrieveResult(CRetrieveResult* retrieve_result);
@ -63,7 +64,8 @@ AsyncRetrieve(CTraceContext c_trace,
uint64_t timestamp,
int64_t limit_size,
bool ignore_non_pk,
int32_t consistency_level);
int32_t consistency_level,
uint64_t collection_ttl);
CFuture* // Future<CRetrieveResult>
AsyncRetrieveByOffsets(CTraceContext c_trace,

View File

@ -70,7 +70,7 @@ CRetrieve(CSegmentInterface c_segment,
uint64_t timestamp,
CRetrieveResult** result) {
auto future = AsyncRetrieve(
{}, c_segment, c_plan, timestamp, DEFAULT_MAX_OUTPUT_SIZE, false, 0);
{}, c_segment, c_plan, timestamp, DEFAULT_MAX_OUTPUT_SIZE, false, 0, 0);
auto futurePtr = static_cast<milvus::futures::IFuture*>(
static_cast<void*>(static_cast<CFuture*>(future)));

View File

@ -158,6 +158,7 @@ TEST_P(TaskTest, CallExprEmpty) {
100000,
MAX_TIMESTAMP,
0,
0,
std::make_shared<milvus::exec::QueryConfig>(
std::unordered_map<std::string, std::string>{}));
@ -195,6 +196,7 @@ TEST_P(TaskTest, UnaryExpr) {
100000,
MAX_TIMESTAMP,
0,
0,
std::make_shared<milvus::exec::QueryConfig>(
std::unordered_map<std::string, std::string>{}));
@ -240,6 +242,7 @@ TEST_P(TaskTest, LogicalExpr) {
100000,
MAX_TIMESTAMP,
0,
0,
std::make_shared<milvus::exec::QueryConfig>(
std::unordered_map<std::string, std::string>{}));

View File

@ -149,6 +149,8 @@ TEST(Sealed, without_predicate) {
sr = sealed_segment->Search(plan.get(), ph_group.get(), 0);
EXPECT_EQ(sr->get_total_result_count(), 0);
sr = sealed_segment->Search(plan.get(), ph_group.get(), timestamp, 0, 100);
EXPECT_EQ(sr->get_total_result_count(), 0);
}
TEST(Sealed, without_search_ef_less_than_limit) {
@ -1046,7 +1048,7 @@ TEST(Sealed, LoadScalarIndex) {
nothing_index.index = GenScalarIndexing<int32_t>(N, nothing_data.data());
segment->LoadIndex(nothing_index);
auto sr = segment->Search(plan.get(), ph_group.get(), timestamp);
auto sr = segment->Search(plan.get(), ph_group.get(), timestamp, 0, 100000);
auto json = SearchResultToJson(*sr);
std::cout << json.dump(1);
}

View File

@ -181,8 +181,8 @@ CSearch(CSegmentInterface c_segment,
CPlaceholderGroup c_placeholder_group,
uint64_t timestamp,
CSearchResult* result) {
auto future =
AsyncSearch({}, c_segment, c_plan, c_placeholder_group, timestamp, 0);
auto future = AsyncSearch(
{}, c_segment, c_plan, c_placeholder_group, timestamp, 0, 0);
auto futurePtr = static_cast<milvus::futures::IFuture*>(
static_cast<void*>(static_cast<CFuture*>(future)));

View File

@ -1239,7 +1239,7 @@ func GenSimpleRetrievePlan(collection *segcore.CCollection) (*segcore.RetrievePl
return nil, err
}
plan, err2 := segcore.NewRetrievePlan(collection, planBytes, timestamp, 100, 0)
plan, err2 := segcore.NewRetrievePlan(collection, planBytes, timestamp, 100, 0, 0)
return plan, err2
}

View File

@ -104,6 +104,7 @@ type collectionInfo struct {
partitionKeyIsolation bool
replicateID string
updateTimestamp uint64
collectionTTL uint64
}
type databaseInfo struct {
@ -475,6 +476,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
consistencyLevel: collection.ConsistencyLevel,
partitionKeyIsolation: isolation,
updateTimestamp: collection.UpdateTimestamp,
collectionTTL: getCollectionTTL(schemaInfo.CollectionSchema.GetProperties()),
}, nil
}
_, dbOk := m.collInfo[database]
@ -493,6 +495,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
partitionKeyIsolation: isolation,
replicateID: replicateID,
updateTimestamp: collection.UpdateTimestamp,
collectionTTL: getCollectionTTL(schemaInfo.CollectionSchema.GetProperties()),
}
log.Ctx(ctx).Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName),

View File

@ -481,7 +481,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
t.MvccTimestamp = t.request.GetGuaranteeTimestamp()
t.GuaranteeTimestamp = t.request.GetGuaranteeTimestamp()
}
t.CollectionTtl = collectionInfo.collectionTTL
deadline, ok := t.TraceCtx().Deadline()
if ok {
t.TimeoutTimestamp = tsoutil.ComposeTSByTime(deadline, 0)
@ -491,7 +491,8 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
log.Debug("Query PreExecute done.",
zap.Uint64("guarantee_ts", guaranteeTs),
zap.Uint64("mvcc_ts", t.GetMvccTimestamp()),
zap.Uint64("timeout_ts", t.GetTimeoutTimestamp()))
zap.Uint64("timeout_ts", t.GetTimeoutTimestamp()),
zap.Uint64("collection_ttl", t.CollectionTtl))
return nil
}

View File

@ -264,13 +264,16 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
t.SearchRequest.Username = username
}
t.CollectionTtl = collectionInfo.collectionTTL
t.resultBuf = typeutil.NewConcurrentSet[*internalpb.SearchResults]()
log.Debug("search PreExecute done.",
zap.Uint64("guarantee_ts", guaranteeTs),
zap.Bool("use_default_consistency", useDefaultConsistency),
zap.Any("consistency level", consistencyLevel),
zap.Uint64("timeout_ts", t.SearchRequest.GetTimeoutTimestamp()))
zap.Uint64("timeout_ts", t.SearchRequest.GetTimeoutTimestamp()),
zap.Uint64("collection_ttl", t.CollectionTtl))
return nil
}

View File

@ -127,6 +127,12 @@ func constructCollectionSchema(
pk,
fVec,
},
Properties: []*commonpb.KeyValuePair{
{
Key: common.CollectionTTLConfigKey,
Value: "15",
},
},
}
}

View File

@ -2447,3 +2447,21 @@ func GetReplicateID(ctx context.Context, database, collectionName string) (strin
replicateID, _ := common.GetReplicateID(dbInfo.properties)
return replicateID, nil
}
func getCollectionTTL(pairs []*commonpb.KeyValuePair) uint64 {
properties := make(map[string]string)
for _, pair := range pairs {
properties[pair.Key] = pair.Value
}
v, ok := properties[common.CollectionTTLConfigKey]
if ok {
ttl, err := strconv.Atoi(v)
if err != nil {
return 0
}
return uint64(time.Duration(ttl) * time.Millisecond)
}
return 0
}

View File

@ -870,7 +870,6 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
defer func() {
node.manager.Collection.Unref(req.GetReq().GetCollectionID(), 1)
}()
// Send task to scheduler and wait until it finished.
task := tasks.NewQueryTask(queryCtx, collection, node.manager, req)
if err := node.scheduler.Add(task); err != nil {

View File

@ -66,6 +66,7 @@ func (t *QueryStreamTask) Execute() error {
t.req.Req.GetMvccTimestamp(),
t.req.Req.Base.GetMsgID(),
t.req.Req.GetConsistencyLevel(),
t.req.Req.GetCollectionTtl(),
)
if err != nil {
return err

View File

@ -107,6 +107,7 @@ func (t *QueryTask) Execute() error {
t.req.Req.GetMvccTimestamp(),
t.req.Req.Base.GetMsgID(),
t.req.Req.GetConsistencyLevel(),
t.req.Req.GetCollectionTtl(),
)
if err != nil {
return err

View File

@ -81,6 +81,7 @@ type SearchRequest struct {
searchFieldID int64
mvccTimestamp typeutil.Timestamp
consistencyLevel commonpb.ConsistencyLevel
collectionTTL typeutil.Timestamp
}
func NewSearchRequest(collection *CCollection, req *querypb.SearchRequest, placeholderGrp []byte) (*SearchRequest, error) {
@ -125,6 +126,7 @@ func NewSearchRequest(collection *CCollection, req *querypb.SearchRequest, place
searchFieldID: int64(fieldID),
mvccTimestamp: req.GetReq().GetMvccTimestamp(),
consistencyLevel: req.GetReq().GetConsistencyLevel(),
collectionTTL: req.GetReq().GetCollectionTtl(),
}, nil
}
@ -156,9 +158,10 @@ type RetrievePlan struct {
maxLimitSize int64
ignoreNonPk bool
consistencyLevel commonpb.ConsistencyLevel
collectionTTL typeutil.Timestamp
}
func NewRetrievePlan(col *CCollection, expr []byte, timestamp typeutil.Timestamp, msgID int64, consistencylevel commonpb.ConsistencyLevel) (*RetrievePlan, error) {
func NewRetrievePlan(col *CCollection, expr []byte, timestamp typeutil.Timestamp, msgID int64, consistencylevel commonpb.ConsistencyLevel, collectionTTL typeutil.Timestamp) (*RetrievePlan, error) {
if col.rawPointer() == nil {
return nil, errors.New("collection is released")
}
@ -174,6 +177,7 @@ func NewRetrievePlan(col *CCollection, expr []byte, timestamp typeutil.Timestamp
msgID: msgID,
maxLimitSize: maxLimitSize,
consistencyLevel: consistencylevel,
collectionTTL: collectionTTL,
}, nil
}

View File

@ -77,7 +77,7 @@ func (suite *PlanSuite) TestPlanCreateByExpr() {
func (suite *PlanSuite) TestQueryPlanCollectionReleased() {
suite.collection.Release()
_, err := segcore.NewRetrievePlan(suite.collection, nil, 0, 0, 0)
_, err := segcore.NewRetrievePlan(suite.collection, nil, 0, 0, 0, 0)
suite.Error(err)
}

View File

@ -197,6 +197,7 @@ func (suite *ReduceSuite) TestReduceAllFunc() {
[]byte(fmt.Sprintf("%d > 100", mock_segcore.RowIDField.ID)),
typeutil.MaxTimestamp,
0,
0,
0)
suite.Error(err)
suite.Nil(retrievePlan)
@ -215,6 +216,7 @@ func (suite *ReduceSuite) TestReduceAllFunc() {
expr,
typeutil.MaxTimestamp,
0,
0,
0)
suite.NotNil(retrievePlan)
suite.NoError(err)

View File

@ -121,6 +121,7 @@ func (s *cSegmentImpl) Search(ctx context.Context, searchReq *SearchRequest) (*S
searchReq.cPlaceholderGroup,
C.uint64_t(searchReq.mvccTimestamp),
C.int32_t(searchReq.consistencyLevel),
C.uint64_t(searchReq.collectionTTL),
))
},
cgo.WithName("search"),
@ -149,6 +150,7 @@ func (s *cSegmentImpl) Retrieve(ctx context.Context, plan *RetrievePlan) (*Retri
C.int64_t(plan.maxLimitSize),
C.bool(plan.ignoreNonPk),
C.int32_t(plan.consistencyLevel),
C.uint64_t(plan.collectionTTL),
))
},
cgo.WithName("retrieve"),

View File

@ -107,6 +107,7 @@ func assertEqualCount(
expr,
typeutil.MaxTimestamp,
100,
0,
0)
defer retrievePlan.Delete()

View File

@ -133,6 +133,7 @@ message SearchRequest {
bool is_recall_evaluation = 27;
bool is_iterator = 28;
string analyzer_name = 29;
uint64 collection_ttl = 30;
}
message SubSearchResults {
@ -199,6 +200,7 @@ message RetrieveRequest {
int32 reduce_type = 17;
common.ConsistencyLevel consistency_level = 18;
bool is_iterator = 19;
uint64 collection_ttl = 20;
}

File diff suppressed because it is too large Load Diff