diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index e85a78f71b..5977d84f40 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -558,9 +558,9 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() { s.Equal(failed, task.state) } -func getFieldBinlogIDs(id int64, logIDs ...int64) *datapb.FieldBinlog { +func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog { l := &datapb.FieldBinlog{ - FieldID: id, + FieldID: fieldID, Binlogs: make([]*datapb.Binlog, 0, len(logIDs)), } for _, id := range logIDs { @@ -569,9 +569,9 @@ func getFieldBinlogIDs(id int64, logIDs ...int64) *datapb.FieldBinlog { return l } -func getFieldBinlogPaths(id int64, paths ...string) *datapb.FieldBinlog { +func getFieldBinlogPaths(fieldID int64, paths ...string) *datapb.FieldBinlog { l := &datapb.FieldBinlog{ - FieldID: id, + FieldID: fieldID, Binlogs: make([]*datapb.Binlog, 0, len(paths)), } for _, path := range paths { @@ -580,9 +580,9 @@ func getFieldBinlogPaths(id int64, paths ...string) *datapb.FieldBinlog { return l } -func getFieldBinlogIDsWithEntry(id int64, entry int64, logIDs ...int64) *datapb.FieldBinlog { +func getFieldBinlogIDsWithEntry(fieldID int64, entry int64, logIDs ...int64) *datapb.FieldBinlog { l := &datapb.FieldBinlog{ - FieldID: id, + FieldID: fieldID, Binlogs: make([]*datapb.Binlog, 0, len(logIDs)), } for _, id := range logIDs { diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index f7cbbaa69e..3d88ada594 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -21,7 +21,6 @@ import ( "context" "fmt" "math" - "path" "sync" "time" @@ -35,7 +34,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore" - "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" @@ -44,6 +42,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/lock" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" @@ -978,225 +977,163 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) { m.segments.SetIsCompacting(segmentID, compacting) } -// CompleteCompactionMutation completes compaction mutation. -func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan, - result *datapb.CompactionPlanResult, -) ([]*SegmentInfo, *segMetricMutation, error) { +func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) { m.Lock() defer m.Unlock() - modSegments, segments, metricMutation, err := m.prepareCompactionMutation(plan, result) - if err != nil { - log.Warn("fail to prepare for complete compaction mutation", zap.Error(err), zap.Int64("planID", plan.GetPlanID())) - return nil, nil, err + + log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.String("type", plan.GetType().String())) + + metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)} + var compactFromSegIDs []int64 + var latestCompactFromSegments []*SegmentInfo + for _, segmentBinlogs := range plan.GetSegmentBinlogs() { + segment := m.segments.GetSegment(segmentBinlogs.GetSegmentID()) + if segment == nil { + return nil, nil, merr.WrapErrSegmentNotFound(segmentBinlogs.GetSegmentID()) + } + + cloned := segment.Clone() + cloned.DroppedAt = uint64(time.Now().UnixNano()) + cloned.Compacted = true + + latestCompactFromSegments = append(latestCompactFromSegments, cloned) + compactFromSegIDs = append(compactFromSegIDs, cloned.GetID()) + + // metrics mutation for compaction from segments + updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation) } - if err := m.alterMetaStoreAfterCompaction(segments, modSegments); err != nil { - newSegIDs := lo.Map(segments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() }) - log.Warn("fail to alert meta store", zap.Error(err), zap.Int64s("segmentIDs", newSegIDs), zap.Int64("planID", plan.GetPlanID())) - return nil, nil, err - } - return segments, metricMutation, err -} - -// prepareCompactionMutation returns -// - the segment info of compactedFrom segments after compaction to alter -// - the segment info of compactedTo segment after compaction to add -// The compactedTo segment could contain 0 numRows -// TODO: too complicated -// TODO: support Major compaction -func (m *meta) prepareCompactionMutation(plan *datapb.CompactionPlan, - result *datapb.CompactionPlanResult, -) ([]*SegmentInfo, []*SegmentInfo, *segMetricMutation, error) { - log.Info("meta update: prepare for complete compaction mutation") - compactionLogs := plan.GetSegmentBinlogs() - - modSegments := make([]*SegmentInfo, 0, len(compactionLogs)) - - metricMutation := &segMetricMutation{ - stateChange: make(map[string]map[string]int), - } - for _, cl := range compactionLogs { - if segment := m.segments.GetSegment(cl.GetSegmentID()); segment != nil { - cloned := segment.Clone() - err := binlog.DecompressBinLog(storage.DeleteBinlog, cloned.GetCollectionID(), cloned.GetPartitionID(), cloned.GetID(), cloned.GetDeltalogs()) - if err != nil { - return nil, nil, nil, err + logIDsFromPlan := make(map[int64]struct{}) + for _, segBinlogs := range plan.GetSegmentBinlogs() { + for _, fieldBinlog := range segBinlogs.GetDeltalogs() { + for _, binlog := range fieldBinlog.GetBinlogs() { + logIDsFromPlan[binlog.GetLogID()] = struct{}{} } - updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation) - cloned.DroppedAt = uint64(time.Now().UnixNano()) - cloned.Compacted = true - modSegments = append(modSegments, cloned) } } - var startPosition, dmlPosition *msgpb.MsgPosition - for _, s := range modSegments { - if dmlPosition == nil || - s.GetDmlPosition() != nil && s.GetDmlPosition().GetTimestamp() < dmlPosition.GetTimestamp() { - dmlPosition = s.GetDmlPosition() - } - - if startPosition == nil || - s.GetStartPosition() != nil && s.GetStartPosition().GetTimestamp() < startPosition.GetTimestamp() { - startPosition = s.GetStartPosition() - } - } - - // find new added delta logs when executing compaction - // TODO: won't be needed when enable L0 Segment - var originDeltalogs []*datapb.FieldBinlog - for _, s := range modSegments { - originDeltalogs = append(originDeltalogs, s.GetDeltalogs()...) - } - - var deletedDeltalogs []*datapb.FieldBinlog - for _, l := range compactionLogs { - deletedDeltalogs = append(deletedDeltalogs, l.GetDeltalogs()...) - } - // MixCompaction / MergeCompaction will generates one and only one segment compactToSegment := result.GetSegments()[0] - newAddedDeltalogs := updateDeltalogs(originDeltalogs, deletedDeltalogs) - copiedDeltalogs, err := m.copyDeltaFiles(newAddedDeltalogs, modSegments[0].CollectionID, modSegments[0].PartitionID, compactToSegment.GetSegmentID()) + // copy new deltalogs in compactFrom segments to compactTo segments. + // TODO: Not needed when enable L0 segments. + newDeltalogs, err := m.copyNewDeltalogs(latestCompactFromSegments, logIDsFromPlan, compactToSegment.GetSegmentID()) if err != nil { - return nil, nil, nil, err + return nil, nil, err } - deltalogs := append(compactToSegment.GetDeltalogs(), copiedDeltalogs...) - - compactionFrom := make([]UniqueID, 0, len(modSegments)) - for _, s := range modSegments { - compactionFrom = append(compactionFrom, s.GetID()) + if len(newDeltalogs) > 0 { + compactToSegment.Deltalogs = append(compactToSegment.GetDeltalogs(), &datapb.FieldBinlog{Binlogs: newDeltalogs}) } - segmentInfo := &datapb.SegmentInfo{ - ID: compactToSegment.GetSegmentID(), - CollectionID: modSegments[0].CollectionID, - PartitionID: modSegments[0].PartitionID, - InsertChannel: modSegments[0].InsertChannel, - NumOfRows: compactToSegment.NumOfRows, - State: commonpb.SegmentState_Flushed, - MaxRowNum: modSegments[0].MaxRowNum, - Binlogs: compactToSegment.GetInsertLogs(), - Statslogs: compactToSegment.GetField2StatslogPaths(), - Deltalogs: deltalogs, - StartPosition: startPosition, - DmlPosition: dmlPosition, - CreatedByCompaction: true, - CompactionFrom: compactionFrom, - LastExpireTime: plan.GetStartTime(), - Level: datapb.SegmentLevel_L1, + getMinPosition := func(positions []*msgpb.MsgPosition) *msgpb.MsgPosition { + var minPos *msgpb.MsgPosition + for _, pos := range positions { + if minPos == nil || + pos != nil && pos.GetTimestamp() < minPos.GetTimestamp() { + minPos = pos + } + } + return minPos } - segment := NewSegmentInfo(segmentInfo) + + compactToSegmentInfo := NewSegmentInfo( + &datapb.SegmentInfo{ + ID: compactToSegment.GetSegmentID(), + CollectionID: latestCompactFromSegments[0].CollectionID, + PartitionID: latestCompactFromSegments[0].PartitionID, + InsertChannel: plan.GetChannel(), + NumOfRows: compactToSegment.NumOfRows, + State: commonpb.SegmentState_Flushed, + MaxRowNum: latestCompactFromSegments[0].MaxRowNum, + Binlogs: compactToSegment.GetInsertLogs(), + Statslogs: compactToSegment.GetField2StatslogPaths(), + Deltalogs: compactToSegment.GetDeltalogs(), + + CreatedByCompaction: true, + CompactionFrom: compactFromSegIDs, + LastExpireTime: plan.GetStartTime(), + Level: datapb.SegmentLevel_L1, + + StartPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition { + return info.GetStartPosition() + })), + DmlPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition { + return info.GetDmlPosition() + })), + }) // L1 segment with NumRows=0 will be discarded, so no need to change the metric - if segmentInfo.GetNumOfRows() > 0 { - metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows()) + if compactToSegmentInfo.GetNumOfRows() > 0 { + // metrics mutation for compactTo segments + metricMutation.addNewSeg(compactToSegmentInfo.GetState(), compactToSegmentInfo.GetLevel(), compactToSegmentInfo.GetNumOfRows()) + } else { + compactToSegmentInfo.State = commonpb.SegmentState_Dropped } - log.Info("meta update: prepare for complete compaction mutation - complete", - zap.Int64("collectionID", segment.GetCollectionID()), - zap.Int64("partitionID", segment.GetPartitionID()), - zap.Int64("new segment ID", segment.GetID()), - zap.String("new segment level", segment.GetLevel().String()), - zap.Int64("new segment num of rows", segment.GetNumOfRows()), - zap.Any("compacted from", segment.GetCompactionFrom())) + log = log.With( + zap.String("channel", plan.GetChannel()), + zap.Int64("partitionID", compactToSegmentInfo.GetPartitionID()), + zap.Int64("compactTo segmentID", compactToSegmentInfo.GetID()), + zap.Int64("compactTo segment numRows", compactToSegmentInfo.GetNumOfRows()), + zap.Any("compactFrom segments(to be updated as dropped)", compactFromSegIDs), + ) - return modSegments, []*SegmentInfo{segment}, metricMutation, nil -} - -func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, partitionID, targetSegmentID int64) ([]*datapb.FieldBinlog, error) { - ret := make([]*datapb.FieldBinlog, 0, len(binlogs)) - for _, fieldBinlog := range binlogs { - fieldBinlog = proto.Clone(fieldBinlog).(*datapb.FieldBinlog) - for _, binlog := range fieldBinlog.Binlogs { - blobKey := metautil.JoinIDPath(collectionID, partitionID, targetSegmentID, binlog.LogID) - blobPath := path.Join(m.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey) - blob, err := m.chunkManager.Read(m.ctx, binlog.LogPath) - if err != nil { - return nil, err - } - err = m.chunkManager.Write(m.ctx, blobPath, blob) - if err != nil { - return nil, err - } - } - ret = append(ret, fieldBinlog) - } - return ret, nil -} - -func (m *meta) alterMetaStoreAfterCompaction(segmentsCompactTo []*SegmentInfo, segmentsCompactFrom []*SegmentInfo) error { - modInfos := make([]*datapb.SegmentInfo, 0, len(segmentsCompactFrom)) - for _, segment := range segmentsCompactFrom { - modInfos = append(modInfos, segment.SegmentInfo) - } - - newSegments := make([]*datapb.SegmentInfo, len(segmentsCompactTo)) - binlogsIncrements := make([]metastore.BinlogsIncrement, len(segmentsCompactTo)) - for i, seg := range segmentsCompactTo { - newSegment := seg.SegmentInfo - if newSegment.GetNumOfRows() == 0 { - newSegment.State = commonpb.SegmentState_Dropped - } - newSegments[i] = newSegment - binlogsIncrements[i] = metastore.BinlogsIncrement{ - Segment: newSegment, - } - } - modSegIDs := lo.Map(modInfos, func(segment *datapb.SegmentInfo, _ int) int64 { return segment.GetID() }) - newSegIDs := lo.Map(newSegments, func(segment *datapb.SegmentInfo, _ int) int64 { return segment.GetID() }) + log.Debug("meta update: prepare for meta mutation - complete") + compactFromInfos := lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo { + return info.SegmentInfo + }) log.Debug("meta update: alter meta store for compaction updates", - zap.Int64s("compact from segment IDs", modSegIDs), - zap.Int64s("compact to segment IDs", newSegIDs)) - - err := m.catalog.AlterSegments(m.ctx, append(modInfos, newSegments...), binlogsIncrements...) - if err != nil { + zap.Int("binlog count", len(compactToSegmentInfo.GetBinlogs())), + zap.Int("statslog count", len(compactToSegmentInfo.GetStatslogs())), + zap.Int("deltalog count", len(compactToSegmentInfo.GetDeltalogs())), + ) + if err := m.catalog.AlterSegments(m.ctx, append(compactFromInfos, compactToSegmentInfo.SegmentInfo), + metastore.BinlogsIncrement{Segment: compactToSegmentInfo.SegmentInfo}, + ); err != nil { log.Warn("fail to alter segments and new segment", zap.Error(err)) - return err + return nil, nil, err } - for _, s := range segmentsCompactFrom { - m.segments.SetSegment(s.GetID(), s) - } - for _, s := range segmentsCompactTo { - m.segments.SetSegment(s.GetID(), s) - } + lo.ForEach(latestCompactFromSegments, func(info *SegmentInfo, _ int) { + m.segments.SetSegment(info.GetID(), info) + }) + m.segments.SetSegment(compactToSegmentInfo.GetID(), compactToSegmentInfo) - log.Info("meta update: alter in memory meta after compaction - complete", - zap.Int64s("compact from segment IDs", modSegIDs), - zap.Int64s("compact to segment IDs", newSegIDs)) - return nil + log.Info("meta update: alter in memory meta after compaction - complete") + return []*SegmentInfo{compactToSegmentInfo}, metricMutation, nil } -func updateDeltalogs(origin []*datapb.FieldBinlog, removes []*datapb.FieldBinlog) []*datapb.FieldBinlog { - res := make([]*datapb.FieldBinlog, 0, len(origin)) - for _, fbl := range origin { - logs := make(map[int64]*datapb.Binlog) - for _, d := range fbl.GetBinlogs() { - logs[d.GetLogID()] = d - } - for _, remove := range removes { - if remove.GetFieldID() == fbl.GetFieldID() { - for _, r := range remove.GetBinlogs() { - delete(logs, r.GetLogID()) +func (m *meta) copyNewDeltalogs(latestCompactFromInfos []*SegmentInfo, logIDsInPlan map[int64]struct{}, toSegment int64) ([]*datapb.Binlog, error) { + newBinlogs := []*datapb.Binlog{} + for _, seg := range latestCompactFromInfos { + for _, fieldLog := range seg.GetDeltalogs() { + for _, l := range fieldLog.GetBinlogs() { + if _, ok := logIDsInPlan[l.GetLogID()]; !ok { + fromKey := metautil.BuildDeltaLogPath(m.chunkManager.RootPath(), seg.CollectionID, seg.PartitionID, seg.ID, l.GetLogID()) + toKey := metautil.BuildDeltaLogPath(m.chunkManager.RootPath(), seg.CollectionID, seg.PartitionID, toSegment, l.GetLogID()) + log.Warn("found new deltalog in compactFrom segment, copying it...", + zap.Any("deltalog", l), + zap.Int64("copyFrom segmentID", seg.GetID()), + zap.Int64("copyTo segmentID", toSegment), + zap.String("copyFrom key", fromKey), + zap.String("copyTo key", toKey), + ) + + blob, err := m.chunkManager.Read(m.ctx, fromKey) + if err != nil { + return nil, err + } + + if err := m.chunkManager.Write(m.ctx, toKey, blob); err != nil { + return nil, err + } + newBinlogs = append(newBinlogs, l) } } } - binlogs := make([]*datapb.Binlog, 0, len(logs)) - for _, l := range logs { - binlogs = append(binlogs, l) - } - if len(binlogs) > 0 { - res = append(res, &datapb.FieldBinlog{ - FieldID: fbl.GetFieldID(), - Binlogs: binlogs, - }) - } } - - return res + return newBinlogs, nil } // buildSegment utility function for compose datapb.SegmentInfo struct with provided info diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 4cd157478a..7a9bbd621c 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" - "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -197,97 +196,129 @@ func (suite *MetaBasicSuite) TestCollection() { suite.MetricsEqual(metrics.DataCoordNumCollections.WithLabelValues(), 1) } -func (suite *MetaBasicSuite) TestPrepareCompleteCompactionMutation() { - prepareSegments := &SegmentsInfo{ +func (suite *MetaBasicSuite) TestCompleteCompactionMutation() { + latestSegments := &SegmentsInfo{ map[UniqueID]*SegmentInfo{ 1: {SegmentInfo: &datapb.SegmentInfo{ ID: 1, CollectionID: 100, PartitionID: 10, State: commonpb.SegmentState_Flushed, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 1, 2)}, - NumOfRows: 1, + Level: datapb.SegmentLevel_L1, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)}, + // latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)}, + NumOfRows: 2, }}, 2: {SegmentInfo: &datapb.SegmentInfo{ ID: 2, CollectionID: 100, PartitionID: 10, State: commonpb.SegmentState_Flushed, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 3, 4)}, - NumOfRows: 1, + Level: datapb.SegmentLevel_L1, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 11000)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 21000)}, + // latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 31000), getFieldBinlogIDs(0, 31001)}, + NumOfRows: 2, }}, }, } - // m := suite.meta + mockChMgr := mocks.NewChunkManager(suite.T()) + mockChMgr.EXPECT().RootPath().Return("mockroot").Times(4) + mockChMgr.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, nil).Twice() + mockChMgr.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice() + m := &meta{ - catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, - segments: prepareSegments, + catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, + segments: latestSegments, + chunkManager: mockChMgr, } plan := &datapb.CompactionPlan{ SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ { SegmentID: 1, - FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 1, 2)}, + FieldBinlogs: m.GetSegment(1).GetBinlogs(), + Field2StatslogPaths: m.GetSegment(1).GetStatslogs(), + Deltalogs: m.GetSegment(1).GetDeltalogs()[:1], // compaction plan use only 1 deltalog }, { SegmentID: 2, - FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 3, 4)}, + FieldBinlogs: m.GetSegment(2).GetBinlogs(), + Field2StatslogPaths: m.GetSegment(2).GetStatslogs(), + Deltalogs: m.GetSegment(2).GetDeltalogs()[:1], // compaction plan use only 1 deltalog }, }, - StartTime: 15, } - inSegment := &datapb.CompactionSegment{ + compactToSeg := &datapb.CompactionSegment{ SegmentID: 3, - InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 5)}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 5)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 5)}, + InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50000)}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50001)}, NumOfRows: 2, } - inCompactionResult := &datapb.CompactionPlanResult{ - Segments: []*datapb.CompactionSegment{inSegment}, + + result := &datapb.CompactionPlanResult{ + Segments: []*datapb.CompactionSegment{compactToSeg}, } - afterCompact, newSegment, metricMutation, err := m.prepareCompactionMutation(plan, inCompactionResult) + + infos, mutation, err := m.CompleteCompactionMutation(plan, result) + suite.Equal(1, len(infos)) + info := infos[0] suite.NoError(err) - suite.NotNil(afterCompact) - suite.NotNil(newSegment) - suite.Equal(2, len(metricMutation.stateChange[datapb.SegmentLevel_Legacy.String()])) - suite.Equal(1, len(metricMutation.stateChange[datapb.SegmentLevel_L1.String()])) - suite.Equal(int64(0), metricMutation.rowCountChange) - suite.Equal(int64(2), metricMutation.rowCountAccChange) + suite.NotNil(info) + suite.NotNil(mutation) - suite.Require().Equal(2, len(afterCompact)) - suite.Equal(commonpb.SegmentState_Dropped, afterCompact[0].GetState()) - suite.Equal(commonpb.SegmentState_Dropped, afterCompact[1].GetState()) - suite.NotZero(afterCompact[0].GetDroppedAt()) - suite.NotZero(afterCompact[1].GetDroppedAt()) + // check newSegment + suite.EqualValues(3, info.GetID()) + suite.Equal(datapb.SegmentLevel_L1, info.GetLevel()) + suite.Equal(commonpb.SegmentState_Flushed, info.GetState()) - suite.Equal(inSegment.SegmentID, newSegment[0].GetID()) - suite.Equal(UniqueID(100), newSegment[0].GetCollectionID()) - suite.Equal(UniqueID(10), newSegment[0].GetPartitionID()) - suite.Equal(inSegment.NumOfRows, newSegment[0].GetNumOfRows()) - suite.Equal(commonpb.SegmentState_Flushed, newSegment[0].GetState()) + binlogs := info.GetBinlogs() + for _, fbinlog := range binlogs { + for _, blog := range fbinlog.GetBinlogs() { + suite.Empty(blog.GetLogPath()) + suite.EqualValues(50000, blog.GetLogID()) + } + } - suite.EqualValues(inSegment.GetInsertLogs(), newSegment[0].GetBinlogs()) - suite.EqualValues(inSegment.GetField2StatslogPaths(), newSegment[0].GetStatslogs()) - suite.EqualValues(inSegment.GetDeltalogs(), newSegment[0].GetDeltalogs()) - suite.NotZero(newSegment[0].lastFlushTime) - suite.Equal(uint64(15), newSegment[0].GetLastExpireTime()) + statslogs := info.GetStatslogs() + for _, fbinlog := range statslogs { + for _, blog := range fbinlog.GetBinlogs() { + suite.Empty(blog.GetLogPath()) + suite.EqualValues(50001, blog.GetLogID()) + } + } - segmentsDone, metricMutationDone, err := m.CompleteCompactionMutation(plan, inCompactionResult) - suite.NoError(err) - suite.NotNil(segmentsDone) - suite.NotNil(metricMutationDone) + deltalogs := info.GetDeltalogs() + deltalogIDs := []int64{} + for _, fbinlog := range deltalogs { + for _, blog := range fbinlog.GetBinlogs() { + suite.Empty(blog.GetLogPath()) + deltalogIDs = append(deltalogIDs, blog.GetLogID()) + } + } + suite.ElementsMatch([]int64{30001, 31001}, deltalogIDs) + + // check compactFrom segments + for _, segID := range []int64{1, 2} { + seg := m.GetSegment(segID) + suite.Equal(commonpb.SegmentState_Dropped, seg.GetState()) + suite.NotEmpty(seg.GetDroppedAt()) + + suite.EqualValues(segID, seg.GetID()) + suite.ElementsMatch(latestSegments.segments[segID].GetBinlogs(), seg.GetBinlogs()) + suite.ElementsMatch(latestSegments.segments[segID].GetStatslogs(), seg.GetStatslogs()) + suite.ElementsMatch(latestSegments.segments[segID].GetDeltalogs(), seg.GetDeltalogs()) + } + + // check mutation metrics + suite.Equal(2, len(mutation.stateChange[datapb.SegmentLevel_L1.String()])) + suite.EqualValues(-2, mutation.rowCountChange) + suite.EqualValues(2, mutation.rowCountAccChange) } func TestMeta(t *testing.T) { @@ -731,59 +762,6 @@ func TestUpdateSegmentsInfo(t *testing.T) { }) } -func TestMeta_alterMetaStore(t *testing.T) { - toAlter := []*datapb.SegmentInfo{ - { - CollectionID: 100, - PartitionID: 10, - ID: 1, - NumOfRows: 10, - }, - } - - newSeg := &datapb.SegmentInfo{ - Binlogs: []*datapb.FieldBinlog{ - { - FieldID: 101, - Binlogs: []*datapb.Binlog{}, - }, - }, - Statslogs: []*datapb.FieldBinlog{ - { - FieldID: 101, - Binlogs: []*datapb.Binlog{}, - }, - }, - Deltalogs: []*datapb.FieldBinlog{ - { - FieldID: 101, - Binlogs: []*datapb.Binlog{}, - }, - }, - CollectionID: 100, - PartitionID: 10, - ID: 2, - NumOfRows: 15, - } - - m := &meta{ - catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, - segments: &SegmentsInfo{map[int64]*SegmentInfo{ - 1: {SegmentInfo: &datapb.SegmentInfo{ - ID: 1, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - }}, - }}, - } - - err := m.alterMetaStoreAfterCompaction([]*SegmentInfo{{SegmentInfo: newSeg}}, lo.Map(toAlter, func(t *datapb.SegmentInfo, _ int) *SegmentInfo { - return &SegmentInfo{SegmentInfo: t} - })) - assert.NoError(t, err) -} - func Test_meta_SetSegmentCompacting(t *testing.T) { type fields struct { client kv.MetaKv diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 10b1c65beb..21e2b3c1ef 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -540,6 +540,11 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath log.Error("save binlog and checkpoints failed", zap.Error(err)) return merr.Status(err), nil } + log.Info("SaveBinlogPaths sync segment with meta", + zap.Any("binlogs", req.GetField2BinlogPaths()), + zap.Any("deltalogs", req.GetDeltalogs()), + zap.Any("statslogs", req.GetField2StatslogPaths()), + ) if req.GetSegLevel() == datapb.SegmentLevel_L0 { metrics.DataCoordSizeStoredL0Segment.WithLabelValues(fmt.Sprint(req.GetCollectionID())).Observe(calculateL0SegmentSize(req.GetField2StatslogPaths()))