diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 479f0215d7..f59e4cae90 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -173,6 +173,9 @@ dataCoord: sealProportion: 0.75 # It's the minimum proportion for a segment which can be sealed assignmentExpiration: 2000 # ms + compaction: + retentionDuration: 432000 # 5 days in seconds + dataNode: port: 21124 diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index bdd291736a..9bd40588e7 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -19,7 +19,6 @@ const ( singleCompactionRatioThreshold = 0.2 singleCompactionDeltaLogMaxSize = 10 * 1024 * 1024 //10MiB globalCompactionInterval = 60 * time.Second - timetravelRange = 5 * 24 * time.Hour ) type timetravel struct { diff --git a/internal/datacoord/param_table.go b/internal/datacoord/param_table.go index a355a5615b..a6dd7b922f 100644 --- a/internal/datacoord/param_table.go +++ b/internal/datacoord/param_table.go @@ -79,6 +79,8 @@ type ParamTable struct { EnableCompaction bool EnableGarbageCollection bool + + CompactionRetentionDuration int64 } // Params is a package scoped variable of type ParamTable. @@ -126,6 +128,8 @@ func (p *ParamTable) Init() { p.initMinioUseSSL() p.initMinioBucketName() p.initMinioRootPath() + + p.initCompactionRetentionDuration() } // InitOnce ensures param table is a singleton @@ -344,5 +348,9 @@ func (p *ParamTable) initMinioRootPath() { if err != nil { panic(err) } - Params.MinioRootPath = rootPath + p.MinioRootPath = rootPath +} + +func (p *ParamTable) initCompactionRetentionDuration() { + p.CompactionRetentionDuration = p.ParseInt64WithDefault("dataCoord.compaction.retentionDuration", 432000) } diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 29a63c229e..b492c07164 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -123,7 +123,7 @@ func getTimetravelReverseTime(ctx context.Context, allocator allocator) (*timetr } pts, _ := tsoutil.ParseTS(ts) - ttpts := pts.Add(-timetravelRange) + ttpts := pts.Add(-time.Duration(Params.CompactionRetentionDuration) * time.Second) tt := tsoutil.ComposeTS(ttpts.UnixNano()/int64(time.Millisecond), 0) return &timetravel{tt}, nil } diff --git a/internal/datacoord/util_test.go b/internal/datacoord/util_test.go index eeba1e8b10..69a015b845 100644 --- a/internal/datacoord/util_test.go +++ b/internal/datacoord/util_test.go @@ -1,4 +1,4 @@ -// Licensed to the LF AI & Data foundation under one +// licensed to the lf ai & data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file @@ -17,11 +17,14 @@ package datacoord import ( + "context" "errors" "testing" + "time" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/stretchr/testify/assert" ) @@ -107,3 +110,47 @@ func TestVerifyResponse(t *testing.T) { } } } + +func Test_getTimetravelReverseTime(t *testing.T) { + Params.Init() + Params.CompactionRetentionDuration = 43200 // 5 days + + tFixed := time.Date(2021, 11, 15, 0, 0, 0, 0, time.Local) + tBefore := tFixed.Add(-time.Duration(Params.CompactionRetentionDuration) * time.Second) + + type args struct { + allocator allocator + } + tests := []struct { + name string + args args + want *timetravel + wantErr bool + }{ + { + "test get timetravel", + args{&fixedTSOAllocator{fixedTime: tFixed}}, + &timetravel{tsoutil.ComposeTS(tBefore.UnixNano()/int64(time.Millisecond), 0)}, + false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := getTimetravelReverseTime(context.TODO(), tt.args.allocator) + assert.Equal(t, tt.wantErr, err != nil) + assert.EqualValues(t, tt.want, got) + }) + } +} + +type fixedTSOAllocator struct { + fixedTime time.Time +} + +func (f *fixedTSOAllocator) allocTimestamp(_ context.Context) (Timestamp, error) { + return tsoutil.ComposeTS(f.fixedTime.UnixNano()/int64(time.Millisecond), 0), nil +} + +func (f *fixedTSOAllocator) allocID(_ context.Context) (UniqueID, error) { + panic("not implemented") // TODO: Implement +}