diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index 44bcc93afc..fd8fa493d6 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -493,9 +493,13 @@ func (t *queryTask) PreExecute(ctx context.Context) error { t.RetrieveRequest.IsIterator = queryParams.isIterator if collectionInfo.collectionTTL != 0 { - physicalTime, _ := tsoutil.ParseTS(guaranteeTs) + physicalTime := tsoutil.PhysicalTime(t.GetBase().GetTimestamp()) expireTime := physicalTime.Add(-time.Duration(collectionInfo.collectionTTL)) t.CollectionTtlTimestamps = tsoutil.ComposeTSByTime(expireTime, 0) + // preventing overflow, abort + if t.CollectionTtlTimestamps > t.GetBase().GetTimestamp() { + return merr.WrapErrServiceInternal(fmt.Sprintf("ttl timestamp overflow, base timestamp: %d, ttl duration %v", t.GetBase().GetTimestamp(), collectionInfo.collectionTTL)) + } } deadline, ok := t.TraceCtx().Deadline() if ok { diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 6368a26ac3..2e6b9a52e8 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -267,9 +267,13 @@ func (t *searchTask) PreExecute(ctx context.Context) error { } if collectionInfo.collectionTTL != 0 { - physicalTime, _ := tsoutil.ParseTS(guaranteeTs) + physicalTime := tsoutil.PhysicalTime(t.GetBase().GetTimestamp()) expireTime := physicalTime.Add(-time.Duration(collectionInfo.collectionTTL)) t.CollectionTtlTimestamps = tsoutil.ComposeTSByTime(expireTime, 0) + // preventing overflow, abort + if t.CollectionTtlTimestamps > t.GetBase().GetTimestamp() { + return merr.WrapErrServiceInternal(fmt.Sprintf("ttl timestamp overflow, base timestamp: %d, ttl duration %v", t.GetBase().GetTimestamp(), collectionInfo.collectionTTL)) + } } t.resultBuf = typeutil.NewConcurrentSet[*internalpb.SearchResults]() diff --git a/internal/proxy/task_search_test.go b/internal/proxy/task_search_test.go index 1b7a2690c9..3b694e4d15 100644 --- a/internal/proxy/task_search_test.go +++ b/internal/proxy/task_search_test.go @@ -54,6 +54,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/testutils" "github.com/milvus-io/milvus/pkg/v2/util/timerecord" + "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -87,6 +88,7 @@ func TestSearchTask_PostExecute(t *testing.T) { tr: timerecord.NewTimeRecorder("test-search"), } require.NoError(t, task.OnEnqueue()) + task.SetTs(tsoutil.ComposeTSByTime(time.Now(), 0)) return task } t.Run("Test empty result", func(t *testing.T) { @@ -705,6 +707,7 @@ func TestSearchTask_PreExecute(t *testing.T) { tr: timerecord.NewTimeRecorder("test-search"), } require.NoError(t, task.OnEnqueue()) + task.SetTs(tsoutil.ComposeTSByTime(time.Now(), 0)) return task } @@ -721,6 +724,7 @@ func TestSearchTask_PreExecute(t *testing.T) { tr: timerecord.NewTimeRecorder("test-search"), } require.NoError(t, task.OnEnqueue()) + task.SetTs(tsoutil.ComposeTSByTime(time.Now(), 0)) return task } @@ -754,6 +758,7 @@ func TestSearchTask_PreExecute(t *testing.T) { tr: timerecord.NewTimeRecorder("test-search"), } require.NoError(t, task.OnEnqueue()) + task.SetTs(tsoutil.ComposeTSByTime(time.Now(), 0)) return task } @@ -894,7 +899,7 @@ func TestSearchTask_PreExecute(t *testing.T) { _, cancel := context.WithTimeout(ctx, time.Second) defer cancel() require.Equal(t, typeutil.ZeroTimestamp, st.TimeoutTimestamp) - enqueueTs := uint64(100000) + enqueueTs := tsoutil.ComposeTSByTime(time.Now(), 0) st.SetTs(enqueueTs) assert.NoError(t, st.PreExecute(ctx)) assert.True(t, st.isIterator) @@ -923,7 +928,7 @@ func TestSearchTask_PreExecute(t *testing.T) { _, cancel := context.WithTimeout(ctx, time.Second) defer cancel() require.Equal(t, typeutil.ZeroTimestamp, st.TimeoutTimestamp) - enqueueTs := uint64(100000) + enqueueTs := tsoutil.ComposeTSByTime(time.Now(), 0) st.SetTs(enqueueTs) assert.Error(t, st.PreExecute(ctx)) }) @@ -954,7 +959,7 @@ func TestSearchTask_PreExecute(t *testing.T) { _, cancel := context.WithTimeout(ctx, time.Second) defer cancel() require.Equal(t, typeutil.ZeroTimestamp, st.TimeoutTimestamp) - enqueueTs := uint64(100000) + enqueueTs := tsoutil.ComposeTSByTime(time.Now(), 0) st.SetTs(enqueueTs) assert.NoError(t, st.PreExecute(ctx)) assert.NotNil(t, st.functionScore) @@ -976,7 +981,7 @@ func TestSearchTask_PreExecute(t *testing.T) { _, cancel := context.WithTimeout(ctx, time.Second) defer cancel() require.Equal(t, typeutil.ZeroTimestamp, st.TimeoutTimestamp) - enqueueTs := uint64(100000) + enqueueTs := tsoutil.ComposeTSByTime(time.Now(), 0) st.SetTs(enqueueTs) assert.NoError(t, st.PreExecute(ctx)) assert.NotNil(t, st.functionScore) @@ -998,7 +1003,7 @@ func TestSearchTask_PreExecute(t *testing.T) { _, cancel := context.WithTimeout(ctx, time.Second) defer cancel() require.Equal(t, typeutil.ZeroTimestamp, st.TimeoutTimestamp) - enqueueTs := uint64(100000) + enqueueTs := tsoutil.ComposeTSByTime(time.Now(), 0) st.SetTs(enqueueTs) assert.NoError(t, st.PreExecute(ctx)) })