Add segment gc restraint of dml position before channel cp (#21564)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2023-01-06 21:33:36 +08:00 committed by GitHub
parent e1f722045b
commit 42ab1b0a48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 2 deletions

View File

@ -202,9 +202,11 @@ func (gc *garbageCollector) clearEtcd() {
all := gc.meta.SelectSegments(func(si *SegmentInfo) bool { return true }) all := gc.meta.SelectSegments(func(si *SegmentInfo) bool { return true })
drops := make(map[int64]*SegmentInfo, 0) drops := make(map[int64]*SegmentInfo, 0)
compactTo := make(map[int64]*SegmentInfo) compactTo := make(map[int64]*SegmentInfo)
channels := typeutil.NewSet[string]()
for _, segment := range all { for _, segment := range all {
if segment.GetState() == commonpb.SegmentState_Dropped { if segment.GetState() == commonpb.SegmentState_Dropped {
drops[segment.GetID()] = segment drops[segment.GetID()] = segment
channels.Insert(segment.GetInsertChannel())
//continue //continue
// A(indexed), B(indexed) -> C(no indexed), D(no indexed) -> E(no indexed), A, B can not be GC // A(indexed), B(indexed) -> C(no indexed), D(no indexed) -> E(no indexed), A, B can not be GC
} }
@ -225,10 +227,24 @@ func (gc *garbageCollector) clearEtcd() {
indexedSet.Insert(segment.GetID()) indexedSet.Insert(segment.GetID())
} }
channelCPs := make(map[string]uint64)
for channel := range channels {
pos := gc.meta.GetChannelCheckpoint(channel)
channelCPs[channel] = pos.GetTimestamp()
}
for _, segment := range drops { for _, segment := range drops {
if !gc.isExpire(segment.GetDroppedAt()) { if !gc.isExpire(segment.GetDroppedAt()) {
continue continue
} }
// segment gc shall only happen when channel cp is after segment dml cp.
if segment.GetDmlPosition().GetTimestamp() > channelCPs[segment.GetInsertChannel()] {
log.RatedInfo(60, "dropped segment dml position after channel cp, skip meta gc",
zap.Uint64("dmlPosTs", segment.GetDmlPosition().GetTimestamp()),
zap.Uint64("channelCpTs", channelCPs[segment.GetInsertChannel()]),
)
continue
}
// For compact A, B -> C, don't GC A or B if C is not indexed, // For compact A, B -> C, don't GC A or B if C is not indexed,
// guarantee replacing A, B with C won't downgrade performance // guarantee replacing A, B with C won't downgrade performance
if to, ok := compactTo[segment.GetID()]; ok && !indexedSet.Contain(to.GetID()) { if to, ok := compactTo[segment.GetID()]; ok && !indexedSet.Contain(to.GetID()) {

View File

@ -30,6 +30,11 @@ import (
"github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/credentials"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -40,9 +45,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks" catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/funcutil"
) )
@ -772,6 +775,11 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
m := &meta{ m := &meta{
catalog: catalog, catalog: catalog,
channelCPs: map[string]*internalpb.MsgPosition{
"dmlChannel": {
Timestamp: 1000,
},
},
segments: &SegmentsInfo{ segments: &SegmentsInfo{
map[UniqueID]*SegmentInfo{ map[UniqueID]*SegmentInfo{
segID: { segID: {
@ -878,6 +886,23 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
}, },
segmentIndexes: map[UniqueID]*model.SegmentIndex{}, segmentIndexes: map[UniqueID]*model.SegmentIndex{},
}, },
// before channel cp,
segID + 5: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 5,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 2000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65535,
DroppedAt: 0,
CompactionFrom: nil,
DmlPosition: &internalpb.MsgPosition{
Timestamp: 1200,
},
},
},
}, },
}, },
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{ buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{
@ -982,6 +1007,8 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
assert.NotNil(t, segD) assert.NotNil(t, segD)
segE := gc.meta.GetSegmentUnsafe(segID + 4) segE := gc.meta.GetSegmentUnsafe(segID + 4)
assert.NotNil(t, segE) assert.NotNil(t, segE)
segF := gc.meta.GetSegmentUnsafe(segID + 5)
assert.NotNil(t, segF)
err := gc.meta.AddSegmentIndex(&model.SegmentIndex{ err := gc.meta.AddSegmentIndex(&model.SegmentIndex{
SegmentID: segID + 4, SegmentID: segID + 4,
@ -1013,10 +1040,15 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
assert.Nil(t, segD) assert.Nil(t, segD)
segE = gc.meta.GetSegmentUnsafe(segID + 4) segE = gc.meta.GetSegmentUnsafe(segID + 4)
assert.NotNil(t, segE) assert.NotNil(t, segE)
segF = gc.meta.GetSegmentUnsafe(segID + 5)
assert.NotNil(t, segF)
gc.clearEtcd() gc.clearEtcd()
segA = gc.meta.GetSegmentUnsafe(segID) segA = gc.meta.GetSegmentUnsafe(segID)
assert.Nil(t, segA) assert.Nil(t, segA)
segB = gc.meta.GetSegmentUnsafe(segID + 1) segB = gc.meta.GetSegmentUnsafe(segID + 1)
assert.Nil(t, segB) assert.Nil(t, segB)
segF = gc.meta.GetSegmentUnsafe(segID + 5)
assert.NotNil(t, segF)
} }