enhance: optimize forwarding level0 deletions by respecting partition (#28456)

- Cache the level 0 deletions after loading level0 segments
- Divide the level 0 deletions by partition
related: #27349

---------

Signed-off-by: yah01 <yah2er0ne@outlook.com>
This commit is contained in:
yah01 2023-11-21 18:24:22 +08:00 committed by GitHub
parent bfccfcd0ca
commit cc952e0486
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 244 additions and 90 deletions

View File

@ -840,6 +840,7 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
CollectionID: segment.CollectionID,
InsertChannel: segment.InsertChannel,
NumOfRows: rowCount,
Level: segment.GetLevel(),
})
}

View File

@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
@ -92,11 +93,13 @@ type shardDelegator struct {
lifetime lifetime.Lifetime[lifetime.State]
distribution *distribution
segmentManager segments.SegmentManager
tsafeManager tsafe.Manager
pkOracle pkoracle.PkOracle
// L0 delete buffer
distribution *distribution
segmentManager segments.SegmentManager
tsafeManager tsafe.Manager
pkOracle pkoracle.PkOracle
level0Mut sync.RWMutex
level0Deletions map[int64]*storage.DeleteData // partitionID -> deletions
// stream delete buffer
deleteMut sync.Mutex
deleteBuffer deletebuffer.DeleteBuffer[*deletebuffer.Item]
// dispatcherClient msgdispatcher.Client
@ -654,21 +657,22 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
log.Info("Init delta cache", zap.Int64("maxSegmentCacheBuffer", maxSegmentDeleteBuffer), zap.Time("startTime", tsoutil.PhysicalTime(startTs)))
sd := &shardDelegator{
collectionID: collectionID,
replicaID: replicaID,
vchannelName: channel,
version: version,
collection: collection,
segmentManager: manager.Segment,
workerManager: workerManager,
lifetime: lifetime.NewLifetime(lifetime.Initializing),
distribution: NewDistribution(),
deleteBuffer: deletebuffer.NewDoubleCacheDeleteBuffer[*deletebuffer.Item](startTs, maxSegmentDeleteBuffer),
pkOracle: pkoracle.NewPkOracle(),
tsafeManager: tsafeManager,
latestTsafe: atomic.NewUint64(startTs),
loader: loader,
factory: factory,
collectionID: collectionID,
replicaID: replicaID,
vchannelName: channel,
version: version,
collection: collection,
segmentManager: manager.Segment,
workerManager: workerManager,
lifetime: lifetime.NewLifetime(lifetime.Initializing),
distribution: NewDistribution(),
level0Deletions: make(map[int64]*storage.DeleteData),
deleteBuffer: deletebuffer.NewDoubleCacheDeleteBuffer[*deletebuffer.Item](startTs, maxSegmentDeleteBuffer),
pkOracle: pkoracle.NewPkOracle(),
tsafeManager: tsafeManager,
latestTsafe: atomic.NewUint64(startTs),
loader: loader,
factory: factory,
}
m := sync.Mutex{}
sd.tsCond = sync.NewCond(&m)

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math/rand"
"sort"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
@ -322,23 +323,27 @@ func (sd *shardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.Segm
return err
}
deletedPks, deletedTss := sd.segmentManager.GetL0DeleteRecords()
if len(deletedPks) > 0 {
log.Info("forwarding L0 delete records...", zap.Int("deleteNum", len(deletedPks)))
for _, segment := range loaded {
err = segment.Delete(deletedPks, deletedTss)
if err != nil {
log.Warn("failed to forward L0 deletions to growing segment",
zap.Int64("segmentID", segment.ID()),
zap.Error(err),
)
for _, segment := range loaded {
log := log.With(
zap.Int64("segmentID", segment.ID()),
)
deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition())
if len(deletedPks) == 0 {
continue
}
// clear loaded growing segments
for _, segment := range loaded {
segment.Release()
}
return err
log.Info("forwarding L0 delete records...", zap.Int("deletionCount", len(deletedPks)))
err = segment.Delete(deletedPks, deletedTss)
if err != nil {
log.Warn("failed to forward L0 deletions to growing segment",
zap.Error(err),
)
// clear loaded growing segments
for _, segment := range loaded {
segment.Release()
}
return err
}
}
@ -431,9 +436,8 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
}
log.Debug("work loads segments done")
// load index and L0 segment need no stream delete and distribution change
if req.GetLoadScope() == querypb.LoadScope_Index ||
req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 {
// load index segment need no stream delete and distribution change
if req.GetLoadScope() == querypb.LoadScope_Index {
return nil
}
@ -445,16 +449,104 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
Version: req.GetVersion(),
}
})
log.Debug("load delete...")
err = sd.loadStreamDelete(ctx, candidates, infos, req.GetDeltaPositions(), targetNodeID, worker, entries)
if err != nil {
log.Warn("load stream delete failed", zap.Error(err))
return err
if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 {
sd.GenerateLevel0DeletionCache()
} else {
log.Debug("load delete...")
err = sd.loadStreamDelete(ctx, candidates, infos, req.GetDeltaPositions(), targetNodeID, worker, entries)
if err != nil {
log.Warn("load stream delete failed", zap.Error(err))
return err
}
}
// alter distribution
sd.distribution.AddDistributions(entries...)
return nil
}
func (sd *shardDelegator) GetLevel0Deletions(partitionID int64) ([]storage.PrimaryKey, []storage.Timestamp) {
sd.level0Mut.RLock()
deleteData, ok1 := sd.level0Deletions[partitionID]
allPartitionsDeleteData, ok2 := sd.level0Deletions[common.InvalidPartitionID]
sd.level0Mut.RUnlock()
// we may need to merge the specified partition deletions and the all partitions deletions,
// so release the mutex as early as possible.
if ok1 && ok2 {
pks := make([]storage.PrimaryKey, 0, deleteData.RowCount+allPartitionsDeleteData.RowCount)
tss := make([]storage.Timestamp, 0, deleteData.RowCount+allPartitionsDeleteData.RowCount)
i := 0
j := 0
for i < int(deleteData.RowCount) || j < int(allPartitionsDeleteData.RowCount) {
if i == int(deleteData.RowCount) {
pks = append(pks, allPartitionsDeleteData.Pks[j])
tss = append(tss, allPartitionsDeleteData.Tss[j])
j++
} else if j == int(allPartitionsDeleteData.RowCount) {
pks = append(pks, deleteData.Pks[i])
tss = append(tss, deleteData.Tss[i])
i++
} else if deleteData.Tss[i] < allPartitionsDeleteData.Tss[j] {
pks = append(pks, deleteData.Pks[i])
tss = append(tss, deleteData.Tss[i])
i++
} else {
pks = append(pks, allPartitionsDeleteData.Pks[j])
tss = append(tss, allPartitionsDeleteData.Tss[j])
j++
}
}
return pks, tss
} else if ok1 {
return deleteData.Pks, deleteData.Tss
} else if ok2 {
return allPartitionsDeleteData.Pks, allPartitionsDeleteData.Tss
}
return nil, nil
}
func (sd *shardDelegator) GenerateLevel0DeletionCache() {
level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0))
deletions := make(map[int64]*storage.DeleteData)
for _, segment := range level0Segments {
segment := segment.(*segments.L0Segment)
pks, tss := segment.DeleteRecords()
deleteData, ok := deletions[segment.Partition()]
if !ok {
deleteData = storage.NewDeleteData(pks, tss)
} else {
deleteData.AppendBatch(pks, tss)
}
deletions[segment.Partition()] = deleteData
}
type DeletePair struct {
Pk storage.PrimaryKey
Ts storage.Timestamp
}
for _, deleteData := range deletions {
pairs := make([]DeletePair, deleteData.RowCount)
for i := range deleteData.Pks {
pairs[i] = DeletePair{deleteData.Pks[i], deleteData.Tss[i]}
}
sort.Slice(pairs, func(i, j int) bool {
return pairs[i].Ts < pairs[j].Ts
})
for i := range pairs {
deleteData.Pks[i], deleteData.Tss[i] = pairs[i].Pk, pairs[i].Ts
}
}
sd.level0Mut.Lock()
defer sd.level0Mut.Unlock()
sd.level0Deletions = deletions
}
func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
candidates []*pkoracle.BloomFilterSet,
infos []*querypb.SegmentLoadInfo,
@ -469,8 +561,6 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
return candidate.ID(), candidate
})
level0DeletePks, level0DeleteTss := sd.segmentManager.GetL0DeleteRecords()
sd.deleteMut.Lock()
defer sd.deleteMut.Unlock()
// apply buffered delete for new segments
@ -490,10 +580,11 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
position = deltaPositions[0]
}
deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition())
deleteData := &storage.DeleteData{}
for i, pk := range level0DeletePks {
for i, pk := range deletedPks {
if candidate.MayPkExist(pk) {
deleteData.Append(pk, level0DeleteTss[i])
deleteData.Append(pk, deletedTss[i])
}
}
@ -570,8 +661,6 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
sd.pkOracle.Register(candidate, targetNodeID)
}
log.Info("load delete done")
// alter distribution
sd.distribution.AddDistributions(entries...)
return nil
}
@ -658,6 +747,17 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele
log := sd.getLogger(ctx)
targetNodeID := req.GetNodeID()
level0Segments := typeutil.NewSet(lo.Map(sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0)), func(segment segments.Segment, _ int) int64 {
return segment.ID()
})...)
hasLevel0 := false
for _, segmentID := range req.GetSegmentIDs() {
hasLevel0 = level0Segments.Contain(segmentID)
if hasLevel0 {
break
}
}
// add common log fields
log = log.With(
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
@ -724,6 +824,10 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele
return err
}
if hasLevel0 {
sd.GenerateLevel0DeletionCache()
}
return nil
}

View File

@ -502,12 +502,12 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
})
s.Run("load_segments_with_l0_delete_failed", func() {
s.T().Skip("skip this test for now")
defer func() {
s.workerManager.ExpectedCalls = nil
s.loader.ExpectedCalls = nil
}()
mockMgr := segments.NewMockSegmentManager(s.T())
delegator, err := NewShardDelegator(
context.Background(),
s.collectionID,
@ -515,10 +515,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
s.vchannelName,
s.version,
s.workerManager,
&segments.Manager{
Collection: s.manager.Collection,
Segment: mockMgr,
},
s.manager,
s.tsafeManager,
s.loader,
&msgstream.MockMqFactory{
@ -529,9 +526,15 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
s.NoError(err)
growing0 := segments.NewMockSegment(s.T())
growing0.EXPECT().ID().Return(1)
growing0.EXPECT().Partition().Return(10)
growing0.EXPECT().Type().Return(segments.SegmentTypeGrowing)
growing0.EXPECT().Release()
growing1 := segments.NewMockSegment(s.T())
growing1.EXPECT().ID().Return(2)
growing0.EXPECT().Release()
growing1.EXPECT().Partition().Return(10)
growing1.EXPECT().Type().Return(segments.SegmentTypeGrowing)
growing1.EXPECT().Release()
mockErr := merr.WrapErrServiceInternal("mock")
@ -548,11 +551,6 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
mock.Anything,
).Return([]segments.Segment{growing0, growing1}, nil)
mockMgr.EXPECT().GetL0DeleteRecords().Return(
[]storage.PrimaryKey{storage.NewInt64PrimaryKey(1)},
[]uint64{100},
)
err = delegator.LoadGrowing(context.Background(), []*querypb.SegmentLoadInfo{{}, {}}, 100)
s.ErrorIs(err, mockErr)
})
@ -901,6 +899,51 @@ func (s *DelegatorDataSuite) TestSyncTargetVersion() {
s.Equal(int64(5), s.delegator.GetTargetVersion())
}
func (s *DelegatorDataSuite) TestLevel0Deletions() {
delegator := s.delegator.(*shardDelegator)
partitionID := int64(10)
partitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1)}, []storage.Timestamp{100})
allPartitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(2)}, []storage.Timestamp{101})
delegator.level0Deletions[partitionID] = partitionDeleteData
pks, _ := delegator.GetLevel0Deletions(partitionID)
s.True(pks[0].EQ(partitionDeleteData.Pks[0]))
pks, _ = delegator.GetLevel0Deletions(partitionID + 1)
s.Empty(pks)
delegator.level0Deletions[common.InvalidPartitionID] = allPartitionDeleteData
pks, _ = delegator.GetLevel0Deletions(partitionID)
s.Len(pks, 2)
s.True(pks[0].EQ(partitionDeleteData.Pks[0]))
s.True(pks[1].EQ(allPartitionDeleteData.Pks[0]))
delete(delegator.level0Deletions, partitionID)
pks, _ = delegator.GetLevel0Deletions(partitionID)
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))
// exchange the order
delegator.level0Deletions = make(map[int64]*storage.DeleteData)
partitionDeleteData, allPartitionDeleteData = allPartitionDeleteData, partitionDeleteData
delegator.level0Deletions[partitionID] = partitionDeleteData
pks, _ = delegator.GetLevel0Deletions(partitionID)
s.True(pks[0].EQ(partitionDeleteData.Pks[0]))
pks, _ = delegator.GetLevel0Deletions(partitionID + 1)
s.Empty(pks)
delegator.level0Deletions[common.InvalidPartitionID] = allPartitionDeleteData
pks, _ = delegator.GetLevel0Deletions(partitionID)
s.Len(pks, 2)
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))
s.True(pks[1].EQ(partitionDeleteData.Pks[0]))
delete(delegator.level0Deletions, partitionID)
pks, _ = delegator.GetLevel0Deletions(partitionID)
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))
}
func TestDelegatorDataSuite(t *testing.T) {
suite.Run(t, new(DelegatorDataSuite))
}

View File

@ -33,7 +33,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/eventlog"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
@ -74,6 +73,12 @@ func WithID(id int64) SegmentFilter {
}
}
func WithLevel(level datapb.SegmentLevel) SegmentFilter {
return func(segment Segment) bool {
return segment.Level() == level
}
}
type SegmentAction func(segment Segment) bool
func IncreaseVersion(version int64) SegmentAction {
@ -129,7 +134,6 @@ type SegmentManager interface {
GetSealed(segmentID UniqueID) Segment
GetGrowing(segmentID UniqueID) Segment
GetL0DeleteRecords() ([]storage.PrimaryKey, []uint64)
Empty() bool
// Remove removes the given segment,
@ -292,8 +296,6 @@ func (mgr *segmentManager) GetAndPinBy(filters ...SegmentFilter) ([]Segment, err
mgr.mu.RLock()
defer mgr.mu.RUnlock()
filters = append(filters, WithSkipEmpty())
ret := make([]Segment, 0)
var err error
defer func() {
@ -315,7 +317,7 @@ func (mgr *segmentManager) GetAndPinBy(filters ...SegmentFilter) ([]Segment, err
}
for _, segment := range mgr.sealedSegments {
if filter(segment, filters...) {
if segment.Level() != datapb.SegmentLevel_L0 && filter(segment, filters...) {
err = segment.RLock()
if err != nil {
return nil, err
@ -330,8 +332,6 @@ func (mgr *segmentManager) GetAndPin(segments []int64, filters ...SegmentFilter)
mgr.mu.RLock()
defer mgr.mu.RUnlock()
filters = append(filters, WithSkipEmpty())
lockedSegments := make([]Segment, 0, len(segments))
var err error
defer func() {
@ -414,25 +414,6 @@ func (mgr *segmentManager) GetGrowing(segmentID UniqueID) Segment {
return nil
}
func (mgr *segmentManager) GetL0DeleteRecords() ([]storage.PrimaryKey, []uint64) {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
pks := make([]storage.PrimaryKey, 0)
tss := make([]uint64, 0)
for _, segment := range mgr.sealedSegments {
if segment.Level() != datapb.SegmentLevel_L0 {
continue
}
deletePks, deleteTss := segment.(*L0Segment).DeleteRecords()
pks = append(pks, deletePks...)
tss = append(tss, deleteTss...)
}
return pks, tss
}
func (mgr *segmentManager) Empty() bool {
mgr.mu.RLock()
defer mgr.mu.RUnlock()

View File

@ -768,6 +768,9 @@ func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int6
}
func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error {
log := log.With(
zap.Int64("segmentID", segment.ID()),
)
dCodec := storage.DeleteCodec{}
var blobs []*storage.Blob
for _, deltaLog := range deltaLogs {
@ -789,7 +792,7 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment,
}
}
if len(blobs) == 0 {
log.Info("there are no delta logs saved with segment, skip loading delete record", zap.Any("segmentID", segment.ID()))
log.Info("there are no delta logs saved with segment, skip loading delete record")
return nil
}
_, _, deltaData, err := dCodec.Deserialize(blobs)
@ -801,6 +804,8 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment,
if err != nil {
return err
}
log.Info("load delta logs done", zap.Int64("deleteCount", deltaData.RowCount))
return nil
}

View File

@ -1840,7 +1840,8 @@ func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() {
delegator, ok := suite.node.delegators.Get(suite.vchannel)
suite.True(ok)
sealedSegments, _ := delegator.GetSegmentInfo(false)
suite.Len(sealedSegments[0].Segments, 3)
// 1 level 0 + 3 sealed segments
suite.Len(sealedSegments[0].Segments, 4)
// data
req := &querypb.SyncDistributionRequest{
@ -1864,7 +1865,7 @@ func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() {
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode)
sealedSegments, _ = delegator.GetSegmentInfo(false)
suite.Len(sealedSegments[0].Segments, 3)
suite.Len(sealedSegments[0].Segments, 4)
releaseAction = &querypb.SyncAction{
Type: querypb.SyncType_Remove,
@ -1878,7 +1879,7 @@ func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() {
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode)
sealedSegments, _ = delegator.GetSegmentInfo(false)
suite.Len(sealedSegments[0].Segments, 2)
suite.Len(sealedSegments[0].Segments, 3)
}
func (suite *ServiceSuite) TestSyncDistribution_Failed() {

View File

@ -841,6 +841,14 @@ type DeleteData struct {
RowCount int64
}
func NewDeleteData(pks []PrimaryKey, tss []Timestamp) *DeleteData {
return &DeleteData{
Pks: pks,
Tss: tss,
RowCount: int64(len(pks)),
}
}
// Append append 1 pk&ts pair to DeleteData
func (data *DeleteData) Append(pk PrimaryKey, ts Timestamp) {
data.Pks = append(data.Pks, pk)
@ -848,6 +856,13 @@ func (data *DeleteData) Append(pk PrimaryKey, ts Timestamp) {
data.RowCount++
}
// Append append 1 pk&ts pair to DeleteData
func (data *DeleteData) AppendBatch(pks []PrimaryKey, tss []Timestamp) {
data.Pks = append(data.Pks, pks...)
data.Tss = append(data.Tss, tss...)
data.RowCount += int64(len(pks))
}
func (data *DeleteData) Merge(other *DeleteData) {
data.Pks = append(other.Pks, other.Pks...)
data.Tss = append(other.Tss, other.Tss...)