fix: Use task timestamp to calculate TTL timestamp (#42920)

Related to #42918

Previously the `CollectionTtlTimestamp` could be overflowed when the
guarantee_ts==1, which means using `Eventually` consistency level.

This patch use task timestamp, allocated by scheduler, to generate ttl
timestamp ignore the potential very small timestamp being used.

Also add overflow check for ttl timestamp calculated.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-06-25 20:48:42 +08:00 committed by GitHub
parent 69872f45ad
commit 942055fa7d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 20 additions and 7 deletions

View File

@ -493,9 +493,13 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
t.RetrieveRequest.IsIterator = queryParams.isIterator t.RetrieveRequest.IsIterator = queryParams.isIterator
if collectionInfo.collectionTTL != 0 { if collectionInfo.collectionTTL != 0 {
physicalTime, _ := tsoutil.ParseTS(guaranteeTs) physicalTime := tsoutil.PhysicalTime(t.GetBase().GetTimestamp())
expireTime := physicalTime.Add(-time.Duration(collectionInfo.collectionTTL)) expireTime := physicalTime.Add(-time.Duration(collectionInfo.collectionTTL))
t.CollectionTtlTimestamps = tsoutil.ComposeTSByTime(expireTime, 0) t.CollectionTtlTimestamps = tsoutil.ComposeTSByTime(expireTime, 0)
// preventing overflow, abort
if t.CollectionTtlTimestamps > t.GetBase().GetTimestamp() {
return merr.WrapErrServiceInternal(fmt.Sprintf("ttl timestamp overflow, base timestamp: %d, ttl duration %v", t.GetBase().GetTimestamp(), collectionInfo.collectionTTL))
}
} }
deadline, ok := t.TraceCtx().Deadline() deadline, ok := t.TraceCtx().Deadline()
if ok { if ok {

View File

@ -267,9 +267,13 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
} }
if collectionInfo.collectionTTL != 0 { if collectionInfo.collectionTTL != 0 {
physicalTime, _ := tsoutil.ParseTS(guaranteeTs) physicalTime := tsoutil.PhysicalTime(t.GetBase().GetTimestamp())
expireTime := physicalTime.Add(-time.Duration(collectionInfo.collectionTTL)) expireTime := physicalTime.Add(-time.Duration(collectionInfo.collectionTTL))
t.CollectionTtlTimestamps = tsoutil.ComposeTSByTime(expireTime, 0) t.CollectionTtlTimestamps = tsoutil.ComposeTSByTime(expireTime, 0)
// preventing overflow, abort
if t.CollectionTtlTimestamps > t.GetBase().GetTimestamp() {
return merr.WrapErrServiceInternal(fmt.Sprintf("ttl timestamp overflow, base timestamp: %d, ttl duration %v", t.GetBase().GetTimestamp(), collectionInfo.collectionTTL))
}
} }
t.resultBuf = typeutil.NewConcurrentSet[*internalpb.SearchResults]() t.resultBuf = typeutil.NewConcurrentSet[*internalpb.SearchResults]()

View File

@ -54,6 +54,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/testutils" "github.com/milvus-io/milvus/pkg/v2/util/testutils"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord" "github.com/milvus-io/milvus/pkg/v2/util/timerecord"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
@ -87,6 +88,7 @@ func TestSearchTask_PostExecute(t *testing.T) {
tr: timerecord.NewTimeRecorder("test-search"), tr: timerecord.NewTimeRecorder("test-search"),
} }
require.NoError(t, task.OnEnqueue()) require.NoError(t, task.OnEnqueue())
task.SetTs(tsoutil.ComposeTSByTime(time.Now(), 0))
return task return task
} }
t.Run("Test empty result", func(t *testing.T) { t.Run("Test empty result", func(t *testing.T) {
@ -705,6 +707,7 @@ func TestSearchTask_PreExecute(t *testing.T) {
tr: timerecord.NewTimeRecorder("test-search"), tr: timerecord.NewTimeRecorder("test-search"),
} }
require.NoError(t, task.OnEnqueue()) require.NoError(t, task.OnEnqueue())
task.SetTs(tsoutil.ComposeTSByTime(time.Now(), 0))
return task return task
} }
@ -721,6 +724,7 @@ func TestSearchTask_PreExecute(t *testing.T) {
tr: timerecord.NewTimeRecorder("test-search"), tr: timerecord.NewTimeRecorder("test-search"),
} }
require.NoError(t, task.OnEnqueue()) require.NoError(t, task.OnEnqueue())
task.SetTs(tsoutil.ComposeTSByTime(time.Now(), 0))
return task return task
} }
@ -754,6 +758,7 @@ func TestSearchTask_PreExecute(t *testing.T) {
tr: timerecord.NewTimeRecorder("test-search"), tr: timerecord.NewTimeRecorder("test-search"),
} }
require.NoError(t, task.OnEnqueue()) require.NoError(t, task.OnEnqueue())
task.SetTs(tsoutil.ComposeTSByTime(time.Now(), 0))
return task return task
} }
@ -894,7 +899,7 @@ func TestSearchTask_PreExecute(t *testing.T) {
_, cancel := context.WithTimeout(ctx, time.Second) _, cancel := context.WithTimeout(ctx, time.Second)
defer cancel() defer cancel()
require.Equal(t, typeutil.ZeroTimestamp, st.TimeoutTimestamp) require.Equal(t, typeutil.ZeroTimestamp, st.TimeoutTimestamp)
enqueueTs := uint64(100000) enqueueTs := tsoutil.ComposeTSByTime(time.Now(), 0)
st.SetTs(enqueueTs) st.SetTs(enqueueTs)
assert.NoError(t, st.PreExecute(ctx)) assert.NoError(t, st.PreExecute(ctx))
assert.True(t, st.isIterator) assert.True(t, st.isIterator)
@ -923,7 +928,7 @@ func TestSearchTask_PreExecute(t *testing.T) {
_, cancel := context.WithTimeout(ctx, time.Second) _, cancel := context.WithTimeout(ctx, time.Second)
defer cancel() defer cancel()
require.Equal(t, typeutil.ZeroTimestamp, st.TimeoutTimestamp) require.Equal(t, typeutil.ZeroTimestamp, st.TimeoutTimestamp)
enqueueTs := uint64(100000) enqueueTs := tsoutil.ComposeTSByTime(time.Now(), 0)
st.SetTs(enqueueTs) st.SetTs(enqueueTs)
assert.Error(t, st.PreExecute(ctx)) assert.Error(t, st.PreExecute(ctx))
}) })
@ -954,7 +959,7 @@ func TestSearchTask_PreExecute(t *testing.T) {
_, cancel := context.WithTimeout(ctx, time.Second) _, cancel := context.WithTimeout(ctx, time.Second)
defer cancel() defer cancel()
require.Equal(t, typeutil.ZeroTimestamp, st.TimeoutTimestamp) require.Equal(t, typeutil.ZeroTimestamp, st.TimeoutTimestamp)
enqueueTs := uint64(100000) enqueueTs := tsoutil.ComposeTSByTime(time.Now(), 0)
st.SetTs(enqueueTs) st.SetTs(enqueueTs)
assert.NoError(t, st.PreExecute(ctx)) assert.NoError(t, st.PreExecute(ctx))
assert.NotNil(t, st.functionScore) assert.NotNil(t, st.functionScore)
@ -976,7 +981,7 @@ func TestSearchTask_PreExecute(t *testing.T) {
_, cancel := context.WithTimeout(ctx, time.Second) _, cancel := context.WithTimeout(ctx, time.Second)
defer cancel() defer cancel()
require.Equal(t, typeutil.ZeroTimestamp, st.TimeoutTimestamp) require.Equal(t, typeutil.ZeroTimestamp, st.TimeoutTimestamp)
enqueueTs := uint64(100000) enqueueTs := tsoutil.ComposeTSByTime(time.Now(), 0)
st.SetTs(enqueueTs) st.SetTs(enqueueTs)
assert.NoError(t, st.PreExecute(ctx)) assert.NoError(t, st.PreExecute(ctx))
assert.NotNil(t, st.functionScore) assert.NotNil(t, st.functionScore)
@ -998,7 +1003,7 @@ func TestSearchTask_PreExecute(t *testing.T) {
_, cancel := context.WithTimeout(ctx, time.Second) _, cancel := context.WithTimeout(ctx, time.Second)
defer cancel() defer cancel()
require.Equal(t, typeutil.ZeroTimestamp, st.TimeoutTimestamp) require.Equal(t, typeutil.ZeroTimestamp, st.TimeoutTimestamp)
enqueueTs := uint64(100000) enqueueTs := tsoutil.ComposeTSByTime(time.Now(), 0)
st.SetTs(enqueueTs) st.SetTs(enqueueTs)
assert.NoError(t, st.PreExecute(ctx)) assert.NoError(t, st.PreExecute(ctx))
}) })