From d331b403c3c140be7393fcbf48921082e45a223d Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Thu, 6 Jun 2024 17:13:50 +0800 Subject: [PATCH] enhance: Remove l0 delete cache (#33537) Cherry pick from master pr: #32989 remove l0 cache and build delete pk and ts everytime. this reduce the memory and also increase the code readability Signed-off-by: xiaofanluan --- internal/querynodev2/delegator/delegator.go | 12 +- .../querynodev2/delegator/delegator_data.go | 110 +++++------------- .../delegator/delegator_data_test.go | 68 +++++++---- 3 files changed, 79 insertions(+), 111 deletions(-) diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 7fca3c6acd..7f3cc38d38 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -106,12 +106,11 @@ type shardDelegator struct { lifetime lifetime.Lifetime[lifetime.State] - distribution *distribution - segmentManager segments.SegmentManager - tsafeManager tsafe.Manager - pkOracle pkoracle.PkOracle - level0Mut sync.RWMutex - level0Deletions map[int64]*storage.DeleteData // partitionID -> deletions + distribution *distribution + segmentManager segments.SegmentManager + tsafeManager tsafe.Manager + pkOracle pkoracle.PkOracle + level0Mut sync.RWMutex // stream delete buffer deleteMut sync.RWMutex deleteBuffer deletebuffer.DeleteBuffer[*deletebuffer.Item] @@ -876,7 +875,6 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni workerManager: workerManager, lifetime: lifetime.NewLifetime(lifetime.Initializing), distribution: NewDistribution(), - level0Deletions: make(map[int64]*storage.DeleteData), deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock), pkOracle: pkoracle.NewPkOracle(), tsafeManager: tsafeManager, diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 5c4525258c..6daf04e48c 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -365,7 +365,7 @@ func (sd *shardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.Segm log := log.With( zap.Int64("segmentID", segment.ID()), ) - deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition()) + deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition(), pkoracle.NewCandidateKey(segment.ID(), segment.Partition(), segments.SegmentTypeGrowing)) if len(deletedPks) == 0 { continue } @@ -478,7 +478,7 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg } }) if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 { - sd.GenerateLevel0DeletionCache() + sd.RefreshLevel0DeletionStats() } else { // load bloom filter only when candidate not exists infos := lo.Filter(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) bool { @@ -512,94 +512,51 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg return nil } -func (sd *shardDelegator) GetLevel0Deletions(partitionID int64) ([]storage.PrimaryKey, []storage.Timestamp) { - sd.level0Mut.RLock() - deleteData, ok1 := sd.level0Deletions[partitionID] - allPartitionsDeleteData, ok2 := sd.level0Deletions[common.AllPartitionsID] - sd.level0Mut.RUnlock() - // we may need to merge the specified partition deletions and the all partitions deletions, - // so release the mutex as early as possible. +func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkoracle.Candidate) ([]storage.PrimaryKey, []storage.Timestamp) { + sd.level0Mut.Lock() + defer sd.level0Mut.Unlock() - if ok1 && ok2 { - pks := make([]storage.PrimaryKey, 0, deleteData.RowCount+allPartitionsDeleteData.RowCount) - tss := make([]storage.Timestamp, 0, deleteData.RowCount+allPartitionsDeleteData.RowCount) + // TODO: this could be large, host all L0 delete on delegator might be a dangerous, consider mmap it on local segment and stream processing it + level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName)) + pks := make([]storage.PrimaryKey, 0) + tss := make([]storage.Timestamp, 0) - i := 0 - j := 0 - for i < int(deleteData.RowCount) || j < int(allPartitionsDeleteData.RowCount) { - if i == int(deleteData.RowCount) { - pks = append(pks, allPartitionsDeleteData.Pks[j]) - tss = append(tss, allPartitionsDeleteData.Tss[j]) - j++ - } else if j == int(allPartitionsDeleteData.RowCount) { - pks = append(pks, deleteData.Pks[i]) - tss = append(tss, deleteData.Tss[i]) - i++ - } else if deleteData.Tss[i] < allPartitionsDeleteData.Tss[j] { - pks = append(pks, deleteData.Pks[i]) - tss = append(tss, deleteData.Tss[i]) - i++ - } else { - pks = append(pks, allPartitionsDeleteData.Pks[j]) - tss = append(tss, allPartitionsDeleteData.Tss[j]) - j++ + for _, segment := range level0Segments { + segment := segment.(*segments.L0Segment) + if segment.Partition() == partitionID || segment.Partition() == common.AllPartitionsID { + segmentPks, segmentTss := segment.DeleteRecords() + for i, pk := range segmentPks { + if candidate.MayPkExist(pk) { + pks = append(pks, pk) + tss = append(tss, segmentTss[i]) + } } } - - return pks, tss - } else if ok1 { - return deleteData.Pks, deleteData.Tss - } else if ok2 { - return allPartitionsDeleteData.Pks, allPartitionsDeleteData.Tss } - return nil, nil + sort.Slice(pks, func(i, j int) bool { + return tss[i] < tss[j] + }) + + return pks, tss } -func (sd *shardDelegator) GenerateLevel0DeletionCache() { +func (sd *shardDelegator) RefreshLevel0DeletionStats() { + sd.level0Mut.Lock() + defer sd.level0Mut.Unlock() level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName)) - deletions := make(map[int64]*storage.DeleteData) + totalSize := int64(0) for _, segment := range level0Segments { segment := segment.(*segments.L0Segment) pks, tss := segment.DeleteRecords() - deleteData, ok := deletions[segment.Partition()] - if !ok { - deleteData = storage.NewDeleteData(pks, tss) - } else { - deleteData.AppendBatch(pks, tss) - } - deletions[segment.Partition()] = deleteData + totalSize += lo.SumBy(pks, func(pk storage.PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8) } - type DeletePair struct { - Pk storage.PrimaryKey - Ts storage.Timestamp - } - for _, deleteData := range deletions { - pairs := make([]DeletePair, deleteData.RowCount) - for i := range deleteData.Pks { - pairs[i] = DeletePair{deleteData.Pks[i], deleteData.Tss[i]} - } - sort.Slice(pairs, func(i, j int) bool { - return pairs[i].Ts < pairs[j].Ts - }) - for i := range pairs { - deleteData.Pks[i], deleteData.Tss[i] = pairs[i].Pk, pairs[i].Ts - } - } - - sd.level0Mut.Lock() - defer sd.level0Mut.Unlock() - totalSize := int64(0) - for _, delete := range deletions { - totalSize += delete.Size() - } metrics.QueryNodeLevelZeroSize.WithLabelValues( fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(sd.collectionID), sd.vchannelName, ).Set(float64(totalSize)) - sd.level0Deletions = deletions } func (sd *shardDelegator) loadStreamDelete(ctx context.Context, @@ -635,14 +592,9 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context, position = deltaPositions[0] } - deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition()) + deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate) deleteData := &storage.DeleteData{} - for i, pk := range deletedPks { - if candidate.MayPkExist(pk) { - deleteData.Append(pk, deletedTss[i]) - } - } - + deleteData.AppendBatch(deletedPks, deletedTss) if deleteData.RowCount > 0 { log.Info("forward L0 delete to worker...", zap.Int64("deleteRowNum", deleteData.RowCount), @@ -900,7 +852,7 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele } if hasLevel0 { - sd.GenerateLevel0DeletionCache() + sd.RefreshLevel0DeletionStats() } partitionsToReload := make([]UniqueID, 0) lo.ForEach(req.GetSegmentIDs(), func(segmentID int64, _ int) { diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 6d2ae22411..47a284afd4 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -1110,44 +1110,62 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() { partitionID := int64(10) partitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1)}, []storage.Timestamp{100}) allPartitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(2)}, []storage.Timestamp{101}) - delegator.level0Deletions[partitionID] = partitionDeleteData - pks, _ := delegator.GetLevel0Deletions(partitionID) + schema := segments.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64, true) + collection := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{ + LoadType: querypb.LoadType_LoadCollection, + }) + + l0, _ := segments.NewL0Segment(collection, segments.SegmentTypeSealed, 1, &querypb.SegmentLoadInfo{ + CollectionID: 1, + SegmentID: 2, + PartitionID: partitionID, + InsertChannel: delegator.vchannelName, + Level: datapb.SegmentLevel_L0, + NumOfRows: 1, + }) + l0.LoadDeltaData(context.TODO(), partitionDeleteData) + delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0) + + l0Global, _ := segments.NewL0Segment(collection, segments.SegmentTypeSealed, 2, &querypb.SegmentLoadInfo{ + CollectionID: 1, + SegmentID: 3, + PartitionID: common.AllPartitionsID, + InsertChannel: delegator.vchannelName, + Level: datapb.SegmentLevel_L0, + NumOfRows: int64(1), + }) + l0Global.LoadDeltaData(context.TODO(), allPartitionDeleteData) + + pks, _ := delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) s.True(pks[0].EQ(partitionDeleteData.Pks[0])) - pks, _ = delegator.GetLevel0Deletions(partitionID + 1) + pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) s.Empty(pks) - delegator.level0Deletions[common.AllPartitionsID] = allPartitionDeleteData - pks, _ = delegator.GetLevel0Deletions(partitionID) - s.Len(pks, 2) + delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0Global) + pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) s.True(pks[0].EQ(partitionDeleteData.Pks[0])) s.True(pks[1].EQ(allPartitionDeleteData.Pks[0])) - delete(delegator.level0Deletions, partitionID) - pks, _ = delegator.GetLevel0Deletions(partitionID) + bfs := pkoracle.NewBloomFilterSet(3, l0.Partition(), commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(allPartitionDeleteData.Pks) + + pks, _ = delegator.GetLevel0Deletions(partitionID, bfs) + // bf filtered segment + s.Equal(len(pks), 1) s.True(pks[0].EQ(allPartitionDeleteData.Pks[0])) - // exchange the order - delegator.level0Deletions = make(map[int64]*storage.DeleteData) - partitionDeleteData, allPartitionDeleteData = allPartitionDeleteData, partitionDeleteData - delegator.level0Deletions[partitionID] = partitionDeleteData + delegator.segmentManager.Remove(context.TODO(), l0.ID(), querypb.DataScope_All) + pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) + s.True(pks[0].EQ(allPartitionDeleteData.Pks[0])) - pks, _ = delegator.GetLevel0Deletions(partitionID) - s.True(pks[0].EQ(partitionDeleteData.Pks[0])) + pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) + s.True(pks[0].EQ(allPartitionDeleteData.Pks[0])) - pks, _ = delegator.GetLevel0Deletions(partitionID + 1) + delegator.segmentManager.Remove(context.TODO(), l0Global.ID(), querypb.DataScope_All) + pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) s.Empty(pks) - - delegator.level0Deletions[common.AllPartitionsID] = allPartitionDeleteData - pks, _ = delegator.GetLevel0Deletions(partitionID) - s.Len(pks, 2) - s.True(pks[0].EQ(allPartitionDeleteData.Pks[0])) - s.True(pks[1].EQ(partitionDeleteData.Pks[0])) - - delete(delegator.level0Deletions, partitionID) - pks, _ = delegator.GetLevel0Deletions(partitionID) - s.True(pks[0].EQ(allPartitionDeleteData.Pks[0])) } func (s *DelegatorDataSuite) TestReadDeleteFromMsgstream() {