milvus/internal/datacoord/compaction_policy_forcemerge.go
cai.zhang a16d04f5d1
feat: Support ttl field for entity level expiration (#46342)
issue: #46033

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Pull Request Summary: Entity-Level TTL Field Support

### Core Invariant and Design
This PR introduces **per-entity TTL (time-to-live) expiration** via a
dedicated TIMESTAMPTZ field as a fine-grained alternative to
collection-level TTL. The key invariant is **mutual exclusivity**:
collection-level TTL and entity-level TTL field cannot coexist on the
same collection. Validation is enforced at the proxy layer during
collection creation/alteration (`validateTTL()` prevents both being set
simultaneously).

### What Is Removed and Why
- **Global `EntityExpirationTTL` parameter** removed from config
(`configs/milvus.yaml`, `pkg/util/paramtable/component_param.go`). This
was the only mechanism for collection-level expiration. The removal is
safe because:
- The collection-level TTL path (`isEntityExpired(ts)` check) remains
intact in the codebase for backward compatibility
- TTL field check (`isEntityExpiredByTTLField()`) is a secondary path
invoked only when a TTL field is configured
- Existing deployments using collection TTL can continue without
modification
  
The global parameter was removed specifically because entity-level TTL
makes per-entity control redundant with a collection-wide setting, and
the PR chooses one mechanism per collection rather than layering both.

### No Data Loss or Behavior Regression
**TTL filtering logic is additive and safe:**
1. **Collection-level TTL unaffected**: The `isEntityExpired(ts)` check
still applies when no TTL field is configured; callers of
`EntityFilter.Filtered()` pass `-1` as the TTL expiration timestamp when
no field exists, causing `isEntityExpiredByTTLField()` to return false
immediately
2. **Null/invalid TTL values treated safely**: Rows with null TTL or TTL
≤ 0 are marked as "never expire" (using sentinel value `int64(^uint64(0)
>> 1)`) and are preserved across compactions; percentile calculations
only include positive TTL values
3. **Query-time filtering automatic**: TTL filtering is transparently
added to expression compilation via `AddTTLFieldFilterExpressions()`,
which appends `(ttl_field IS NULL OR ttl_field > current_time)` to the
filter pipeline. Entities with null TTL always pass the filter
4. **Compaction triggering granular**: Percentile-based expiration (20%,
40%, 60%, 80%, 100%) allows configurable compaction thresholds via
`SingleCompactionRatioThreshold`, preventing premature data deletion

### Capability Added: Per-Entity Expiration with Data Distribution
Awareness
Users can now specify a TIMESTAMPTZ collection property `ttl_field`
naming a schema field. During data writes, TTL values are collected per
segment and percentile quantiles (5-value array) are computed and stored
in segment metadata. At query time, the TTL field is automatically
filtered. At compaction time, segment-level percentiles drive
expiration-based compaction decisions, enabling intelligent compaction
of segments where a configurable fraction of data has expired (e.g.,
compact when 40% of rows are expired, controlled by threshold ratio).
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
2026-01-05 10:27:24 +08:00

275 lines
7.9 KiB
Go

package datacoord
import (
"context"
"fmt"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
const (
// Fallback memory for pooling DataNode (returns 0 from GetMetrics)
defaultPoolingDataNodeMemory = 32 * 1024 * 1024 * 1024 // 32GB
)
// CollectionTopology captures memory constraints for a collection
type CollectionTopology struct {
CollectionID int64
NumReplicas int
IsStandaloneMode bool
IsPooling bool
QueryNodeMemory map[int64]uint64
DataNodeMemory map[int64]uint64
}
// CollectionTopologyQuerier queries collection topology including replicas and memory info
type CollectionTopologyQuerier interface {
GetCollectionTopology(ctx context.Context, collectionID int64) (*CollectionTopology, error)
}
type forceMergeCompactionPolicy struct {
meta *meta
allocator allocator.Allocator
handler Handler
topologyQuerier CollectionTopologyQuerier
}
func newForceMergeCompactionPolicy(meta *meta, allocator allocator.Allocator, handler Handler) *forceMergeCompactionPolicy {
return &forceMergeCompactionPolicy{
meta: meta,
allocator: allocator,
handler: handler,
topologyQuerier: nil,
}
}
func (policy *forceMergeCompactionPolicy) SetTopologyQuerier(querier CollectionTopologyQuerier) {
policy.topologyQuerier = querier
}
func (policy *forceMergeCompactionPolicy) triggerOneCollection(
ctx context.Context,
collectionID int64,
targetSize int64,
) ([]CompactionView, int64, error) {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", collectionID),
zap.Int64("targetSize", targetSize))
collection, err := policy.handler.GetCollection(ctx, collectionID)
if err != nil {
return nil, 0, err
}
triggerID, err := policy.allocator.AllocID(ctx)
if err != nil {
return nil, 0, err
}
collectionTTL, err := common.GetCollectionTTLFromMap(collection.Properties)
if err != nil {
log.Warn("failed to get collection ttl, use default", zap.Error(err))
collectionTTL = 0
}
configMaxSize := getExpectedSegmentSize(policy.meta, collectionID, collection.Schema)
segments := policy.meta.SelectSegments(ctx, WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlushed(segment) &&
!segment.isCompacting &&
!segment.GetIsImporting() &&
segment.GetLevel() != datapb.SegmentLevel_L0
}))
if len(segments) == 0 {
log.Info("no eligible segments for force merge")
return nil, 0, nil
}
topology, err := policy.topologyQuerier.GetCollectionTopology(ctx, collectionID)
if err != nil {
return nil, 0, err
}
views := []CompactionView{}
for label, groups := range groupByPartitionChannel(GetViewsByInfo(segments...)) {
view := &ForceMergeSegmentView{
label: label,
segments: groups,
triggerID: triggerID,
collectionTTL: collectionTTL,
configMaxSize: float64(configMaxSize),
topology: topology,
}
views = append(views, view)
}
log.Info("force merge triggered", zap.Int("viewCount", len(views)))
return views, triggerID, nil
}
func groupByPartitionChannel(segments []*SegmentView) map[*CompactionGroupLabel][]*SegmentView {
result := make(map[*CompactionGroupLabel][]*SegmentView)
for _, seg := range segments {
label := seg.label
key := label.Key()
var foundLabel *CompactionGroupLabel
for l := range result {
if l.Key() == key {
foundLabel = l
break
}
}
if foundLabel == nil {
foundLabel = label
}
result[foundLabel] = append(result[foundLabel], seg)
}
return result
}
type metricsNodeMemoryQuerier struct {
nodeManager session.NodeManager
mixCoord types.MixCoord
session sessionutil.SessionInterface
}
func newMetricsNodeMemoryQuerier(nodeManager session.NodeManager, mixCoord types.MixCoord, session sessionutil.SessionInterface) *metricsNodeMemoryQuerier {
return &metricsNodeMemoryQuerier{
nodeManager: nodeManager,
mixCoord: mixCoord,
session: session,
}
}
var _ CollectionTopologyQuerier = (*metricsNodeMemoryQuerier)(nil)
func (q *metricsNodeMemoryQuerier) GetCollectionTopology(ctx context.Context, collectionID int64) (*CollectionTopology, error) {
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID))
if q.mixCoord == nil {
return nil, fmt.Errorf("mixCoord not available for topology query")
}
// 1. Get replica information
replicasResp, err := q.mixCoord.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
CollectionID: collectionID,
})
if err != nil {
return nil, err
}
numReplicas := len(replicasResp.GetReplicas())
// 2. Get QueryNode metrics for memory info
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
if err != nil {
return nil, err
}
// Get QueryNode sessions from etcd to filter out embedded nodes
sessions, _, err := q.session.GetSessions(ctx, typeutil.QueryNodeRole)
if err != nil {
log.Warn("failed to get QueryNode sessions", zap.Error(err))
return nil, err
}
// Build set of embedded QueryNode IDs to exclude
embeddedNodeIDs := make(map[int64]struct{})
for _, sess := range sessions {
// Check if this is an embedded QueryNode in streaming node
if labels := sess.ServerLabels; labels != nil {
if labels[sessionutil.LabelStreamingNodeEmbeddedQueryNode] == "1" {
embeddedNodeIDs[sess.ServerID] = struct{}{}
}
}
}
log.Info("excluding embedded QueryNode", zap.Int64s("nodeIDs", lo.Keys(embeddedNodeIDs)))
rsp, err := q.mixCoord.GetQcMetrics(ctx, req)
if err = merr.CheckRPCCall(rsp, err); err != nil {
return nil, err
}
topology := &metricsinfo.QueryCoordTopology{}
if err := metricsinfo.UnmarshalTopology(rsp.GetResponse(), topology); err != nil {
return nil, err
}
// Build QueryNode memory map: nodeID → memory size (exclude embedded nodes)
queryNodeMemory := make(map[int64]uint64)
for _, node := range topology.Cluster.ConnectedNodes {
if _, ok := embeddedNodeIDs[node.ID]; ok {
continue
}
queryNodeMemory[node.ID] = node.HardwareInfos.Memory
}
// 3. Get DataNode memory info
dataNodeMemory := make(map[int64]uint64)
isPooling := false
nodes := q.nodeManager.GetClientIDs()
for _, nodeID := range nodes {
cli, err := q.nodeManager.GetClient(nodeID)
if err != nil {
continue
}
resp, err := cli.GetMetrics(ctx, req)
if err != nil {
continue
}
var infos metricsinfo.DataNodeInfos
if err := metricsinfo.UnmarshalComponentInfos(resp.GetResponse(), &infos); err != nil {
continue
}
if infos.HardwareInfos.Memory > 0 {
dataNodeMemory[nodeID] = infos.HardwareInfos.Memory
} else {
// Pooling DataNode returns 0 from GetMetrics
// Use default fallback: 32GB
isPooling = true
log.Warn("DataNode returned 0 memory (pooling mode?), using default",
zap.Int64("nodeID", nodeID),
zap.Uint64("defaultMemory", defaultPoolingDataNodeMemory))
dataNodeMemory[nodeID] = defaultPoolingDataNodeMemory
}
}
isStandaloneMode := paramtable.GetRole() == typeutil.StandaloneRole
log.Info("Collection topology",
zap.Int64("collectionID", collectionID),
zap.Int("numReplicas", numReplicas),
zap.Any("querynodes", queryNodeMemory),
zap.Any("datanodes", dataNodeMemory),
zap.Bool("isStandaloneMode", isStandaloneMode),
zap.Bool("isPooling", isPooling))
return &CollectionTopology{
CollectionID: collectionID,
NumReplicas: numReplicas,
QueryNodeMemory: queryNodeMemory,
DataNodeMemory: dataNodeMemory,
IsStandaloneMode: isStandaloneMode,
IsPooling: isPooling,
}, nil
}