diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index b3318e1a89..26095080d9 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1660,8 +1660,10 @@ func (m *meta) completeMixCompactionMutation( log = log.With(zap.Int64s("compactFrom", compactFromSegIDs)) resultInvisible := false + targetSegmentLevel := datapb.SegmentLevel_L1 if t.GetType() == datapb.CompactionType_SortCompaction { resultInvisible = compactFromSegInfos[0].GetIsInvisible() + targetSegmentLevel = compactFromSegInfos[0].GetLevel() if !compactFromSegInfos[0].GetCreatedByCompaction() { resultInvisible = false } @@ -1687,7 +1689,7 @@ func (m *meta) completeMixCompactionMutation( CreatedByCompaction: true, CompactionFrom: compactFromSegIDs, LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(t.GetStartTime(), 0), 0), - Level: datapb.SegmentLevel_L1, + Level: targetSegmentLevel, StorageVersion: compactToSegment.GetStorageVersion(), StartPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition { return info.GetStartPosition() diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 22f3ce0a44..c18d246b97 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -425,6 +425,87 @@ func (suite *MetaBasicSuite) TestCompleteCompactionMutation() { droppedCount := mutation.stateChange[datapb.SegmentLevel_L1.String()][commonpb.SegmentState_Dropped.String()][getSortStatus(false)] suite.EqualValues(2, droppedCount) }) + + suite.Run("test L2 sort", func() { + getLatestSegments := func() *SegmentsInfo { + latestSegments := NewSegmentsInfo() + for segID, segment := range map[UniqueID]*SegmentInfo{ + 1: {SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 100, + PartitionID: 10, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)}, + // latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)}, + NumOfRows: 2, + }}, + } { + latestSegments.SetSegment(segID, segment) + } + + return latestSegments + } + + latestSegments := getLatestSegments() + compactToSeg := &datapb.CompactionSegment{ + SegmentID: 2, + InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50000)}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50001)}, + NumOfRows: 2, + } + + result := &datapb.CompactionPlanResult{ + Segments: []*datapb.CompactionSegment{compactToSeg}, + } + task := &datapb.CompactionTask{ + InputSegments: []UniqueID{1}, + Type: datapb.CompactionType_SortCompaction, + } + m := &meta{ + catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, + segments: latestSegments, + chunkManager: mockChMgr, + } + + infos, mutation, err := m.CompleteCompactionMutation(context.TODO(), task, result) + assert.NoError(suite.T(), err) + suite.Equal(1, len(infos)) + info := infos[0] + suite.NoError(err) + suite.NotNil(info) + suite.NotNil(mutation) + + // check newSegment + suite.EqualValues(2, info.GetID()) + suite.Equal(datapb.SegmentLevel_L2, info.GetLevel()) + suite.Equal(commonpb.SegmentState_Flushed, info.GetState()) + + binlogs := info.GetBinlogs() + for _, fbinlog := range binlogs { + for _, blog := range fbinlog.GetBinlogs() { + suite.Empty(blog.GetLogPath()) + suite.EqualValues(50000, blog.GetLogID()) + } + } + + statslogs := info.GetStatslogs() + for _, fbinlog := range statslogs { + for _, blog := range fbinlog.GetBinlogs() { + suite.Empty(blog.GetLogPath()) + suite.EqualValues(50001, blog.GetLogID()) + } + } + + // check compactFrom segments + for _, segID := range []int64{1} { + seg := m.GetSegment(context.TODO(), segID) + suite.Equal(commonpb.SegmentState_Dropped, seg.GetState()) + suite.NotEmpty(seg.GetDroppedAt()) + } + }) } func (suite *MetaBasicSuite) TestSetSegment() {