milvus/internal/datanode/compactor/compactor_common.go
cai.zhang a16d04f5d1
feat: Support ttl field for entity level expiration (#46342)
issue: #46033

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Pull Request Summary: Entity-Level TTL Field Support

### Core Invariant and Design
This PR introduces **per-entity TTL (time-to-live) expiration** via a
dedicated TIMESTAMPTZ field as a fine-grained alternative to
collection-level TTL. The key invariant is **mutual exclusivity**:
collection-level TTL and entity-level TTL field cannot coexist on the
same collection. Validation is enforced at the proxy layer during
collection creation/alteration (`validateTTL()` prevents both being set
simultaneously).

### What Is Removed and Why
- **Global `EntityExpirationTTL` parameter** removed from config
(`configs/milvus.yaml`, `pkg/util/paramtable/component_param.go`). This
was the only mechanism for collection-level expiration. The removal is
safe because:
- The collection-level TTL path (`isEntityExpired(ts)` check) remains
intact in the codebase for backward compatibility
- TTL field check (`isEntityExpiredByTTLField()`) is a secondary path
invoked only when a TTL field is configured
- Existing deployments using collection TTL can continue without
modification
  
The global parameter was removed specifically because entity-level TTL
makes per-entity control redundant with a collection-wide setting, and
the PR chooses one mechanism per collection rather than layering both.

### No Data Loss or Behavior Regression
**TTL filtering logic is additive and safe:**
1. **Collection-level TTL unaffected**: The `isEntityExpired(ts)` check
still applies when no TTL field is configured; callers of
`EntityFilter.Filtered()` pass `-1` as the TTL expiration timestamp when
no field exists, causing `isEntityExpiredByTTLField()` to return false
immediately
2. **Null/invalid TTL values treated safely**: Rows with null TTL or TTL
≤ 0 are marked as "never expire" (using sentinel value `int64(^uint64(0)
>> 1)`) and are preserved across compactions; percentile calculations
only include positive TTL values
3. **Query-time filtering automatic**: TTL filtering is transparently
added to expression compilation via `AddTTLFieldFilterExpressions()`,
which appends `(ttl_field IS NULL OR ttl_field > current_time)` to the
filter pipeline. Entities with null TTL always pass the filter
4. **Compaction triggering granular**: Percentile-based expiration (20%,
40%, 60%, 80%, 100%) allows configurable compaction thresholds via
`SingleCompactionRatioThreshold`, preventing premature data deletion

### Capability Added: Per-Entity Expiration with Data Distribution
Awareness
Users can now specify a TIMESTAMPTZ collection property `ttl_field`
naming a schema field. During data writes, TTL values are collected per
segment and percentile quantiles (5-value array) are computed and stored
in segment metadata. At query time, the TTL field is automatically
filtered. At compaction time, segment-level percentiles drive
expiration-based compaction decisions, enabling intelligent compaction
of segments where a configurable fraction of data has expired (e.g.,
compact when 40% of rows are expired, controlled by threshold ratio).
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
2026-01-05 10:27:24 +08:00

236 lines
6.6 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compactor
import (
"context"
sio "io"
"strconv"
"time"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
const compactionBatchSize = 100
type EntityFilter struct {
deletedPkTs map[interface{}]typeutil.Timestamp // pk2ts
ttl int64 // nanoseconds
currentTime time.Time
expiredCount int
deletedCount int
}
func newEntityFilter(deletedPkTs map[interface{}]typeutil.Timestamp, ttl int64, currTime time.Time) *EntityFilter {
if deletedPkTs == nil {
deletedPkTs = make(map[interface{}]typeutil.Timestamp)
}
return &EntityFilter{
deletedPkTs: deletedPkTs,
ttl: ttl,
currentTime: currTime,
}
}
func (filter *EntityFilter) Filtered(pk any, ts typeutil.Timestamp) bool {
if filter.isEntityDeleted(pk, ts) {
filter.deletedCount++
return true
}
// Filtering expired entity
if filter.isEntityExpired(ts) {
filter.expiredCount++
return true
}
return false
}
func (filter *EntityFilter) GetExpiredCount() int {
return filter.expiredCount
}
func (filter *EntityFilter) GetDeletedCount() int {
return filter.deletedCount
}
func (filter *EntityFilter) GetDeltalogDeleteCount() int {
return len(filter.deletedPkTs)
}
func (filter *EntityFilter) GetMissingDeleteCount() int {
diff := filter.GetDeltalogDeleteCount() - filter.GetDeletedCount()
if diff <= 0 {
diff = 0
}
return diff
}
func (filter *EntityFilter) isEntityDeleted(pk interface{}, pkTs typeutil.Timestamp) bool {
if deleteTs, ok := filter.deletedPkTs[pk]; ok {
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if pkTs < deleteTs {
return true
}
}
return false
}
func (filter *EntityFilter) isEntityExpired(entityTs typeutil.Timestamp) bool {
// entity expire is not enabled if duration <= 0
if filter.ttl <= 0 {
return false
}
entityTime, _ := tsoutil.ParseTS(entityTs)
// this dur can represents 292 million years before or after 1970, enough for milvus
// ttl calculation
dur := filter.currentTime.UnixMilli() - entityTime.UnixMilli()
// filter.ttl is nanoseconds
return filter.ttl/int64(time.Millisecond) <= dur
}
func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[interface{}]typeutil.Timestamp, error) {
pk2Ts := make(map[interface{}]typeutil.Timestamp)
log := log.Ctx(ctx)
if len(paths) == 0 {
log.Debug("compact with no deltalogs, skip merge deltalogs")
return pk2Ts, nil
}
blobs := make([]*storage.Blob, 0)
binaries, err := io.Download(ctx, paths)
if err != nil {
log.Warn("compact wrong, fail to download deltalogs",
zap.Strings("path", paths),
zap.Error(err))
return nil, err
}
for i := range binaries {
blobs = append(blobs, &storage.Blob{Value: binaries[i]})
}
reader, err := storage.CreateDeltalogReader(blobs)
if err != nil {
log.Error("malformed delta file", zap.Error(err))
return nil, err
}
defer reader.Close()
for {
dl, err := reader.NextValue()
if err != nil {
if err == sio.EOF {
break
}
log.Error("compact wrong, fail to read deltalogs", zap.Error(err))
return nil, err
}
if ts, ok := pk2Ts[(*dl).Pk.GetValue()]; ok && ts > (*dl).Ts {
continue
}
pk2Ts[(*dl).Pk.GetValue()] = (*dl).Ts
}
log.Info("compact mergeDeltalogs end", zap.Int("delete entries counts", len(pk2Ts)))
return pk2Ts, nil
}
// TODO: remove, used in test only
func serializeWrite(ctx context.Context, allocator allocator.Interface, writer *SegmentWriter) (kvs map[string][]byte, fieldBinlogs map[int64]*datapb.FieldBinlog, err error) {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "serializeWrite")
defer span.End()
blobs, tr, err := writer.SerializeYield()
startID, _, err := allocator.Alloc(uint32(len(blobs)))
if err != nil {
return nil, nil, err
}
kvs = make(map[string][]byte)
fieldBinlogs = make(map[int64]*datapb.FieldBinlog)
for i := range blobs {
// Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt
fID, _ := strconv.ParseInt(blobs[i].GetKey(), 10, 64)
key, _ := binlog.BuildLogPath(storage.InsertBinlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), fID, startID+int64(i))
kvs[key] = blobs[i].GetValue()
fieldBinlogs[fID] = &datapb.FieldBinlog{
FieldID: fID,
Binlogs: []*datapb.Binlog{
{
LogSize: int64(len(blobs[i].GetValue())),
MemorySize: blobs[i].GetMemorySize(),
LogPath: key,
EntriesNum: blobs[i].RowNum,
TimestampFrom: tr.GetMinTimestamp(),
TimestampTo: tr.GetMaxTimestamp(),
},
},
}
}
return
}
func mergeFieldBinlogs(base, paths map[typeutil.UniqueID]*datapb.FieldBinlog) {
for fID, fpath := range paths {
if _, ok := base[fID]; !ok {
base[fID] = &datapb.FieldBinlog{FieldID: fID, Binlogs: make([]*datapb.Binlog, 0)}
}
base[fID].Binlogs = append(base[fID].Binlogs, fpath.GetBinlogs()...)
}
}
func getTTLFieldID(schema *schemapb.CollectionSchema) int64 {
ttlFieldName := ""
for _, pair := range schema.GetProperties() {
if pair.GetKey() == common.CollectionTTLFieldKey {
ttlFieldName = pair.GetValue()
break
}
}
if ttlFieldName == "" {
return -1
}
for _, field := range schema.GetFields() {
if field.GetName() == ttlFieldName && field.GetDataType() == schemapb.DataType_Timestamptz {
return field.GetFieldID()
}
}
return -1
}