enhance: Remove pre-marking segments as L2 during clustering compaction (#37653)

issue: #36686

master pr: #36799 

The core of this change is to **ensure that the many-to-many lineage
derivation logic is correct, making sure that both the parent and child
cannot simultaneously exist in the target segment view.**

feature:
  - Clustering compaction no longer marks the input segments as L2.
- Add a new field `is_invisible` to `segmentInfo`, and mark segments
that have completed clustering but have not yet built indexes as
`is_invisible` to prevent them from being loaded prematurely."
- Do not mark the input segment as `Dropped` before the clustering
compaction is completed.
- After compaction fails, only the result segment needs to be marked as
Dropped.

compatibility:
- If the upgraded task has not failed, there are no compatibility
issues.
- If the status after the upgrade is `MetaSaved`, then skip the stats
task based on whether TmpSegments is empty.
  - If the failure occurs before `MetaSaved`:
- there are no ResultSegments, and InputSegments have not been marked as
dropped yet.
    - the level of input segments need to revert to LastLevel
  - If the failure occurs after `MetaSaved`:
- ResultSegments have already been generated, and InputSegments have
been marked as Dropped. At this point, simply make the ResultSegments
visible.
- the level of ResultSegments needs to be set to L1(in order to
participate in mixCompaction)

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2024-12-04 10:48:40 +08:00 committed by GitHub
parent ab88d23ec0
commit 0fe4b2c7b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 468 additions and 139 deletions

View File

@ -193,16 +193,6 @@ func (t *clusteringCompactionTask) processPipelining() error {
log.Debug("wait for the node to be assigned before proceeding with the subsequent steps")
return nil
}
var operators []UpdateOperator
for _, segID := range t.InputSegments {
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L2))
}
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("fail to set segment level to L2", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo before compaction executing", err)
}
if typeutil.IsVectorType(t.GetClusteringKeyField().DataType) {
err := t.doAnalyze()
if err != nil {
@ -309,34 +299,49 @@ func (t *clusteringCompactionTask) processIndexing() error {
return nil
}
func (t *clusteringCompactionTask) markInputSegmentsDropped() error {
var operators []UpdateOperator
// mark
for _, segID := range t.GetInputSegments() {
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
}
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("markInputSegmentsDropped UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("markInputSegmentsDropped UpdateSegmentsInfo", err)
}
return nil
}
// indexed is the final state of a clustering compaction task
// one task should only run this once
func (t *clusteringCompactionTask) completeTask() error {
err := t.meta.GetPartitionStatsMeta().SavePartitionStatsInfo(&datapb.PartitionStatsInfo{
var err error
// update current partition stats version
// at this point, the segment view includes both the input segments and the result segments.
if err = t.meta.GetPartitionStatsMeta().SavePartitionStatsInfo(&datapb.PartitionStatsInfo{
CollectionID: t.GetCollectionID(),
PartitionID: t.GetPartitionID(),
VChannel: t.GetChannel(),
Version: t.GetPlanID(),
SegmentIDs: t.GetResultSegments(),
CommitTime: time.Now().Unix(),
})
if err != nil {
}); err != nil {
return merr.WrapErrClusteringCompactionMetaError("SavePartitionStatsInfo", err)
}
var operators []UpdateOperator
for _, segID := range t.GetResultSegments() {
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetPlanID()))
}
err = t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentPartitionStatsVersion", err)
}
err = t.meta.GetPartitionStatsMeta().SaveCurrentPartitionStatsVersion(t.GetCollectionID(), t.GetPartitionID(), t.GetChannel(), t.GetPlanID())
if err != nil {
return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPartitionStatsVersion", err)
}
// mark input segments as dropped
// now, the segment view only includes the result segments.
if err = t.markInputSegmentsDropped(); err != nil {
log.Warn("mark input segments as Dropped failed, skip it and wait retry",
zap.Int64("planID", t.GetPlanID()), zap.Error(err))
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
}
@ -376,25 +381,50 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
}); err != nil {
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
}
isInputDropped := false
for _, segID := range t.GetInputSegments() {
if t.meta.GetHealthySegment(segID) == nil {
isInputDropped = true
break
}
}
if isInputDropped {
log.Info("input segments dropped, doing for compatibility",
zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()))
// this task must be generated by v2.4, just for compatibility
// revert segments meta
var operators []UpdateOperator
// revert level of input segments
// L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1
// L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2
for _, segID := range t.GetInputSegments() {
operators = append(operators, RevertSegmentLevelOperator(segID))
}
// if result segments are generated but task fail in the other steps, mark them as L1 segments without partitions stats
for _, segID := range t.GetResultSegments() {
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0))
}
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
}
} else {
// after v2.4.16, mark the results segment as dropped
var operators []UpdateOperator
for _, segID := range t.GetResultSegments() {
// Don't worry about them being loaded; they are all invisible.
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
}
// revert segments meta
var operators []UpdateOperator
// revert level of input segments
// L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1
// L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2
for _, segID := range t.InputSegments {
operators = append(operators, RevertSegmentLevelOperator(segID))
}
// if result segments are generated but task fail in the other steps, mark them as L1 segments without partitions stats
for _, segID := range t.ResultSegments {
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0))
}
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
}
}
t.resetSegmentCompacting()
// drop partition stats if uploaded
@ -405,7 +435,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
Version: t.GetPlanID(),
SegmentIDs: t.GetResultSegments(),
}
err = t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
err := t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
if err != nil {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
}

View File

@ -112,7 +112,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
task.processPipelining()
seg11 := s.meta.GetSegment(101)
s.Equal(datapb.SegmentLevel_L2, seg11.Level)
s.Equal(datapb.SegmentLevel_L1, seg11.Level)
seg21 := s.meta.GetSegment(102)
s.Equal(datapb.SegmentLevel_L2, seg21.Level)
s.Equal(int64(10000), seg21.PartitionStatsVersion)
@ -147,11 +147,13 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
s.Equal(int64(10000), seg22.PartitionStatsVersion)
seg32 := s.meta.GetSegment(103)
s.Equal(datapb.SegmentLevel_L1, seg32.Level)
s.Equal(int64(0), seg32.PartitionStatsVersion)
s.Equal(datapb.SegmentLevel_L2, seg32.Level)
s.Equal(int64(10001), seg32.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Dropped, seg32.GetState())
seg42 := s.meta.GetSegment(104)
s.Equal(datapb.SegmentLevel_L1, seg42.Level)
s.Equal(int64(0), seg42.PartitionStatsVersion)
s.Equal(datapb.SegmentLevel_L2, seg42.Level)
s.Equal(int64(10001), seg42.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Dropped, seg42.GetState())
}
func (s *ClusteringCompactionTaskSuite) generateBasicTask(vectorClusteringKey bool) *clusteringCompactionTask {

View File

@ -119,10 +119,11 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
}
var (
flushedIDs = make(typeutil.UniqueSet)
droppedIDs = make(typeutil.UniqueSet)
growingIDs = make(typeutil.UniqueSet)
levelZeroIDs = make(typeutil.UniqueSet)
flushedIDs = make(typeutil.UniqueSet)
droppedIDs = make(typeutil.UniqueSet)
growingIDs = make(typeutil.UniqueSet)
levelZeroIDs = make(typeutil.UniqueSet)
newFlushedIDs = make(typeutil.UniqueSet)
)
// cannot use GetSegmentsByChannel since dropped segments are needed here
@ -132,7 +133,6 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
indexedSegments := FilterInIndexedSegments(h, h.s.meta, false, segments...)
indexed := typeutil.NewUniqueSet(lo.Map(indexedSegments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })...)
unIndexedIDs := make(typeutil.UniqueSet)
for _, s := range segments {
if s.GetStartPosition() == nil && s.GetDmlPosition() == nil {
continue
@ -142,36 +142,17 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
continue
}
currentPartitionStatsVersion := h.s.meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(channel.GetCollectionID(), s.GetPartitionID(), channel.GetName())
if s.GetLevel() == datapb.SegmentLevel_L2 && s.GetPartitionStatsVersion() != currentPartitionStatsVersion {
// in the process of L2 compaction, newly generated segment may be visible before the whole L2 compaction Plan
// is finished, we have to skip these fast-finished segment because all segments in one L2 Batch must be
// seen atomically, otherwise users will see intermediate result
continue
}
validSegmentInfos[s.GetID()] = s
switch {
case s.GetState() == commonpb.SegmentState_Dropped:
if s.GetLevel() == datapb.SegmentLevel_L2 && s.GetPartitionStatsVersion() == currentPartitionStatsVersion {
// if segment.partStatsVersion is equal to currentPartitionStatsVersion,
// it must have been indexed, this is guaranteed by clustering compaction process
// this is to ensure that the current valid L2 compaction produce is available to search/query
// to avoid insufficient data
flushedIDs.Insert(s.GetID())
continue
}
droppedIDs.Insert(s.GetID())
case !isFlushState(s.GetState()):
growingIDs.Insert(s.GetID())
case s.GetLevel() == datapb.SegmentLevel_L0:
levelZeroIDs.Insert(s.GetID())
case indexed.Contain(s.GetID()) || s.GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64():
// fill in indexed segments into flushed directly
flushedIDs.Insert(s.GetID())
default:
// unIndexed segments will be checked if it's parents are all indexed
unIndexedIDs.Insert(s.GetID())
flushedIDs.Insert(s.GetID())
}
}
@ -203,36 +184,55 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
return true
}
retrieveUnIndexed := func() bool {
var compactionFromExist func(segID UniqueID) bool
compactionFromExist = func(segID UniqueID) bool {
compactionFrom := validSegmentInfos[segID].GetCompactionFrom()
if len(compactionFrom) == 0 || !isValid(compactionFrom...) {
return false
}
for _, fromID := range compactionFrom {
if flushedIDs.Contain(fromID) || newFlushedIDs.Contain(fromID) {
return true
}
if compactionFromExist(fromID) {
return true
}
}
return false
}
segmentIndexed := func(segID UniqueID) bool {
return indexed.Contain(segID) || validSegmentInfos[segID].GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64()
}
retrieve := func() bool {
continueRetrieve := false
for id := range unIndexedIDs {
for id := range flushedIDs {
compactionFrom := validSegmentInfos[id].GetCompactionFrom()
compactTos := []UniqueID{} // neighbors and itself
if len(compactionFrom) > 0 && isValid(compactionFrom...) {
if len(compactionFrom) == 0 || !isValid(compactionFrom...) {
newFlushedIDs.Insert(id)
continue
}
if segmentIndexed(id) && !compactionFromExist(id) {
newFlushedIDs.Insert(id)
} else {
for _, fromID := range compactionFrom {
if len(compactTos) == 0 {
compactToInfo, _ := h.s.meta.GetCompactionTo(fromID)
compactTos = lo.Map(compactToInfo, func(s *SegmentInfo, _ int) UniqueID { return s.GetID() })
}
if indexed.Contain(fromID) {
flushedIDs.Insert(fromID)
} else {
unIndexedIDs.Insert(fromID)
continueRetrieve = true
}
newFlushedIDs.Insert(fromID)
continueRetrieve = true
droppedIDs.Remove(fromID)
}
unIndexedIDs.Remove(compactTos...)
flushedIDs.Remove(compactTos...)
droppedIDs.Remove(compactionFrom...)
}
}
return continueRetrieve
}
for retrieveUnIndexed() {
for retrieve() {
flushedIDs = newFlushedIDs
newFlushedIDs = make(typeutil.UniqueSet)
}
// unindexed is flushed segments as well
flushedIDs.Insert(unIndexedIDs.Collect()...)
flushedIDs = newFlushedIDs
log.Info("GetQueryVChanPositions",
zap.Int64("collectionID", channel.GetCollectionID()),

View File

@ -560,6 +560,312 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds))
assert.ElementsMatch(t, []int64{e.GetID()}, vchan.FlushedSegmentIds) // expected e
})
t.Run("complex derivation", func(t *testing.T) {
// numbers indicate segmentID, letters indicate segment information
// i: indexed, u: unindexed, g: gced
// 1i, 2i, 3g 4i, 5i, 6i
// | | | | | |
// \ | / \ | /
// \ | / \ | /
// 7u, [8i,9i,10i] [11u, 12i]
// | | | | | |
// \ | / \ / |
// \ | / \ / |
// [13u] [14i, 15u] 12i
// | | | |
// \ / \ /
// \ / \ /
// [16u] [17u]
// all leaf nodes are [1,2,3,4,5,6,7], but because segment3 has been gced, the leaf node becomes [7,8,9,10,4,5,6]
// should be returned: flushed: [7, 8, 9, 10, 4, 5, 6]
svr := newTestServer(t)
defer closeTestServer(t, svr)
schema := newTestSchema()
svr.meta.AddCollection(&collectionInfo{
ID: 0,
Schema: schema,
})
err := svr.meta.indexMeta.CreateIndex(&model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
assert.NoError(t, err)
seg1 := &datapb.SegmentInfo{
ID: 1,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg1))
assert.NoError(t, err)
seg2 := &datapb.SegmentInfo{
ID: 2,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2))
assert.NoError(t, err)
// seg3 was GCed
seg4 := &datapb.SegmentInfo{
ID: 4,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg4))
assert.NoError(t, err)
seg5 := &datapb.SegmentInfo{
ID: 5,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5))
assert.NoError(t, err)
seg6 := &datapb.SegmentInfo{
ID: 6,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg6))
assert.NoError(t, err)
seg7 := &datapb.SegmentInfo{
ID: 7,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg7))
assert.NoError(t, err)
seg8 := &datapb.SegmentInfo{
ID: 8,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg8))
assert.NoError(t, err)
seg9 := &datapb.SegmentInfo{
ID: 9,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg9))
assert.NoError(t, err)
seg10 := &datapb.SegmentInfo{
ID: 10,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg10))
assert.NoError(t, err)
seg11 := &datapb.SegmentInfo{
ID: 11,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{4, 5, 6},
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg11))
assert.NoError(t, err)
seg12 := &datapb.SegmentInfo{
ID: 12,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{4, 5, 6},
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg12))
assert.NoError(t, err)
seg13 := &datapb.SegmentInfo{
ID: 13,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2047,
CompactionFrom: []int64{7, 8, 9},
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg13))
assert.NoError(t, err)
seg14 := &datapb.SegmentInfo{
ID: 14,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{10, 11},
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg14))
assert.NoError(t, err)
seg15 := &datapb.SegmentInfo{
ID: 15,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{10, 11},
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg15))
assert.NoError(t, err)
seg16 := &datapb.SegmentInfo{
ID: 16,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{13, 14},
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg16))
assert.NoError(t, err)
seg17 := &datapb.SegmentInfo{
ID: 17,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{12, 15},
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg17))
assert.NoError(t, err)
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0})
assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6}, vchan.FlushedSegmentIds)
assert.ElementsMatch(t, []int64{1, 2}, vchan.DroppedSegmentIds)
})
}
func TestShouldDropChannel(t *testing.T) {

View File

@ -710,7 +710,7 @@ func (p *updateSegmentPack) Get(segmentID int64) *SegmentInfo {
}
segment := p.meta.segments.GetSegment(segmentID)
if segment == nil || !isSegmentHealthy(segment) {
if segment == nil {
log.Warn("meta update: get segment failed - segment not found",
zap.Int64("segmentID", segmentID),
zap.Bool("segment nil", segment == nil),
@ -835,23 +835,13 @@ func RevertSegmentLevelOperator(segmentID int64) UpdateOperator {
zap.Int64("segmentID", segmentID))
return false
}
segment.Level = segment.LastLevel
log.Debug("revert segment level", zap.Int64("segmentID", segmentID), zap.String("LastLevel", segment.LastLevel.String()))
return true
}
}
func RevertSegmentPartitionStatsVersionOperator(segmentID int64) UpdateOperator {
return func(modPack *updateSegmentPack) bool {
segment := modPack.Get(segmentID)
if segment == nil {
log.Warn("meta update: revert level fail - segment not found",
zap.Int64("segmentID", segmentID))
return false
// just for compatibility,
if segment.GetLevel() != segment.GetLastLevel() && segment.GetLastLevel() != datapb.SegmentLevel_Legacy {
segment.Level = segment.LastLevel
log.Debug("revert segment level", zap.Int64("segmentID", segmentID), zap.String("LastLevel", segment.LastLevel.String()))
return true
}
segment.PartitionStatsVersion = segment.LastPartitionStatsVersion
log.Debug("revert segment partition stats version", zap.Int64("segmentID", segmentID), zap.Int64("LastPartitionStatsVersion", segment.LastPartitionStatsVersion))
return true
return false
}
}
@ -1413,8 +1403,14 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)}
compactFromSegIDs := make([]int64, 0)
compactToSegIDs := make([]int64, 0)
compactFromSegInfos := make([]*SegmentInfo, 0)
compactToSegInfos := make([]*SegmentInfo, 0)
var (
collectionID int64
partitionID int64
maxRowNum int64
startPosition *msgpb.MsgPosition
dmlPosition *msgpb.MsgPosition
)
for _, segmentID := range t.GetInputSegments() {
segment := m.segments.GetSegment(segmentID)
@ -1422,38 +1418,35 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
return nil, nil, merr.WrapErrSegmentNotFound(segmentID)
}
cloned := segment.Clone()
cloned.DroppedAt = uint64(time.Now().UnixNano())
cloned.Compacted = true
collectionID = segment.GetCollectionID()
partitionID = segment.GetPartitionID()
maxRowNum = segment.GetMaxRowNum()
if startPosition == nil || segment.GetStartPosition().GetTimestamp() < startPosition.GetTimestamp() {
startPosition = segment.GetStartPosition()
}
compactFromSegInfos = append(compactFromSegInfos, cloned)
compactFromSegIDs = append(compactFromSegIDs, cloned.GetID())
// metrics mutation for compaction from segments
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
if dmlPosition == nil || segment.GetDmlPosition().GetTimestamp() < dmlPosition.GetTimestamp() {
dmlPosition = segment.GetDmlPosition()
}
}
for _, seg := range result.GetSegments() {
segmentInfo := &datapb.SegmentInfo{
ID: seg.GetSegmentID(),
CollectionID: compactFromSegInfos[0].CollectionID,
PartitionID: compactFromSegInfos[0].PartitionID,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: t.GetChannel(),
NumOfRows: seg.NumOfRows,
State: commonpb.SegmentState_Flushed,
MaxRowNum: compactFromSegInfos[0].MaxRowNum,
MaxRowNum: maxRowNum,
Binlogs: seg.GetInsertLogs(),
Statslogs: seg.GetField2StatslogPaths(),
CreatedByCompaction: true,
CompactionFrom: compactFromSegIDs,
LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(t.GetStartTime(), 0), 0),
Level: datapb.SegmentLevel_L2,
StartPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetStartPosition()
})),
DmlPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetDmlPosition()
})),
StartPosition: startPosition,
DmlPosition: dmlPosition,
}
segment := NewSegmentInfo(segmentInfo)
compactToSegInfos = append(compactToSegInfos, segment)
@ -1464,10 +1457,6 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
log = log.With(zap.Int64s("compact from", compactFromSegIDs), zap.Int64s("compact to", compactToSegIDs))
log.Debug("meta update: prepare for meta mutation - complete")
compactFromInfos := lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
return info.SegmentInfo
})
compactToInfos := lo.Map(compactToSegInfos, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
return info.SegmentInfo
})
@ -1476,18 +1465,11 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
for _, seg := range compactToInfos {
binlogs = append(binlogs, metastore.BinlogsIncrement{Segment: seg})
}
// alter compactTo before compactFrom segments to avoid data lost if service crash during AlterSegments
// only add new segments
if err := m.catalog.AlterSegments(m.ctx, compactToInfos, binlogs...); err != nil {
log.Warn("fail to alter compactTo segments", zap.Error(err))
return nil, nil, err
}
if err := m.catalog.AlterSegments(m.ctx, compactFromInfos); err != nil {
log.Warn("fail to alter compactFrom segments", zap.Error(err))
return nil, nil, err
}
lo.ForEach(compactFromSegInfos, func(info *SegmentInfo, _ int) {
m.segments.SetSegment(info.GetID(), info)
})
lo.ForEach(compactToSegInfos, func(info *SegmentInfo, _ int) {
m.segments.SetSegment(info.GetID(), info)
})

View File

@ -42,6 +42,7 @@ func (s *PartitionStatsMetaSuite) SetupTest() {
catalog := mocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().SavePartitionStatsInfo(mock.Anything, mock.Anything).Return(nil).Maybe()
catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil).Maybe()
catalog.EXPECT().SaveCurrentPartitionStatsVersion(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
s.catalog = catalog
}
@ -74,8 +75,11 @@ func (s *PartitionStatsMetaSuite) TestGetPartitionStats() {
ps := partitionStatsMeta.GetPartitionStats(1, 2, "ch-1", 100)
s.NotNil(ps)
err = partitionStatsMeta.SaveCurrentPartitionStatsVersion(1, 2, "ch-1", 100)
s.NoError(err)
currentVersion := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-1")
s.Equal(emptyPartitionStatsVersion, currentVersion)
s.Equal(int64(100), currentVersion)
currentVersion2 := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-2")
s.Equal(emptyPartitionStatsVersion, currentVersion2)

View File

@ -23,6 +23,7 @@ import (
"strconv"
"strings"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
@ -42,6 +43,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -891,6 +893,9 @@ func (kc *Catalog) GetCurrentPartitionStatsVersion(ctx context.Context, collID,
key := buildCurrentPartitionStatsVersionPath(collID, partID, vChannel)
valueStr, err := kc.MetaKv.Load(key)
if err != nil {
if errors.Is(err, merr.ErrIoKeyNotFound) {
return 0, nil
}
return 0, err
}