feat: Support ttl field for entity level expiration (#46342)

issue: #46033

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Pull Request Summary: Entity-Level TTL Field Support

### Core Invariant and Design
This PR introduces **per-entity TTL (time-to-live) expiration** via a
dedicated TIMESTAMPTZ field as a fine-grained alternative to
collection-level TTL. The key invariant is **mutual exclusivity**:
collection-level TTL and entity-level TTL field cannot coexist on the
same collection. Validation is enforced at the proxy layer during
collection creation/alteration (`validateTTL()` prevents both being set
simultaneously).

### What Is Removed and Why
- **Global `EntityExpirationTTL` parameter** removed from config
(`configs/milvus.yaml`, `pkg/util/paramtable/component_param.go`). This
was the only mechanism for collection-level expiration. The removal is
safe because:
- The collection-level TTL path (`isEntityExpired(ts)` check) remains
intact in the codebase for backward compatibility
- TTL field check (`isEntityExpiredByTTLField()`) is a secondary path
invoked only when a TTL field is configured
- Existing deployments using collection TTL can continue without
modification
  
The global parameter was removed specifically because entity-level TTL
makes per-entity control redundant with a collection-wide setting, and
the PR chooses one mechanism per collection rather than layering both.

### No Data Loss or Behavior Regression
**TTL filtering logic is additive and safe:**
1. **Collection-level TTL unaffected**: The `isEntityExpired(ts)` check
still applies when no TTL field is configured; callers of
`EntityFilter.Filtered()` pass `-1` as the TTL expiration timestamp when
no field exists, causing `isEntityExpiredByTTLField()` to return false
immediately
2. **Null/invalid TTL values treated safely**: Rows with null TTL or TTL
≤ 0 are marked as "never expire" (using sentinel value `int64(^uint64(0)
>> 1)`) and are preserved across compactions; percentile calculations
only include positive TTL values
3. **Query-time filtering automatic**: TTL filtering is transparently
added to expression compilation via `AddTTLFieldFilterExpressions()`,
which appends `(ttl_field IS NULL OR ttl_field > current_time)` to the
filter pipeline. Entities with null TTL always pass the filter
4. **Compaction triggering granular**: Percentile-based expiration (20%,
40%, 60%, 80%, 100%) allows configurable compaction thresholds via
`SingleCompactionRatioThreshold`, preventing premature data deletion

### Capability Added: Per-Entity Expiration with Data Distribution
Awareness
Users can now specify a TIMESTAMPTZ collection property `ttl_field`
naming a schema field. During data writes, TTL values are collected per
segment and percentile quantiles (5-value array) are computed and stored
in segment metadata. At query time, the TTL field is automatically
filtered. At compaction time, segment-level percentiles drive
expiration-based compaction decisions, enabling intelligent compaction
of segments where a configurable fraction of data has expired (e.g.,
compact when 40% of rows are expired, controlled by threshold ratio).
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2026-01-05 10:27:24 +08:00 committed by GitHub
parent c10cf53b4b
commit a16d04f5d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
47 changed files with 3258 additions and 1818 deletions

View File

@ -916,7 +916,6 @@ internaltls:
common:
defaultPartitionName: _default # Name of the default partition when a collection is created
defaultIndexName: _default_idx # Name of the index when it is created with name unspecified
entityExpiration: -1 # Entity expiration in seconds, CAUTION -1 means never expire
indexSliceSize: 16 # Index slice size in MB
threadCoreCoefficient:
highPriority: 10 # This parameter specify how many times the number of threads is the number of cores in high priority pool

View File

@ -40,7 +40,7 @@ Milvus already supports the `TIMESTAMPTZ` data type. Entity TTL information will
Entity-level TTL is implemented by allowing users to explicitly add a `TIMESTAMPTZ` column in the schema and mark it in collection properties:
```text
"collection.ttl.field": "ttl"
"ttl_field": "ttl"
```
Here, `ttl` is the name of the column that stores TTL information. This mechanism is **mutually exclusive** with collection-level TTL.
@ -49,11 +49,10 @@ Here, `ttl` is the name of the column that stores TTL information. This mechanis
### Terminology and Conventions
* **TTL column / TTL field** : A field of type `TIMESTAMPTZ` declared in the schema and marked with `is_ttl = true`.
* **ExpireAt** : The value stored in the TTL field, representing the absolute expiration timestamp of an entity (UTC by default if no timezone is specified).
* **Collection-level TTL** : The existing mechanism where retention duration is defined at the collection level (e.g., retain 30 days).
* **insert_ts / mvcc_ts** : Existing Milvus write or MVCC timestamps, used as fallback when needed.
* **expirationTimeByPercentile** : A time point corresponding to a certain percentile of expired data within a segment, used to quickly determine whether compaction should be triggered.
* **expirQuantiles** : A time point corresponding to a certain percentile of expired data within a segment, used to quickly determine whether compaction should be triggered.
Example:
@ -100,7 +99,7 @@ If historical data should also have expiration times, users must perform an **up
#### 4.1 SegmentInfo Metadata Extension
A new field `expirationTimeByPercentile` is added to the segment metadata:
A new field `expirQuantiles` is added to the segment metadata:
```proto
message SegmentInfo {
@ -173,35 +172,35 @@ message SegmentInfo {
// and we could keep the possiblity that manifest stores out side of collection/partition/segment path
string manifest_path = 32;
// expirationTimeByPercentile records the expiration timestamps of the segment
// expirQuantiles records the expiration timestamps of the segment
// at the 20%, 40%, 60%, 80%, and 100% data distribution levels
repeated int64 expirationTimeByPercentile = 33;
repeated int64 expirQuantiles = 33;
}
```
Meaning:
* `expirationTimeByPercentile`: The expiration timestamps corresponding to the 20%, 40%, 60%, 80%, and 100% percentiles of data within the segment.
* `expirQuantiles`: The expiration timestamps corresponding to the 20%, 40%, 60%, 80%, and 100% percentiles of data within the segment.
---
#### 4.2 Metadata Writing
* Statistics are collected **only during compaction** .
* `expirationTimeByPercentile` is computed during sort or mix compaction tasks.
* `expirQuantiles` is computed during sort or mix compaction tasks.
* For streaming segments, sort compaction is required as the first step, making this approach sufficient.
---
#### 4.3 Compaction Trigger Strategy
* Based on a configured expired-data ratio, select the corresponding percentile from `expirationTimeByPercentile` (rounded down).
* Based on a configured expired-data ratio, select the corresponding percentile from `expirQuantiles` (rounded down).
* Compare the selected expiration time with the current time.
* If the expiration condition is met, trigger a compaction task.
Special cases:
* If `expirationTimeByPercentile` is `NULL`, the segment is treated as non-expiring.
* If `expirQuantiles` is `NULL`, the segment is treated as non-expiring.
* For old segments without a TTL field, expiration logic is skipped.
* Subsequent upsert operations will trigger the corresponding L0 compaction.
@ -228,7 +227,7 @@ schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("ttl", DataType.TIMESTAMPTZ, nullable=True)
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=dim)
prop = {"collection.ttl.field": "ttl"}
prop = {"ttl_field": "ttl"}
client.create_collection(
collection_name,
schema=schema,

View File

@ -8,7 +8,7 @@ import (
)
type EntityFilter interface {
Filtered(pk any, ts typeutil.Timestamp) bool
Filtered(pk any, ts typeutil.Timestamp, expirationTimeMicros int64) bool
GetExpiredCount() int
GetDeletedCount() int
@ -40,7 +40,7 @@ func newEntityFilter(deletedPkTs map[interface{}]typeutil.Timestamp, ttl int64,
}
}
func (filter *EntityFilterImpl) Filtered(pk any, ts typeutil.Timestamp) bool {
func (filter *EntityFilterImpl) Filtered(pk any, ts typeutil.Timestamp, expirationTimeMicros int64) bool {
if filter.isEntityDeleted(pk, ts) {
filter.deletedCount++
return true
@ -51,6 +51,11 @@ func (filter *EntityFilterImpl) Filtered(pk any, ts typeutil.Timestamp) bool {
filter.expiredCount++
return true
}
if filter.isEntityExpiredByTTLField(expirationTimeMicros) {
filter.expiredCount++
return true
}
return false
}
@ -100,3 +105,13 @@ func (filter *EntityFilterImpl) isEntityExpired(entityTs typeutil.Timestamp) boo
// filter.ttl is nanoseconds
return filter.ttl/int64(time.Millisecond) <= dur
}
func (filter *EntityFilterImpl) isEntityExpiredByTTLField(expirationTimeMicros int64) bool {
// entity expire is not enabled if expirationTimeMicros < 0
if expirationTimeMicros < 0 {
return false
}
// entityExpireTs is microseconds
return filter.currentTime.UnixMicro() >= expirationTimeMicros
}

View File

@ -69,7 +69,7 @@ func (s *EntityFilterSuite) TestEntityFilterByTTL() {
filter := newEntityFilter(nil, test.collTTL, test.nowTime)
entityTs := tsoutil.ComposeTSByTime(test.entityTime, 0)
got := filter.Filtered("mockpk", entityTs)
got := filter.Filtered("mockpk", entityTs, -1)
s.Equal(test.expect, got)
if got {

View File

@ -83,6 +83,8 @@ const int64_t DEFAULT_EXEC_EVAL_EXPR_BATCH_SIZE = 8192;
const int64_t DEFAULT_DELETE_DUMP_BATCH_SIZE = 10000;
constexpr const char* COLLECTION_TTL_FIELD_KEY = "ttl_field";
constexpr const char* RADIUS = knowhere::meta::RADIUS;
constexpr const char* RANGE_FILTER = knowhere::meta::RANGE_FILTER;
@ -136,3 +138,5 @@ const std::string ELEMENT_TYPE_KEY_FOR_ARROW = "elementType";
const float EPSILON = 0.0000000119;
const std::string NAMESPACE_FIELD_NAME = "$namespace_id";
const std::string MMAP_ENABLED_KEY = "mmap.enabled";
const int64_t LOGICAL_BITS = 18;

View File

@ -86,6 +86,26 @@ Schema::ParseFrom(const milvus::proto::schema::CollectionSchema& schema_proto) {
std::tie(schema->has_mmap_setting_, schema->mmap_enabled_) =
GetBoolFromRepeatedKVs(schema_proto.properties(), MMAP_ENABLED_KEY);
std::optional<std::string> ttl_field_name;
for (const auto& property : schema_proto.properties()) {
if (property.key() == COLLECTION_TTL_FIELD_KEY) {
ttl_field_name = property.value();
break;
}
}
if (ttl_field_name.has_value()) {
bool found = false;
for (const milvus::proto::schema::FieldSchema& child :
schema_proto.fields()) {
if (child.name() == ttl_field_name.value()) {
schema->set_ttl_field_id(FieldId(child.fieldid()));
found = true;
break;
}
}
AssertInfo(found, "ttl field name not found in schema fields");
}
AssertInfo(schema->get_primary_field_id().has_value(),
"primary key should be specified");

View File

@ -235,6 +235,11 @@ class Schema {
this->primary_field_id_opt_ = field_id;
}
void
set_ttl_field_id(FieldId field_id) {
this->ttl_field_id_opt_ = field_id;
}
void
set_dynamic_field_id(FieldId field_id) {
this->dynamic_field_id_opt_ = field_id;
@ -327,6 +332,11 @@ class Schema {
return dynamic_field_id_opt_;
}
std::optional<FieldId>
get_ttl_field_id() const {
return ttl_field_id_opt_;
}
const ArrowSchemaPtr
ConvertToArrowSchema() const;
@ -428,6 +438,7 @@ class Schema {
std::optional<FieldId> primary_field_id_opt_;
std::optional<FieldId> dynamic_field_id_opt_;
std::optional<FieldId> namespace_field_id_opt_;
std::optional<FieldId> ttl_field_id_opt_;
// field partial load list
// work as hint now

View File

@ -39,6 +39,7 @@
#include "exec/expression/TimestamptzArithCompareExpr.h"
#include "expr/ITypeExpr.h"
#include "monitor/Monitor.h"
#include "segcore/Utils.h"
#include <memory>
namespace milvus {
@ -62,6 +63,48 @@ ExprSet::Eval(int32_t begin,
}
}
// Create TTL field filtering expression if schema has TTL field configured
// Returns a single OR expression: ttl_field is null OR ttl_field > physical_us
// This means: keep entities with null TTL (never expire) OR entities with TTL > current time (not expired)
inline expr::TypedExprPtr
CreateTTLFieldFilterExpression(QueryContext* query_context) {
auto segment = query_context->get_segment();
auto& schema = segment->get_schema();
if (!schema.get_ttl_field_id().has_value()) {
return nullptr;
}
auto ttl_field_id = schema.get_ttl_field_id().value();
auto& ttl_field_meta = schema[ttl_field_id];
// Convert query_timestamp to physical microseconds for TTL comparison
auto query_timestamp = query_context->get_query_timestamp();
int64_t physical_us =
static_cast<int64_t>(
milvus::segcore::TimestampToPhysicalMs(query_timestamp)) *
1000;
expr::ColumnInfo ttl_column_info(ttl_field_id,
ttl_field_meta.get_data_type(),
{},
ttl_field_meta.is_nullable());
auto ttl_is_null_expr = std::make_shared<expr::NullExpr>(
ttl_column_info, proto::plan::NullExpr_NullOp_IsNull);
proto::plan::GenericValue ttl_threshold;
ttl_threshold.set_int64_val(physical_us);
auto ttl_greater_expr = std::make_shared<expr::UnaryRangeFilterExpr>(
ttl_column_info, proto::plan::OpType::GreaterThan, ttl_threshold);
auto ttl_or_expr = std::make_shared<expr::LogicalBinaryExpr>(
expr::LogicalBinaryExpr::OpType::Or,
ttl_is_null_expr,
ttl_greater_expr);
return ttl_or_expr;
}
std::vector<ExprPtr>
CompileExpressions(const std::vector<expr::TypedExprPtr>& sources,
ExecContext* context,
@ -70,8 +113,19 @@ CompileExpressions(const std::vector<expr::TypedExprPtr>& sources,
std::vector<std::shared_ptr<Expr>> exprs;
exprs.reserve(sources.size());
for (auto& source : sources) {
exprs.emplace_back(CompileExpression(source,
// Create TTL filter expression if schema has TTL field
auto ttl_expr =
CreateTTLFieldFilterExpression(context->get_query_context());
// Merge TTL expression with the first source expression if TTL exists
for (size_t i = 0; i < sources.size(); ++i) {
expr::TypedExprPtr expr_to_compile = sources[i];
if (i == 0 && ttl_expr != nullptr) {
// Merge TTL expression with the first expression using AND
expr_to_compile = std::make_shared<expr::LogicalBinaryExpr>(
expr::LogicalBinaryExpr::OpType::And, sources[i], ttl_expr);
}
exprs.emplace_back(CompileExpression(expr_to_compile,
context->get_query_context(),
flatten_candidate,
enable_constant_folding));

View File

@ -41,6 +41,7 @@
#include "query/ExecPlanNodeVisitor.h"
#include "query/SearchOnSealed.h"
#include "segcore/SegcoreConfig.h"
#include "segcore/SegmentInterface.h"
#include "segcore/SegmentSealed.h"
#include "segcore/ChunkedSegmentSealedImpl.h"
#include "storage/RemoteChunkManagerSingleton.h"
@ -491,3 +492,264 @@ TEST_P(TestChunkSegment, TestPkRange) {
EXPECT_EQ(0, bitset_sorted_view.count());
}
}
TEST(TestTTLFieldFilter, TestMaskWithTTLField) {
using namespace milvus::segcore;
auto schema = std::make_shared<Schema>();
auto pk_fid = schema->AddDebugField("pk", DataType::INT64, false);
schema->AddField(FieldName("ts"),
TimestampFieldID,
DataType::INT64,
false,
std::nullopt);
auto ttl_fid =
schema->AddDebugField("ttl_field", DataType::TIMESTAMPTZ, false);
schema->set_primary_field_id(pk_fid);
schema->set_ttl_field_id(ttl_fid);
auto segment = CreateSealedSegment(
schema, nullptr, -1, SegcoreConfig::default_config(), true);
int test_data_count = 100;
auto cm = milvus::storage::RemoteChunkManagerSingleton::GetInstance()
.GetRemoteChunkManager();
std::vector<int64_t> pk_data(test_data_count);
std::iota(pk_data.begin(), pk_data.end(), 0);
uint64_t base_ts = 1000000000ULL << 18;
std::vector<int64_t> ts_data(test_data_count);
for (int i = 0; i < test_data_count; i++) {
ts_data[i] = static_cast<int64_t>(base_ts + i);
}
uint64_t base_physical_us = (base_ts >> 18) * 1000;
std::vector<int64_t> ttl_data(test_data_count);
for (int i = 0; i < test_data_count; i++) {
if (i < test_data_count / 2) {
ttl_data[i] = static_cast<int64_t>(base_physical_us - 10);
} else {
ttl_data[i] = static_cast<int64_t>(base_physical_us + 10);
}
}
{
auto field_data =
std::make_shared<FieldData<int64_t>>(DataType::INT64, false);
field_data->FillFieldData(pk_data.data(), test_data_count);
std::vector<FieldDataPtr> field_datas = {field_data};
auto load_info = PrepareSingleFieldInsertBinlog(kCollectionID,
kPartitionID,
kSegmentID,
pk_fid.get(),
field_datas,
cm);
segment->LoadFieldData(load_info);
}
{
auto field_data =
std::make_shared<FieldData<int64_t>>(DataType::INT64, false);
field_data->FillFieldData(ts_data.data(), test_data_count);
std::vector<FieldDataPtr> field_datas = {field_data};
auto load_info = PrepareSingleFieldInsertBinlog(kCollectionID,
kPartitionID,
kSegmentID,
TimestampFieldID.get(),
field_datas,
cm);
segment->LoadFieldData(load_info);
}
{
auto field_data =
std::make_shared<FieldData<int64_t>>(DataType::TIMESTAMPTZ, false);
field_data->FillFieldData(ttl_data.data(), test_data_count);
std::vector<FieldDataPtr> field_datas = {field_data};
auto load_info = PrepareSingleFieldInsertBinlog(kCollectionID,
kPartitionID,
kSegmentID,
ttl_fid.get(),
field_datas,
cm);
segment->LoadFieldData(load_info);
}
// Test TTL field filtering using CompileExpressions pathway
Timestamp query_ts = base_ts + test_data_count;
int64_t active_count = segment->get_active_count(query_ts);
// Create an expression list with AlwaysTrueExpr - CompileExpressions will
// automatically add TTL field filtering expression
auto always_true_expr = std::make_shared<expr::AlwaysTrueExpr>();
auto plan = std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID,
always_true_expr);
// Execute query expression - this will trigger CompileExpressions which
// automatically adds TTL field filtering expression
BitsetType bitset =
query::ExecuteQueryExpr(plan, segment.get(), active_count, query_ts);
// Note: ExecuteQueryExpr already flips the bitset, so bitset[i] = true
// means the row matches (not expired), bitset[i] = false means expired
BitsetTypeView bitset_view(bitset);
// Verify results:
// After ExecuteQueryExpr, bitset[i] = true means row matches (not expired)
// bitset[i] = false means row is filtered out (expired)
// - i < test_data_count / 2: expired (TTL < current time), should be expired (bitset_view[i] = false)
// - i >= test_data_count / 2: not expired, should NOT be expired (bitset_view[i] = true)
int expired_count = 0;
for (int i = 0; i < test_data_count; i++) {
if (!bitset_view[i]) {
expired_count++;
}
}
EXPECT_EQ(expired_count, test_data_count / 2);
for (int i = 0; i < test_data_count / 2; i++) {
EXPECT_FALSE(bitset_view[i]) << "Row " << i << " should be expired";
}
for (int i = test_data_count / 2; i < test_data_count; i++) {
EXPECT_TRUE(bitset_view[i]) << "Row " << i << " should not be expired";
}
}
TEST(TestTTLFieldFilter, TestMaskWithNullableTTLField) {
using namespace milvus::segcore;
auto schema = std::make_shared<Schema>();
auto pk_fid = schema->AddDebugField("pk", DataType::INT64, false);
schema->AddField(FieldName("ts"),
TimestampFieldID,
DataType::INT64,
false,
std::nullopt);
auto ttl_fid =
schema->AddDebugField("ttl_field", DataType::TIMESTAMPTZ, true);
schema->set_primary_field_id(pk_fid);
schema->set_ttl_field_id(ttl_fid);
auto segment = CreateSealedSegment(
schema, nullptr, -1, SegcoreConfig::default_config(), true);
int test_data_count = 100;
auto cm = milvus::storage::RemoteChunkManagerSingleton::GetInstance()
.GetRemoteChunkManager();
std::vector<int64_t> pk_data(test_data_count);
std::iota(pk_data.begin(), pk_data.end(), 0);
uint64_t base_ts = 1000000000ULL << 18;
std::vector<int64_t> ts_data(test_data_count);
for (int i = 0; i < test_data_count; i++) {
ts_data[i] = static_cast<int64_t>(base_ts + i);
}
uint64_t base_physical_us = (base_ts >> 18) * 1000;
std::vector<int64_t> ttl_data(test_data_count);
// Arrow validity bitmap uses 1 bit per value (LSB-first within each byte).
std::vector<uint8_t> valid_data((test_data_count + 7) / 8, 0);
for (int i = 0; i < test_data_count; i++) {
const size_t byte_idx = static_cast<size_t>(i) >> 3;
const uint8_t bit_mask = static_cast<uint8_t>(1u << (i & 7));
if (i % 4 == 0) {
ttl_data[i] = 0;
valid_data[byte_idx] &= static_cast<uint8_t>(~bit_mask);
} else if (i % 4 == 1) {
ttl_data[i] = static_cast<int64_t>(base_physical_us - 10);
valid_data[byte_idx] |= bit_mask;
} else {
ttl_data[i] = static_cast<int64_t>(base_physical_us + 10);
valid_data[byte_idx] |= bit_mask;
}
}
{
auto field_data =
std::make_shared<FieldData<int64_t>>(DataType::INT64, false);
field_data->FillFieldData(pk_data.data(), test_data_count);
std::vector<FieldDataPtr> field_datas = {field_data};
auto load_info = PrepareSingleFieldInsertBinlog(kCollectionID,
kPartitionID,
kSegmentID,
pk_fid.get(),
field_datas,
cm);
segment->LoadFieldData(load_info);
}
{
auto field_data =
std::make_shared<FieldData<int64_t>>(DataType::INT64, false);
field_data->FillFieldData(ts_data.data(), test_data_count);
std::vector<FieldDataPtr> field_datas = {field_data};
auto load_info = PrepareSingleFieldInsertBinlog(kCollectionID,
kPartitionID,
kSegmentID,
TimestampFieldID.get(),
field_datas,
cm);
segment->LoadFieldData(load_info);
}
{
auto field_data =
std::make_shared<FieldData<int64_t>>(DataType::TIMESTAMPTZ, true);
field_data->FillFieldData(
ttl_data.data(), valid_data.data(), test_data_count, 0);
std::vector<FieldDataPtr> field_datas = {field_data};
auto load_info = PrepareSingleFieldInsertBinlog(kCollectionID,
kPartitionID,
kSegmentID,
ttl_fid.get(),
field_datas,
cm);
segment->LoadFieldData(load_info);
}
// Test TTL field filtering using CompileExpressions pathway
Timestamp query_ts = base_ts + test_data_count;
int64_t active_count = segment->get_active_count(query_ts);
// Create an expression list with AlwaysTrueExpr - CompileExpressions will
// automatically add TTL field filtering expression
auto always_true_expr = std::make_shared<expr::AlwaysTrueExpr>();
auto plan = std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID,
always_true_expr);
// Execute query expression - this will trigger CompileExpressions which
// automatically adds TTL field filtering expression
BitsetType bitset =
query::ExecuteQueryExpr(plan, segment.get(), active_count, query_ts);
// Note: ExecuteQueryExpr already flips the bitset, so bitset[i] = true
// means the row matches (not expired), bitset[i] = false means expired
BitsetTypeView bitset_view(bitset);
// Verify results:
// After ExecuteQueryExpr, bitset[i] = true means row matches (not expired)
// bitset[i] = false means row is filtered out (expired)
// - i % 4 == 0: null, should NOT be expired (bitset_view[i] = true)
// - i % 4 == 1: expired (TTL < current time), should be expired (bitset_view[i] = false)
// - i % 4 == 2 or 3: not expired, should NOT be expired (bitset_view[i] = true)
int expired_count = 0;
for (int i = 0; i < test_data_count; i++) {
if (i % 4 == 0) {
// Null value should not be expired (should match)
EXPECT_TRUE(bitset_view[i])
<< "Row " << i << " (null) should not be expired";
} else if (i % 4 == 1) {
// Should be expired (should be filtered out)
EXPECT_FALSE(bitset_view[i]) << "Row " << i << " should be expired";
expired_count++;
} else {
// Should not be expired (should match)
EXPECT_TRUE(bitset_view[i])
<< "Row " << i << " should not be expired";
}
}
EXPECT_EQ(expired_count, test_data_count / 4);
}

View File

@ -10,6 +10,7 @@
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <gtest/gtest.h>
#include <numeric>
#include "common/Types.h"
#include "common/IndexMeta.h"
@ -24,6 +25,7 @@
#include "test_utils/DataGen.h"
#include "test_utils/storage_test_utils.h"
#include "test_utils/GenExprProto.h"
#include "query/ExecPlanNodeVisitor.h"
using namespace milvus::segcore;
using namespace milvus;
@ -971,3 +973,252 @@ TEST(GrowingTest, SearchVectorArray) {
auto sr_parsed = SearchResultToJson(*sr);
std::cout << sr_parsed.dump(1) << std::endl;
}
TEST(Growing, TestMaskWithTTLField) {
auto schema = std::make_shared<Schema>();
auto pk_fid = schema->AddDebugField("pk", DataType::INT64, false);
auto ttl_fid =
schema->AddDebugField("ttl_field", DataType::TIMESTAMPTZ, false);
schema->set_primary_field_id(pk_fid);
schema->set_ttl_field_id(ttl_fid);
auto segment = CreateGrowingSegment(schema, empty_index_meta);
auto segment_impl = dynamic_cast<SegmentGrowingImpl*>(segment.get());
ASSERT_NE(segment_impl, nullptr);
int64_t test_data_count = 100;
uint64_t base_ts = 1000000000ULL << 18;
std::vector<Timestamp> ts_data(test_data_count);
for (int i = 0; i < test_data_count; i++) {
ts_data[i] = base_ts + i;
}
std::vector<idx_t> row_ids(test_data_count);
std::iota(row_ids.begin(), row_ids.end(), 0);
std::vector<int64_t> pk_data(test_data_count);
std::iota(pk_data.begin(), pk_data.end(), 0);
uint64_t base_physical_us = (base_ts >> 18) * 1000;
std::vector<int64_t> ttl_data(test_data_count);
for (int i = 0; i < test_data_count; i++) {
if (i < test_data_count / 2) {
ttl_data[i] = static_cast<int64_t>(base_physical_us - 10);
} else {
ttl_data[i] = static_cast<int64_t>(base_physical_us + 10);
}
}
auto insert_record_proto = std::make_unique<InsertRecordProto>();
insert_record_proto->set_num_rows(test_data_count);
{
auto field_data = insert_record_proto->add_fields_data();
field_data->set_field_id(pk_fid.get());
field_data->set_type(proto::schema::DataType::Int64);
auto* scalars = field_data->mutable_scalars();
auto* data = scalars->mutable_long_data();
for (auto v : pk_data) {
data->add_data(v);
}
}
{
auto field_data = insert_record_proto->add_fields_data();
field_data->set_field_id(ttl_fid.get());
field_data->set_type(proto::schema::DataType::Timestamptz);
auto* scalars = field_data->mutable_scalars();
auto* data = scalars->mutable_timestamptz_data();
for (auto v : ttl_data) {
data->add_data(v);
}
}
auto offset = segment->PreInsert(test_data_count);
segment->Insert(offset,
test_data_count,
row_ids.data(),
ts_data.data(),
insert_record_proto.get());
// Test TTL field filtering using CompileExpressions pathway
Timestamp query_ts = base_ts + test_data_count;
int64_t active_count = segment->get_active_count(query_ts);
// Create an expression list with AlwaysTrueExpr - CompileExpressions will
// automatically add TTL field filtering expression
auto always_true_expr = std::make_shared<expr::AlwaysTrueExpr>();
auto plan = std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID,
always_true_expr);
// Execute query expression - this will trigger CompileExpressions which
// automatically adds TTL field filtering expression
BitsetType bitset =
query::ExecuteQueryExpr(plan, segment.get(), active_count, query_ts);
// Note: ExecuteQueryExpr already flips the bitset, so bitset[i] = true
// means the row matches (not expired), bitset[i] = false means expired
BitsetTypeView bitset_view(bitset);
// Verify results:
// After ExecuteQueryExpr, bitset[i] = true means row matches (not expired)
// bitset[i] = false means row is filtered out (expired)
// - i < test_data_count / 2: expired (TTL < current time), should be expired (bitset_view[i] = false)
// - i >= test_data_count / 2: not expired, should NOT be expired (bitset_view[i] = true)
int expired_count = 0;
for (int i = 0; i < test_data_count; i++) {
if (!bitset_view[i]) {
expired_count++;
}
}
EXPECT_EQ(expired_count, test_data_count / 2);
for (int i = 0; i < test_data_count / 2; i++) {
EXPECT_FALSE(bitset_view[i]) << "Row " << i << " should be expired";
}
for (int i = test_data_count / 2; i < test_data_count; i++) {
EXPECT_TRUE(bitset_view[i]) << "Row " << i << " should not be expired";
}
}
// Test TTL field filtering with nullable field for Growing segment
TEST(Growing, TestMaskWithNullableTTLField) {
// Create schema with nullable TTL field
auto schema = std::make_shared<Schema>();
auto pk_fid = schema->AddDebugField("pk", DataType::INT64, false);
auto ttl_fid = schema->AddDebugField(
"ttl_field", DataType::TIMESTAMPTZ, true); // nullable
schema->set_primary_field_id(pk_fid);
schema->set_ttl_field_id(ttl_fid);
auto segment = CreateGrowingSegment(schema, empty_index_meta);
auto segment_impl = dynamic_cast<SegmentGrowingImpl*>(segment.get());
ASSERT_NE(segment_impl, nullptr);
int64_t test_data_count = 100;
// Generate timestamp data
uint64_t base_ts = 1000000000ULL << 18;
std::vector<Timestamp> ts_data(test_data_count);
for (int i = 0; i < test_data_count; i++) {
ts_data[i] = base_ts + i;
}
// Generate row IDs
std::vector<idx_t> row_ids(test_data_count);
std::iota(row_ids.begin(), row_ids.end(), 0);
// Generate PK data
std::vector<int64_t> pk_data(test_data_count);
std::iota(pk_data.begin(), pk_data.end(), 0);
// Generate TTL data with some nulls
uint64_t base_physical_us = (base_ts >> 18) * 1000;
std::vector<int64_t> ttl_data(test_data_count);
std::vector<bool> valid_data(test_data_count);
for (int i = 0; i < test_data_count; i++) {
if (i % 4 == 0) {
// Null value - should not expire
ttl_data[i] = 0;
valid_data[i] = false;
} else if (i % 4 == 1) {
// Expired
ttl_data[i] = static_cast<int64_t>(base_physical_us - 10);
valid_data[i] = true;
} else {
// Not expired
ttl_data[i] = static_cast<int64_t>(base_physical_us + 10);
valid_data[i] = true;
}
}
// Create insert record proto
auto insert_record_proto = std::make_unique<InsertRecordProto>();
insert_record_proto->set_num_rows(test_data_count);
// Add PK field data
{
auto field_data = insert_record_proto->add_fields_data();
field_data->set_field_id(pk_fid.get());
field_data->set_type(proto::schema::DataType::Int64);
auto* scalars = field_data->mutable_scalars();
auto* data = scalars->mutable_long_data();
for (auto v : pk_data) {
data->add_data(v);
}
}
// Add nullable TTL field data
{
auto field_data = insert_record_proto->add_fields_data();
field_data->set_field_id(ttl_fid.get());
field_data->set_type(proto::schema::DataType::Timestamptz);
auto* scalars = field_data->mutable_scalars();
auto* data = scalars->mutable_timestamptz_data();
for (auto v : ttl_data) {
data->add_data(v);
}
// Add valid_data for nullable field
// Note: valid_data[i] = false means null, valid_data[i] = true means non-null
for (size_t i = 0; i < valid_data.size(); ++i) {
field_data->add_valid_data(valid_data[i]);
}
// Verify valid_data was added correctly
ASSERT_EQ(field_data->valid_data_size(), test_data_count)
<< "valid_data size mismatch: expected " << test_data_count
<< ", got " << field_data->valid_data_size();
}
// Insert data
auto offset = segment->PreInsert(test_data_count);
segment->Insert(offset,
test_data_count,
row_ids.data(),
ts_data.data(),
insert_record_proto.get());
// Test TTL field filtering using CompileExpressions pathway
Timestamp query_ts = base_ts + test_data_count;
int64_t active_count = segment->get_active_count(query_ts);
// Create an expression list with AlwaysTrueExpr - CompileExpressions will
// automatically add TTL field filtering expression
auto always_true_expr = std::make_shared<expr::AlwaysTrueExpr>();
auto plan = std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID,
always_true_expr);
// Execute query expression - this will trigger CompileExpressions which
// automatically adds TTL field filtering expression
BitsetType bitset =
query::ExecuteQueryExpr(plan, segment.get(), active_count, query_ts);
// Note: ExecuteQueryExpr already flips the bitset, so bitset[i] = true
// means the row matches (not expired), bitset[i] = false means expired
BitsetTypeView bitset_view(bitset);
// Verify results:
// After ExecuteQueryExpr, bitset[i] = true means row matches (not expired)
// bitset[i] = false means row is filtered out (expired)
// - i % 4 == 0: null, should NOT be expired (bitset_view[i] = true)
// - i % 4 == 1: expired (TTL < current time), should be expired (bitset_view[i] = false)
// - i % 4 == 2 or 3: not expired, should NOT be expired (bitset_view[i] = true)
int expired_count = 0;
for (int i = 0; i < test_data_count; i++) {
if (i % 4 == 0) {
// Null value should not be expired (should match)
EXPECT_TRUE(bitset_view[i])
<< "Row " << i << " (null) should not be expired";
} else if (i % 4 == 1) {
// Should be expired (should be filtered out)
EXPECT_FALSE(bitset_view[i]) << "Row " << i << " should be expired";
expired_count++;
} else {
// Should not be expired (should match)
EXPECT_TRUE(bitset_view[i])
<< "Row " << i << " should not be expired";
}
}
EXPECT_EQ(expired_count, test_data_count / 4);
}

View File

@ -23,6 +23,7 @@
#include "cachinglayer/Utils.h"
#include "segcore/ConcurrentVector.h"
#include "segcore/Types.h"
#include "common/Consts.h"
namespace milvus::segcore {
@ -176,4 +177,18 @@ void
LoadIndexData(milvus::tracer::TraceContext& ctx,
milvus::segcore::LoadIndexInfo* load_index_info);
/**
* Convert Milvus timestamp to physical time in milliseconds.
* Milvus timestamp format: physical time in the high bits, logical counter in
* the lower LOGICAL_BITS bits. Shifting by LOGICAL_BITS extracts the physical
* time component in milliseconds.
*
* @param timestamp Milvus timestamp value
* @return Physical time in millisecond
*/
inline uint64_t
TimestampToPhysicalMs(Timestamp timestamp) {
return timestamp >> LOGICAL_BITS;
}
} // namespace milvus::segcore

View File

@ -148,7 +148,7 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte
continue
}
collectionTTL, err := common.GetCollectionTTLFromMap(collection.Properties, paramtable.Get().CommonCfg.EntityExpirationTTL.GetAsDuration(time.Second))
collectionTTL, err := common.GetCollectionTTLFromMap(collection.Properties)
if err != nil {
log.Warn("get collection ttl failed, skip to handle compaction")
return make([]CompactionView, 0), 0, err

View File

@ -3,7 +3,6 @@ package datacoord
import (
"context"
"fmt"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
@ -80,7 +79,7 @@ func (policy *forceMergeCompactionPolicy) triggerOneCollection(
return nil, 0, err
}
collectionTTL, err := common.GetCollectionTTLFromMap(collection.Properties, paramtable.Get().CommonCfg.EntityExpirationTTL.GetAsDuration(time.Second))
collectionTTL, err := common.GetCollectionTTLFromMap(collection.Properties)
if err != nil {
log.Warn("failed to get collection ttl, use default", zap.Error(err))
collectionTTL = 0

View File

@ -29,7 +29,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
// singleCompactionPolicy is to compact one segment with too many delta logs
@ -112,7 +111,7 @@ func (policy *singleCompactionPolicy) triggerSegmentSortCompaction(
return nil
}
collectionTTL, err := common.GetCollectionTTLFromMap(collection.Properties, paramtable.Get().CommonCfg.EntityExpirationTTL.GetAsDuration(time.Second))
collectionTTL, err := common.GetCollectionTTLFromMap(collection.Properties)
if err != nil {
log.Warn("failed to apply triggerSegmentSortCompaction, get collection ttl failed")
return nil
@ -228,7 +227,7 @@ func (policy *singleCompactionPolicy) triggerOneCollection(ctx context.Context,
return nil, nil, 0, nil
}
collectionTTL, err := common.GetCollectionTTLFromMap(collection.Properties, paramtable.Get().CommonCfg.EntityExpirationTTL.GetAsDuration(time.Second))
collectionTTL, err := common.GetCollectionTTLFromMap(collection.Properties)
if err != nil {
log.Warn("failed to apply singleCompactionPolicy, get collection ttl failed")
return nil, nil, 0, err

View File

@ -239,7 +239,7 @@ func isCollectionAutoCompactionEnabled(coll *collectionInfo) bool {
}
func getCompactTime(ts Timestamp, coll *collectionInfo) (*compactTime, error) {
collectionTTL, err := common.GetCollectionTTLFromMap(coll.Properties, paramtable.Get().CommonCfg.EntityExpirationTTL.GetAsDuration(time.Second))
collectionTTL, err := common.GetCollectionTTLFromMap(coll.Properties)
if err != nil {
return nil, err
}
@ -694,6 +694,39 @@ func (t *compactionTrigger) ShouldCompactExpiry(fromTs uint64, compactTime *comp
return false
}
func getExpirQuantilesIndexByRatio(ratio float64, percentilesLen int) int {
// expirQuantiles is [20%, 40%, 60%, 80%, 100%] (len = 5).
// We map ratio to the nearest lower 20% bucket:
// 0~0.39 -> 20%, 0.4~0.59 -> 40%, 0.6~0.79 -> 60%, 0.8~0.99 -> 80%, >=1.0 -> 100%
if percentilesLen <= 0 {
return 0
}
step := 0.2
idx := int((ratio+0.01)/step) - 1 // add 0.01 to avoid rounding error
if idx < 0 {
idx = 0
}
if idx >= percentilesLen {
idx = percentilesLen - 1
}
return idx
}
func (t *compactionTrigger) ShouldCompactExpiryWithTTLField(compactTime *compactTime, segment *SegmentInfo) bool {
percentiles := segment.GetExpirQuantiles()
if len(percentiles) == 0 {
return false
}
ratio := Params.DataCoordCfg.SingleCompactionRatioThreshold.GetAsFloat()
index := getExpirQuantilesIndexByRatio(ratio, len(percentiles))
expirationTime := percentiles[index]
// If current time (startTime) is greater than the expiration time at this percentile, trigger compaction
startTs := tsoutil.PhysicalTime(compactTime.startTime)
return startTs.UnixMicro() >= expirationTime && expirationTime > 0
}
func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compactTime *compactTime) bool {
// no longer restricted binlog numbers because this is now related to field numbers
log := log.Ctx(context.TODO())
@ -738,6 +771,14 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa
return true
}
if t.ShouldCompactExpiryWithTTLField(compactTime, segment) {
log.Info("ttl field is expired, trigger compaction", zap.Int64("segmentID", segment.ID),
zap.Int64("collectionID", segment.CollectionID),
zap.Int64("partitionID", segment.PartitionID),
zap.String("channel", segment.InsertChannel))
return true
}
return false
}

View File

@ -3077,3 +3077,63 @@ func Test_compactionTrigger_generatePlansByTime(t *testing.T) {
})
}
}
func Test_compactionTrigger_ShouldCompactExpiryWithTTLField(t *testing.T) {
trigger := &compactionTrigger{}
ts := time.Now()
segment := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
ExpirQuantiles: []int64{ts.UnixMicro(), ts.Add(time.Hour).UnixMicro(), ts.Add(2 * time.Hour).UnixMicro(), ts.Add(3 * time.Hour).UnixMicro(), ts.Add(4 * time.Hour).UnixMicro()},
},
}
startTime := tsoutil.ComposeTSByTime(ts.Add(time.Minute), 0)
ct := &compactTime{startTime: startTime, collectionTTL: 0}
shouldCompact := trigger.ShouldCompactExpiryWithTTLField(ct, segment)
assert.True(t, shouldCompact)
startTime = tsoutil.ComposeTSByTime(ts.Add(-1*time.Hour), 0)
ct = &compactTime{startTime: startTime, collectionTTL: 0}
shouldCompact = trigger.ShouldCompactExpiryWithTTLField(ct, segment)
assert.False(t, shouldCompact)
origin := Params.DataCoordCfg.SingleCompactionRatioThreshold.GetValue()
defer Params.Save(Params.DataCoordCfg.SingleCompactionRatioThreshold.Key, origin)
Params.Save(Params.DataCoordCfg.SingleCompactionRatioThreshold.Key, "0.1")
startTime = tsoutil.ComposeTSByTime(ts.Add(time.Minute), 0)
ct = &compactTime{startTime: startTime, collectionTTL: 0}
shouldCompact = trigger.ShouldCompactExpiryWithTTLField(ct, segment)
assert.True(t, shouldCompact)
Params.Save(Params.DataCoordCfg.SingleCompactionRatioThreshold.Key, "5")
startTime = tsoutil.ComposeTSByTime(ts.Add(time.Minute), 0)
ct = &compactTime{startTime: startTime, collectionTTL: 0}
shouldCompact = trigger.ShouldCompactExpiryWithTTLField(ct, segment)
assert.False(t, shouldCompact)
segment2 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
},
}
startTime = tsoutil.ComposeTSByTime(ts.Add(time.Minute), 0)
ct = &compactTime{startTime: startTime, collectionTTL: 0}
shouldCompact = trigger.ShouldCompactExpiryWithTTLField(ct, segment2)
assert.False(t, shouldCompact)
}

View File

@ -877,7 +877,7 @@ func createSortCompactionTask(ctx context.Context,
return nil, err
}
collectionTTL, err := common.GetCollectionTTLFromMap(collection.Properties, paramtable.Get().CommonCfg.EntityExpirationTTL.GetAsDuration(time.Second))
collectionTTL, err := common.GetCollectionTTLFromMap(collection.Properties)
if err != nil {
log.Warn("failed to apply triggerSegmentSortCompaction, get collection ttl failed")
return nil, err

View File

@ -1731,6 +1731,7 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
IsInvisible: true,
StorageVersion: seg.GetStorageVersion(),
ManifestPath: seg.GetManifest(),
ExpirQuantiles: seg.GetExpirQuantiles(),
}
segment := NewSegmentInfo(segmentInfo)
compactToSegInfos = append(compactToSegInfos, segment)
@ -1839,8 +1840,9 @@ func (m *meta) completeMixCompactionMutation(
DmlPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetDmlPosition()
})),
IsSorted: compactToSegment.GetIsSorted(),
ManifestPath: compactToSegment.GetManifest(),
IsSorted: compactToSegment.GetIsSorted(),
ManifestPath: compactToSegment.GetManifest(),
ExpirQuantiles: compactToSegment.GetExpirQuantiles(),
})
if compactToSegmentInfo.GetNumOfRows() == 0 {
@ -1857,6 +1859,7 @@ func (m *meta) completeMixCompactionMutation(
zap.Int("statslog count", len(compactToSegmentInfo.GetStatslogs())),
zap.Int("deltalog count", len(compactToSegmentInfo.GetDeltalogs())),
zap.Int64("segment size", compactToSegmentInfo.getSegmentSize()),
zap.Int64s("expirQuantiles", compactToSegmentInfo.GetExpirQuantiles()),
)
compactToSegments = append(compactToSegments, compactToSegmentInfo)
}
@ -2334,6 +2337,7 @@ func (m *meta) completeSortCompactionMutation(
CompactionFrom: []int64{compactFromSegID},
IsSorted: true,
ManifestPath: resultSegment.GetManifest(),
ExpirQuantiles: resultSegment.GetExpirQuantiles(),
}
segment := NewSegmentInfo(segmentInfo)
@ -2355,7 +2359,8 @@ func (m *meta) completeSortCompactionMutation(
log.Info("meta update: prepare for complete stats mutation - complete",
zap.Int64("num rows", segment.GetNumOfRows()),
zap.Int64("segment size", segment.getSegmentSize()))
zap.Int64("segment size", segment.getSegmentSize()),
zap.Int64s("expirQuantiles", segment.GetExpirQuantiles()))
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{cloned.SegmentInfo, segment.SegmentInfo}, metastore.BinlogsIncrement{Segment: segment.SegmentInfo}); err != nil {
log.Warn("fail to alter segments and new segment", zap.Error(err))
return nil, nil, err

View File

@ -90,6 +90,8 @@ type clusteringCompactionTask struct {
clusteringKeyField *schemapb.FieldSchema
primaryKeyField *schemapb.FieldSchema
ttlFieldID int64
memoryLimit int64
bufferSize int64
@ -241,6 +243,7 @@ func (t *clusteringCompactionTask) init() error {
}
t.primaryKeyField = pkField
t.ttlFieldID = getTTLFieldID(t.plan.GetSchema())
t.isVectorClusteringKey = typeutil.IsVectorType(t.clusteringKeyField.DataType)
t.currentTime = time.Now()
t.memoryLimit = t.getMemoryLimit()
@ -345,7 +348,8 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
t.partitionID, t.collectionID, t.plan.Channel, 100,
storage.WithBufferSize(t.bufferSize),
storage.WithStorageConfig(t.compactionParams.StorageConfig),
storage.WithUseLoonFFI(t.compactionParams.UseLoonFFI))
storage.WithUseLoonFFI(t.compactionParams.UseLoonFFI),
)
if err != nil {
return err
}
@ -369,7 +373,8 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
t.partitionID, t.collectionID, t.plan.Channel, 100,
storage.WithBufferSize(t.bufferSize),
storage.WithStorageConfig(t.compactionParams.StorageConfig),
storage.WithUseLoonFFI(t.compactionParams.UseLoonFFI))
storage.WithUseLoonFFI(t.compactionParams.UseLoonFFI),
)
if err != nil {
return err
}
@ -430,7 +435,8 @@ func (t *clusteringCompactionTask) generatedVectorPlan(ctx context.Context, buff
t.partitionID, t.collectionID, t.plan.Channel, 100,
storage.WithBufferSize(t.bufferSize),
storage.WithStorageConfig(t.compactionParams.StorageConfig),
storage.WithUseLoonFFI(t.compactionParams.UseLoonFFI))
storage.WithUseLoonFFI(t.compactionParams.UseLoonFFI),
)
if err != nil {
return err
}
@ -642,6 +648,8 @@ func (t *clusteringCompactionTask) mappingSegment(
}
defer rr.Close()
hasTTLField := t.ttlFieldID >= common.StartOfUserFieldID
offset := int64(-1)
for {
r, err := rr.Next()
@ -662,15 +670,22 @@ func (t *clusteringCompactionTask) mappingSegment(
for _, v := range vs {
offset++
if entityFilter.Filtered((*v).PK.GetValue(), uint64((*v).Timestamp)) {
continue
}
row, ok := (*v).Value.(map[typeutil.UniqueID]interface{})
if !ok {
log.Warn("convert interface to map wrong")
return errors.New("unexpected error")
}
expireTs := int64(-1)
if hasTTLField {
if val, exists := row[t.ttlFieldID]; exists {
if v, ok := val.(int64); ok {
expireTs = v
}
}
}
if entityFilter.Filtered((*v).PK.GetValue(), uint64((*v).Timestamp), expireTs) {
continue
}
clusteringKey := row[t.clusteringKeyField.FieldID]
var clusterBuffer *ClusterBuffer
@ -905,6 +920,9 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
requiredFields := typeutil.NewSet[int64]()
requiredFields.Insert(0, 1, t.primaryKeyField.GetFieldID(), t.clusteringKeyField.GetFieldID())
if t.ttlFieldID >= common.StartOfUserFieldID {
requiredFields.Insert(t.ttlFieldID)
}
selectedFields := lo.Filter(t.plan.GetSchema().GetFields(), func(field *schemapb.FieldSchema, _ int) bool {
return requiredFields.Contain(field.GetFieldID())
})
@ -977,6 +995,7 @@ func (t *clusteringCompactionTask) iterAndGetScalarAnalyzeResult(pkIter *storage
remained int64 = 0
analyzeResult map[interface{}]int64 = make(map[interface{}]int64, 0)
)
hasTTLField := t.ttlFieldID >= common.StartOfUserFieldID
for {
v, err := pkIter.NextValue()
if err != nil {
@ -989,16 +1008,26 @@ func (t *clusteringCompactionTask) iterAndGetScalarAnalyzeResult(pkIter *storage
}
}
// Filtering expired entity
if expiredFilter.Filtered((*v).PK.GetValue(), uint64((*v).Timestamp)) {
continue
}
// rowValue := vIter.GetData().(*iterators.InsertRow).GetValue()
row, ok := (*v).Value.(map[typeutil.UniqueID]interface{})
if !ok {
return nil, 0, errors.New("unexpected error")
}
expireTs := int64(-1)
if hasTTLField {
if val, exists := row[t.ttlFieldID]; exists {
if v, ok := val.(int64); ok {
expireTs = v
}
}
}
// Filtering expired entity
if expiredFilter.Filtered((*v).PK.GetValue(), uint64((*v).Timestamp), expireTs) {
continue
}
key := row[t.clusteringKeyField.GetFieldID()]
if _, exist := analyzeResult[key]; exist {
analyzeResult[key] = analyzeResult[key] + 1

View File

@ -58,7 +58,6 @@ func (s *ClusteringCompactionTaskStorageV2Suite) SetupTest() {
}
func (s *ClusteringCompactionTaskStorageV2Suite) TearDownTest() {
paramtable.Get().Reset(paramtable.Get().CommonCfg.EntityExpirationTTL.Key)
paramtable.Get().Reset("common.storage.enableV2")
os.RemoveAll(paramtable.Get().LocalStorageCfg.Path.GetValue() + "insert_log")
os.RemoveAll(paramtable.Get().LocalStorageCfg.Path.GetValue() + "delta_log")

View File

@ -83,7 +83,6 @@ func (s *ClusteringCompactionTaskSuite) setupTest() {
s.task = NewClusteringCompactionTask(context.Background(), s.mockBinlogIO, nil, compaction.GenParams())
paramtable.Get().Save(paramtable.Get().CommonCfg.EntityExpirationTTL.Key, "0")
params, err := compaction.GenerateJSONParams()
if err != nil {
panic(err)
@ -117,7 +116,6 @@ func (s *ClusteringCompactionTaskSuite) SetupSubTest() {
}
func (s *ClusteringCompactionTaskSuite) TearDownTest() {
paramtable.Get().Reset(paramtable.Get().CommonCfg.EntityExpirationTTL.Key)
paramtable.Get().Reset(paramtable.Get().CommonCfg.StorageType.Key)
}

View File

@ -25,10 +25,12 @@ import (
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
@ -212,3 +214,22 @@ func mergeFieldBinlogs(base, paths map[typeutil.UniqueID]*datapb.FieldBinlog) {
base[fID].Binlogs = append(base[fID].Binlogs, fpath.GetBinlogs()...)
}
}
func getTTLFieldID(schema *schemapb.CollectionSchema) int64 {
ttlFieldName := ""
for _, pair := range schema.GetProperties() {
if pair.GetKey() == common.CollectionTTLFieldKey {
ttlFieldName = pair.GetValue()
break
}
}
if ttlFieldName == "" {
return -1
}
for _, field := range schema.GetFields() {
if field.GetName() == ttlFieldName && field.GetDataType() == schemapb.DataType_Timestamptz {
return field.GetFieldID()
}
}
return -1
}

View File

@ -45,7 +45,8 @@ func mergeSortMultipleSegments(ctx context.Context,
compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc)
writer, err := NewMultiSegmentWriter(ctx, binlogIO, compAlloc, plan.GetMaxSize(), plan.GetSchema(), compactionParams, maxRows, partitionID, collectionID, plan.GetChannel(), 4096,
storage.WithStorageConfig(compactionParams.StorageConfig),
storage.WithUseLoonFFI(compactionParams.UseLoonFFI))
storage.WithUseLoonFFI(compactionParams.UseLoonFFI),
)
if err != nil {
return nil, err
}
@ -56,6 +57,9 @@ func mergeSortMultipleSegments(ctx context.Context,
return nil, err
}
ttlFieldID := getTTLFieldID(plan.GetSchema())
hasTTLField := ttlFieldID >= common.StartOfUserFieldID
segmentReaders := make([]storage.RecordReader, len(binlogs))
segmentFilters := make([]compaction.EntityFilter, len(binlogs))
for i, s := range binlogs {
@ -108,13 +112,27 @@ func mergeSortMultipleSegments(ctx context.Context,
predicate = func(r storage.Record, ri, i int) bool {
pk := r.Column(pkField.FieldID).(*array.Int64).Value(i)
ts := r.Column(common.TimeStampField).(*array.Int64).Value(i)
return !segmentFilters[ri].Filtered(pk, uint64(ts))
expireTs := int64(-1)
if hasTTLField {
col := r.Column(ttlFieldID).(*array.Int64)
if col.IsValid(i) {
expireTs = col.Value(i)
}
}
return !segmentFilters[ri].Filtered(pk, uint64(ts), expireTs)
}
case schemapb.DataType_VarChar:
predicate = func(r storage.Record, ri, i int) bool {
pk := r.Column(pkField.FieldID).(*array.String).Value(i)
ts := r.Column(common.TimeStampField).(*array.Int64).Value(i)
return !segmentFilters[ri].Filtered(pk, uint64(ts))
expireTs := int64(-1)
if hasTTLField {
col := r.Column(ttlFieldID).(*array.Int64)
if col.IsValid(i) {
expireTs = col.Value(i)
}
}
return !segmentFilters[ri].Filtered(pk, uint64(ts), expireTs)
}
default:
log.Warn("compaction only support int64 and varchar pk field")

View File

@ -66,6 +66,8 @@ type mixCompactionTask struct {
compactionParams compaction.Params
sortByFieldIDs []int64
ttlFieldID int64
}
var _ Compactor = (*mixCompactionTask)(nil)
@ -109,7 +111,7 @@ func (t *mixCompactionTask) preCompact() error {
t.partitionID = t.plan.GetSegmentBinlogs()[0].GetPartitionID()
t.targetSize = t.plan.GetMaxSize()
t.bm25FieldIDs = GetBM25FieldIDs(t.plan.GetSchema())
t.ttlFieldID = getTTLFieldID(t.plan.GetSchema())
currSize := int64(0)
for _, segmentBinlog := range t.plan.GetSegmentBinlogs() {
for i, fieldBinlog := range segmentBinlog.GetFieldBinlogs() {
@ -150,7 +152,12 @@ func (t *mixCompactionTask) mergeSplit(
segIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedSegmentIDs().GetBegin(), t.plan.GetPreAllocatedSegmentIDs().GetEnd())
logIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedLogIDs().GetBegin(), t.plan.GetPreAllocatedLogIDs().GetEnd())
compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc)
mWriter, err := NewMultiSegmentWriter(ctx, t.binlogIO, compAlloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.maxRows, t.partitionID, t.collectionID, t.GetChannelName(), 4096, storage.WithStorageConfig(t.compactionParams.StorageConfig), storage.WithUseLoonFFI(t.compactionParams.UseLoonFFI))
mWriter, err := NewMultiSegmentWriter(ctx,
t.binlogIO, compAlloc, t.plan.GetMaxSize(), t.plan.GetSchema(),
t.compactionParams, t.maxRows, t.partitionID, t.collectionID, t.GetChannelName(), 4096,
storage.WithStorageConfig(t.compactionParams.StorageConfig),
storage.WithUseLoonFFI(t.compactionParams.UseLoonFFI),
)
if err != nil {
return nil, err
}
@ -243,6 +250,8 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context,
}
defer reader.Close()
hasTTLField := t.ttlFieldID >= common.StartOfUserFieldID
for {
var r storage.Record
r, err = reader.Next()
@ -257,12 +266,18 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context,
}
var (
pkArray = r.Column(pkField.FieldID)
tsArray = r.Column(common.TimeStampField).(*array.Int64)
pkArray = r.Column(pkField.FieldID)
tsArray = r.Column(common.TimeStampField).(*array.Int64)
ttlArr *array.Int64
sliceStart = -1
rb *storage.RecordBuilder
)
if hasTTLField {
ttlArr = r.Column(t.ttlFieldID).(*array.Int64)
}
for i := range r.Len() {
// Filtering deleted entities
var pk any
@ -275,7 +290,13 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context,
panic("invalid data type")
}
ts := typeutil.Timestamp(tsArray.Value(i))
if entityFilter.Filtered(pk, ts) {
expireTs := int64(-1)
if hasTTLField {
if ttlArr.IsValid(i) {
expireTs = ttlArr.Value(i)
}
}
if entityFilter.Filtered(pk, ts, expireTs) {
if rb == nil {
rb = storage.NewRecordBuilder(t.plan.GetSchema())
}
@ -329,6 +350,9 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
defer span.End()
compactStart := time.Now()
log.Info("compact start", zap.Any("compactionParams", t.compactionParams),
zap.Any("plan", t.plan))
if err := t.preCompact(); err != nil {
log.Warn("compact wrong, failed to preCompact", zap.Error(err))
return nil, err

View File

@ -64,7 +64,6 @@ func (s *MixCompactionTaskStorageV2Suite) SetupTest() {
}
func (s *MixCompactionTaskStorageV2Suite) TearDownTest() {
paramtable.Get().Reset(paramtable.Get().CommonCfg.EntityExpirationTTL.Key)
paramtable.Get().Reset("common.storageType")
paramtable.Get().Reset("common.storage.enableV2")
os.RemoveAll(paramtable.Get().LocalStorageCfg.Path.GetValue() + "insert_log")

View File

@ -70,7 +70,6 @@ func (s *MixCompactionTaskStorageV1Suite) setupTest() {
s.meta = genTestCollectionMeta()
paramtable.Get().Save(paramtable.Get().CommonCfg.EntityExpirationTTL.Key, "0")
params, err := compaction.GenerateJSONParams()
if err != nil {
panic(err)
@ -138,7 +137,6 @@ func (s *MixCompactionTaskStorageV1Suite) SetupSubTest() {
}
func (s *MixCompactionTaskStorageV1Suite) TearDownTest() {
paramtable.Get().Reset(paramtable.Get().CommonCfg.EntityExpirationTTL.Key)
paramtable.Get().Reset("common.storageType")
paramtable.Get().Reset("common.storage.enablev2")
}

View File

@ -120,7 +120,7 @@ func (w *MultiSegmentWriter) closeWriter() error {
return err
}
fieldBinlogs, statsLog, bm25Logs, manifest := w.writer.GetLogs()
fieldBinlogs, statsLog, bm25Logs, manifest, expirQuantiles := w.writer.GetLogs()
result := &datapb.CompactionSegment{
SegmentID: w.currentSegmentID,
@ -131,6 +131,7 @@ func (w *MultiSegmentWriter) closeWriter() error {
Bm25Logs: lo.Values(bm25Logs),
StorageVersion: w.storageVersion,
Manifest: manifest,
ExpirQuantiles: expirQuantiles,
}
w.res = append(w.res, result)

View File

@ -67,6 +67,8 @@ type sortCompactionTask struct {
manifest string
useLoonFFI bool
ttlFieldID int64
done chan struct{}
tr *timerecord.TimeRecorder
@ -138,6 +140,7 @@ func (t *sortCompactionTask) preCompact() error {
t.segmentStorageVersion = segment.GetStorageVersion()
t.manifest = segment.GetManifest()
t.useLoonFFI = t.compactionParams.UseLoonFFI
t.ttlFieldID = getTTLFieldID(t.plan.GetSchema())
log.Ctx(t.ctx).Info("preCompaction analyze",
zap.Int64("planID", t.GetPlanID()),
@ -195,6 +198,7 @@ func (t *sortCompactionTask) sortSegment(ctx context.Context) (*datapb.Compactio
log.Warn("load deletePKs failed", zap.Error(err))
return nil, err
}
hasTTLField := t.ttlFieldID >= common.StartOfUserFieldID
entityFilter := compaction.NewEntityFilter(deletePKs, t.plan.GetCollectionTtl(), t.currentTime)
var predicate func(r storage.Record, ri, i int) bool
@ -203,13 +207,27 @@ func (t *sortCompactionTask) sortSegment(ctx context.Context) (*datapb.Compactio
predicate = func(r storage.Record, ri, i int) bool {
pk := r.Column(pkField.FieldID).(*array.Int64).Value(i)
ts := r.Column(common.TimeStampField).(*array.Int64).Value(i)
return !entityFilter.Filtered(pk, uint64(ts))
expireTs := int64(-1)
if hasTTLField {
col := r.Column(t.ttlFieldID).(*array.Int64)
if col.IsValid(i) {
expireTs = col.Value(i)
}
}
return !entityFilter.Filtered(pk, uint64(ts), expireTs)
}
case schemapb.DataType_VarChar:
predicate = func(r storage.Record, ri, i int) bool {
pk := r.Column(pkField.FieldID).(*array.String).Value(i)
ts := r.Column(common.TimeStampField).(*array.Int64).Value(i)
return !entityFilter.Filtered(pk, uint64(ts))
expireTs := int64(-1)
if hasTTLField {
col := r.Column(t.ttlFieldID).(*array.Int64)
if col.IsValid(i) {
expireTs = col.Value(i)
}
}
return !entityFilter.Filtered(pk, uint64(ts), expireTs)
}
default:
log.Warn("sort task only support int64 and varchar pk field")
@ -247,7 +265,7 @@ func (t *sortCompactionTask) sortSegment(ctx context.Context) (*datapb.Compactio
return nil, err
}
binlogs, stats, bm25stats, manifest := srw.GetLogs()
binlogs, stats, bm25stats, manifest, expirQuantiles := srw.GetLogs()
insertLogs := storage.SortFieldBinlogs(binlogs)
if err := binlog.CompressFieldBinlogs(insertLogs); err != nil {
return nil, err
@ -284,6 +302,7 @@ func (t *sortCompactionTask) sortSegment(ctx context.Context) (*datapb.Compactio
IsSorted: true,
StorageVersion: t.storageVersion,
Manifest: manifest,
ExpirQuantiles: expirQuantiles,
},
}
planResult := &datapb.CompactionPlanResult{

View File

@ -229,13 +229,13 @@ func (st *statsTask) sort(ctx context.Context) ([]*datapb.FieldBinlog, error) {
predicate = func(r storage.Record, ri, i int) bool {
pk := r.Column(pkField.FieldID).(*array.Int64).Value(i)
ts := r.Column(common.TimeStampField).(*array.Int64).Value(i)
return !entityFilter.Filtered(pk, uint64(ts))
return !entityFilter.Filtered(pk, uint64(ts), -1)
}
case schemapb.DataType_VarChar:
predicate = func(r storage.Record, ri, i int) bool {
pk := r.Column(pkField.FieldID).(*array.String).Value(i)
ts := r.Column(common.TimeStampField).(*array.Int64).Value(i)
return !entityFilter.Filtered(pk, uint64(ts))
return !entityFilter.Filtered(pk, uint64(ts), -1)
}
default:
log.Warn("sort task only support int64 and varchar pk field")
@ -263,7 +263,7 @@ func (st *statsTask) sort(ctx context.Context) ([]*datapb.FieldBinlog, error) {
return nil, err
}
binlogs, stats, bm25stats, _ := srw.GetLogs()
binlogs, stats, bm25stats, _, _ := srw.GetLogs()
insertLogs := storage.SortFieldBinlogs(binlogs)
if err := binlog.CompressFieldBinlogs(insertLogs); err != nil {
return nil, err

View File

@ -1815,6 +1815,12 @@ func (h *HandlersV2) createCollection(ctx context.Context, c *gin.Context, anyRe
Value: fmt.Sprintf("%v", httpReq.Params["ttlSeconds"]),
})
}
if _, ok := httpReq.Params["ttlField"]; ok {
req.Properties = append(req.Properties, &commonpb.KeyValuePair{
Key: common.CollectionTTLFieldKey,
Value: fmt.Sprintf("%v", httpReq.Params["ttlField"]),
})
}
if _, ok := httpReq.Params["partitionKeyIsolation"]; ok {
req.Properties = append(req.Properties, &commonpb.KeyValuePair{
Key: common.PartitionKeyIsolationKey,

View File

@ -354,6 +354,54 @@ func (t *createCollectionTask) validateClusteringKey(ctx context.Context) error
return nil
}
func validateCollectionTTL(props []*commonpb.KeyValuePair) (bool, error) {
for _, pair := range props {
if pair.Key == common.CollectionTTLConfigKey {
_, err := strconv.Atoi(pair.Value)
if err != nil {
return true, merr.WrapErrParameterInvalidMsg("collection TTL is not a valid positive integer")
}
return true, nil
}
}
return false, nil
}
func validateTTLField(props []*commonpb.KeyValuePair, fields []*schemapb.FieldSchema) (bool, error) {
for _, pair := range props {
if pair.Key == common.CollectionTTLFieldKey {
fieldName := pair.Value
for _, field := range fields {
if field.Name == fieldName {
if field.DataType != schemapb.DataType_Timestamptz {
return true, merr.WrapErrParameterInvalidMsg("ttl field must be timestamptz, field name = %s", fieldName)
}
return true, nil
}
}
return true, merr.WrapErrParameterInvalidMsg("ttl field name %s not found in schema", fieldName)
}
}
return false, nil
}
func (t *createCollectionTask) validateTTL() error {
hasCollectionTTL, err := validateCollectionTTL(t.GetProperties())
if err != nil {
return err
}
hasTTLField, err := validateTTLField(t.GetProperties(), t.schema.Fields)
if err != nil {
return err
}
if hasCollectionTTL && hasTTLField {
return merr.WrapErrParameterInvalidMsg("collection TTL and ttl field cannot be set at the same time")
}
return nil
}
func (t *createCollectionTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_CreateCollection
t.Base.SourceID = paramtable.GetNodeID()
@ -434,7 +482,7 @@ func (t *createCollectionTask) PreExecute(ctx context.Context) error {
}
// Validate collection ttl
_, err = common.GetCollectionTTL(t.GetProperties(), -1)
_, err = common.GetCollectionTTL(t.GetProperties())
if err != nil {
return merr.WrapErrParameterInvalidMsg("collection ttl property value not valid, parse error: %s", err.Error())
}
@ -476,6 +524,10 @@ func (t *createCollectionTask) PreExecute(ctx context.Context) error {
return err
}
if err := t.validateTTL(); err != nil {
return err
}
t.CreateCollectionRequest.Schema, err = proto.Marshal(t.schema)
if err != nil {
return err
@ -1217,6 +1269,24 @@ func hasMmapProp(props ...*commonpb.KeyValuePair) bool {
return false
}
func hasTTLProp(props ...*commonpb.KeyValuePair) bool {
for _, p := range props {
if p.GetKey() == common.CollectionTTLConfigKey {
return true
}
}
return false
}
func hasTTLFieldProp(props ...*commonpb.KeyValuePair) bool {
for _, p := range props {
if p.GetKey() == common.CollectionTTLFieldKey {
return true
}
}
return false
}
func hasLazyLoadProp(props ...*commonpb.KeyValuePair) bool {
for _, p := range props {
if p.GetKey() == common.LazyLoadEnableKey {
@ -1288,7 +1358,7 @@ func (t *alterCollectionTask) PreExecute(ctx context.Context) error {
}
}
_, err = common.GetCollectionTTL(t.GetProperties(), -1)
_, err = common.GetCollectionTTL(t.GetProperties())
if err != nil {
return merr.WrapErrParameterInvalidMsg("collection ttl properties value not valid, parse error: %s", err.Error())
}
@ -1308,6 +1378,24 @@ func (t *alterCollectionTask) PreExecute(ctx context.Context) error {
if exist && !timestamptz.IsTimezoneValid(userDefinedTimezone) {
return merr.WrapErrParameterInvalidMsg("unknown or invalid IANA Time Zone ID: %s", userDefinedTimezone)
}
hasTTL, err := validateCollectionTTL(t.GetProperties())
if err != nil {
return err
}
hasTTLField, err := validateTTLField(t.GetProperties(), collSchema.GetFields())
if err != nil {
return err
}
if hasTTL && hasTTLField {
return merr.WrapErrParameterInvalidMsg("collection TTL and ttl field cannot be set at the same time")
}
if hasTTL && hasTTLFieldProp(collSchema.GetProperties()...) {
return merr.WrapErrParameterInvalidMsg("ttl field is already exists, cannot be set collection TTL")
}
if hasTTLField && hasTTLProp(collSchema.GetProperties()...) {
return merr.WrapErrParameterInvalidMsg("collection TTL is already set, cannot be set ttl field")
}
} else if len(t.GetDeleteKeys()) > 0 {
key := hasPropInDeletekeys(t.DeleteKeys)
if key != "" {

View File

@ -73,6 +73,7 @@ const (
testFloat16VecField = "f16vec"
testBFloat16VecField = "bf16vec"
testStructArrayField = "structArray"
testTTLField = "ttl"
testGeometryField = "geometry"
testVecDim = 128
testMaxVarCharLength = 100
@ -236,6 +237,55 @@ func constructCollectionSchemaByDataType(collectionName string, fieldName2DataTy
}
}
func constructCollectionSchemaWithTTLField(collectionName string, ttlField string) *schemapb.CollectionSchema {
pk := &schemapb.FieldSchema{
FieldID: 100,
Name: int64Field,
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
TypeParams: nil,
IndexParams: nil,
AutoID: true,
}
tz := &schemapb.FieldSchema{
FieldID: 101,
Name: ttlField,
IsPrimaryKey: false,
Description: "",
DataType: schemapb.DataType_Timestamptz,
TypeParams: nil,
IndexParams: nil,
AutoID: false,
}
fVec := &schemapb.FieldSchema{
FieldID: 102,
Name: floatVecField,
IsPrimaryKey: false,
Description: "",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(dim),
},
},
IndexParams: nil,
AutoID: false,
}
return &schemapb.CollectionSchema{
Name: collectionName,
Description: "",
AutoID: false,
Fields: []*schemapb.FieldSchema{
pk,
tz,
fVec,
},
Properties: []*commonpb.KeyValuePair{},
}
}
func constructCollectionSchemaWithAllType(
boolField, int32Field, int64Field, floatField, doubleField string,
floatVecField, binaryVecField, float16VecField, bfloat16VecField, structArrayField string,
@ -1518,6 +1568,45 @@ func TestCreateCollectionTask(t *testing.T) {
err = task.PreExecute(ctx)
assert.Error(t, err)
ttlFieldSchema := constructCollectionSchemaWithTTLField(collectionName+"_ttl", testTTLField)
// Test invalid ttl field - ttl field not found
ttlFieldSchemaBytes, err := proto.Marshal(ttlFieldSchema)
assert.NoError(t, err)
task.CreateCollectionRequest.Properties = make([]*commonpb.KeyValuePair, 0)
task.CreateCollectionRequest.Schema = ttlFieldSchemaBytes
task.CreateCollectionRequest.Properties = append(task.CreateCollectionRequest.Properties, &commonpb.KeyValuePair{
Key: common.CollectionTTLFieldKey,
Value: "invalid",
})
err = task.PreExecute(ctx)
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrParameterInvalid)
// Test invalid ttl field - exist with collection ttl
task.CreateCollectionRequest.Properties = make([]*commonpb.KeyValuePair, 0)
task.CreateCollectionRequest.Properties = append(task.CreateCollectionRequest.Properties, &commonpb.KeyValuePair{
Key: common.CollectionTTLFieldKey,
Value: testTTLField,
})
task.CreateCollectionRequest.Properties = append(task.CreateCollectionRequest.Properties, &commonpb.KeyValuePair{
Key: common.CollectionTTLConfigKey,
Value: "100",
})
err = task.PreExecute(ctx)
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrParameterInvalid)
// Test invalid ttl field - ttl field is not timestamptz
task.CreateCollectionRequest.Properties = make([]*commonpb.KeyValuePair, 0)
task.CreateCollectionRequest.Properties = append(task.CreateCollectionRequest.Properties, &commonpb.KeyValuePair{
Key: common.CollectionTTLFieldKey,
Value: "pk",
})
err = task.PreExecute(ctx)
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrParameterInvalid)
// Restore original schema for remaining tests
task.CreateCollectionRequest = reqBackup
})
@ -4528,6 +4617,132 @@ func TestAlterCollectionCheckLoaded(t *testing.T) {
assert.Equal(t, merr.Code(merr.ErrCollectionLoaded), merr.Code(err))
}
func TestAlterCollectionTaskValidateTTLAndTTLField(t *testing.T) {
qc := NewMixCoordMock()
ctx := context.Background()
cache := globalMetaCache
defer func() { globalMetaCache = cache }()
err := InitMetaCache(ctx, qc)
assert.NoError(t, err)
createCollectionWithProps := func(colName string, props []*commonpb.KeyValuePair) {
schema := &schemapb.CollectionSchema{
Name: colName,
Description: "ttl alter test",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{FieldID: 100, Name: "pk", IsPrimaryKey: true, DataType: schemapb.DataType_Int64},
{FieldID: 101, Name: "ttl", DataType: schemapb.DataType_Timestamptz, Nullable: true},
{FieldID: 102, Name: "vec", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "8"}}},
},
Properties: props,
}
bs, err := proto.Marshal(schema)
assert.NoError(t, err)
_, err = qc.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection},
DbName: "",
CollectionName: colName,
Schema: bs,
ShardsNum: 1,
Properties: props,
})
assert.NoError(t, err)
}
t.Run("mutual exclusion: existing ttl.seconds, alter ttl.field should fail", func(t *testing.T) {
col := "alter_ttl_seconds_then_field_" + funcutil.GenRandomStr()
createCollectionWithProps(col, []*commonpb.KeyValuePair{{Key: common.CollectionTTLConfigKey, Value: "3600"}})
task := &alterCollectionTask{
AlterCollectionRequest: &milvuspb.AlterCollectionRequest{
Base: &commonpb.MsgBase{},
CollectionName: col,
Properties: []*commonpb.KeyValuePair{{Key: common.CollectionTTLFieldKey, Value: "ttl"}},
},
ctx: ctx,
mixCoord: qc,
}
err := task.PreExecute(ctx)
assert.Error(t, err)
})
t.Run("mutual exclusion: existing ttl.field, alter ttl.seconds should fail", func(t *testing.T) {
col := "alter_ttl_field_then_seconds_" + funcutil.GenRandomStr()
createCollectionWithProps(col, []*commonpb.KeyValuePair{{Key: common.CollectionTTLFieldKey, Value: "ttl"}})
task := &alterCollectionTask{
AlterCollectionRequest: &milvuspb.AlterCollectionRequest{
Base: &commonpb.MsgBase{},
CollectionName: col,
Properties: []*commonpb.KeyValuePair{{Key: common.CollectionTTLConfigKey, Value: "10"}},
},
ctx: ctx,
mixCoord: qc,
}
err := task.PreExecute(ctx)
assert.Error(t, err)
})
t.Run("ttl.field must exist in schema", func(t *testing.T) {
col := "alter_ttl_field_invalid_" + funcutil.GenRandomStr()
createCollectionWithProps(col, nil)
task := &alterCollectionTask{
AlterCollectionRequest: &milvuspb.AlterCollectionRequest{
Base: &commonpb.MsgBase{},
CollectionName: col,
Properties: []*commonpb.KeyValuePair{{Key: common.CollectionTTLFieldKey, Value: "not_exist"}},
},
ctx: ctx,
mixCoord: qc,
}
assert.Error(t, task.PreExecute(ctx))
})
t.Run("ttl.field must be timestamptz", func(t *testing.T) {
col := "alter_ttl_field_invalid_type_" + funcutil.GenRandomStr()
createCollectionWithProps(col, nil)
task := &alterCollectionTask{
AlterCollectionRequest: &milvuspb.AlterCollectionRequest{
Base: &commonpb.MsgBase{},
CollectionName: col,
Properties: []*commonpb.KeyValuePair{{Key: common.CollectionTTLFieldKey, Value: "pk"}},
},
ctx: ctx,
mixCoord: qc,
}
assert.Error(t, task.PreExecute(ctx))
})
t.Run("delete ttl.seconds should pass", func(t *testing.T) {
col := "alter_delete_ttl_seconds_" + funcutil.GenRandomStr()
createCollectionWithProps(col, []*commonpb.KeyValuePair{{Key: common.CollectionTTLConfigKey, Value: "3600"}})
task := &alterCollectionTask{
AlterCollectionRequest: &milvuspb.AlterCollectionRequest{
Base: &commonpb.MsgBase{},
CollectionName: col,
DeleteKeys: []string{common.CollectionTTLConfigKey},
},
ctx: ctx,
mixCoord: qc,
}
assert.NoError(t, task.PreExecute(ctx))
})
t.Run("delete ttl.field should pass", func(t *testing.T) {
col := "alter_delete_ttl_field_" + funcutil.GenRandomStr()
createCollectionWithProps(col, []*commonpb.KeyValuePair{{Key: common.CollectionTTLFieldKey, Value: "ttl"}})
task := &alterCollectionTask{
AlterCollectionRequest: &milvuspb.AlterCollectionRequest{
Base: &commonpb.MsgBase{},
CollectionName: col,
DeleteKeys: []string{common.CollectionTTLFieldKey},
},
ctx: ctx,
mixCoord: qc,
}
assert.NoError(t, task.PreExecute(ctx))
})
}
func TestTaskPartitionKeyIsolation(t *testing.T) {
qc := NewMixCoordMock()
ctx := context.Background()

View File

@ -2677,11 +2677,9 @@ func GetBM25FunctionOutputFields(collSchema *schemapb.CollectionSchema) []string
// or return global ttl if collection's ttl is not specified
// this is a helper util wrapping common.GetCollectionTTL without returning error
func getCollectionTTL(pairs []*commonpb.KeyValuePair) uint64 {
defaultTTL := paramtable.Get().CommonCfg.EntityExpirationTTL.GetAsDuration(time.Second)
ttl, err := common.GetCollectionTTL(pairs, defaultTTL)
ttl, err := common.GetCollectionTTL(pairs)
if err != nil {
log.Error("failed to get collection ttl, use default ttl", zap.Error(err))
ttl = defaultTTL
}
if ttl < 0 {
return 0

View File

@ -90,6 +90,7 @@ func (c *Core) broadcastAlterCollectionForAlterCollection(ctx context.Context, r
udpates := &messagespb.AlterCollectionMessageUpdates{}
// Apply the properties to override the existing properties.
oldProperties := common.CloneKeyValuePairs(coll.Properties).ToMap()
newProperties := common.CloneKeyValuePairs(coll.Properties).ToMap()
for _, prop := range req.GetProperties() {
switch prop.GetKey() {
@ -110,6 +111,7 @@ func (c *Core) broadcastAlterCollectionForAlterCollection(ctx context.Context, r
for _, deleteKey := range req.GetDeleteKeys() {
delete(newProperties, deleteKey)
}
// Check if the properties are changed.
newPropsKeyValuePairs := common.NewKeyValuePairs(newProperties)
if !newPropsKeyValuePairs.Equal(coll.Properties) {
@ -117,6 +119,46 @@ func (c *Core) broadcastAlterCollectionForAlterCollection(ctx context.Context, r
header.UpdateMask.Paths = append(header.UpdateMask.Paths, message.FieldMaskCollectionProperties)
}
// If TTL field is changed through properties, also broadcast an updated schema snapshot and mark it as schema change,
// so QueryNode can refresh runtime schema properties without requiring release/load.
ttlOld, okOld := oldProperties[common.CollectionTTLFieldKey]
ttlNew, okNew := newProperties[common.CollectionTTLFieldKey]
needTTLFieldSchemaRefresh := (okOld != okNew) || (okOld && okNew && ttlOld != ttlNew)
if needTTLFieldSchemaRefresh {
// validate ttl field name exists in schema fields when setting it
if okNew {
found := false
for _, f := range coll.Fields {
if f.Name == ttlNew {
found = true
break
}
}
if !found {
return merr.WrapErrParameterInvalidMsg("ttl field name %s not found in schema", ttlNew)
}
}
// Ensure schema update mask exists so QueryNode pipeline treats this as a schema update event.
if !funcutil.SliceContain(header.UpdateMask.Paths, message.FieldMaskCollectionSchema) {
header.UpdateMask.Paths = append(header.UpdateMask.Paths, message.FieldMaskCollectionSchema)
}
// Build schema snapshot with updated properties (schema version should NOT be changed for properties-only alter).
schema := &schemapb.CollectionSchema{
Name: coll.Name,
Description: coll.Description,
AutoID: coll.AutoID,
Fields: model.MarshalFieldModels(coll.Fields),
StructArrayFields: model.MarshalStructArrayFieldModels(coll.StructArrayFields),
Functions: model.MarshalFunctionModels(coll.Functions),
EnableDynamicField: coll.EnableDynamicField,
Properties: newPropsKeyValuePairs,
Version: coll.SchemaVersion,
}
udpates.Schema = schema
}
// if there's no change, return nil directly to promise idempotent.
if len(header.UpdateMask.Paths) == 0 {
return errIgnoredAlterCollection

View File

@ -198,6 +198,50 @@ func TestDDLCallbacksAlterCollectionPropertiesForDynamicField(t *testing.T) {
require.ErrorIs(t, merr.CheckRPCCall(resp, err), merr.ErrParameterInvalid)
}
func TestDDLCallbacksAlterCollectionProperties_TTLFieldShouldBroadcastSchema(t *testing.T) {
core := initStreamingSystemAndCore(t)
ctx := context.Background()
dbName := "testDB" + funcutil.RandomString(10)
collectionName := "testCollectionTTLField" + funcutil.RandomString(10)
// Create collection with a ttl field.
resp, err := core.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{
DbName: dbName,
})
require.NoError(t, merr.CheckRPCCall(resp, err))
testSchema := &schemapb.CollectionSchema{
Name: collectionName,
Description: "description",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{Name: "field1", DataType: schemapb.DataType_Int64},
{Name: "ttl", DataType: schemapb.DataType_Timestamptz, Nullable: true},
},
}
schemaBytes, err := proto.Marshal(testSchema)
require.NoError(t, err)
resp, err = core.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Properties: []*commonpb.KeyValuePair{{Key: common.CollectionReplicaNumber, Value: "1"}},
Schema: schemaBytes,
ConsistencyLevel: commonpb.ConsistencyLevel_Bounded,
})
require.NoError(t, merr.CheckRPCCall(resp, err))
assertSchemaVersion(t, ctx, core, dbName, collectionName, 0)
// Alter properties to set ttl field should succeed and should NOT change schema version in meta.
resp, err = core.AlterCollection(ctx, &milvuspb.AlterCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Properties: []*commonpb.KeyValuePair{{Key: common.CollectionTTLFieldKey, Value: "ttl"}},
})
require.NoError(t, merr.CheckRPCCall(resp, err))
assertSchemaVersion(t, ctx, core, dbName, collectionName, 0)
}
func createCollectionForTest(t *testing.T, ctx context.Context, core *Core, dbName string, collectionName string) {
resp, err := core.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{
DbName: dbName,

View File

@ -42,6 +42,7 @@ type BinlogRecordWriter interface {
statsLog *datapb.FieldBinlog,
bm25StatsLog map[FieldID]*datapb.FieldBinlog,
manifest string,
expirQuantiles []int64,
)
GetRowNum() int64
FlushChunk() error
@ -77,6 +78,9 @@ type packedBinlogRecordWriterBase struct {
statsLog *datapb.FieldBinlog
bm25StatsLog map[FieldID]*datapb.FieldBinlog
manifest string
ttlFieldID int64
ttlFieldValues []int64
}
func (pw *packedBinlogRecordWriterBase) getColumnStatsFromRecord(r Record, allFields []*schemapb.FieldSchema) map[int64]storagecommon.ColumnStats {
@ -95,6 +99,10 @@ func (pw *packedBinlogRecordWriterBase) GetWrittenUncompressed() uint64 {
return pw.writtenUncompressed
}
func (pw *packedBinlogRecordWriterBase) GetExpirQuantiles() []int64 {
return calculateExpirQuantiles(pw.ttlFieldID, pw.rowNum, pw.ttlFieldValues)
}
func (pw *packedBinlogRecordWriterBase) writeStats() error {
// Write PK stats
pkStatsMap, err := pw.pkCollector.Digest(
@ -138,8 +146,9 @@ func (pw *packedBinlogRecordWriterBase) GetLogs() (
statsLog *datapb.FieldBinlog,
bm25StatsLog map[FieldID]*datapb.FieldBinlog,
manifest string,
expirQuantiles []int64,
) {
return pw.fieldBinlogs, pw.statsLog, pw.bm25StatsLog, pw.manifest
return pw.fieldBinlogs, pw.statsLog, pw.bm25StatsLog, pw.manifest, pw.GetExpirQuantiles()
}
func (pw *packedBinlogRecordWriterBase) GetRowNum() int64 {
@ -183,6 +192,28 @@ func (pw *PackedBinlogRecordWriter) Write(r Record) error {
}
}
if pw.ttlFieldID >= common.StartOfUserFieldID {
ttlColumn := r.Column(pw.ttlFieldID)
// Defensive check to prevent panic
if ttlColumn == nil {
return merr.WrapErrServiceInternal("ttl field not found")
}
ttlArray, ok := ttlColumn.(*array.Int64)
if !ok {
return merr.WrapErrServiceInternal("ttl field is not int64")
}
for i := 0; i < rows; i++ {
if ttlArray.IsNull(i) {
continue
}
ttlValue := ttlArray.Value(i)
if ttlValue <= 0 {
continue
}
pw.ttlFieldValues = append(pw.ttlFieldValues, ttlValue)
}
}
// Collect statistics
if err := pw.pkCollector.Collect(r); err != nil {
return err
@ -291,6 +322,8 @@ func newPackedBinlogRecordWriter(collectionID, partitionID, segmentID UniqueID,
storagePluginContext: storagePluginContext,
tsFrom: typeutil.MaxTimestamp,
tsTo: 0,
ttlFieldID: getTTLFieldID(schema),
ttlFieldValues: make([]int64, 0),
},
}
@ -440,6 +473,8 @@ func newPackedManifestRecordWriter(collectionID, partitionID, segmentID UniqueID
storagePluginContext: storagePluginContext,
tsFrom: typeutil.MaxTimestamp,
tsTo: 0,
ttlFieldID: getTTLFieldID(schema),
ttlFieldValues: make([]int64, 0),
},
}

View File

@ -408,7 +408,8 @@ func NewBinlogRecordWriter(ctx context.Context, collectionID, partitionID, segme
blobsWriter, allocator, maxRowNum,
rwOptions.bufferSize, rwOptions.multiPartUploadSize, rwOptions.columnGroups,
rwOptions.storageConfig,
pluginContext)
pluginContext,
)
} else {
return newPackedBinlogRecordWriter(collectionID, partitionID, segmentID, schema,
blobsWriter, allocator, maxRowNum,

View File

@ -161,7 +161,7 @@ func (s *PackedBinlogRecordSuite) TestPackedBinlogRecordIntegration() {
rowNum := w.GetRowNum()
s.Equal(rowNum, int64(rows))
fieldBinlogs, statsLog, bm25StatsLog, _ := w.GetLogs()
fieldBinlogs, statsLog, bm25StatsLog, _, _ := w.GetLogs()
s.Equal(len(fieldBinlogs), len(columnGroups))
for _, columnGroup := range fieldBinlogs {
s.Equal(len(columnGroup.Binlogs), 1)
@ -240,7 +240,7 @@ func (s *PackedBinlogRecordSuite) TestGenerateBM25Stats() {
s.NoError(err)
err = w.Close()
s.NoError(err)
fieldBinlogs, statsLog, bm25StatsLog, _ := w.GetLogs()
fieldBinlogs, statsLog, bm25StatsLog, _, _ := w.GetLogs()
s.Equal(len(fieldBinlogs), len(columnGroups))
s.Equal(statsLog.Binlogs[0].EntriesNum, int64(1))

View File

@ -509,6 +509,9 @@ type CompositeBinlogRecordWriter struct {
flushedUncompressed uint64
options []StreamWriterOption
ttlFieldID int64
ttlFieldValues []int64
}
var _ BinlogRecordWriter = (*CompositeBinlogRecordWriter)(nil)
@ -531,6 +534,29 @@ func (c *CompositeBinlogRecordWriter) Write(r Record) error {
}
}
// not system column
if c.ttlFieldID >= common.StartOfUserFieldID {
// Defensive check to prevent panic
ttlColumn := r.Column(c.ttlFieldID)
if ttlColumn == nil {
return merr.WrapErrServiceInternal("ttl field not found")
}
ttlArray, ok := ttlColumn.(*array.Int64)
if !ok {
return merr.WrapErrServiceInternal("ttl field is not int64")
}
for i := 0; i < rows; i++ {
if ttlArray.IsNull(i) {
continue
}
ttlValue := ttlArray.Value(i)
if ttlValue <= 0 {
continue
}
c.ttlFieldValues = append(c.ttlFieldValues, ttlValue)
}
}
// Collect statistics
if err := c.pkCollector.Collect(r); err != nil {
return err
@ -693,13 +719,18 @@ func (c *CompositeBinlogRecordWriter) writeStats() error {
return nil
}
func (c *CompositeBinlogRecordWriter) GetExpirQuantiles() []int64 {
return calculateExpirQuantiles(c.ttlFieldID, c.rowNum, c.ttlFieldValues)
}
func (c *CompositeBinlogRecordWriter) GetLogs() (
fieldBinlogs map[FieldID]*datapb.FieldBinlog,
statsLog *datapb.FieldBinlog,
bm25StatsLog map[FieldID]*datapb.FieldBinlog,
manifest string,
expirQuantiles []int64,
) {
return c.fieldBinlogs, c.statsLog, c.bm25StatsLog, ""
return c.fieldBinlogs, c.statsLog, c.bm25StatsLog, "", c.GetExpirQuantiles()
}
func (c *CompositeBinlogRecordWriter) GetRowNum() int64 {
@ -711,18 +742,20 @@ func newCompositeBinlogRecordWriter(collectionID, partitionID, segmentID UniqueI
options ...StreamWriterOption,
) (*CompositeBinlogRecordWriter, error) {
writer := &CompositeBinlogRecordWriter{
collectionID: collectionID,
partitionID: partitionID,
segmentID: segmentID,
schema: schema,
BlobsWriter: blobsWriter,
allocator: allocator,
chunkSize: chunkSize,
rootPath: rootPath,
maxRowNum: maxRowNum,
options: options,
tsFrom: math.MaxUint64,
tsTo: 0,
collectionID: collectionID,
partitionID: partitionID,
segmentID: segmentID,
schema: schema,
BlobsWriter: blobsWriter,
allocator: allocator,
chunkSize: chunkSize,
rootPath: rootPath,
maxRowNum: maxRowNum,
options: options,
tsFrom: math.MaxUint64,
tsTo: 0,
ttlFieldID: getTTLFieldID(schema),
ttlFieldValues: make([]int64, 0),
}
// Create stats collectors

View File

@ -177,12 +177,69 @@ func TestBinlogSerializeWriter(t *testing.T) {
err = writer.Close()
assert.NoError(t, err)
logs, _, _, _ := writer.GetLogs()
logs, _, _, _, _ := writer.GetLogs()
assert.Equal(t, 18, len(logs))
assert.Equal(t, 5, len(logs[0].Binlogs))
})
}
func TestCompositeBinlogRecordWriter_TTLFieldCollection(t *testing.T) {
ttlFieldID := FieldID(100)
w := &CompositeBinlogRecordWriter{
// avoid initWriters() side-effects in Write()
rw: &MockRecordWriter{
writefn: func(Record) error { return nil },
closefn: func() error { return nil },
},
pkCollector: &PkStatsCollector{pkstats: nil},
bm25Collector: &Bm25StatsCollector{bm25Stats: map[int64]*BM25Stats{}},
chunkSize: 1<<63 - 1,
ttlFieldID: ttlFieldID,
}
// Build a record with timestamp + nullable ttl field:
// ttl values: [10, -1, 0, null, 20] -> only [10, 20] should be collected.
rows := 5
tsBuilder := array.NewInt64Builder(memory.DefaultAllocator)
tsBuilder.AppendValues([]int64{1, 2, 3, 4, 5}, nil)
tsArr := tsBuilder.NewArray()
tsBuilder.Release()
defer tsArr.Release()
ttlBuilder := array.NewInt64Builder(memory.DefaultAllocator)
ttlBuilder.AppendValues([]int64{10, -1, 0, 30, 20}, []bool{true, true, true, false, true})
ttlArr := ttlBuilder.NewArray()
ttlBuilder.Release()
defer ttlArr.Release()
ar := array.NewRecord(
arrow.NewSchema(
[]arrow.Field{
{Name: "ts", Type: arrow.PrimitiveTypes.Int64},
{Name: "ttl", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
},
nil,
),
[]arrow.Array{tsArr, ttlArr},
int64(rows),
)
r := NewSimpleArrowRecord(ar, map[FieldID]int{
common.TimeStampField: 0,
ttlFieldID: 1,
})
defer r.Release()
err := w.Write(r)
assert.NoError(t, err)
assert.Equal(t, int64(rows), w.rowNum)
assert.ElementsMatch(t, []int64{10, 20}, w.ttlFieldValues)
neverExpire := int64(^uint64(0) >> 1)
got := w.GetExpirQuantiles()
assert.Equal(t, []int64{10, 20, neverExpire, neverExpire, neverExpire}, got)
}
func TestBinlogValueWriter(t *testing.T) {
t.Run("test empty data", func(t *testing.T) {
reader, err := NewBinlogDeserializeReader(generateTestSchema(), func() ([]*Blob, error) {

View File

@ -23,6 +23,7 @@ import (
"fmt"
"io"
"io/fs"
"math"
"os"
"sort"
"strconv"
@ -1766,3 +1767,66 @@ func VectorArrayToArrowType(elementType schemapb.DataType, dim int) (arrow.DataT
return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("unsupported element type in VectorArray: %s", elementType.String()))
}
}
func getTTLFieldID(schema *schemapb.CollectionSchema) int64 {
ttlFieldName := ""
for _, pair := range schema.GetProperties() {
if pair.GetKey() == common.CollectionTTLFieldKey {
ttlFieldName = pair.GetValue()
break
}
}
if ttlFieldName == "" {
return -1
}
for _, field := range schema.GetFields() {
if field.GetName() == ttlFieldName && field.GetDataType() == schemapb.DataType_Timestamptz {
return field.GetFieldID()
}
}
return -1
}
// calculateExpirQuantiles computes TTL values at 20%, 40%, 60%, 80%, 100% percentiles.
// Returns nil if ttlFieldID is not enabled or no rows exist.
// Precondition: ttlFieldValues must contain only positive values (>0); the caller is responsible
// for filtering out null/zero TTL values which represent "never expire" rows.
func calculateExpirQuantiles(ttlFieldID int64, rowNum int64, ttlFieldValues []int64) []int64 {
// If ttl field is not enabled for this writer, do not emit percentile info.
if ttlFieldID <= 1 {
return nil
}
// If segment is empty, do not emit percentile info.
if rowNum <= 0 {
return nil
}
sort.Slice(ttlFieldValues, func(i, j int) bool {
return ttlFieldValues[i] < ttlFieldValues[j]
})
// Calculate percentile indices for 20%, 40%, 60%, 80%, 100%
percentiles := []float64{0.2, 0.4, 0.6, 0.8, 1.0}
result := make([]int64, len(percentiles))
// Treat rows with null/<=0 ttl as "never expire".
const neverExpire = int64(^uint64(0) >> 1)
for i, p := range percentiles {
// Calculate index: for n elements, p percentile means ceil(n * p) elements
// e.g., for n=5: 20%->idx 0, 40%->idx 1, 60%->idx 2, 80%->idx 3, 100%->idx 4
idx := int(math.Ceil(p*float64(rowNum))) - 1
if idx < 0 {
idx = 0
}
// If idx exceeds collected ttl values, it falls into "never expire" region.
if idx >= len(ttlFieldValues) {
result[i] = neverExpire
continue
}
result[i] = ttlFieldValues[idx]
}
return result
}

View File

@ -189,6 +189,7 @@ const (
CollectionTTLConfigKey = "collection.ttl.seconds"
CollectionAutoCompactionKey = "collection.autocompaction.enabled"
CollectionDescription = "collection.description"
CollectionTTLFieldKey = "ttl_field"
// Deprecated: will be removed in the 3.0 after implementing ack sync up semantic.
CollectionOnTruncatingKey = "collection.on.truncating" // when collection is on truncating, forbid the compaction of current collection.
@ -605,23 +606,23 @@ func GetStringValue(kvs []*commonpb.KeyValuePair, key string) (result string, ex
return kv.GetValue(), true
}
func GetCollectionTTL(kvs []*commonpb.KeyValuePair, defaultValue time.Duration) (time.Duration, error) {
func GetCollectionTTL(kvs []*commonpb.KeyValuePair) (time.Duration, error) {
value, parseErr, exist := GetInt64Value(kvs, CollectionTTLConfigKey)
if parseErr != nil {
return 0, parseErr
}
if !exist {
return defaultValue, nil
return -1, nil
}
return time.Duration(value) * time.Second, nil
}
func GetCollectionTTLFromMap(kvs map[string]string, defaultValue time.Duration) (time.Duration, error) {
func GetCollectionTTLFromMap(kvs map[string]string) (time.Duration, error) {
value, exist := kvs[CollectionTTLConfigKey]
if !exist {
return defaultValue, nil
return -1, nil
}
ttlSeconds, err := strconv.ParseInt(value, 10, 64)

View File

@ -1,7 +1,6 @@
package common
import (
"math/rand"
"strings"
"testing"
"time"
@ -267,13 +266,13 @@ func TestGetCollectionTTL(t *testing.T) {
for _, tc := range cases {
t.Run(tc.tag, func(t *testing.T) {
result, err := GetCollectionTTL([]*commonpb.KeyValuePair{{Key: CollectionTTLConfigKey, Value: tc.value}}, 0)
result, err := GetCollectionTTL([]*commonpb.KeyValuePair{{Key: CollectionTTLConfigKey, Value: tc.value}})
if tc.expectErr {
assert.Error(t, err)
} else {
assert.EqualValues(t, tc.expect, result)
}
result, err = GetCollectionTTLFromMap(map[string]string{CollectionTTLConfigKey: tc.value}, 0)
result, err = GetCollectionTTLFromMap(map[string]string{CollectionTTLConfigKey: tc.value})
if tc.expectErr {
assert.Error(t, err)
} else {
@ -283,12 +282,11 @@ func TestGetCollectionTTL(t *testing.T) {
}
t.Run("not_config", func(t *testing.T) {
randValue := rand.Intn(100)
result, err := GetCollectionTTL([]*commonpb.KeyValuePair{}, time.Duration(randValue)*time.Second)
result, err := GetCollectionTTL([]*commonpb.KeyValuePair{})
assert.NoError(t, err)
assert.EqualValues(t, time.Duration(randValue)*time.Second, result)
result, err = GetCollectionTTLFromMap(map[string]string{}, time.Duration(randValue)*time.Second)
assert.EqualValues(t, -1, result)
result, err = GetCollectionTTLFromMap(map[string]string{})
assert.NoError(t, err)
assert.EqualValues(t, time.Duration(randValue)*time.Second, result)
assert.EqualValues(t, -1, result)
})
}

View File

@ -430,6 +430,10 @@ message SegmentInfo {
// we could keep the fullpath since one segment shall only have one active manifest
// and we could keep the possiblity that manifest stores out side of collection/partition/segment path
string manifest_path = 32;
// expirQuantiles records the expiration timestamptz values of the segment
// at the 20%, 40%, 60%, 80%, and 100% data distribution levels
repeated int64 expirQuantiles = 33;
}
message SegmentStartPosition {
@ -688,6 +692,9 @@ message CompactionSegmentBinlogs {
bool is_sorted = 9;
int64 storage_version = 10;
string manifest = 11;
// expirQuantiles records the expiration timestamptz values of the segment
// at the 20%, 40%, 60%, 80%, and 100% data distribution levels
repeated int64 expirQuantiles = 12;
}
message CompactionPlan {
@ -732,6 +739,7 @@ message CompactionSegment {
int64 storage_version = 10;
map<int64, data.TextIndexStats> text_stats_logs = 11;
string manifest = 12;
repeated int64 expirQuantiles = 13;
}
message CompactionPlanResult {

File diff suppressed because it is too large Load Diff

View File

@ -222,7 +222,6 @@ type commonConfig struct {
DefaultPartitionName ParamItem `refreshable:"false"`
DefaultIndexName ParamItem `refreshable:"true"`
EntityExpirationTTL ParamItem `refreshable:"true"`
IndexSliceSize ParamItem `refreshable:"false"`
HighPriorityThreadCoreCoefficient ParamItem `refreshable:"true"`
@ -510,23 +509,6 @@ It is recommended to change this parameter before starting Milvus for the first
}
p.DefaultIndexName.Init(base.mgr)
p.EntityExpirationTTL = ParamItem{
Key: "common.entityExpiration",
Version: "2.1.0",
DefaultValue: "-1",
Formatter: func(value string) string {
ttl := getAsInt(value)
if ttl < 0 {
return "-1"
}
return strconv.Itoa(ttl)
},
Doc: "Entity expiration in seconds, CAUTION -1 means never expire",
Export: true,
}
p.EntityExpirationTTL.Init(base.mgr)
p.SimdType = ParamItem{
Key: "common.simdType",
Version: "2.1.0",

View File

@ -45,13 +45,6 @@ func TestComponentParam(t *testing.T) {
assert.NotEqual(t, Params.DefaultIndexName.GetValue(), "")
t.Logf("default index name = %s", Params.DefaultIndexName.GetValue())
assert.Equal(t, Params.EntityExpirationTTL.GetAsInt64(), int64(-1))
t.Logf("default entity expiration = %d", Params.EntityExpirationTTL.GetAsInt64())
// test the case coommo
params.Save("common.entityExpiration", "50")
assert.Equal(t, Params.EntityExpirationTTL.GetAsInt(), 50)
assert.NotEqual(t, Params.SimdType.GetValue(), "")
t.Logf("knowhere simd type = %s", Params.SimdType.GetValue())