From ac8c5fcd5d833fa270fb5257d538e61710f1e032 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Wed, 23 Oct 2024 17:15:28 +0800 Subject: [PATCH] enhance: Remove pre-marking segments as L2 during clustering compaction (#36799) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit issue: #36686 This pr will remove pre-marking segments as L2 during clustering compaction in version 2.5, and ensure compatibility with version 2.4. The core of this change is to **ensure that the many-to-many lineage derivation logic is correct, making sure that both the parent and child cannot simultaneously exist in the target segment view.** feature: - Clustering compaction no longer marks the input segments as L2. - Add a new field `is_invisible` to `segmentInfo`, and mark segments that have completed clustering but have not yet built indexes as `is_invisible` to prevent them from being loaded prematurely." - Do not mark the input segment as `Dropped` before the clustering compaction is completed. - After compaction fails, only the result segment needs to be marked as Dropped. compatibility: - If the upgraded task has not failed, there are no compatibility issues. - If the status after the upgrade is `MetaSaved`, then skip the stats task based on whether TmpSegments is empty. - If the failure occurs before `MetaSaved`: - there are no ResultSegments, and InputSegments have not been marked as dropped yet. - the level of input segments need to revert to LastLevel - If the failure occurs after `MetaSaved`: - ResultSegments have already been generated, and InputSegments have been marked as Dropped. At this point, simply make the ResultSegments visible. - the level of ResultSegments needs to be set to L1(in order to participate in mixCompaction) --------- Signed-off-by: Cai Zhang --- .../datacoord/compaction_policy_clustering.go | 3 +- .../datacoord/compaction_policy_single.go | 3 +- .../datacoord/compaction_task_clustering.go | 145 ++++++--- .../compaction_task_clustering_test.go | 196 +++++++++-- internal/datacoord/compaction_trigger.go | 3 +- internal/datacoord/handler.go | 96 +++--- internal/datacoord/handler_test.go | 306 ++++++++++++++++++ internal/datacoord/meta.go | 91 +++--- internal/datacoord/partition_stats_meta.go | 2 + .../datacoord/partition_stats_meta_test.go | 2 +- internal/datacoord/task_stats_test.go | 7 +- internal/proto/data_coord.proto | 4 + 12 files changed, 692 insertions(+), 166 deletions(-) diff --git a/internal/datacoord/compaction_policy_clustering.go b/internal/datacoord/compaction_policy_clustering.go index 093e0caa18..35ac38e3db 100644 --- a/internal/datacoord/compaction_policy_clustering.go +++ b/internal/datacoord/compaction_policy_clustering.go @@ -126,7 +126,8 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte isFlush(segment) && !segment.isCompacting && // not compacting now !segment.GetIsImporting() && // not importing now - segment.GetLevel() != datapb.SegmentLevel_L0 // ignore level zero segments + segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments + !segment.GetIsInvisible() }) views := make([]CompactionView, 0) diff --git a/internal/datacoord/compaction_policy_single.go b/internal/datacoord/compaction_policy_single.go index 1311bcb2fa..8a59ee6e2d 100644 --- a/internal/datacoord/compaction_policy_single.go +++ b/internal/datacoord/compaction_policy_single.go @@ -94,7 +94,8 @@ func (policy *singleCompactionPolicy) triggerOneCollection(ctx context.Context, isFlush(segment) && !segment.isCompacting && // not compacting now !segment.GetIsImporting() && // not importing now - segment.GetLevel() == datapb.SegmentLevel_L2 // only support L2 for now + segment.GetLevel() == datapb.SegmentLevel_L2 && // only support L2 for now + !segment.GetIsInvisible() }) views := make([]CompactionView, 0) diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 35bf82a78a..081d46ae8f 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -213,15 +213,7 @@ func (t *clusteringCompactionTask) processPipelining() error { log.Debug("wait for the node to be assigned before proceeding with the subsequent steps") return nil } - var operators []UpdateOperator - for _, segID := range t.InputSegments { - operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L2)) - } - err := t.meta.UpdateSegmentsInfo(operators...) - if err != nil { - log.Warn("fail to set segment level to L2", zap.Error(err)) - return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo before compaction executing", err) - } + // don't mark segment level to L2 before clustering compaction after v2.5.0 if typeutil.IsVectorType(t.GetClusteringKeyField().DataType) { err := t.doAnalyze() @@ -298,6 +290,12 @@ func (t *clusteringCompactionTask) processMetaSaved() error { }); err != nil { log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) } + // to ensure compatibility, if a task upgraded from version 2.4 has a status of MetaSave, + // its TmpSegments will be empty, so skip the stats task, to build index. + if len(t.GetTmpSegments()) == 0 { + log.Info("tmp segments is nil, skip stats task", zap.Int64("planID", t.GetPlanID())) + return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing)) + } return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_statistic)) } @@ -418,34 +416,63 @@ func (t *clusteringCompactionTask) processIndexing() error { return nil } +func (t *clusteringCompactionTask) markResultSegmentsVisible() error { + var operators []UpdateOperator + for _, segID := range t.GetResultSegments() { + operators = append(operators, UpdateSegmentVisible(segID)) + operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetPlanID())) + } + + err := t.meta.UpdateSegmentsInfo(operators...) + if err != nil { + log.Warn("markResultSegmentVisible UpdateSegmentsInfo fail", zap.Error(err)) + return merr.WrapErrClusteringCompactionMetaError("markResultSegmentVisible UpdateSegmentsInfo", err) + } + return nil +} + +func (t *clusteringCompactionTask) markInputSegmentsDropped() error { + var operators []UpdateOperator + // mark + for _, segID := range t.GetInputSegments() { + operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped)) + } + err := t.meta.UpdateSegmentsInfo(operators...) + if err != nil { + log.Warn("markInputSegmentsDropped UpdateSegmentsInfo fail", zap.Error(err)) + return merr.WrapErrClusteringCompactionMetaError("markInputSegmentsDropped UpdateSegmentsInfo", err) + } + return nil +} + // indexed is the final state of a clustering compaction task // one task should only run this once func (t *clusteringCompactionTask) completeTask() error { - err := t.meta.GetPartitionStatsMeta().SavePartitionStatsInfo(&datapb.PartitionStatsInfo{ + var err error + // first mark result segments visible + if err = t.markResultSegmentsVisible(); err != nil { + return err + } + + // update current partition stats version + // at this point, the segment view includes both the input segments and the result segments. + if err = t.meta.GetPartitionStatsMeta().SavePartitionStatsInfo(&datapb.PartitionStatsInfo{ CollectionID: t.GetCollectionID(), PartitionID: t.GetPartitionID(), VChannel: t.GetChannel(), Version: t.GetPlanID(), SegmentIDs: t.GetResultSegments(), CommitTime: time.Now().Unix(), - }) - if err != nil { + }); err != nil { return merr.WrapErrClusteringCompactionMetaError("SavePartitionStatsInfo", err) } - var operators []UpdateOperator - for _, segID := range t.GetResultSegments() { - operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetPlanID())) - } - err = t.meta.UpdateSegmentsInfo(operators...) - if err != nil { - return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentPartitionStatsVersion", err) + // mark input segments as dropped + // now, the segment view only includes the result segments. + if err = t.markInputSegmentsDropped(); err != nil { + return err } - err = t.meta.GetPartitionStatsMeta().SaveCurrentPartitionStatsVersion(t.GetCollectionID(), t.GetPartitionID(), t.GetChannel(), t.GetPlanID()) - if err != nil { - return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPartitionStatsVersion", err) - } return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)) } @@ -479,31 +506,65 @@ func (t *clusteringCompactionTask) resetSegmentCompacting() { func (t *clusteringCompactionTask) processFailedOrTimeout() error { log.Info("clean task", zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()), zap.String("state", t.GetState().String())) - if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetPlanID(), }); err != nil { log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) } + isInputDropped := false + for _, segID := range t.GetInputSegments() { + if t.meta.GetHealthySegment(segID) == nil { + isInputDropped = true + break + } + } + if isInputDropped { + log.Info("input segments dropped, doing for compatibility", + zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID())) + // this task must be generated by v2.4, just for compatibility + // revert segments meta + var operators []UpdateOperator + // revert level of input segments + // L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1 + // L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2 + for _, segID := range t.GetInputSegments() { + operators = append(operators, RevertSegmentLevelOperator(segID)) + } + // if result segments are generated but task fail in the other steps, mark them as L1 segments without partitions stats + for _, segID := range t.GetResultSegments() { + operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1)) + operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0)) + } + for _, segID := range t.GetTmpSegments() { + // maybe no necessary, there will be no `TmpSegments` that task was generated by v2.4 + operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1)) + operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0)) + } + err := t.meta.UpdateSegmentsInfo(operators...) + if err != nil { + log.Warn("UpdateSegmentsInfo fail", zap.Error(err)) + return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err) + } + } else { + // after v2.5.0, mark the results segment as dropped + var operators []UpdateOperator + for _, segID := range t.GetResultSegments() { + // Don't worry about them being loaded; they are all invisible. + operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped)) + } + for _, segID := range t.GetTmpSegments() { + // Don't worry about them being loaded; they are all invisible. + // tmpSegment is always invisible + operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped)) + } - // revert segments meta - var operators []UpdateOperator - // revert level of input segments - // L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1 - // L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2 - for _, segID := range t.InputSegments { - operators = append(operators, RevertSegmentLevelOperator(segID)) - } - // if result segments are generated but task fail in the other steps, mark them as L1 segments without partitions stats - for _, segID := range t.ResultSegments { - operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1)) - operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0)) - } - err := t.meta.UpdateSegmentsInfo(operators...) - if err != nil { - log.Warn("UpdateSegmentsInfo fail", zap.Error(err)) - return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err) + err := t.meta.UpdateSegmentsInfo(operators...) + if err != nil { + log.Warn("UpdateSegmentsInfo fail", zap.Error(err)) + return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err) + } } + t.resetSegmentCompacting() // drop partition stats if uploaded @@ -514,7 +575,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error { Version: t.GetPlanID(), SegmentIDs: t.GetResultSegments(), } - err = t.meta.CleanPartitionStatsInfo(partitionStatsInfo) + err := t.meta.CleanPartitionStatsInfo(partitionStatsInfo) if err != nil { log.Warn("gcPartitionStatsInfo fail", zap.Error(err)) } diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index 7e215ef362..a318bc8f71 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -107,53 +107,181 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang }, }) s.mockSessionMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil) - s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) - task := s.generateBasicTask(false) task.processPipelining() seg11 := s.meta.GetSegment(101) - s.Equal(datapb.SegmentLevel_L2, seg11.Level) + s.Equal(datapb.SegmentLevel_L1, seg11.Level) seg21 := s.meta.GetSegment(102) s.Equal(datapb.SegmentLevel_L2, seg21.Level) s.Equal(int64(10000), seg21.PartitionStatsVersion) - task.ResultSegments = []int64{103, 104} - // fake some compaction result segment - s.meta.AddSegment(context.TODO(), &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ - ID: 103, - State: commonpb.SegmentState_Flushed, - Level: datapb.SegmentLevel_L2, - CreatedByCompaction: true, - PartitionStatsVersion: 10001, - }, - }) - s.meta.AddSegment(context.TODO(), &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ - ID: 104, - State: commonpb.SegmentState_Flushed, - Level: datapb.SegmentLevel_L2, - CreatedByCompaction: true, - PartitionStatsVersion: 10001, - }, + s.Run("v2.4.x", func() { + // fake some compaction result segment + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 101, + State: commonpb.SegmentState_Dropped, + LastLevel: datapb.SegmentLevel_L1, + Level: datapb.SegmentLevel_L2, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 102, + State: commonpb.SegmentState_Dropped, + LastLevel: datapb.SegmentLevel_L2, + Level: datapb.SegmentLevel_L2, + PartitionStatsVersion: 10000, + }, + }) + + // fake some compaction result segment + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 103, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + CreatedByCompaction: true, + PartitionStatsVersion: 10001, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 104, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + CreatedByCompaction: true, + PartitionStatsVersion: 10001, + }, + }) + + task := s.generateBasicTask(false) + task.sessions = s.mockSessionMgr + s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + task.InputSegments = []int64{101, 102} + task.ResultSegments = []int64{103, 104} + + task.processFailedOrTimeout() + + seg12 := s.meta.GetSegment(101) + s.Equal(datapb.SegmentLevel_L1, seg12.Level) + s.Equal(commonpb.SegmentState_Dropped, seg12.State) + + seg22 := s.meta.GetSegment(102) + s.Equal(datapb.SegmentLevel_L2, seg22.Level) + s.Equal(int64(10000), seg22.PartitionStatsVersion) + s.Equal(commonpb.SegmentState_Dropped, seg22.State) + + seg32 := s.meta.GetSegment(103) + s.Equal(datapb.SegmentLevel_L1, seg32.Level) + s.Equal(int64(0), seg32.PartitionStatsVersion) + s.Equal(commonpb.SegmentState_Flushed, seg32.State) + + seg42 := s.meta.GetSegment(104) + s.Equal(datapb.SegmentLevel_L1, seg42.Level) + s.Equal(int64(0), seg42.PartitionStatsVersion) + s.Equal(commonpb.SegmentState_Flushed, seg42.State) }) - task.processFailedOrTimeout() + s.Run("v2.5.0", func() { + // fake some compaction result segment + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 101, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 102, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + PartitionStatsVersion: 10000, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 103, + State: commonpb.SegmentState_Dropped, + Level: datapb.SegmentLevel_L2, + CreatedByCompaction: true, + PartitionStatsVersion: 10001, + IsInvisible: true, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 104, + State: commonpb.SegmentState_Dropped, + Level: datapb.SegmentLevel_L2, + CreatedByCompaction: true, + PartitionStatsVersion: 10001, + IsInvisible: true, + }, + }) - seg12 := s.meta.GetSegment(101) - s.Equal(datapb.SegmentLevel_L1, seg12.Level) - seg22 := s.meta.GetSegment(102) - s.Equal(datapb.SegmentLevel_L2, seg22.Level) - s.Equal(int64(10000), seg22.PartitionStatsVersion) + // fake some compaction result segment + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 105, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + CreatedByCompaction: true, + PartitionStatsVersion: 10001, + IsInvisible: true, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 106, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + CreatedByCompaction: true, + PartitionStatsVersion: 10001, + IsInvisible: true, + }, + }) - seg32 := s.meta.GetSegment(103) - s.Equal(datapb.SegmentLevel_L1, seg32.Level) - s.Equal(int64(0), seg32.PartitionStatsVersion) - seg42 := s.meta.GetSegment(104) - s.Equal(datapb.SegmentLevel_L1, seg42.Level) - s.Equal(int64(0), seg42.PartitionStatsVersion) + task := s.generateBasicTask(false) + task.sessions = s.mockSessionMgr + s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + task.InputSegments = []int64{101, 102} + task.TmpSegments = []int64{103, 104} + task.ResultSegments = []int64{105, 106} + + task.processFailedOrTimeout() + + seg12 := s.meta.GetSegment(101) + s.Equal(datapb.SegmentLevel_L1, seg12.Level) + seg22 := s.meta.GetSegment(102) + s.Equal(datapb.SegmentLevel_L2, seg22.Level) + s.Equal(int64(10000), seg22.PartitionStatsVersion) + + seg32 := s.meta.GetSegment(103) + s.Equal(datapb.SegmentLevel_L2, seg32.Level) + s.Equal(commonpb.SegmentState_Dropped, seg32.State) + s.True(seg32.IsInvisible) + + seg42 := s.meta.GetSegment(104) + s.Equal(datapb.SegmentLevel_L2, seg42.Level) + s.Equal(commonpb.SegmentState_Dropped, seg42.State) + s.True(seg42.IsInvisible) + + seg52 := s.meta.GetSegment(105) + s.Equal(datapb.SegmentLevel_L2, seg52.Level) + s.Equal(int64(10001), seg52.PartitionStatsVersion) + s.Equal(commonpb.SegmentState_Dropped, seg52.State) + s.True(seg52.IsInvisible) + + seg62 := s.meta.GetSegment(106) + s.Equal(datapb.SegmentLevel_L2, seg62.Level) + s.Equal(int64(10001), seg62.PartitionStatsVersion) + s.Equal(commonpb.SegmentState_Dropped, seg62.State) + s.True(seg62.IsInvisible) + }) } func (s *ClusteringCompactionTaskSuite) generateBasicTask(vectorClusteringKey bool) *clusteringCompactionTask { diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 66bd954e06..6e27212d36 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -317,7 +317,8 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { !segment.isCompacting && // not compacting now !segment.GetIsImporting() && // not importing now segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments - segment.GetLevel() != datapb.SegmentLevel_L2 // ignore l2 segment + segment.GetLevel() != datapb.SegmentLevel_L2 && // ignore l2 segment + !segment.GetIsInvisible() }) // partSegments is list of chanPartSegments, which is channel-partition organized segments if len(partSegments) == 0 { diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 60bec831c3..2c78982996 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -76,6 +76,10 @@ func (h *ServerHandler) GetDataVChanPositions(channel RWChannel, partitionID Uni // Skip bulk insert segments. continue } + if s.GetIsInvisible() { + // skip invisible segments + continue + } if s.GetState() == commonpb.SegmentState_Dropped { droppedIDs.Insert(s.GetID()) @@ -119,10 +123,11 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . } var ( - flushedIDs = make(typeutil.UniqueSet) - droppedIDs = make(typeutil.UniqueSet) - growingIDs = make(typeutil.UniqueSet) - levelZeroIDs = make(typeutil.UniqueSet) + flushedIDs = make(typeutil.UniqueSet) + droppedIDs = make(typeutil.UniqueSet) + growingIDs = make(typeutil.UniqueSet) + levelZeroIDs = make(typeutil.UniqueSet) + newFlushedIDs = make(typeutil.UniqueSet) ) // cannot use GetSegmentsByChannel since dropped segments are needed here @@ -132,7 +137,6 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . indexedSegments := FilterInIndexedSegments(h, h.s.meta, false, segments...) indexed := typeutil.NewUniqueSet(lo.Map(indexedSegments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })...) - unIndexedIDs := make(typeutil.UniqueSet) for _, s := range segments { if s.GetStartPosition() == nil && s.GetDmlPosition() == nil { continue @@ -141,37 +145,22 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . // Skip bulk insert segments. continue } - - currentPartitionStatsVersion := h.s.meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(channel.GetCollectionID(), s.GetPartitionID(), channel.GetName()) - if s.GetLevel() == datapb.SegmentLevel_L2 && s.GetPartitionStatsVersion() != currentPartitionStatsVersion { - // in the process of L2 compaction, newly generated segment may be visible before the whole L2 compaction Plan - // is finished, we have to skip these fast-finished segment because all segments in one L2 Batch must be - // seen atomically, otherwise users will see intermediate result + if s.GetIsInvisible() { + // skip invisible segments continue } validSegmentInfos[s.GetID()] = s switch { case s.GetState() == commonpb.SegmentState_Dropped: - if s.GetLevel() == datapb.SegmentLevel_L2 && s.GetPartitionStatsVersion() == currentPartitionStatsVersion { - // if segment.partStatsVersion is equal to currentPartitionStatsVersion, - // it must have been indexed, this is guaranteed by clustering compaction process - // this is to ensure that the current valid L2 compaction produce is available to search/query - // to avoid insufficient data - flushedIDs.Insert(s.GetID()) - continue - } droppedIDs.Insert(s.GetID()) case !isFlushState(s.GetState()): growingIDs.Insert(s.GetID()) case s.GetLevel() == datapb.SegmentLevel_L0: levelZeroIDs.Insert(s.GetID()) - case indexed.Contain(s.GetID()) || s.GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64(): - // fill in indexed segments into flushed directly - flushedIDs.Insert(s.GetID()) + default: - // unIndexed segments will be checked if it's parents are all indexed - unIndexedIDs.Insert(s.GetID()) + flushedIDs.Insert(s.GetID()) } } @@ -203,36 +192,55 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . return true } - retrieveUnIndexed := func() bool { + var compactionFromExist func(segID UniqueID) bool + + compactionFromExist = func(segID UniqueID) bool { + compactionFrom := validSegmentInfos[segID].GetCompactionFrom() + if len(compactionFrom) == 0 || !isValid(compactionFrom...) { + return false + } + for _, fromID := range compactionFrom { + if flushedIDs.Contain(fromID) || newFlushedIDs.Contain(fromID) { + return true + } + if compactionFromExist(fromID) { + return true + } + } + return false + } + + segmentIndexed := func(segID UniqueID) bool { + return indexed.Contain(segID) || validSegmentInfos[segID].GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() + } + + retrieve := func() bool { continueRetrieve := false - for id := range unIndexedIDs { + for id := range flushedIDs { compactionFrom := validSegmentInfos[id].GetCompactionFrom() - compactTos := []UniqueID{} // neighbors and itself - if len(compactionFrom) > 0 && isValid(compactionFrom...) { + if len(compactionFrom) == 0 || !isValid(compactionFrom...) { + newFlushedIDs.Insert(id) + continue + } + if segmentIndexed(id) && !compactionFromExist(id) { + newFlushedIDs.Insert(id) + } else { for _, fromID := range compactionFrom { - if len(compactTos) == 0 { - compactToInfo, _ := h.s.meta.GetCompactionTo(fromID) - compactTos = lo.Map(compactToInfo, func(s *SegmentInfo, _ int) UniqueID { return s.GetID() }) - } - if indexed.Contain(fromID) { - flushedIDs.Insert(fromID) - } else { - unIndexedIDs.Insert(fromID) - continueRetrieve = true - } + newFlushedIDs.Insert(fromID) + continueRetrieve = true + droppedIDs.Remove(fromID) } - unIndexedIDs.Remove(compactTos...) - flushedIDs.Remove(compactTos...) - droppedIDs.Remove(compactionFrom...) } } return continueRetrieve } - for retrieveUnIndexed() { + + for retrieve() { + flushedIDs = newFlushedIDs + newFlushedIDs = make(typeutil.UniqueSet) } - // unindexed is flushed segments as well - flushedIDs.Insert(unIndexedIDs.Collect()...) + flushedIDs = newFlushedIDs log.Info("GetQueryVChanPositions", zap.Int64("collectionID", channel.GetCollectionID()), diff --git a/internal/datacoord/handler_test.go b/internal/datacoord/handler_test.go index 6ec69eede6..5814508062 100644 --- a/internal/datacoord/handler_test.go +++ b/internal/datacoord/handler_test.go @@ -560,6 +560,312 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { // assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds)) // assert.ElementsMatch(t, []int64{e.GetID()}, vchan.FlushedSegmentIds) // expected e }) + + t.Run("complex derivation", func(t *testing.T) { + // numbers indicate segmentID, letters indicate segment information + // i: indexed, u: unindexed, g: gced + // 1i, 2i, 3g 4i, 5i, 6i + // | | | | | | + // \ | / \ | / + // \ | / \ | / + // 7u, [8i,9i,10i] [11u, 12i] + // | | | | | | + // \ | / \ / | + // \ | / \ / | + // [13u] [14i, 15u] 12i + // | | | | + // \ / \ / + // \ / \ / + // [16u] [17u] + // all leaf nodes are [1,2,3,4,5,6,7], but because segment3 has been gced, the leaf node becomes [7,8,9,10,4,5,6] + // should be returned: flushed: [7, 8, 9, 10, 4, 5, 6] + svr := newTestServer(t) + defer closeTestServer(t, svr) + schema := newTestSchema() + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Schema: schema, + }) + err := svr.meta.indexMeta.CreateIndex(&model.Index{ + TenantID: "", + CollectionID: 0, + FieldID: 2, + IndexID: 1, + }) + assert.NoError(t, err) + seg1 := &datapb.SegmentInfo{ + ID: 1, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg1)) + assert.NoError(t, err) + seg2 := &datapb.SegmentInfo{ + ID: 2, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) + assert.NoError(t, err) + // seg3 was GCed + seg4 := &datapb.SegmentInfo{ + ID: 4, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg4)) + assert.NoError(t, err) + seg5 := &datapb.SegmentInfo{ + ID: 5, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5)) + assert.NoError(t, err) + seg6 := &datapb.SegmentInfo{ + ID: 6, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg6)) + assert.NoError(t, err) + seg7 := &datapb.SegmentInfo{ + ID: 7, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 2048, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg7)) + assert.NoError(t, err) + seg8 := &datapb.SegmentInfo{ + ID: 8, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{1, 2, 3}, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg8)) + assert.NoError(t, err) + seg9 := &datapb.SegmentInfo{ + ID: 9, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{1, 2, 3}, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg9)) + assert.NoError(t, err) + seg10 := &datapb.SegmentInfo{ + ID: 10, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{1, 2, 3}, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg10)) + assert.NoError(t, err) + seg11 := &datapb.SegmentInfo{ + ID: 11, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 2048, + CompactionFrom: []int64{4, 5, 6}, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg11)) + assert.NoError(t, err) + seg12 := &datapb.SegmentInfo{ + ID: 12, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{4, 5, 6}, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg12)) + assert.NoError(t, err) + seg13 := &datapb.SegmentInfo{ + ID: 13, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 2047, + CompactionFrom: []int64{7, 8, 9}, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg13)) + assert.NoError(t, err) + seg14 := &datapb.SegmentInfo{ + ID: 14, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{10, 11}, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg14)) + assert.NoError(t, err) + seg15 := &datapb.SegmentInfo{ + ID: 15, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 2048, + CompactionFrom: []int64{10, 11}, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg15)) + assert.NoError(t, err) + seg16 := &datapb.SegmentInfo{ + ID: 16, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 2048, + CompactionFrom: []int64{13, 14}, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg16)) + assert.NoError(t, err) + seg17 := &datapb.SegmentInfo{ + ID: 17, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 2048, + CompactionFrom: []int64{12, 15}, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg17)) + assert.NoError(t, err) + + vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}) + assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6}, vchan.FlushedSegmentIds) + assert.ElementsMatch(t, []int64{1, 2}, vchan.DroppedSegmentIds) + }) + } func TestShouldDropChannel(t *testing.T) { diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index e1d8155b7e..a323805a6d 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -704,7 +704,7 @@ func (p *updateSegmentPack) Get(segmentID int64) *SegmentInfo { } segment := p.meta.segments.GetSegment(segmentID) - if segment == nil || !isSegmentHealthy(segment) { + if segment == nil { log.Warn("meta update: get segment failed - segment not found", zap.Int64("segmentID", segmentID), zap.Bool("segment nil", segment == nil), @@ -791,6 +791,19 @@ func UpdateCompactedOperator(segmentID int64) UpdateOperator { } } +func UpdateSegmentVisible(segmentID int64) UpdateOperator { + return func(modPack *updateSegmentPack) bool { + segment := modPack.Get(segmentID) + if segment == nil { + log.Warn("meta update: update segment visible fail - segment not found", + zap.Int64("segmentID", segmentID)) + return false + } + segment.IsInvisible = false + return true + } +} + func UpdateSegmentLevelOperator(segmentID int64, level datapb.SegmentLevel) UpdateOperator { return func(modPack *updateSegmentPack) bool { segment := modPack.Get(segmentID) @@ -832,9 +845,13 @@ func RevertSegmentLevelOperator(segmentID int64) UpdateOperator { zap.Int64("segmentID", segmentID)) return false } - segment.Level = segment.LastLevel - log.Debug("revert segment level", zap.Int64("segmentID", segmentID), zap.String("LastLevel", segment.LastLevel.String())) - return true + // just for compatibility, + if segment.GetLevel() != segment.GetLastLevel() && segment.GetLastLevel() != datapb.SegmentLevel_Legacy { + segment.Level = segment.LastLevel + log.Debug("revert segment level", zap.Int64("segmentID", segmentID), zap.String("LastLevel", segment.LastLevel.String())) + return true + } + return false } } @@ -1417,14 +1434,9 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul } cloned := segment.Clone() - cloned.DroppedAt = uint64(time.Now().UnixNano()) - cloned.Compacted = true compactFromSegInfos = append(compactFromSegInfos, cloned) compactFromSegIDs = append(compactFromSegIDs, cloned.GetID()) - - // metrics mutation for compaction from segments - updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation) } for _, seg := range result.GetSegments() { @@ -1448,6 +1460,8 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul DmlPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition { return info.GetDmlPosition() })), + // visible after stats and index + IsInvisible: true, } segment := NewSegmentInfo(segmentInfo) compactToSegInfos = append(compactToSegInfos, segment) @@ -1458,10 +1472,6 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul log = log.With(zap.Int64s("compact from", compactFromSegIDs), zap.Int64s("compact to", compactToSegIDs)) log.Debug("meta update: prepare for meta mutation - complete") - compactFromInfos := lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *datapb.SegmentInfo { - return info.SegmentInfo - }) - compactToInfos := lo.Map(compactToSegInfos, func(info *SegmentInfo, _ int) *datapb.SegmentInfo { return info.SegmentInfo }) @@ -1470,18 +1480,11 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul for _, seg := range compactToInfos { binlogs = append(binlogs, metastore.BinlogsIncrement{Segment: seg}) } - // alter compactTo before compactFrom segments to avoid data lost if service crash during AlterSegments + // only add new segments if err := m.catalog.AlterSegments(m.ctx, compactToInfos, binlogs...); err != nil { log.Warn("fail to alter compactTo segments", zap.Error(err)) return nil, nil, err } - if err := m.catalog.AlterSegments(m.ctx, compactFromInfos); err != nil { - log.Warn("fail to alter compactFrom segments", zap.Error(err)) - return nil, nil, err - } - lo.ForEach(compactFromSegInfos, func(info *SegmentInfo, _ int) { - m.segments.SetSegment(info.GetID(), info) - }) lo.ForEach(compactToSegInfos, func(info *SegmentInfo, _ int) { m.segments.SetSegment(info.GetID(), info) }) @@ -1977,35 +1980,41 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation) segmentInfo := &datapb.SegmentInfo{ - ID: result.GetSegmentID(), - CollectionID: result.GetCollectionID(), - PartitionID: result.GetPartitionID(), - InsertChannel: result.GetChannel(), - NumOfRows: result.GetNumRows(), - State: commonpb.SegmentState_Flushed, - MaxRowNum: cloned.GetMaxRowNum(), - Binlogs: result.GetInsertLogs(), - Statslogs: result.GetStatsLogs(), - Bm25Statslogs: result.GetBm25Logs(), - TextStatsLogs: result.GetTextStatsLogs(), - CreatedByCompaction: true, - CompactionFrom: []int64{oldSegmentID}, - LastExpireTime: cloned.GetLastExpireTime(), - Level: datapb.SegmentLevel_L1, - StartPosition: cloned.GetStartPosition(), - DmlPosition: cloned.GetDmlPosition(), - IsSorted: true, - IsImporting: cloned.GetIsImporting(), + CollectionID: oldSegment.GetCollectionID(), + PartitionID: oldSegment.GetPartitionID(), + InsertChannel: oldSegment.GetInsertChannel(), + MaxRowNum: oldSegment.GetMaxRowNum(), + LastExpireTime: oldSegment.GetLastExpireTime(), + StartPosition: oldSegment.GetStartPosition(), + DmlPosition: oldSegment.GetDmlPosition(), + IsImporting: oldSegment.GetIsImporting(), + StorageVersion: oldSegment.GetStorageVersion(), + State: oldSegment.GetState(), + Level: oldSegment.GetLevel(), + LastLevel: oldSegment.GetLastLevel(), + PartitionStatsVersion: oldSegment.GetPartitionStatsVersion(), + LastPartitionStatsVersion: oldSegment.GetLastPartitionStatsVersion(), + IsInvisible: oldSegment.GetIsInvisible(), + ID: result.GetSegmentID(), + NumOfRows: result.GetNumRows(), + Binlogs: result.GetInsertLogs(), + Statslogs: result.GetStatsLogs(), + TextStatsLogs: result.GetTextStatsLogs(), + Bm25Statslogs: result.GetBm25Logs(), + Deltalogs: nil, + CreatedByCompaction: true, + CompactionFrom: []int64{oldSegmentID}, + IsSorted: true, } segment := NewSegmentInfo(segmentInfo) if segment.GetNumOfRows() > 0 { metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetIsSorted(), segment.GetNumOfRows()) } else { segment.State = commonpb.SegmentState_Dropped + segment.DroppedAt = uint64(time.Now().UnixNano()) } log.Info("meta update: prepare for complete stats mutation - complete", zap.Int64("num rows", result.GetNumRows())) - if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{cloned.SegmentInfo, segment.SegmentInfo}, metastore.BinlogsIncrement{Segment: segment.SegmentInfo}); err != nil { log.Warn("fail to alter segments and new segment", zap.Error(err)) return nil, err diff --git a/internal/datacoord/partition_stats_meta.go b/internal/datacoord/partition_stats_meta.go index f379afe67f..27009992ce 100644 --- a/internal/datacoord/partition_stats_meta.go +++ b/internal/datacoord/partition_stats_meta.go @@ -121,6 +121,8 @@ func (psm *partitionStatsMeta) SavePartitionStatsInfo(info *datapb.PartitionStat } psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].infos[info.GetVersion()] = info + // after v2.5.0, the current version will be updated when updating the partition stats info, so there’s no need to split it into two steps + psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].currentVersion = info.Version return nil } diff --git a/internal/datacoord/partition_stats_meta_test.go b/internal/datacoord/partition_stats_meta_test.go index 904f6b3d2c..2a124a6471 100644 --- a/internal/datacoord/partition_stats_meta_test.go +++ b/internal/datacoord/partition_stats_meta_test.go @@ -75,7 +75,7 @@ func (s *PartitionStatsMetaSuite) TestGetPartitionStats() { s.NotNil(ps) currentVersion := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-1") - s.Equal(emptyPartitionStatsVersion, currentVersion) + s.Equal(int64(100), currentVersion) currentVersion2 := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-2") s.Equal(emptyPartitionStatsVersion, currentVersion2) diff --git a/internal/datacoord/task_stats_test.go b/internal/datacoord/task_stats_test.go index 34dd0f0c62..992d174d64 100644 --- a/internal/datacoord/task_stats_test.go +++ b/internal/datacoord/task_stats_test.go @@ -67,6 +67,7 @@ func (s *statsTaskSuite) SetupSuite() { NumOfRows: 65535, State: commonpb.SegmentState_Flushed, MaxRowNum: 65535, + Level: datapb.SegmentLevel_L2, }, }, }, @@ -82,6 +83,7 @@ func (s *statsTaskSuite) SetupSuite() { NumOfRows: 65535, State: commonpb.SegmentState_Flushed, MaxRowNum: 65535, + Level: datapb.SegmentLevel_L2, }, }, }, @@ -97,6 +99,7 @@ func (s *statsTaskSuite) SetupSuite() { NumOfRows: 65535, State: commonpb.SegmentState_Flushed, MaxRowNum: 65535, + Level: datapb.SegmentLevel_L2, }, }, }, @@ -545,12 +548,14 @@ func (s *statsTaskSuite) TestTaskStats_PreCheck() { catalog := catalogmocks.NewDataCoordCatalog(s.T()) s.mt.catalog = catalog s.mt.statsTaskMeta.catalog = catalog + s.mt.segments.SetState(s.segID, commonpb.SegmentState_Flushed) catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything, mock.Anything).Return(nil) catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(nil) s.NoError(st.SetJobInfo(s.mt)) - s.NotNil(s.mt.GetHealthySegment(s.segID + 1)) + s.NotNil(s.mt.GetHealthySegment(s.targetID)) s.Equal(indexpb.JobState_JobStateFinished, s.mt.statsTaskMeta.tasks[s.taskID].GetState()) + s.Equal(datapb.SegmentLevel_L2, s.mt.GetHealthySegment(s.targetID).GetLevel()) }) }) } diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 4b84970de2..47a111d18b 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -357,6 +357,10 @@ message SegmentInfo { // textStatsLogs is used to record tokenization index for fields. map textStatsLogs = 26; repeated FieldBinlog bm25statslogs = 27; + + // This field is used to indicate that some intermediate state segments should not be loaded. + // For example, segments that have been clustered but haven't undergone stats yet. + bool is_invisible = 28; } message SegmentStartPosition {