mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
Add compaction retention config (#11793)
Signed-off-by: sunby <bingyi.sun@zilliz.com> Co-authored-by: sunby <bingyi.sun@zilliz.com>
This commit is contained in:
parent
8b60aa8c4e
commit
b8c5239784
@ -173,6 +173,9 @@ dataCoord:
|
|||||||
sealProportion: 0.75 # It's the minimum proportion for a segment which can be sealed
|
sealProportion: 0.75 # It's the minimum proportion for a segment which can be sealed
|
||||||
assignmentExpiration: 2000 # ms
|
assignmentExpiration: 2000 # ms
|
||||||
|
|
||||||
|
compaction:
|
||||||
|
retentionDuration: 432000 # 5 days in seconds
|
||||||
|
|
||||||
dataNode:
|
dataNode:
|
||||||
port: 21124
|
port: 21124
|
||||||
|
|
||||||
|
|||||||
@ -19,7 +19,6 @@ const (
|
|||||||
singleCompactionRatioThreshold = 0.2
|
singleCompactionRatioThreshold = 0.2
|
||||||
singleCompactionDeltaLogMaxSize = 10 * 1024 * 1024 //10MiB
|
singleCompactionDeltaLogMaxSize = 10 * 1024 * 1024 //10MiB
|
||||||
globalCompactionInterval = 60 * time.Second
|
globalCompactionInterval = 60 * time.Second
|
||||||
timetravelRange = 5 * 24 * time.Hour
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type timetravel struct {
|
type timetravel struct {
|
||||||
|
|||||||
@ -79,6 +79,8 @@ type ParamTable struct {
|
|||||||
|
|
||||||
EnableCompaction bool
|
EnableCompaction bool
|
||||||
EnableGarbageCollection bool
|
EnableGarbageCollection bool
|
||||||
|
|
||||||
|
CompactionRetentionDuration int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Params is a package scoped variable of type ParamTable.
|
// Params is a package scoped variable of type ParamTable.
|
||||||
@ -126,6 +128,8 @@ func (p *ParamTable) Init() {
|
|||||||
p.initMinioUseSSL()
|
p.initMinioUseSSL()
|
||||||
p.initMinioBucketName()
|
p.initMinioBucketName()
|
||||||
p.initMinioRootPath()
|
p.initMinioRootPath()
|
||||||
|
|
||||||
|
p.initCompactionRetentionDuration()
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitOnce ensures param table is a singleton
|
// InitOnce ensures param table is a singleton
|
||||||
@ -344,5 +348,9 @@ func (p *ParamTable) initMinioRootPath() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
Params.MinioRootPath = rootPath
|
p.MinioRootPath = rootPath
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ParamTable) initCompactionRetentionDuration() {
|
||||||
|
p.CompactionRetentionDuration = p.ParseInt64WithDefault("dataCoord.compaction.retentionDuration", 432000)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -123,7 +123,7 @@ func getTimetravelReverseTime(ctx context.Context, allocator allocator) (*timetr
|
|||||||
}
|
}
|
||||||
|
|
||||||
pts, _ := tsoutil.ParseTS(ts)
|
pts, _ := tsoutil.ParseTS(ts)
|
||||||
ttpts := pts.Add(-timetravelRange)
|
ttpts := pts.Add(-time.Duration(Params.CompactionRetentionDuration) * time.Second)
|
||||||
tt := tsoutil.ComposeTS(ttpts.UnixNano()/int64(time.Millisecond), 0)
|
tt := tsoutil.ComposeTS(ttpts.UnixNano()/int64(time.Millisecond), 0)
|
||||||
return &timetravel{tt}, nil
|
return &timetravel{tt}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
// Licensed to the LF AI & Data foundation under one
|
// licensed to the lf ai & data foundation under one
|
||||||
// or more contributor license agreements. See the NOTICE file
|
// or more contributor license agreements. See the NOTICE file
|
||||||
// distributed with this work for additional information
|
// distributed with this work for additional information
|
||||||
// regarding copyright ownership. The ASF licenses this file
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
@ -17,11 +17,14 @@
|
|||||||
package datacoord
|
package datacoord
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -107,3 +110,47 @@ func TestVerifyResponse(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_getTimetravelReverseTime(t *testing.T) {
|
||||||
|
Params.Init()
|
||||||
|
Params.CompactionRetentionDuration = 43200 // 5 days
|
||||||
|
|
||||||
|
tFixed := time.Date(2021, 11, 15, 0, 0, 0, 0, time.Local)
|
||||||
|
tBefore := tFixed.Add(-time.Duration(Params.CompactionRetentionDuration) * time.Second)
|
||||||
|
|
||||||
|
type args struct {
|
||||||
|
allocator allocator
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
args args
|
||||||
|
want *timetravel
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"test get timetravel",
|
||||||
|
args{&fixedTSOAllocator{fixedTime: tFixed}},
|
||||||
|
&timetravel{tsoutil.ComposeTS(tBefore.UnixNano()/int64(time.Millisecond), 0)},
|
||||||
|
false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got, err := getTimetravelReverseTime(context.TODO(), tt.args.allocator)
|
||||||
|
assert.Equal(t, tt.wantErr, err != nil)
|
||||||
|
assert.EqualValues(t, tt.want, got)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type fixedTSOAllocator struct {
|
||||||
|
fixedTime time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fixedTSOAllocator) allocTimestamp(_ context.Context) (Timestamp, error) {
|
||||||
|
return tsoutil.ComposeTS(f.fixedTime.UnixNano()/int64(time.Millisecond), 0), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fixedTSOAllocator) allocID(_ context.Context) (UniqueID, error) {
|
||||||
|
panic("not implemented") // TODO: Implement
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user