From cdc5ce5d6f3a09e15b37f5850e82146e73fde1a0 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Wed, 28 Feb 2024 19:03:01 +0800 Subject: [PATCH] fix: Donot set LogPath when executing compaction (#30537) Compaction would copy logPaths from comapctFrom segA to compactTo segB, and previous code would copy the logPath directly, causing there're full-logPaths-of-segA in compactTo segB's meta. So, for the next compaction of segB, if segA has been GCed, Download would report error "The sperified key not found". This PR makes sure compactTo segment's meta contains logID only. And this PR also refines CompleteComapctionMutation, increasing some readability and merge two methods into one. See also: #30496 Signed-off-by: yangxuan --- internal/datacoord/compaction_test.go | 12 +- internal/datacoord/meta.go | 313 ++++++++++---------------- internal/datacoord/meta_test.go | 190 +++++++--------- internal/datacoord/services.go | 5 + 4 files changed, 220 insertions(+), 300 deletions(-) diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index e85a78f71b..5977d84f40 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -558,9 +558,9 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() { s.Equal(failed, task.state) } -func getFieldBinlogIDs(id int64, logIDs ...int64) *datapb.FieldBinlog { +func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog { l := &datapb.FieldBinlog{ - FieldID: id, + FieldID: fieldID, Binlogs: make([]*datapb.Binlog, 0, len(logIDs)), } for _, id := range logIDs { @@ -569,9 +569,9 @@ func getFieldBinlogIDs(id int64, logIDs ...int64) *datapb.FieldBinlog { return l } -func getFieldBinlogPaths(id int64, paths ...string) *datapb.FieldBinlog { +func getFieldBinlogPaths(fieldID int64, paths ...string) *datapb.FieldBinlog { l := &datapb.FieldBinlog{ - FieldID: id, + FieldID: fieldID, Binlogs: make([]*datapb.Binlog, 0, len(paths)), } for _, path := range paths { @@ -580,9 +580,9 @@ func getFieldBinlogPaths(id int64, paths ...string) *datapb.FieldBinlog { return l } -func getFieldBinlogIDsWithEntry(id int64, entry int64, logIDs ...int64) *datapb.FieldBinlog { +func getFieldBinlogIDsWithEntry(fieldID int64, entry int64, logIDs ...int64) *datapb.FieldBinlog { l := &datapb.FieldBinlog{ - FieldID: id, + FieldID: fieldID, Binlogs: make([]*datapb.Binlog, 0, len(logIDs)), } for _, id := range logIDs { diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index f7cbbaa69e..3d88ada594 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -21,7 +21,6 @@ import ( "context" "fmt" "math" - "path" "sync" "time" @@ -35,7 +34,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore" - "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" @@ -44,6 +42,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/lock" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" @@ -978,225 +977,163 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) { m.segments.SetIsCompacting(segmentID, compacting) } -// CompleteCompactionMutation completes compaction mutation. -func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan, - result *datapb.CompactionPlanResult, -) ([]*SegmentInfo, *segMetricMutation, error) { +func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) { m.Lock() defer m.Unlock() - modSegments, segments, metricMutation, err := m.prepareCompactionMutation(plan, result) - if err != nil { - log.Warn("fail to prepare for complete compaction mutation", zap.Error(err), zap.Int64("planID", plan.GetPlanID())) - return nil, nil, err + + log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.String("type", plan.GetType().String())) + + metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)} + var compactFromSegIDs []int64 + var latestCompactFromSegments []*SegmentInfo + for _, segmentBinlogs := range plan.GetSegmentBinlogs() { + segment := m.segments.GetSegment(segmentBinlogs.GetSegmentID()) + if segment == nil { + return nil, nil, merr.WrapErrSegmentNotFound(segmentBinlogs.GetSegmentID()) + } + + cloned := segment.Clone() + cloned.DroppedAt = uint64(time.Now().UnixNano()) + cloned.Compacted = true + + latestCompactFromSegments = append(latestCompactFromSegments, cloned) + compactFromSegIDs = append(compactFromSegIDs, cloned.GetID()) + + // metrics mutation for compaction from segments + updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation) } - if err := m.alterMetaStoreAfterCompaction(segments, modSegments); err != nil { - newSegIDs := lo.Map(segments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() }) - log.Warn("fail to alert meta store", zap.Error(err), zap.Int64s("segmentIDs", newSegIDs), zap.Int64("planID", plan.GetPlanID())) - return nil, nil, err - } - return segments, metricMutation, err -} - -// prepareCompactionMutation returns -// - the segment info of compactedFrom segments after compaction to alter -// - the segment info of compactedTo segment after compaction to add -// The compactedTo segment could contain 0 numRows -// TODO: too complicated -// TODO: support Major compaction -func (m *meta) prepareCompactionMutation(plan *datapb.CompactionPlan, - result *datapb.CompactionPlanResult, -) ([]*SegmentInfo, []*SegmentInfo, *segMetricMutation, error) { - log.Info("meta update: prepare for complete compaction mutation") - compactionLogs := plan.GetSegmentBinlogs() - - modSegments := make([]*SegmentInfo, 0, len(compactionLogs)) - - metricMutation := &segMetricMutation{ - stateChange: make(map[string]map[string]int), - } - for _, cl := range compactionLogs { - if segment := m.segments.GetSegment(cl.GetSegmentID()); segment != nil { - cloned := segment.Clone() - err := binlog.DecompressBinLog(storage.DeleteBinlog, cloned.GetCollectionID(), cloned.GetPartitionID(), cloned.GetID(), cloned.GetDeltalogs()) - if err != nil { - return nil, nil, nil, err + logIDsFromPlan := make(map[int64]struct{}) + for _, segBinlogs := range plan.GetSegmentBinlogs() { + for _, fieldBinlog := range segBinlogs.GetDeltalogs() { + for _, binlog := range fieldBinlog.GetBinlogs() { + logIDsFromPlan[binlog.GetLogID()] = struct{}{} } - updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation) - cloned.DroppedAt = uint64(time.Now().UnixNano()) - cloned.Compacted = true - modSegments = append(modSegments, cloned) } } - var startPosition, dmlPosition *msgpb.MsgPosition - for _, s := range modSegments { - if dmlPosition == nil || - s.GetDmlPosition() != nil && s.GetDmlPosition().GetTimestamp() < dmlPosition.GetTimestamp() { - dmlPosition = s.GetDmlPosition() - } - - if startPosition == nil || - s.GetStartPosition() != nil && s.GetStartPosition().GetTimestamp() < startPosition.GetTimestamp() { - startPosition = s.GetStartPosition() - } - } - - // find new added delta logs when executing compaction - // TODO: won't be needed when enable L0 Segment - var originDeltalogs []*datapb.FieldBinlog - for _, s := range modSegments { - originDeltalogs = append(originDeltalogs, s.GetDeltalogs()...) - } - - var deletedDeltalogs []*datapb.FieldBinlog - for _, l := range compactionLogs { - deletedDeltalogs = append(deletedDeltalogs, l.GetDeltalogs()...) - } - // MixCompaction / MergeCompaction will generates one and only one segment compactToSegment := result.GetSegments()[0] - newAddedDeltalogs := updateDeltalogs(originDeltalogs, deletedDeltalogs) - copiedDeltalogs, err := m.copyDeltaFiles(newAddedDeltalogs, modSegments[0].CollectionID, modSegments[0].PartitionID, compactToSegment.GetSegmentID()) + // copy new deltalogs in compactFrom segments to compactTo segments. + // TODO: Not needed when enable L0 segments. + newDeltalogs, err := m.copyNewDeltalogs(latestCompactFromSegments, logIDsFromPlan, compactToSegment.GetSegmentID()) if err != nil { - return nil, nil, nil, err + return nil, nil, err } - deltalogs := append(compactToSegment.GetDeltalogs(), copiedDeltalogs...) - - compactionFrom := make([]UniqueID, 0, len(modSegments)) - for _, s := range modSegments { - compactionFrom = append(compactionFrom, s.GetID()) + if len(newDeltalogs) > 0 { + compactToSegment.Deltalogs = append(compactToSegment.GetDeltalogs(), &datapb.FieldBinlog{Binlogs: newDeltalogs}) } - segmentInfo := &datapb.SegmentInfo{ - ID: compactToSegment.GetSegmentID(), - CollectionID: modSegments[0].CollectionID, - PartitionID: modSegments[0].PartitionID, - InsertChannel: modSegments[0].InsertChannel, - NumOfRows: compactToSegment.NumOfRows, - State: commonpb.SegmentState_Flushed, - MaxRowNum: modSegments[0].MaxRowNum, - Binlogs: compactToSegment.GetInsertLogs(), - Statslogs: compactToSegment.GetField2StatslogPaths(), - Deltalogs: deltalogs, - StartPosition: startPosition, - DmlPosition: dmlPosition, - CreatedByCompaction: true, - CompactionFrom: compactionFrom, - LastExpireTime: plan.GetStartTime(), - Level: datapb.SegmentLevel_L1, + getMinPosition := func(positions []*msgpb.MsgPosition) *msgpb.MsgPosition { + var minPos *msgpb.MsgPosition + for _, pos := range positions { + if minPos == nil || + pos != nil && pos.GetTimestamp() < minPos.GetTimestamp() { + minPos = pos + } + } + return minPos } - segment := NewSegmentInfo(segmentInfo) + + compactToSegmentInfo := NewSegmentInfo( + &datapb.SegmentInfo{ + ID: compactToSegment.GetSegmentID(), + CollectionID: latestCompactFromSegments[0].CollectionID, + PartitionID: latestCompactFromSegments[0].PartitionID, + InsertChannel: plan.GetChannel(), + NumOfRows: compactToSegment.NumOfRows, + State: commonpb.SegmentState_Flushed, + MaxRowNum: latestCompactFromSegments[0].MaxRowNum, + Binlogs: compactToSegment.GetInsertLogs(), + Statslogs: compactToSegment.GetField2StatslogPaths(), + Deltalogs: compactToSegment.GetDeltalogs(), + + CreatedByCompaction: true, + CompactionFrom: compactFromSegIDs, + LastExpireTime: plan.GetStartTime(), + Level: datapb.SegmentLevel_L1, + + StartPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition { + return info.GetStartPosition() + })), + DmlPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition { + return info.GetDmlPosition() + })), + }) // L1 segment with NumRows=0 will be discarded, so no need to change the metric - if segmentInfo.GetNumOfRows() > 0 { - metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows()) + if compactToSegmentInfo.GetNumOfRows() > 0 { + // metrics mutation for compactTo segments + metricMutation.addNewSeg(compactToSegmentInfo.GetState(), compactToSegmentInfo.GetLevel(), compactToSegmentInfo.GetNumOfRows()) + } else { + compactToSegmentInfo.State = commonpb.SegmentState_Dropped } - log.Info("meta update: prepare for complete compaction mutation - complete", - zap.Int64("collectionID", segment.GetCollectionID()), - zap.Int64("partitionID", segment.GetPartitionID()), - zap.Int64("new segment ID", segment.GetID()), - zap.String("new segment level", segment.GetLevel().String()), - zap.Int64("new segment num of rows", segment.GetNumOfRows()), - zap.Any("compacted from", segment.GetCompactionFrom())) + log = log.With( + zap.String("channel", plan.GetChannel()), + zap.Int64("partitionID", compactToSegmentInfo.GetPartitionID()), + zap.Int64("compactTo segmentID", compactToSegmentInfo.GetID()), + zap.Int64("compactTo segment numRows", compactToSegmentInfo.GetNumOfRows()), + zap.Any("compactFrom segments(to be updated as dropped)", compactFromSegIDs), + ) - return modSegments, []*SegmentInfo{segment}, metricMutation, nil -} - -func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, partitionID, targetSegmentID int64) ([]*datapb.FieldBinlog, error) { - ret := make([]*datapb.FieldBinlog, 0, len(binlogs)) - for _, fieldBinlog := range binlogs { - fieldBinlog = proto.Clone(fieldBinlog).(*datapb.FieldBinlog) - for _, binlog := range fieldBinlog.Binlogs { - blobKey := metautil.JoinIDPath(collectionID, partitionID, targetSegmentID, binlog.LogID) - blobPath := path.Join(m.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey) - blob, err := m.chunkManager.Read(m.ctx, binlog.LogPath) - if err != nil { - return nil, err - } - err = m.chunkManager.Write(m.ctx, blobPath, blob) - if err != nil { - return nil, err - } - } - ret = append(ret, fieldBinlog) - } - return ret, nil -} - -func (m *meta) alterMetaStoreAfterCompaction(segmentsCompactTo []*SegmentInfo, segmentsCompactFrom []*SegmentInfo) error { - modInfos := make([]*datapb.SegmentInfo, 0, len(segmentsCompactFrom)) - for _, segment := range segmentsCompactFrom { - modInfos = append(modInfos, segment.SegmentInfo) - } - - newSegments := make([]*datapb.SegmentInfo, len(segmentsCompactTo)) - binlogsIncrements := make([]metastore.BinlogsIncrement, len(segmentsCompactTo)) - for i, seg := range segmentsCompactTo { - newSegment := seg.SegmentInfo - if newSegment.GetNumOfRows() == 0 { - newSegment.State = commonpb.SegmentState_Dropped - } - newSegments[i] = newSegment - binlogsIncrements[i] = metastore.BinlogsIncrement{ - Segment: newSegment, - } - } - modSegIDs := lo.Map(modInfos, func(segment *datapb.SegmentInfo, _ int) int64 { return segment.GetID() }) - newSegIDs := lo.Map(newSegments, func(segment *datapb.SegmentInfo, _ int) int64 { return segment.GetID() }) + log.Debug("meta update: prepare for meta mutation - complete") + compactFromInfos := lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo { + return info.SegmentInfo + }) log.Debug("meta update: alter meta store for compaction updates", - zap.Int64s("compact from segment IDs", modSegIDs), - zap.Int64s("compact to segment IDs", newSegIDs)) - - err := m.catalog.AlterSegments(m.ctx, append(modInfos, newSegments...), binlogsIncrements...) - if err != nil { + zap.Int("binlog count", len(compactToSegmentInfo.GetBinlogs())), + zap.Int("statslog count", len(compactToSegmentInfo.GetStatslogs())), + zap.Int("deltalog count", len(compactToSegmentInfo.GetDeltalogs())), + ) + if err := m.catalog.AlterSegments(m.ctx, append(compactFromInfos, compactToSegmentInfo.SegmentInfo), + metastore.BinlogsIncrement{Segment: compactToSegmentInfo.SegmentInfo}, + ); err != nil { log.Warn("fail to alter segments and new segment", zap.Error(err)) - return err + return nil, nil, err } - for _, s := range segmentsCompactFrom { - m.segments.SetSegment(s.GetID(), s) - } - for _, s := range segmentsCompactTo { - m.segments.SetSegment(s.GetID(), s) - } + lo.ForEach(latestCompactFromSegments, func(info *SegmentInfo, _ int) { + m.segments.SetSegment(info.GetID(), info) + }) + m.segments.SetSegment(compactToSegmentInfo.GetID(), compactToSegmentInfo) - log.Info("meta update: alter in memory meta after compaction - complete", - zap.Int64s("compact from segment IDs", modSegIDs), - zap.Int64s("compact to segment IDs", newSegIDs)) - return nil + log.Info("meta update: alter in memory meta after compaction - complete") + return []*SegmentInfo{compactToSegmentInfo}, metricMutation, nil } -func updateDeltalogs(origin []*datapb.FieldBinlog, removes []*datapb.FieldBinlog) []*datapb.FieldBinlog { - res := make([]*datapb.FieldBinlog, 0, len(origin)) - for _, fbl := range origin { - logs := make(map[int64]*datapb.Binlog) - for _, d := range fbl.GetBinlogs() { - logs[d.GetLogID()] = d - } - for _, remove := range removes { - if remove.GetFieldID() == fbl.GetFieldID() { - for _, r := range remove.GetBinlogs() { - delete(logs, r.GetLogID()) +func (m *meta) copyNewDeltalogs(latestCompactFromInfos []*SegmentInfo, logIDsInPlan map[int64]struct{}, toSegment int64) ([]*datapb.Binlog, error) { + newBinlogs := []*datapb.Binlog{} + for _, seg := range latestCompactFromInfos { + for _, fieldLog := range seg.GetDeltalogs() { + for _, l := range fieldLog.GetBinlogs() { + if _, ok := logIDsInPlan[l.GetLogID()]; !ok { + fromKey := metautil.BuildDeltaLogPath(m.chunkManager.RootPath(), seg.CollectionID, seg.PartitionID, seg.ID, l.GetLogID()) + toKey := metautil.BuildDeltaLogPath(m.chunkManager.RootPath(), seg.CollectionID, seg.PartitionID, toSegment, l.GetLogID()) + log.Warn("found new deltalog in compactFrom segment, copying it...", + zap.Any("deltalog", l), + zap.Int64("copyFrom segmentID", seg.GetID()), + zap.Int64("copyTo segmentID", toSegment), + zap.String("copyFrom key", fromKey), + zap.String("copyTo key", toKey), + ) + + blob, err := m.chunkManager.Read(m.ctx, fromKey) + if err != nil { + return nil, err + } + + if err := m.chunkManager.Write(m.ctx, toKey, blob); err != nil { + return nil, err + } + newBinlogs = append(newBinlogs, l) } } } - binlogs := make([]*datapb.Binlog, 0, len(logs)) - for _, l := range logs { - binlogs = append(binlogs, l) - } - if len(binlogs) > 0 { - res = append(res, &datapb.FieldBinlog{ - FieldID: fbl.GetFieldID(), - Binlogs: binlogs, - }) - } } - - return res + return newBinlogs, nil } // buildSegment utility function for compose datapb.SegmentInfo struct with provided info diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 4cd157478a..7a9bbd621c 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" - "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -197,97 +196,129 @@ func (suite *MetaBasicSuite) TestCollection() { suite.MetricsEqual(metrics.DataCoordNumCollections.WithLabelValues(), 1) } -func (suite *MetaBasicSuite) TestPrepareCompleteCompactionMutation() { - prepareSegments := &SegmentsInfo{ +func (suite *MetaBasicSuite) TestCompleteCompactionMutation() { + latestSegments := &SegmentsInfo{ map[UniqueID]*SegmentInfo{ 1: {SegmentInfo: &datapb.SegmentInfo{ ID: 1, CollectionID: 100, PartitionID: 10, State: commonpb.SegmentState_Flushed, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 1, 2)}, - NumOfRows: 1, + Level: datapb.SegmentLevel_L1, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)}, + // latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)}, + NumOfRows: 2, }}, 2: {SegmentInfo: &datapb.SegmentInfo{ ID: 2, CollectionID: 100, PartitionID: 10, State: commonpb.SegmentState_Flushed, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 3, 4)}, - NumOfRows: 1, + Level: datapb.SegmentLevel_L1, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 11000)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 21000)}, + // latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 31000), getFieldBinlogIDs(0, 31001)}, + NumOfRows: 2, }}, }, } - // m := suite.meta + mockChMgr := mocks.NewChunkManager(suite.T()) + mockChMgr.EXPECT().RootPath().Return("mockroot").Times(4) + mockChMgr.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, nil).Twice() + mockChMgr.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice() + m := &meta{ - catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, - segments: prepareSegments, + catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, + segments: latestSegments, + chunkManager: mockChMgr, } plan := &datapb.CompactionPlan{ SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ { SegmentID: 1, - FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 1, 2)}, + FieldBinlogs: m.GetSegment(1).GetBinlogs(), + Field2StatslogPaths: m.GetSegment(1).GetStatslogs(), + Deltalogs: m.GetSegment(1).GetDeltalogs()[:1], // compaction plan use only 1 deltalog }, { SegmentID: 2, - FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 3, 4)}, + FieldBinlogs: m.GetSegment(2).GetBinlogs(), + Field2StatslogPaths: m.GetSegment(2).GetStatslogs(), + Deltalogs: m.GetSegment(2).GetDeltalogs()[:1], // compaction plan use only 1 deltalog }, }, - StartTime: 15, } - inSegment := &datapb.CompactionSegment{ + compactToSeg := &datapb.CompactionSegment{ SegmentID: 3, - InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 5)}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 5)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 5)}, + InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50000)}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50001)}, NumOfRows: 2, } - inCompactionResult := &datapb.CompactionPlanResult{ - Segments: []*datapb.CompactionSegment{inSegment}, + + result := &datapb.CompactionPlanResult{ + Segments: []*datapb.CompactionSegment{compactToSeg}, } - afterCompact, newSegment, metricMutation, err := m.prepareCompactionMutation(plan, inCompactionResult) + + infos, mutation, err := m.CompleteCompactionMutation(plan, result) + suite.Equal(1, len(infos)) + info := infos[0] suite.NoError(err) - suite.NotNil(afterCompact) - suite.NotNil(newSegment) - suite.Equal(2, len(metricMutation.stateChange[datapb.SegmentLevel_Legacy.String()])) - suite.Equal(1, len(metricMutation.stateChange[datapb.SegmentLevel_L1.String()])) - suite.Equal(int64(0), metricMutation.rowCountChange) - suite.Equal(int64(2), metricMutation.rowCountAccChange) + suite.NotNil(info) + suite.NotNil(mutation) - suite.Require().Equal(2, len(afterCompact)) - suite.Equal(commonpb.SegmentState_Dropped, afterCompact[0].GetState()) - suite.Equal(commonpb.SegmentState_Dropped, afterCompact[1].GetState()) - suite.NotZero(afterCompact[0].GetDroppedAt()) - suite.NotZero(afterCompact[1].GetDroppedAt()) + // check newSegment + suite.EqualValues(3, info.GetID()) + suite.Equal(datapb.SegmentLevel_L1, info.GetLevel()) + suite.Equal(commonpb.SegmentState_Flushed, info.GetState()) - suite.Equal(inSegment.SegmentID, newSegment[0].GetID()) - suite.Equal(UniqueID(100), newSegment[0].GetCollectionID()) - suite.Equal(UniqueID(10), newSegment[0].GetPartitionID()) - suite.Equal(inSegment.NumOfRows, newSegment[0].GetNumOfRows()) - suite.Equal(commonpb.SegmentState_Flushed, newSegment[0].GetState()) + binlogs := info.GetBinlogs() + for _, fbinlog := range binlogs { + for _, blog := range fbinlog.GetBinlogs() { + suite.Empty(blog.GetLogPath()) + suite.EqualValues(50000, blog.GetLogID()) + } + } - suite.EqualValues(inSegment.GetInsertLogs(), newSegment[0].GetBinlogs()) - suite.EqualValues(inSegment.GetField2StatslogPaths(), newSegment[0].GetStatslogs()) - suite.EqualValues(inSegment.GetDeltalogs(), newSegment[0].GetDeltalogs()) - suite.NotZero(newSegment[0].lastFlushTime) - suite.Equal(uint64(15), newSegment[0].GetLastExpireTime()) + statslogs := info.GetStatslogs() + for _, fbinlog := range statslogs { + for _, blog := range fbinlog.GetBinlogs() { + suite.Empty(blog.GetLogPath()) + suite.EqualValues(50001, blog.GetLogID()) + } + } - segmentsDone, metricMutationDone, err := m.CompleteCompactionMutation(plan, inCompactionResult) - suite.NoError(err) - suite.NotNil(segmentsDone) - suite.NotNil(metricMutationDone) + deltalogs := info.GetDeltalogs() + deltalogIDs := []int64{} + for _, fbinlog := range deltalogs { + for _, blog := range fbinlog.GetBinlogs() { + suite.Empty(blog.GetLogPath()) + deltalogIDs = append(deltalogIDs, blog.GetLogID()) + } + } + suite.ElementsMatch([]int64{30001, 31001}, deltalogIDs) + + // check compactFrom segments + for _, segID := range []int64{1, 2} { + seg := m.GetSegment(segID) + suite.Equal(commonpb.SegmentState_Dropped, seg.GetState()) + suite.NotEmpty(seg.GetDroppedAt()) + + suite.EqualValues(segID, seg.GetID()) + suite.ElementsMatch(latestSegments.segments[segID].GetBinlogs(), seg.GetBinlogs()) + suite.ElementsMatch(latestSegments.segments[segID].GetStatslogs(), seg.GetStatslogs()) + suite.ElementsMatch(latestSegments.segments[segID].GetDeltalogs(), seg.GetDeltalogs()) + } + + // check mutation metrics + suite.Equal(2, len(mutation.stateChange[datapb.SegmentLevel_L1.String()])) + suite.EqualValues(-2, mutation.rowCountChange) + suite.EqualValues(2, mutation.rowCountAccChange) } func TestMeta(t *testing.T) { @@ -731,59 +762,6 @@ func TestUpdateSegmentsInfo(t *testing.T) { }) } -func TestMeta_alterMetaStore(t *testing.T) { - toAlter := []*datapb.SegmentInfo{ - { - CollectionID: 100, - PartitionID: 10, - ID: 1, - NumOfRows: 10, - }, - } - - newSeg := &datapb.SegmentInfo{ - Binlogs: []*datapb.FieldBinlog{ - { - FieldID: 101, - Binlogs: []*datapb.Binlog{}, - }, - }, - Statslogs: []*datapb.FieldBinlog{ - { - FieldID: 101, - Binlogs: []*datapb.Binlog{}, - }, - }, - Deltalogs: []*datapb.FieldBinlog{ - { - FieldID: 101, - Binlogs: []*datapb.Binlog{}, - }, - }, - CollectionID: 100, - PartitionID: 10, - ID: 2, - NumOfRows: 15, - } - - m := &meta{ - catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, - segments: &SegmentsInfo{map[int64]*SegmentInfo{ - 1: {SegmentInfo: &datapb.SegmentInfo{ - ID: 1, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - }}, - }}, - } - - err := m.alterMetaStoreAfterCompaction([]*SegmentInfo{{SegmentInfo: newSeg}}, lo.Map(toAlter, func(t *datapb.SegmentInfo, _ int) *SegmentInfo { - return &SegmentInfo{SegmentInfo: t} - })) - assert.NoError(t, err) -} - func Test_meta_SetSegmentCompacting(t *testing.T) { type fields struct { client kv.MetaKv diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 10b1c65beb..21e2b3c1ef 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -540,6 +540,11 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath log.Error("save binlog and checkpoints failed", zap.Error(err)) return merr.Status(err), nil } + log.Info("SaveBinlogPaths sync segment with meta", + zap.Any("binlogs", req.GetField2BinlogPaths()), + zap.Any("deltalogs", req.GetDeltalogs()), + zap.Any("statslogs", req.GetField2StatslogPaths()), + ) if req.GetSegLevel() == datapb.SegmentLevel_L0 { metrics.DataCoordSizeStoredL0Segment.WithLabelValues(fmt.Sprint(req.GetCollectionID())).Observe(calculateL0SegmentSize(req.GetField2StatslogPaths()))