From b1bbc56b549fc095bf13a6f02f56445f62700484 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 25 Jun 2025 20:56:42 +0800 Subject: [PATCH] fix: [2.5] Use task timestamp to calculate TTL timestamp (#42944) Cherry-pick from master pr: #42920 Related to #42918 Previously the `CollectionTtlTimestamp` could be overflowed when the guarantee_ts==1, which means using `Eventually` consistency level. This patch use task timestamp, allocated by scheduler, to generate ttl timestamp ignore the potential very small timestamp being used. Also add overflow check for ttl timestamp calculated. --------- Signed-off-by: Congqi Xia --- internal/proxy/task_query.go | 6 +++++- internal/proxy/task_search.go | 6 +++++- internal/proxy/task_search_test.go | 8 ++++++-- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index 77fada477e..de320eb320 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -482,9 +482,13 @@ func (t *queryTask) PreExecute(ctx context.Context) error { t.GuaranteeTimestamp = t.request.GetGuaranteeTimestamp() } if collectionInfo.collectionTTL != 0 { - physicalTime, _ := tsoutil.ParseTS(guaranteeTs) + physicalTime := tsoutil.PhysicalTime(t.GetBase().GetTimestamp()) expireTime := physicalTime.Add(-time.Duration(collectionInfo.collectionTTL)) t.CollectionTtlTimestamps = tsoutil.ComposeTSByTime(expireTime, 0) + // preventing overflow, abort + if t.CollectionTtlTimestamps > t.GetBase().GetTimestamp() { + return merr.WrapErrServiceInternal(fmt.Sprintf("ttl timestamp overflow, base timestamp: %d, ttl duration %v", t.GetBase().GetTimestamp(), collectionInfo.collectionTTL)) + } } deadline, ok := t.TraceCtx().Deadline() if ok { diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index fc544a700a..5bf1bbe4f2 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -265,9 +265,13 @@ func (t *searchTask) PreExecute(ctx context.Context) error { } if collectionInfo.collectionTTL != 0 { - physicalTime, _ := tsoutil.ParseTS(guaranteeTs) + physicalTime := tsoutil.PhysicalTime(t.GetBase().GetTimestamp()) expireTime := physicalTime.Add(-time.Duration(collectionInfo.collectionTTL)) t.CollectionTtlTimestamps = tsoutil.ComposeTSByTime(expireTime, 0) + // preventing overflow, abort + if t.CollectionTtlTimestamps > t.GetBase().GetTimestamp() { + return merr.WrapErrServiceInternal(fmt.Sprintf("ttl timestamp overflow, base timestamp: %d, ttl duration %v", t.GetBase().GetTimestamp(), collectionInfo.collectionTTL)) + } } t.resultBuf = typeutil.NewConcurrentSet[*internalpb.SearchResults]() diff --git a/internal/proxy/task_search_test.go b/internal/proxy/task_search_test.go index 1f4d2d2536..9ff8d073ec 100644 --- a/internal/proxy/task_search_test.go +++ b/internal/proxy/task_search_test.go @@ -49,6 +49,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/metric" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/timerecord" + "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -86,6 +87,7 @@ func TestSearchTask_PostExecute(t *testing.T) { tr: timerecord.NewTimeRecorder("test-search"), } require.NoError(t, task.OnEnqueue()) + task.SetTs(tsoutil.ComposeTSByTime(time.Now(), 0)) return task } t.Run("Test empty result", func(t *testing.T) { @@ -336,6 +338,7 @@ func TestSearchTask_PreExecute(t *testing.T) { tr: timerecord.NewTimeRecorder("test-search"), } require.NoError(t, task.OnEnqueue()) + task.SetTs(tsoutil.ComposeTSByTime(time.Now(), 0)) return task } @@ -352,6 +355,7 @@ func TestSearchTask_PreExecute(t *testing.T) { tr: timerecord.NewTimeRecorder("test-search"), } require.NoError(t, task.OnEnqueue()) + task.SetTs(tsoutil.ComposeTSByTime(time.Now(), 0)) return task } @@ -464,7 +468,7 @@ func TestSearchTask_PreExecute(t *testing.T) { _, cancel := context.WithTimeout(ctx, time.Second) defer cancel() require.Equal(t, typeutil.ZeroTimestamp, st.TimeoutTimestamp) - enqueueTs := uint64(100000) + enqueueTs := tsoutil.ComposeTSByTime(time.Now(), 0) st.SetTs(enqueueTs) assert.NoError(t, st.PreExecute(ctx)) assert.True(t, st.isIterator) @@ -493,7 +497,7 @@ func TestSearchTask_PreExecute(t *testing.T) { _, cancel := context.WithTimeout(ctx, time.Second) defer cancel() require.Equal(t, typeutil.ZeroTimestamp, st.TimeoutTimestamp) - enqueueTs := uint64(100000) + enqueueTs := tsoutil.ComposeTSByTime(time.Now(), 0) st.SetTs(enqueueTs) assert.Error(t, st.PreExecute(ctx)) })