enhance: clean compaction task in compactionHandler (#38170)

issue: #35711

---------

Signed-off-by: wayblink <anyang.wang@zilliz.com>
Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
Co-authored-by: wayblink <anyang.wang@zilliz.com>
This commit is contained in:
cai.zhang 2024-12-19 12:36:47 +08:00 committed by GitHub
parent 01cfb1fd97
commit 306e5e6898
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 549 additions and 147 deletions

View File

@ -84,6 +84,9 @@ type compactionPlanHandler struct {
executingGuard lock.RWMutex
executingTasks map[int64]CompactionTask // planID -> task
cleaningGuard lock.RWMutex
cleaningTasks map[int64]CompactionTask // planID -> task
meta CompactionMeta
allocator allocator.Allocator
sessions session.DataNodeManager
@ -193,6 +196,7 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager,
stopCh: make(chan struct{}),
cluster: cluster,
executingTasks: make(map[int64]CompactionTask),
cleaningTasks: make(map[int64]CompactionTask),
analyzeScheduler: analyzeScheduler,
handler: handler,
}
@ -416,6 +420,7 @@ func (c *compactionPlanHandler) loopCheck() {
if err != nil {
log.Info("fail to update compaction", zap.Error(err))
}
c.cleanFailedTasks()
}
}
}
@ -447,7 +452,7 @@ func (c *compactionPlanHandler) cleanCompactionTaskMeta() {
triggers := c.meta.GetCompactionTasks(context.TODO())
for _, tasks := range triggers {
for _, task := range tasks {
if task.State == datapb.CompactionTaskState_completed || task.State == datapb.CompactionTaskState_cleaned {
if task.State == datapb.CompactionTaskState_cleaned {
duration := time.Since(time.Unix(task.StartTime, 0)).Seconds()
if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds()) {
// try best to delete meta
@ -668,6 +673,11 @@ func assignNodeID(slots map[int64]int64, t CompactionTask) int64 {
return nodeID
}
// checkCompaction retrieves executing tasks and calls each task's Process() method
// to evaluate its state and progress through the state machine.
// Completed tasks are removed from executingTasks.
// Tasks that fail or timeout are moved from executingTasks to cleaningTasks,
// where task-specific clean logic is performed asynchronously.
func (c *compactionPlanHandler) checkCompaction() error {
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
// for DC might add new task while GetCompactionState.
@ -709,9 +719,44 @@ func (c *compactionPlanHandler) checkCompaction() error {
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Done).Inc()
}
c.executingGuard.Unlock()
// insert task need to clean
c.cleaningGuard.Lock()
for _, t := range finishedTasks {
if t.GetTaskProto().GetState() == datapb.CompactionTaskState_failed ||
t.GetTaskProto().GetState() == datapb.CompactionTaskState_timeout ||
t.GetTaskProto().GetState() == datapb.CompactionTaskState_completed {
log.Ctx(context.TODO()).Info("task need to clean",
zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()),
zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.String("state", t.GetTaskProto().GetState().String()))
c.cleaningTasks[t.GetTaskProto().GetPlanID()] = t
}
}
c.cleaningGuard.Unlock()
return nil
}
// cleanFailedTasks performs task define Clean logic
// while compactionPlanHandler.Clean is to do garbage collection for cleaned tasks
func (c *compactionPlanHandler) cleanFailedTasks() {
c.cleaningGuard.RLock()
cleanedTasks := make([]CompactionTask, 0)
for _, t := range c.cleaningTasks {
clean := t.Clean()
if clean {
cleanedTasks = append(cleanedTasks, t)
}
}
c.cleaningGuard.RUnlock()
c.cleaningGuard.Lock()
for _, t := range cleanedTasks {
delete(c.cleaningTasks, t.GetTaskProto().GetPlanID())
}
c.cleaningGuard.Unlock()
}
func pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) {
nodeID = NullNodeID
var maxSlots int64 = -1

View File

@ -23,7 +23,17 @@ import (
)
type CompactionTask interface {
// Process performs the task's state machine
//
// Returns:
// - <bool>: whether the task state machine ends.
//
// Notes:
//
// `end` doesn't mean the task completed, its state may be completed or failed or timeout.
Process() bool
// Clean performs clean logic for a fail/timeout task
Clean() bool
BuildCompactionRequest() (*datapb.CompactionPlan, error)
GetSlotUsage() int64
GetLabel() string

View File

@ -85,10 +85,13 @@ func newClusteringCompactionTask(t *datapb.CompactionTask, allocator allocator.A
return task
}
// Note: return True means exit this state machine.
// ONLY return True for Completed, Failed or Timeout
func (t *clusteringCompactionTask) Process() bool {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
ctx := context.TODO()
log := log.Ctx(ctx).With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
lastState := t.GetTaskProto().GetState().String()
err := t.retryableProcess()
err := t.retryableProcess(ctx)
if err != nil {
log.Warn("fail in process task", zap.Error(err))
if merr.IsRetryableErr(err) && t.GetTaskProto().RetryTimes < t.maxRetryTimes {
@ -125,19 +128,26 @@ func (t *clusteringCompactionTask) Process() bool {
if err != nil {
log.Warn("Failed to updateAndSaveTaskMeta", zap.Error(err))
}
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration))
}
log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState))
return t.GetTaskProto().State == datapb.CompactionTaskState_completed || t.GetTaskProto().State == datapb.CompactionTaskState_cleaned
return t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout
}
// retryableProcess process task's state transfer, return error if not work as expected
// the outer Process will set state and retry times according to the error type(retryable or not-retryable)
func (t *clusteringCompactionTask) retryableProcess() error {
if t.GetTaskProto().State == datapb.CompactionTaskState_completed || t.GetTaskProto().State == datapb.CompactionTaskState_cleaned {
func (t *clusteringCompactionTask) retryableProcess(ctx context.Context) error {
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
return nil
}
coll, err := t.handler.GetCollection(context.Background(), t.GetTaskProto().GetCollectionID())
coll, err := t.handler.GetCollection(ctx, t.GetTaskProto().GetCollectionID())
if err != nil {
// retryable
log.Warn("fail to get collection", zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()), zap.Error(err))
@ -162,15 +172,15 @@ func (t *clusteringCompactionTask) retryableProcess() error {
return t.processIndexing()
case datapb.CompactionTaskState_statistic:
return t.processStats()
case datapb.CompactionTaskState_timeout:
return t.processFailedOrTimeout()
case datapb.CompactionTaskState_failed:
return t.processFailedOrTimeout()
}
return nil
}
func (t *clusteringCompactionTask) Clean() bool {
log.Ctx(context.TODO()).Info("clean task", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String()))
return t.doClean() == nil
}
func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
beginLogID, _, err := t.allocator.AllocN(1)
if err != nil {
@ -219,7 +229,9 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
}
func (t *clusteringCompactionTask) processPipelining() error {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().TriggerID), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()), zap.Int64("planID", t.GetTaskProto().GetPlanID()))
log := log.Ctx(context.TODO()).With(zap.Int64("triggerID", t.GetTaskProto().TriggerID),
zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()),
zap.Int64("planID", t.GetTaskProto().GetPlanID()))
if t.NeedReAssignNodeID() {
log.Debug("wait for the node to be assigned before proceeding with the subsequent steps")
return nil
@ -244,7 +256,7 @@ func (t *clusteringCompactionTask) processPipelining() error {
}
func (t *clusteringCompactionTask) processExecuting() error {
log := log.With(zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String()))
log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String()))
result, err := t.sessions.GetCompactionPlanResult(t.GetTaskProto().GetNodeID(), t.GetTaskProto().GetPlanID())
if err != nil || result == nil {
log.Warn("processExecuting clustering compaction", zap.Error(err))
@ -282,12 +294,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
return t.processMetaSaved()
case datapb.CompactionTaskState_executing:
if t.checkTimeout() {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
if err == nil {
return t.processFailedOrTimeout()
} else {
return err
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
}
return nil
case datapb.CompactionTaskState_failed:
@ -299,21 +306,23 @@ func (t *clusteringCompactionTask) processExecuting() error {
}
func (t *clusteringCompactionTask) processMetaSaved() error {
log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetTaskProto().GetPlanID()))
if err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetTaskProto().GetPlanID(),
}); err != nil {
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Error(err))
}
// to ensure compatibility, if a task upgraded from version 2.4 has a status of MetaSave,
// its TmpSegments will be empty, so skip the stats task, to build index.
if len(t.GetTaskProto().GetTmpSegments()) == 0 {
log.Info("tmp segments is nil, skip stats task", zap.Int64("planID", t.GetTaskProto().GetPlanID()))
log.Info("tmp segments is nil, skip stats task")
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing))
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_statistic))
}
func (t *clusteringCompactionTask) processStats() error {
log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetTaskProto().GetPlanID()))
// just the memory step, if it crashes at this step, the state after recovery is CompactionTaskState_statistic.
resultSegments := make([]int64, 0, len(t.GetTaskProto().GetTmpSegments()))
if Params.DataCoordCfg.EnableStatsTask.GetAsBool() {
@ -338,15 +347,15 @@ func (t *clusteringCompactionTask) processStats() error {
}
if err := t.regeneratePartitionStats(tmpToResultSegments); err != nil {
log.Warn("regenerate partition stats failed, wait for retry", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
log.Warn("regenerate partition stats failed, wait for retry", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("regeneratePartitionStats", err)
}
} else {
log.Info("stats task is not enable, set tmp segments to result segments", zap.Int64("planID", t.GetTaskProto().GetPlanID()))
log.Info("stats task is not enable, set tmp segments to result segments")
resultSegments = t.GetTaskProto().GetTmpSegments()
}
log.Info("clustering compaction stats task finished", zap.Int64("planID", t.GetTaskProto().GetPlanID()),
log.Info("clustering compaction stats task finished",
zap.Int64s("tmp segments", t.GetTaskProto().GetTmpSegments()),
zap.Int64s("result segments", resultSegments))
@ -405,7 +414,7 @@ func (t *clusteringCompactionTask) regeneratePartitionStats(tmpToResultSegments
}
func (t *clusteringCompactionTask) processIndexing() error {
log := log.Ctx(context.TODO())
log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetTaskProto().GetPlanID()))
// wait for segment indexed
collectionIndexes := t.meta.GetIndexMeta().GetIndexesForCollection(t.GetTaskProto().GetCollectionID(), "")
if len(collectionIndexes) == 0 {
@ -424,7 +433,8 @@ func (t *clusteringCompactionTask) processIndexing() error {
}
return true
}()
log.Debug("check compaction result segments index states", zap.Bool("indexed", indexed), zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Int64s("segments", t.GetTaskProto().ResultSegments))
log.Debug("check compaction result segments index states",
zap.Bool("indexed", indexed), zap.Int64s("segments", t.GetTaskProto().ResultSegments))
if indexed {
return t.completeTask()
}
@ -463,6 +473,7 @@ func (t *clusteringCompactionTask) markInputSegmentsDropped() error {
// indexed is the final state of a clustering compaction task
// one task should only run this once
func (t *clusteringCompactionTask) completeTask() error {
log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetTaskProto().GetPlanID()))
var err error
// first mark result segments visible
if err = t.markResultSegmentsVisible(); err != nil {
@ -482,29 +493,34 @@ func (t *clusteringCompactionTask) completeTask() error {
return merr.WrapErrClusteringCompactionMetaError("SavePartitionStatsInfo", err)
}
// mark input segments as dropped
// now, the segment view only includes the result segments.
if err = t.markInputSegmentsDropped(); err != nil {
return err
}
err = t.meta.GetPartitionStatsMeta().SaveCurrentPartitionStatsVersion(t.GetTaskProto().GetCollectionID(),
t.GetTaskProto().GetPartitionID(), t.GetTaskProto().GetChannel(), t.GetTaskProto().GetPlanID())
if err != nil {
return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPartitionStatsVersion", err)
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
if err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)); err != nil {
log.Warn("completeTask update task state to completed failed", zap.Error(err))
return err
}
// mark input segments as dropped
// now, the segment view only includes the result segments.
if err = t.markInputSegmentsDropped(); err != nil {
log.Warn("mark input segments as Dropped failed, skip it and wait retry")
}
return nil
}
func (t *clusteringCompactionTask) processAnalyzing() error {
log := log.Ctx(context.TODO())
log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetTaskProto().GetPlanID()))
analyzeTask := t.meta.GetAnalyzeMeta().GetTask(t.GetTaskProto().GetAnalyzeTaskID())
if analyzeTask == nil {
log.Warn("analyzeTask not found", zap.Int64("id", t.GetTaskProto().GetAnalyzeTaskID()))
return merr.WrapErrAnalyzeTaskNotFound(t.GetTaskProto().GetAnalyzeTaskID()) // retryable
}
log.Info("check analyze task state", zap.Int64("id", t.GetTaskProto().GetAnalyzeTaskID()), zap.Int64("version", analyzeTask.GetVersion()), zap.String("state", analyzeTask.State.String()))
log.Info("check analyze task state", zap.Int64("id", t.GetTaskProto().GetAnalyzeTaskID()),
zap.Int64("version", analyzeTask.GetVersion()), zap.String("state", analyzeTask.State.String()))
switch analyzeTask.State {
case indexpb.JobState_JobStateFinished:
if analyzeTask.GetCentroidsFile() == "" {
@ -526,15 +542,21 @@ func (t *clusteringCompactionTask) resetSegmentCompacting() {
t.meta.SetSegmentsCompacting(context.TODO(), t.GetTaskProto().GetInputSegments(), false)
}
func (t *clusteringCompactionTask) processFailedOrTimeout() error {
log := log.Ctx(context.TODO())
log.Info("clean task", zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("state", t.GetTaskProto().GetState().String()))
func (t *clusteringCompactionTask) doClean() error {
log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetTaskProto().GetPlanID()))
log.Info("clean task", zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()),
zap.String("state", t.GetTaskProto().GetState().String()))
if err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetTaskProto().GetPlanID(),
}); err != nil {
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
log.Warn("clusteringCompactionTask unable to drop compaction plan", zap.Error(err))
}
if t.GetTaskProto().GetState() == datapb.CompactionTaskState_completed {
if err := t.markInputSegmentsDropped(); err != nil {
return err
}
} else {
isInputDropped := false
for _, segID := range t.GetTaskProto().GetInputSegments() {
if t.meta.GetHealthySegment(context.TODO(), segID) == nil {
@ -549,8 +571,8 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
// revert segments meta
var operators []UpdateOperator
// revert level of input segments
// L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1
// L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2
// L1 : L1 ->(process)-> L2 ->(clean)-> L1
// L2 : L2 ->(process)-> L2 ->(clean)-> L2
for _, segID := range t.GetTaskProto().GetInputSegments() {
operators = append(operators, RevertSegmentLevelOperator(segID))
}
@ -588,8 +610,6 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
}
}
t.resetSegmentCompacting()
// drop partition stats if uploaded
partitionStatsInfo := &datapb.PartitionStatsInfo{
CollectionID: t.GetTaskProto().GetCollectionID(),
@ -601,9 +621,21 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
err := t.meta.CleanPartitionStatsInfo(context.TODO(), partitionStatsInfo)
if err != nil {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
return merr.WrapErrCleanPartitionStatsFail(fmt.Sprintf("%d-%d-%s-%d", t.GetTaskProto().GetCollectionID(), t.GetTaskProto().GetPartitionID(), t.GetTaskProto().GetChannel(), t.GetTaskProto().GetPlanID()))
}
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
if err != nil {
log.Warn("clusteringCompactionTask fail to updateAndSaveTaskMeta", zap.Error(err))
return err
}
// resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once
// otherwise, it may unlock segments locked by other compaction tasks
t.resetSegmentCompacting()
log.Info("clusteringCompactionTask clean done")
return nil
}
func (t *clusteringCompactionTask) doAnalyze() error {
@ -691,7 +723,7 @@ func (t *clusteringCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskO
return merr.WrapErrClusteringCompactionMetaError("updateAndSaveTaskMeta", err) // retryable
}
t.SetTask(task)
log.Info("updateAndSaveTaskMeta success", zap.String("task state", t.GetTaskProto().GetState().String()))
log.Ctx(context.TODO()).Info("updateAndSaveTaskMeta success", zap.String("task state", t.GetTaskProto().GetState().String()))
return nil
}

View File

@ -109,7 +109,8 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
s.mockSessionMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil)
task := s.generateBasicTask(false)
task.processPipelining()
err := task.processPipelining()
s.NoError(err)
seg11 := s.meta.GetSegment(context.TODO(), 101)
s.Equal(datapb.SegmentLevel_L1, seg11.Level)
@ -117,6 +118,34 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
s.Equal(datapb.SegmentLevel_L2, seg21.Level)
s.Equal(int64(10000), seg21.PartitionStatsVersion)
task.updateAndSaveTaskMeta(setResultSegments([]int64{103, 104}))
// fake some compaction result segment
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 103,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
LastLevel: datapb.SegmentLevel_L1,
CreatedByCompaction: true,
PartitionStatsVersion: 10001,
},
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 104,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
LastLevel: datapb.SegmentLevel_L1,
CreatedByCompaction: true,
PartitionStatsVersion: 10001,
},
})
s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
err = task.doClean()
s.NoError(err)
s.Run("v2.4.x", func() {
// fake some compaction result segment
s.meta.AddSegment(context.TODO(), &SegmentInfo{
@ -162,8 +191,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
task.GetTaskProto().InputSegments = []int64{101, 102}
task.GetTaskProto().ResultSegments = []int64{103, 104}
task.processFailedOrTimeout()
task.Clean()
seg12 := s.meta.GetSegment(context.TODO(), 101)
s.Equal(datapb.SegmentLevel_L1, seg12.Level)
@ -252,7 +280,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
task.GetTaskProto().TmpSegments = []int64{103, 104}
task.GetTaskProto().ResultSegments = []int64{105, 106}
task.processFailedOrTimeout()
task.Clean()
seg12 := s.meta.GetSegment(context.TODO(), 101)
s.Equal(datapb.SegmentLevel_L1, seg12.Level)
@ -336,7 +364,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() {
s.Equal(false, task.Process())
s.Equal(int32(3), task.GetTaskProto().RetryTimes)
s.Equal(datapb.CompactionTaskState_pipelining, task.GetTaskProto().GetState())
s.Equal(false, task.Process())
s.True(task.Process())
s.Equal(int32(0), task.GetTaskProto().RetryTimes)
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
}
@ -345,7 +373,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessPipelining() {
s.Run("process pipelining fail, segment not found", func() {
task := s.generateBasicTask(false)
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining))
s.Equal(false, task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
})
@ -570,11 +598,10 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
},
},
}, nil).Once()
s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once()
time.Sleep(time.Second * 1)
s.Equal(true, task.Process())
s.Equal(datapb.CompactionTaskState_cleaned, task.GetTaskProto().GetState())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_timeout, task.GetTaskProto().GetState())
})
}
@ -675,7 +702,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
s.Run("analyze task not found", func() {
task := s.generateBasicTask(false)
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing))
s.False(task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
})
@ -691,7 +718,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
State: indexpb.JobState_JobStateFailed,
}
s.meta.analyzeMeta.AddAnalyzeTask(t)
s.False(task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
})
@ -708,7 +735,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
CentroidsFile: "",
}
s.meta.analyzeMeta.AddAnalyzeTask(t)
s.False(task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
})

View File

@ -73,7 +73,7 @@ func newL0CompactionTask(t *datapb.CompactionTask, allocator allocator.Allocator
}
// Note: return True means exit this state machine.
// ONLY return True for processCompleted or processFailed
// ONLY return True for Completed, Failed
func (t *l0CompactionTask) Process() bool {
switch t.GetTaskProto().GetState() {
case datapb.CompactionTaskState_pipelining:
@ -188,6 +188,11 @@ func (t *l0CompactionTask) processCompleted() bool {
}
func (t *l0CompactionTask) processFailed() bool {
return true
}
func (t *l0CompactionTask) doClean() error {
log := log.With(zap.Int64("planID", t.GetTaskProto().GetPlanID()))
if t.hasAssignedWorker() {
err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetTaskProto().GetPlanID(),
@ -197,15 +202,21 @@ func (t *l0CompactionTask) processFailed() bool {
}
}
t.resetSegmentCompacting()
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
if err != nil {
log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
return false
return err
}
log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTaskProto().GetTriggerID()), zap.Int64("planID", t.GetTaskProto().GetPlanID()))
return true
// resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once
// otherwise, it may unlock segments locked by other compaction tasks
t.resetSegmentCompacting()
log.Info("l0CompactionTask clean done")
return nil
}
func (t *l0CompactionTask) Clean() bool {
return t.doClean() == nil
}
func (t *l0CompactionTask) GetResult() *datapb.CompactionPlanResult {

View File

@ -335,7 +335,6 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.GetTaskProto().NodeID, mock.Anything).
Return(&datapb.CompactionPlanResult{
PlanID: t.GetTaskProto().GetPlanID(),
@ -417,12 +416,10 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
PlanID: t.GetTaskProto().GetPlanID(),
State: datapb.CompactionTaskState_failed,
}, nil).Once()
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(nil)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, false).Return().Once()
got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().GetState())
s.Equal(datapb.CompactionTaskState_failed, t.GetTaskProto().GetState())
})
s.Run("test executing with result failed save compaction meta failed", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(nil).Once()
@ -510,14 +507,10 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
t := s.generateTestL0Task(datapb.CompactionTaskState_failed)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, false).RunAndReturn(func(ctx context.Context, segIDs []int64, isCompacting bool) {
s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments())
}).Once()
got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().GetState())
s.Equal(datapb.CompactionTaskState_failed, t.GetTaskProto().GetState())
})
s.Run("test process failed failed", func() {
@ -525,14 +518,10 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
t := s.generateTestL0Task(datapb.CompactionTaskState_failed)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once()
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, false).RunAndReturn(func(ctx context.Context, segIDs []int64, isCompacting bool) {
s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments())
}).Once()
got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().GetState())
s.Equal(datapb.CompactionTaskState_failed, t.GetTaskProto().GetState())
})
s.Run("test unknown task", func() {

View File

@ -70,7 +70,7 @@ func (t *mixCompactionTask) processPipelining() bool {
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
return false
}
return t.processFailed()
return true
}
err = t.sessions.Compaction(context.TODO(), t.GetTaskProto().GetNodeID(), t.GetPlan())
@ -78,15 +78,18 @@ func (t *mixCompactionTask) processPipelining() bool {
// Compaction tasks may be refused by DataNode because of slot limit. In this case, the node id is reset
// to enable a retry in compaction.checkCompaction().
// This is tricky, we should remove the reassignment here.
log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
if err != nil {
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
}
return false
}
log.Info("mixCompactionTask notify compaction tasks to DataNode")
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
if err != nil {
log.Warn("mixCompactionTask update task state failed", zap.Error(err))
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
return false
}
return false
@ -129,7 +132,7 @@ func (t *mixCompactionTask) processExecuting() bool {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
return false
}
return t.processFailed()
return true
}
if err := t.saveSegmentMeta(); err != nil {
log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err))
@ -139,7 +142,7 @@ func (t *mixCompactionTask) processExecuting() bool {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
return false
}
return t.processFailed()
return true
}
return false
}
@ -155,7 +158,7 @@ func (t *mixCompactionTask) processExecuting() bool {
if err != nil {
log.Warn("fail to updateAndSaveTaskMeta")
}
return false
return true
}
return false
}
@ -183,7 +186,7 @@ func (t *mixCompactionTask) saveSegmentMeta() error {
}
// Note: return True means exit this state machine.
// ONLY return True for processCompleted or processFailed
// ONLY return True for Completed, Failed or Timeout
func (t *mixCompactionTask) Process() bool {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
lastState := t.GetTaskProto().GetState().String()
@ -257,21 +260,32 @@ func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compa
}
func (t *mixCompactionTask) processFailed() bool {
return true
}
func (t *mixCompactionTask) Clean() bool {
return t.doClean() == nil
}
func (t *mixCompactionTask) doClean() error {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
if err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetTaskProto().GetPlanID(),
}); err != nil {
log.Warn("mixCompactionTask processFailed unable to drop compaction plan", zap.Error(err))
return err
}
log.Info("mixCompactionTask processFailed done")
t.resetSegmentCompacting()
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
if err != nil {
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
return false
log.Warn("mixCompactionTask fail to updateAndSaveTaskMeta", zap.Error(err))
return err
}
return true
// resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once
// otherwise, it may unlock segments locked by other compaction tasks
t.resetSegmentCompacting()
log.Info("mixCompactionTask clean done")
return nil
}
func (t *mixCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {

View File

@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
@ -48,6 +49,7 @@ type CompactionPlanHandlerSuite struct {
mockCm *MockChannelManager
mockSessMgr *session.MockDataNodeManager
handler *compactionPlanHandler
mockHandler *NMockHandler
cluster *MockCluster
}
@ -59,6 +61,8 @@ func (s *CompactionPlanHandlerSuite) SetupTest() {
s.mockSessMgr = session.NewMockDataNodeManager(s.T())
s.cluster = NewMockCluster(s.T())
s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil)
s.mockHandler = NewNMockHandler(s.T())
s.mockHandler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil).Maybe()
}
func (s *CompactionPlanHandlerSuite) TestScheduleEmpty() {
@ -939,6 +943,179 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() {
s.NoError(err)
}
func (s *CompactionPlanHandlerSuite) TestCleanCompaction() {
s.SetupTest()
tests := []struct {
task CompactionTask
}{
{
newMixCompactionTask(
&datapb.CompactionTask{
PlanID: 1,
TriggerID: 1,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_failed,
NodeID: 1,
InputSegments: []UniqueID{1, 2},
},
nil, s.mockMeta, s.mockSessMgr),
},
{
newL0CompactionTask(&datapb.CompactionTask{
PlanID: 1,
TriggerID: 1,
Type: datapb.CompactionType_Level0DeleteCompaction,
State: datapb.CompactionTaskState_failed,
NodeID: 1,
InputSegments: []UniqueID{1, 2},
},
nil, s.mockMeta, s.mockSessMgr),
},
}
for _, test := range tests {
task := test.task
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, mock.Anything).Return().Once()
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(nil)
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.handler.executingTasks[1] = task
s.Equal(1, len(s.handler.executingTasks))
err := s.handler.checkCompaction()
s.NoError(err)
s.Equal(0, len(s.handler.executingTasks))
s.Equal(1, len(s.handler.cleaningTasks))
s.handler.cleanFailedTasks()
s.Equal(0, len(s.handler.cleaningTasks))
}
}
func (s *CompactionPlanHandlerSuite) TestCleanClusteringCompaction() {
s.SetupTest()
task := newClusteringCompactionTask(
&datapb.CompactionTask{
PlanID: 1,
TriggerID: 1,
CollectionID: 1001,
Type: datapb.CompactionType_ClusteringCompaction,
State: datapb.CompactionTaskState_failed,
NodeID: 1,
InputSegments: []UniqueID{1, 2},
},
nil, s.mockMeta, s.mockSessMgr, s.mockHandler, nil)
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, mock.Anything).Return().Once()
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(nil)
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.handler.executingTasks[1] = task
s.Equal(1, len(s.handler.executingTasks))
s.handler.checkCompaction()
s.Equal(0, len(s.handler.executingTasks))
s.Equal(1, len(s.handler.cleaningTasks))
s.handler.cleanFailedTasks()
s.Equal(0, len(s.handler.cleaningTasks))
}
func (s *CompactionPlanHandlerSuite) TestCleanClusteringCompactionCommitFail() {
s.SetupTest()
task := newClusteringCompactionTask(&datapb.CompactionTask{
PlanID: 1,
TriggerID: 1,
CollectionID: 1001,
Channel: "ch-1",
Type: datapb.CompactionType_ClusteringCompaction,
State: datapb.CompactionTaskState_executing,
NodeID: 1,
InputSegments: []UniqueID{1, 2},
ClusteringKeyField: &schemapb.FieldSchema{
FieldID: 100,
Name: Int64Field,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
AutoID: true,
IsClusteringKey: true,
},
},
nil, s.mockMeta, s.mockSessMgr, s.mockHandler, nil)
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(nil)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(UniqueID(1), int64(1)).Return(
&datapb.CompactionPlanResult{
PlanID: 1,
State: datapb.CompactionTaskState_completed,
Segments: []*datapb.CompactionSegment{
{
PlanID: 1,
SegmentID: 101,
},
},
}, nil).Once()
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, errors.New("mock error"))
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.handler.executingTasks[1] = task
s.Equal(1, len(s.handler.executingTasks))
s.handler.checkCompaction()
s.Equal(0, len(task.GetTaskProto().GetResultSegments()))
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
s.Equal(0, len(s.handler.executingTasks))
s.Equal(1, len(s.handler.cleaningTasks))
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, mock.Anything).Return().Once()
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything, mock.Anything).Return(nil)
s.handler.cleanFailedTasks()
s.Equal(0, len(s.handler.cleaningTasks))
}
// test compactionHandler should keep clean the failed task until it become cleaned
func (s *CompactionPlanHandlerSuite) TestKeepClean() {
s.SetupTest()
tests := []struct {
task CompactionTask
}{
{
newClusteringCompactionTask(&datapb.CompactionTask{
PlanID: 1,
TriggerID: 1,
Type: datapb.CompactionType_ClusteringCompaction,
State: datapb.CompactionTaskState_failed,
NodeID: 1,
InputSegments: []UniqueID{1, 2},
},
nil, s.mockMeta, s.mockSessMgr, s.mockHandler, nil),
},
}
for _, test := range tests {
task := test.task
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, mock.Anything).Return()
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(nil)
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.handler.executingTasks[1] = task
s.Equal(1, len(s.handler.executingTasks))
s.handler.checkCompaction()
s.Equal(0, len(s.handler.executingTasks))
s.Equal(1, len(s.handler.cleaningTasks))
s.handler.cleanFailedTasks()
s.Equal(1, len(s.handler.cleaningTasks))
s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything, mock.Anything).Return(nil).Once()
s.handler.cleanFailedTasks()
s.Equal(0, len(s.handler.cleaningTasks))
}
}
func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog {
l := &datapb.FieldBinlog{
FieldID: fieldID,

View File

@ -791,6 +791,12 @@ func UpdateStatusOperator(segmentID int64, status commonpb.SegmentState) UpdateO
return false
}
if segment.GetState() == status {
log.Ctx(context.TODO()).Info("meta update: segment stats already is target state",
zap.Int64("segmentID", segmentID), zap.String("status", status.String()))
return false
}
updateSegStateAndPrepareMetrics(segment, status, modPack.metricMutation)
if status == commonpb.SegmentState_Dropped {
segment.DroppedAt = uint64(time.Now().UnixNano())

View File

@ -107,8 +107,8 @@ func (psm *partitionStatsMeta) ListPartitionStatsInfos(collectionID int64, parti
func (psm *partitionStatsMeta) SavePartitionStatsInfo(info *datapb.PartitionStatsInfo) error {
psm.Lock()
defer psm.Unlock()
if err := psm.catalog.SavePartitionStatsInfo(psm.ctx, info); err != nil {
log.Error("meta update: update PartitionStatsInfo info fail", zap.Error(err))
if err := psm.catalog.SavePartitionStatsInfo(context.TODO(), info); err != nil {
log.Ctx(context.TODO()).Error("meta update: update PartitionStatsInfo info fail", zap.Error(err))
return err
}
if _, ok := psm.partitionStatsInfos[info.GetVChannel()]; !ok {
@ -127,8 +127,26 @@ func (psm *partitionStatsMeta) SavePartitionStatsInfo(info *datapb.PartitionStat
func (psm *partitionStatsMeta) DropPartitionStatsInfo(ctx context.Context, info *datapb.PartitionStatsInfo) error {
psm.Lock()
defer psm.Unlock()
// if the dropping partitionStats is the current version, should update currentPartitionStats
currentVersion := psm.innerGetCurrentPartitionStatsVersion(info.GetCollectionID(), info.GetPartitionID(), info.GetVChannel())
if currentVersion == info.GetVersion() && currentVersion != emptyPartitionStatsVersion {
infos := psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].infos
if len(infos) > 0 {
var maxVersion int64 = 0
for version := range infos {
if version > maxVersion && version < currentVersion {
maxVersion = version
}
}
err := psm.innerSaveCurrentPartitionStatsVersion(info.GetCollectionID(), info.GetPartitionID(), info.GetVChannel(), maxVersion)
if err != nil {
return err
}
}
}
if err := psm.catalog.DropPartitionStatsInfo(ctx, info); err != nil {
log.Error("meta update: drop PartitionStatsInfo info fail",
log.Ctx(ctx).Error("meta update: drop PartitionStatsInfo info fail",
zap.Int64("collectionID", info.GetCollectionID()),
zap.Int64("partitionID", info.GetPartitionID()),
zap.String("vchannel", info.GetVChannel()),
@ -155,8 +173,11 @@ func (psm *partitionStatsMeta) DropPartitionStatsInfo(ctx context.Context, info
func (psm *partitionStatsMeta) SaveCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string, currentPartitionStatsVersion int64) error {
psm.Lock()
defer psm.Unlock()
return psm.innerSaveCurrentPartitionStatsVersion(collectionID, partitionID, vChannel, currentPartitionStatsVersion)
}
log.Info("update current partition stats version", zap.Int64("collectionID", collectionID),
func (psm *partitionStatsMeta) innerSaveCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string, currentPartitionStatsVersion int64) error {
log.Ctx(context.TODO()).Info("update current partition stats version", zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID),
zap.String("vChannel", vChannel), zap.Int64("currentPartitionStatsVersion", currentPartitionStatsVersion))
@ -180,7 +201,10 @@ func (psm *partitionStatsMeta) SaveCurrentPartitionStatsVersion(collectionID, pa
func (psm *partitionStatsMeta) GetCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string) int64 {
psm.RLock()
defer psm.RUnlock()
return psm.innerGetCurrentPartitionStatsVersion(collectionID, partitionID, vChannel)
}
func (psm *partitionStatsMeta) innerGetCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string) int64 {
if _, ok := psm.partitionStatsInfos[vChannel]; !ok {
return emptyPartitionStatsVersion
}

View File

@ -91,3 +91,61 @@ func (s *PartitionStatsMetaSuite) TestGetPartitionStats() {
currentVersion4 := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-1")
s.Equal(int64(100), currentVersion4)
}
func (s *PartitionStatsMetaSuite) TestDropPartitionStats() {
ctx := context.Background()
partitionStatsMeta, err := newPartitionStatsMeta(ctx, s.catalog)
s.NoError(err)
collectionID := int64(1)
partitionID := int64(2)
channel := "ch-1"
s.catalog.EXPECT().DropPartitionStatsInfo(mock.Anything, mock.Anything).Return(nil)
s.catalog.EXPECT().SaveCurrentPartitionStatsVersion(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
partitionStats := []*datapb.PartitionStatsInfo{
{
CollectionID: collectionID,
PartitionID: partitionID,
VChannel: channel,
SegmentIDs: []int64{100000},
Version: 100,
},
{
CollectionID: collectionID,
PartitionID: partitionID,
VChannel: channel,
SegmentIDs: []int64{100000},
Version: 101,
},
{
CollectionID: collectionID,
PartitionID: partitionID,
VChannel: channel,
SegmentIDs: []int64{100000},
Version: 102,
},
}
for _, partitionStats := range partitionStats {
partitionStatsMeta.SavePartitionStatsInfo(partitionStats)
}
partitionStatsMeta.SaveCurrentPartitionStatsVersion(collectionID, partitionID, channel, 102)
version := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel)
s.Equal(int64(102), version)
err = partitionStatsMeta.DropPartitionStatsInfo(context.Background(), partitionStats[2])
s.NoError(err)
s.Equal(2, len(partitionStatsMeta.partitionStatsInfos[channel][partitionID].infos))
version2 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel)
s.Equal(int64(101), version2)
err = partitionStatsMeta.DropPartitionStatsInfo(context.Background(), partitionStats[1])
s.Equal(1, len(partitionStatsMeta.partitionStatsInfos[channel][partitionID].infos))
version3 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel)
s.Equal(int64(100), version3)
err = partitionStatsMeta.DropPartitionStatsInfo(context.Background(), partitionStats[0])
s.NoError(err)
s.Nil(partitionStatsMeta.partitionStatsInfos[channel][partitionID])
version4 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel)
s.Equal(emptyPartitionStatsVersion, version4)
}

View File

@ -215,6 +215,7 @@ var (
ErrGetCompactionPlanResultFail = newMilvusError("fail to get compaction plan", 2313, true)
ErrCompactionResult = newMilvusError("illegal compaction results", 2314, false)
ErrDuplicatedCompactionTask = newMilvusError("duplicated compaction task", 2315, false)
ErrCleanPartitionStatsFail = newMilvusError("fail to clean partition Stats", 2316, true)
ErrDataNodeSlotExhausted = newMilvusError("datanode slot exhausted", 2401, false)

View File

@ -1189,6 +1189,14 @@ func WrapErrClusteringCompactionMetaError(operation string, err error) error {
return wrapFieldsWithDesc(ErrClusteringCompactionMetaError, err.Error(), value("operation", operation))
}
func WrapErrCleanPartitionStatsFail(msg ...string) error {
err := error(ErrCleanPartitionStatsFail)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
}
func WrapErrAnalyzeTaskNotFound(id int64) error {
return wrapFields(ErrAnalyzeTaskNotFound, value("analyzeId", id))
}