enhance: Fail compaction task to prevent data loss (#43545)

We’ve frequently observed data loss caused by broken mutual exclusion in
compaction tasks. This PR introduces a post-check: before modifying
metadata upon compaction task completion, it verifies the state of the
input segments. If any input segment has been dropped, the compaction
task will be marked as failed.

issue: https://github.com/milvus-io/milvus/issues/43513

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-07-25 16:24:54 +08:00 committed by GitHub
parent 078ccf5e08
commit 0e1f367164
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 110 additions and 0 deletions

View File

@ -294,6 +294,13 @@ func (c *compactionInspector) schedule() []CompactionTask {
c.executingGuard.Lock()
c.executingTasks[t.GetTaskProto().GetPlanID()] = t
c.scheduler.Enqueue(t)
log.Info("compaction task enqueued",
zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.String("type", t.GetTaskProto().GetType().String()),
zap.String("channel", t.GetTaskProto().GetChannel()),
zap.String("label", t.GetLabel()),
zap.Int64s("inputSegments", t.GetTaskProto().GetInputSegments()),
)
c.executingGuard.Unlock()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Executing).Inc()
@ -630,6 +637,16 @@ func (c *compactionInspector) checkCompaction() error {
c.executingGuard.Lock()
for _, t := range finishedTasks {
delete(c.executingTasks, t.GetTaskProto().GetPlanID())
log.Info("compaction task finished",
zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.String("type", t.GetTaskProto().GetType().String()),
zap.String("state", t.GetTaskProto().GetState().String()),
zap.String("channel", t.GetTaskProto().GetChannel()),
zap.String("label", t.GetLabel()),
zap.Int64("nodeID", t.GetTaskProto().GetNodeID()),
zap.Int64s("inputSegments", t.GetTaskProto().GetInputSegments()),
zap.String("reason", t.GetTaskProto().GetFailReason()),
)
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Done).Inc()
}

View File

@ -600,6 +600,7 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
// s.mockSessMgr.EXPECT().SyncSegments(int64(111), mock.Anything).Return(nil)
// s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything).Return(nil)
s.mockMeta.EXPECT().ValidateSegmentStateBeforeCompleteCompactionMutation(mock.Anything).Return(nil)
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
if t.GetPlanID() == 2 {
@ -693,6 +694,7 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() {
// s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, mock.Anything).Return().Twice()
segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100})
s.mockMeta.EXPECT().ValidateSegmentStateBeforeCompleteCompactionMutation(mock.Anything).Return(nil)
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything, mock.Anything).Return(
[]*SegmentInfo{segment},
&segMetricMutation{}, nil).Once()
@ -887,6 +889,7 @@ func (s *CompactionPlanHandlerSuite) TestCleanClusteringCompactionCommitFail() {
},
},
}, nil).Once()
s.mockMeta.EXPECT().ValidateSegmentStateBeforeCompleteCompactionMutation(mock.Anything).Return(nil)
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, errors.New("mock error"))
s.handler.submitTask(task)

View File

@ -178,6 +178,12 @@ func (t *clusteringCompactionTask) QueryTaskOnWorker(cluster session.Cluster) {
return segment.GetSegmentID()
})
err = t.meta.ValidateSegmentStateBeforeCompleteCompactionMutation(t.GetTaskProto())
if err != nil {
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
return
}
var metricMutation *segMetricMutation
_, metricMutation, err = t.meta.CompleteCompactionMutation(context.TODO(), t.GetTaskProto(), t.result)
if err != nil {

View File

@ -134,6 +134,12 @@ func (t *l0CompactionTask) QueryTaskOnWorker(cluster session.Cluster) {
}
switch result.GetState() {
case datapb.CompactionTaskState_completed:
err = t.meta.ValidateSegmentStateBeforeCompleteCompactionMutation(t.GetTaskProto())
if err != nil {
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
return
}
if err = t.saveSegmentMeta(result); err != nil {
log.Warn("l0CompactionTask failed to save segment meta", zap.Error(err))
return

View File

@ -350,6 +350,7 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
State: datapb.CompactionTaskState_completed,
}, nil).Once()
s.mockMeta.EXPECT().ValidateSegmentStateBeforeCompleteCompactionMutation(mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(nil).Times(2)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, false).Return().Once()
@ -370,6 +371,7 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
State: datapb.CompactionTaskState_completed,
}, nil).Once()
s.mockMeta.EXPECT().ValidateSegmentStateBeforeCompleteCompactionMutation(mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(errors.New("mock error")).Once()
@ -389,6 +391,7 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
State: datapb.CompactionTaskState_completed,
}, nil).Once()
s.mockMeta.EXPECT().ValidateSegmentStateBeforeCompleteCompactionMutation(mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()

View File

@ -144,6 +144,11 @@ func (t *mixCompactionTask) QueryTaskOnWorker(cluster session.Cluster) {
}
return
}
err = t.meta.ValidateSegmentStateBeforeCompleteCompactionMutation(t.GetTaskProto())
if err != nil {
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
return
}
if err := t.saveSegmentMeta(result); err != nil {
log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err))
if errors.Is(err, merr.ErrIllegalCompactionPlan) {

View File

@ -65,6 +65,7 @@ type CompactionMeta interface {
SetSegmentsCompacting(ctx context.Context, segmentID []int64, compacting bool)
CheckAndSetSegmentsCompacting(ctx context.Context, segmentIDs []int64) (bool, bool)
CompleteCompactionMutation(ctx context.Context, t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error)
ValidateSegmentStateBeforeCompleteCompactionMutation(t *datapb.CompactionTask) error
CleanPartitionStatsInfo(ctx context.Context, info *datapb.PartitionStatsInfo) error
SaveCompactionTask(ctx context.Context, task *datapb.CompactionTask) error
@ -1740,6 +1741,29 @@ func (m *meta) completeMixCompactionMutation(
return compactToSegments, metricMutation, nil
}
func (m *meta) ValidateSegmentStateBeforeCompleteCompactionMutation(t *datapb.CompactionTask) error {
m.segMu.RLock()
defer m.segMu.RUnlock()
for _, segmentID := range t.GetInputSegments() {
segment := m.segments.GetSegment(segmentID)
if !isSegmentHealthy(segment) {
// SHOULD NOT HAPPEN: input segment was dropped.
// This indicates that compaction tasks, which should be mutually exclusive,
// may have executed concurrently.
log.Warn("should not happen! input segment was dropped",
zap.Int64("planID", t.GetPlanID()),
zap.String("type", t.GetType().String()),
zap.String("channel", t.GetChannel()),
zap.Int64("partitionID", t.GetPartitionID()),
zap.Int64("segmentID", segmentID),
)
return merr.WrapErrSegmentNotFound(segmentID, "input segment was dropped")
}
}
return nil
}
func (m *meta) CompleteCompactionMutation(ctx context.Context, t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
m.segMu.Lock()
defer m.segMu.Unlock()

View File

@ -879,6 +879,52 @@ func (_c *MockCompactionMeta_UpdateSegmentsInfo_Call) RunAndReturn(run func(cont
return _c
}
// ValidateSegmentStateBeforeCompleteCompactionMutation provides a mock function with given fields: t
func (_m *MockCompactionMeta) ValidateSegmentStateBeforeCompleteCompactionMutation(t *datapb.CompactionTask) error {
ret := _m.Called(t)
if len(ret) == 0 {
panic("no return value specified for ValidateSegmentStateBeforeCompleteCompactionMutation")
}
var r0 error
if rf, ok := ret.Get(0).(func(*datapb.CompactionTask) error); ok {
r0 = rf(t)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockCompactionMeta_ValidateSegmentStateBeforeCompleteCompactionMutation_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ValidateSegmentStateBeforeCompleteCompactionMutation'
type MockCompactionMeta_ValidateSegmentStateBeforeCompleteCompactionMutation_Call struct {
*mock.Call
}
// ValidateSegmentStateBeforeCompleteCompactionMutation is a helper method to define mock.On call
// - t *datapb.CompactionTask
func (_e *MockCompactionMeta_Expecter) ValidateSegmentStateBeforeCompleteCompactionMutation(t interface{}) *MockCompactionMeta_ValidateSegmentStateBeforeCompleteCompactionMutation_Call {
return &MockCompactionMeta_ValidateSegmentStateBeforeCompleteCompactionMutation_Call{Call: _e.mock.On("ValidateSegmentStateBeforeCompleteCompactionMutation", t)}
}
func (_c *MockCompactionMeta_ValidateSegmentStateBeforeCompleteCompactionMutation_Call) Run(run func(t *datapb.CompactionTask)) *MockCompactionMeta_ValidateSegmentStateBeforeCompleteCompactionMutation_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*datapb.CompactionTask))
})
return _c
}
func (_c *MockCompactionMeta_ValidateSegmentStateBeforeCompleteCompactionMutation_Call) Return(_a0 error) *MockCompactionMeta_ValidateSegmentStateBeforeCompleteCompactionMutation_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockCompactionMeta_ValidateSegmentStateBeforeCompleteCompactionMutation_Call) RunAndReturn(run func(*datapb.CompactionTask) error) *MockCompactionMeta_ValidateSegmentStateBeforeCompleteCompactionMutation_Call {
_c.Call.Return(run)
return _c
}
// NewMockCompactionMeta creates a new instance of MockCompactionMeta. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockCompactionMeta(t interface {