enhance: simplify compaction tasks to reduce their memory overhead (#39121)

See #39080

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
This commit is contained in:
Ted Xu 2025-01-15 14:51:00 +08:00 committed by GitHub
parent f5234c3c11
commit e501025bba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 33 additions and 253 deletions

View File

@ -24,7 +24,6 @@ import (
"time"
"github.com/cockroachdb/errors"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -592,8 +591,6 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
}
func (c *compactionPlanHandler) submitTask(t CompactionTask) error {
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetTaskProto().GetType()))
t.SetSpan(span)
if err := c.queueTasks.Enqueue(t); err != nil {
return err
}
@ -603,8 +600,6 @@ func (c *compactionPlanHandler) submitTask(t CompactionTask) error {
// restoreTask used to restore Task from etcd
func (c *compactionPlanHandler) restoreTask(t CompactionTask) {
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetTaskProto().GetType()))
t.SetSpan(span)
c.executingGuard.Lock()
c.executingTasks[t.GetTaskProto().GetPlanID()] = t
c.executingGuard.Unlock()

View File

@ -17,8 +17,6 @@
package datacoord
import (
"go.opentelemetry.io/otel/trace"
"github.com/milvus-io/milvus/pkg/proto/datapb"
)
@ -40,13 +38,10 @@ type CompactionTask interface {
SetTask(*datapb.CompactionTask)
GetTaskProto() *datapb.CompactionTask
SetPlan(plan *datapb.CompactionPlan)
ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask
SetNodeID(UniqueID) error
NeedReAssignNodeID() bool
GetSpan() trace.Span
SetSpan(trace.Span)
SaveTaskMeta() error
}

View File

@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
@ -52,7 +51,6 @@ type clusteringCompactionTask struct {
plan *datapb.CompactionPlan
result *datapb.CompactionPlanResult
span trace.Span
allocator allocator.Allocator
meta CompactionMeta
sessions session.DataNodeManager
@ -60,7 +58,6 @@ type clusteringCompactionTask struct {
analyzeScheduler *taskScheduler
maxRetryTimes int32
slotUsage int64
}
func (t *clusteringCompactionTask) GetTaskProto() *datapb.CompactionTask {
@ -79,7 +76,6 @@ func newClusteringCompactionTask(t *datapb.CompactionTask, allocator allocator.A
handler: handler,
analyzeScheduler: analyzeScheduler,
maxRetryTimes: 3,
slotUsage: paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(),
}
task.taskProto.Store(t)
return task
@ -272,7 +268,6 @@ func (t *clusteringCompactionTask) processExecuting() error {
switch result.GetState() {
case datapb.CompactionTaskState_completed:
t.result = result
result := t.result
if len(result.GetSegments()) == 0 {
log.Warn("illegal compaction results, this should not happen")
return merr.WrapErrCompactionResult("compaction result is empty")
@ -766,24 +761,10 @@ func (t *clusteringCompactionTask) GetResult() *datapb.CompactionPlanResult {
return t.result
}
func (t *clusteringCompactionTask) GetSpan() trace.Span {
return t.span
}
func (t *clusteringCompactionTask) EndSpan() {
if t.span != nil {
t.span.End()
}
}
func (t *clusteringCompactionTask) SetResult(result *datapb.CompactionPlanResult) {
t.result = result
}
func (t *clusteringCompactionTask) SetSpan(span trace.Span) {
t.span = span
}
func (t *clusteringCompactionTask) SetPlan(plan *datapb.CompactionPlan) {
t.plan = plan
}
@ -805,5 +786,5 @@ func (t *clusteringCompactionTask) NeedReAssignNodeID() bool {
}
func (t *clusteringCompactionTask) GetSlotUsage() int64 {
return t.slotUsage
return paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64()
}

View File

@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
@ -42,15 +41,10 @@ var _ CompactionTask = (*l0CompactionTask)(nil)
type l0CompactionTask struct {
taskProto atomic.Value // *datapb.CompactionTask
plan *datapb.CompactionPlan
result *datapb.CompactionPlanResult
span trace.Span
allocator allocator.Allocator
sessions session.DataNodeManager
meta CompactionMeta
slotUsage int64
}
func (t *l0CompactionTask) GetTaskProto() *datapb.CompactionTask {
@ -66,7 +60,6 @@ func newL0CompactionTask(t *datapb.CompactionTask, allocator allocator.Allocator
allocator: allocator,
meta: meta,
sessions: session,
slotUsage: paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64(),
}
task.taskProto.Store(t)
return task
@ -96,8 +89,7 @@ func (t *l0CompactionTask) processPipelining() bool {
}
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("nodeID", t.GetTaskProto().GetNodeID()))
var err error
t.plan, err = t.BuildCompactionRequest()
plan, err := t.BuildCompactionRequest()
if err != nil {
log.Warn("l0CompactionTask failed to build compaction request", zap.Error(err))
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
@ -109,7 +101,7 @@ func (t *l0CompactionTask) processPipelining() bool {
return t.processFailed()
}
err = t.sessions.Compaction(context.TODO(), t.GetTaskProto().GetNodeID(), t.GetPlan())
err = t.sessions.Compaction(context.TODO(), t.GetTaskProto().GetNodeID(), plan)
if err != nil {
log.Warn("l0CompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
@ -132,8 +124,7 @@ func (t *l0CompactionTask) processExecuting() bool {
}
switch result.GetState() {
case datapb.CompactionTaskState_completed:
t.result = result
if err := t.saveSegmentMeta(); err != nil {
if err := t.saveSegmentMeta(result); err != nil {
log.Warn("l0CompactionTask failed to save segment meta", zap.Error(err))
return false
}
@ -142,6 +133,7 @@ func (t *l0CompactionTask) processExecuting() bool {
log.Warn("l0CompactionTask failed to save task meta_saved state", zap.Error(err))
return false
}
UpdateCompactionSegmentSizeMetrics(result.GetSegments())
return t.processMetaSaved()
case datapb.CompactionTaskState_failed:
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)); err != nil {
@ -173,7 +165,6 @@ func (t *l0CompactionTask) processCompleted() bool {
}
t.resetSegmentCompacting()
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
task := t.taskProto.Load().(*datapb.CompactionTask)
log.Info("l0CompactionTask processCompleted done", zap.Int64("planID", task.GetPlanID()),
zap.Duration("costs", time.Duration(task.GetEndTime()-task.GetStartTime())*time.Second))
@ -212,40 +203,10 @@ func (t *l0CompactionTask) Clean() bool {
return t.doClean() == nil
}
func (t *l0CompactionTask) GetResult() *datapb.CompactionPlanResult {
return t.result
}
func (t *l0CompactionTask) SetResult(result *datapb.CompactionPlanResult) {
t.result = result
}
func (t *l0CompactionTask) SetTask(task *datapb.CompactionTask) {
t.taskProto.Store(task)
}
func (t *l0CompactionTask) GetSpan() trace.Span {
return t.span
}
func (t *l0CompactionTask) SetSpan(span trace.Span) {
t.span = span
}
func (t *l0CompactionTask) EndSpan() {
if t.span != nil {
t.span.End()
}
}
func (t *l0CompactionTask) SetPlan(plan *datapb.CompactionPlan) {
t.plan = plan
}
func (t *l0CompactionTask) GetPlan() *datapb.CompactionPlan {
return t.plan
}
func (t *l0CompactionTask) GetLabel() string {
return fmt.Sprintf("%d-%s", t.GetTaskProto().PartitionID, t.GetTaskProto().GetChannel())
}
@ -373,8 +334,7 @@ func (t *l0CompactionTask) saveTaskMeta(task *datapb.CompactionTask) error {
return t.meta.SaveCompactionTask(context.TODO(), task)
}
func (t *l0CompactionTask) saveSegmentMeta() error {
result := t.result
func (t *l0CompactionTask) saveSegmentMeta(result *datapb.CompactionPlanResult) error {
var operators []UpdateOperator
for _, seg := range result.GetSegments() {
operators = append(operators, AddBinlogsOperator(seg.GetSegmentID(), nil, nil, seg.GetDeltalogs(), nil))
@ -392,5 +352,5 @@ func (t *l0CompactionTask) saveSegmentMeta() error {
}
func (t *l0CompactionTask) GetSlotUsage() int64 {
return t.slotUsage
return paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64()
}

View File

@ -24,7 +24,6 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.opentelemetry.io/otel/trace"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
@ -444,7 +443,6 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
t := s.generateTestL0Task(datapb.CompactionTaskState_meta_saved)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
t.result = &datapb.CompactionPlanResult{}
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, false).RunAndReturn(func(ctx context.Context, segIDs []int64, isCompacting bool) {
s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments())
@ -461,7 +459,6 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
t := s.generateTestL0Task(datapb.CompactionTaskState_meta_saved)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
t.result = &datapb.CompactionPlanResult{}
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()
@ -475,7 +472,6 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
t := s.generateTestL0Task(datapb.CompactionTaskState_completed)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
t.result = &datapb.CompactionPlanResult{}
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once()
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, false).RunAndReturn(func(ctx context.Context, segIDs []int64, isCompacting bool) {
s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments())
@ -491,7 +487,6 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
t := s.generateTestL0Task(datapb.CompactionTaskState_completed)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
t.result = &datapb.CompactionPlanResult{}
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, false).RunAndReturn(func(ctx context.Context, segIDs []int64, isCompacting bool) {
s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments())
@ -531,38 +526,3 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
s.True(got)
})
}
func (s *L0CompactionTaskSuite) TestSetterGetter() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(nil)
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
span := t.GetSpan()
s.Nil(span)
s.NotPanics(t.EndSpan)
t.SetSpan(trace.SpanFromContext(context.TODO()))
s.NotPanics(t.EndSpan)
rst := t.GetResult()
s.Nil(rst)
t.SetResult(&datapb.CompactionPlanResult{PlanID: 19530})
s.NotNil(t.GetResult())
label := t.GetLabel()
s.Equal("10-ch-1", label)
t.updateAndSaveTaskMeta(setStartTime(100))
s.EqualValues(100, t.GetTaskProto().GetStartTime())
t.SetTask(nil)
t.SetPlan(&datapb.CompactionPlan{PlanID: 19530})
s.NotNil(t.GetPlan())
s.Run("set NodeID", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(nil)
t.SetNodeID(1000)
s.EqualValues(1000, t.GetTaskProto().GetNodeID())
})
}

View File

@ -7,7 +7,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
@ -24,15 +23,10 @@ var _ CompactionTask = (*mixCompactionTask)(nil)
type mixCompactionTask struct {
taskProto atomic.Value // *datapb.CompactionTask
plan *datapb.CompactionPlan
result *datapb.CompactionPlanResult
span trace.Span
allocator allocator.Allocator
sessions session.DataNodeManager
meta CompactionMeta
newSegmentIDs []int64
slotUsage int64
}
func (t *mixCompactionTask) GetTaskProto() *datapb.CompactionTask {
@ -48,7 +42,6 @@ func newMixCompactionTask(t *datapb.CompactionTask, allocator allocator.Allocato
allocator: allocator,
meta: meta,
sessions: session,
slotUsage: paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(),
}
task.taskProto.Store(t)
return task
@ -61,8 +54,7 @@ func (t *mixCompactionTask) processPipelining() bool {
return false
}
var err error
t.plan, err = t.BuildCompactionRequest()
plan, err := t.BuildCompactionRequest()
if err != nil {
log.Warn("mixCompactionTask failed to build compaction request", zap.Error(err))
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
@ -73,7 +65,7 @@ func (t *mixCompactionTask) processPipelining() bool {
return true
}
err = t.sessions.Compaction(context.TODO(), t.GetTaskProto().GetNodeID(), t.GetPlan())
err = t.sessions.Compaction(context.TODO(), t.GetTaskProto().GetNodeID(), plan)
if err != nil {
// Compaction tasks may be refused by DataNode because of slot limit. In this case, the node id is reset
// to enable a retry in compaction.checkCompaction().
@ -119,7 +111,6 @@ func (t *mixCompactionTask) processExecuting() bool {
}
switch result.GetState() {
case datapb.CompactionTaskState_completed:
t.result = result
if len(result.GetSegments()) == 0 {
log.Info("illegal compaction results")
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
@ -129,7 +120,7 @@ func (t *mixCompactionTask) processExecuting() bool {
}
return true
}
if err := t.saveSegmentMeta(); err != nil {
if err := t.saveSegmentMeta(result); err != nil {
log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err))
if errors.Is(err, merr.ErrIllegalCompactionPlan) {
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
@ -141,11 +132,7 @@ func (t *mixCompactionTask) processExecuting() bool {
}
return false
}
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setResultSegments(t.newSegmentIDs))
if err != nil {
log.Warn("mixCompaction failed to setState meta saved", zap.Error(err))
return false
}
UpdateCompactionSegmentSizeMetrics(result.GetSegments())
return t.processMetaSaved()
case datapb.CompactionTaskState_failed:
log.Info("mixCompactionTask fail in datanode")
@ -166,16 +153,21 @@ func (t *mixCompactionTask) SaveTaskMeta() error {
return t.saveTaskMeta(t.GetTaskProto())
}
func (t *mixCompactionTask) saveSegmentMeta() error {
func (t *mixCompactionTask) saveSegmentMeta(result *datapb.CompactionPlanResult) error {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
// Also prepare metric updates.
newSegments, metricMutation, err := t.meta.CompleteCompactionMutation(context.TODO(), t.taskProto.Load().(*datapb.CompactionTask), t.result)
newSegments, metricMutation, err := t.meta.CompleteCompactionMutation(context.TODO(), t.taskProto.Load().(*datapb.CompactionTask), result)
if err != nil {
return err
}
// Apply metrics after successful meta update.
t.newSegmentIDs = lo.Map(newSegments, func(s *SegmentInfo, _ int) UniqueID { return s.GetID() })
newSegmentIDs := lo.Map(newSegments, func(s *SegmentInfo, _ int) UniqueID { return s.GetID() })
metricMutation.commit()
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setResultSegments(newSegmentIDs))
if err != nil {
log.Warn("mixCompaction failed to setState meta saved", zap.Error(err))
return err
}
log.Info("mixCompactionTask success to save segment meta")
return nil
}
@ -183,7 +175,10 @@ func (t *mixCompactionTask) saveSegmentMeta() error {
// Note: return True means exit this state machine.
// ONLY return True for Completed, Failed or Timeout
func (t *mixCompactionTask) Process() bool {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
log := log.With(zap.Int64("triggerID",
t.GetTaskProto().GetTriggerID()),
zap.Int64("PlanID", t.GetTaskProto().GetPlanID()),
zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
lastState := t.GetTaskProto().GetState().String()
processResult := true
switch t.GetTaskProto().GetState() {
@ -205,18 +200,6 @@ func (t *mixCompactionTask) Process() bool {
return processResult
}
func (t *mixCompactionTask) SetPlan(plan *datapb.CompactionPlan) {
t.plan = plan
}
func (t *mixCompactionTask) GetResult() *datapb.CompactionPlanResult {
return t.result
}
func (t *mixCompactionTask) GetPlan() *datapb.CompactionPlan {
return t.plan
}
func (t *mixCompactionTask) GetLabel() string {
return fmt.Sprintf("%d-%s", t.taskProto.Load().(*datapb.CompactionTask).PartitionID, t.GetTaskProto().GetChannel())
}
@ -234,7 +217,6 @@ func (t *mixCompactionTask) processCompleted() bool {
}
t.resetSegmentCompacting()
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
log.Info("mixCompactionTask processCompleted done")
return true
@ -304,18 +286,10 @@ func (t *mixCompactionTask) SetNodeID(id UniqueID) error {
return t.updateAndSaveTaskMeta(setNodeID(id))
}
func (t *mixCompactionTask) GetSpan() trace.Span {
return t.span
}
func (t *mixCompactionTask) SetTask(task *datapb.CompactionTask) {
t.taskProto.Store(task)
}
func (t *mixCompactionTask) SetSpan(span trace.Span) {
t.span = span
}
func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
beginLogID, _, err := t.allocator.AllocN(1)
@ -362,5 +336,5 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er
}
func (t *mixCompactionTask) GetSlotUsage() int64 {
return t.slotUsage
return paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
}

View File

@ -75,7 +75,6 @@ func (s *CompactionPlanHandlerSuite) TestScheduleEmpty() {
func (s *CompactionPlanHandlerSuite) generateInitTasksForSchedule() {
task1 := &mixCompactionTask{
plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch-1", Type: datapb.CompactionType_MixCompaction},
sessions: s.mockSessMgr,
meta: s.mockMeta,
}
@ -88,7 +87,6 @@ func (s *CompactionPlanHandlerSuite) generateInitTasksForSchedule() {
})
task2 := &mixCompactionTask{
plan: &datapb.CompactionPlan{PlanID: 2, Channel: "ch-1", Type: datapb.CompactionType_MixCompaction},
sessions: s.mockSessMgr,
meta: s.mockMeta,
}
@ -101,7 +99,6 @@ func (s *CompactionPlanHandlerSuite) generateInitTasksForSchedule() {
})
task3 := &mixCompactionTask{
plan: &datapb.CompactionPlan{PlanID: 3, Channel: "ch-2", Type: datapb.CompactionType_MixCompaction},
sessions: s.mockSessMgr,
meta: s.mockMeta,
}
@ -114,7 +111,6 @@ func (s *CompactionPlanHandlerSuite) generateInitTasksForSchedule() {
})
task4 := &mixCompactionTask{
plan: &datapb.CompactionPlan{PlanID: 4, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction},
sessions: s.mockSessMgr,
meta: s.mockMeta,
}
@ -225,8 +221,8 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
s.SetupTest()
s.generateInitTasksForSchedule()
// submit the testing tasks
for i, t := range test.tasks {
t.SetPlan(test.plans[i])
for _, t := range test.tasks {
// t.SetPlan(test.plans[i])
s.handler.submitTask(t)
}
@ -300,8 +296,8 @@ func (s *CompactionPlanHandlerSuite) TestScheduleWithSlotLimit() {
}).Maybe()
s.generateInitTasksForSchedule()
// submit the testing tasks
for i, t := range test.tasks {
t.SetPlan(test.plans[i])
for _, t := range test.tasks {
// t.SetPlan(test.plans[i])
s.handler.submitTask(t)
}
assigner := newSlotBasedNodeAssigner(s.cluster)
@ -454,14 +450,13 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
assigner := newSlotBasedNodeAssigner(s.cluster)
assigner.slots = map[int64]int64{
100: 16,
101: 23,
100: 8,
101: 16,
}
task1 := newMixCompactionTask(&datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
}, nil, s.mockMeta, nil)
task1.slotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
ok := assigner.assign(task1)
s.Equal(true, ok)
s.Equal(int64(101), task1.GetTaskProto().GetNodeID())
@ -469,7 +464,6 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
task2 := newMixCompactionTask(&datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
}, nil, s.mockMeta, nil)
task2.slotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
ok = assigner.assign(task2)
s.Equal(true, ok)
s.Equal(int64(100), task2.GetTaskProto().GetNodeID())
@ -477,7 +471,6 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
task3 := newMixCompactionTask(&datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
}, nil, s.mockMeta, nil)
task3.slotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
ok = assigner.assign(task3)
s.Equal(true, ok)
@ -487,22 +480,6 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
s.Equal(false, ok)
}
func (s *CompactionPlanHandlerSuite) TestPickAnyNodeSlotUsageShouldNotBeZero() {
s.SetupTest()
nodeSlots := map[int64]int64{
100: 16,
101: 23,
}
task1 := newMixCompactionTask(&datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
}, nil, s.mockMeta, nil)
task1.slotUsage = 0
assigner := newSlotBasedNodeAssigner(s.cluster)
assigner.slots = nodeSlots
ok := assigner.assign(task1)
s.Equal(false, ok)
}
func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() {
s.SetupTest()
nodeSlots := map[int64]int64{
@ -515,12 +492,10 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() {
task1 := newClusteringCompactionTask(&datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
}, nil, s.mockMeta, nil, nil, nil)
task1.slotUsage = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64()
task2 := newClusteringCompactionTask(&datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
}, nil, s.mockMeta, nil, nil, nil)
task2.slotUsage = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64()
executingTasks[1] = task1
executingTasks[2] = task2
@ -546,11 +521,6 @@ func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() {
Channel: ch,
NodeID: 1,
}, nil, s.mockMeta, s.mockSessMgr)
t1.plan = &datapb.CompactionPlan{
PlanID: 19530,
Channel: ch,
Type: datapb.CompactionType_MixCompaction,
}
t2 := newMixCompactionTask(&datapb.CompactionTask{
PlanID: 19531,
@ -558,11 +528,6 @@ func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() {
Channel: ch,
NodeID: 1,
}, nil, s.mockMeta, s.mockSessMgr)
t2.plan = &datapb.CompactionPlan{
PlanID: 19531,
Channel: ch,
Type: datapb.CompactionType_MixCompaction,
}
s.handler.submitTask(t1)
s.handler.restoreTask(t2)
@ -579,11 +544,6 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
Channel: "ch-01",
State: datapb.CompactionTaskState_executing,
}, nil, s.mockMeta, s.mockSessMgr)
t1.plan = &datapb.CompactionPlan{
PlanID: 1,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-01",
}
t2 := newMixCompactionTask(&datapb.CompactionTask{
TriggerID: 1,
@ -592,11 +552,6 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
Channel: "ch-01",
State: datapb.CompactionTaskState_completed,
}, nil, s.mockMeta, s.mockSessMgr)
t2.plan = &datapb.CompactionPlan{
PlanID: 2,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-01",
}
t3 := newL0CompactionTask(&datapb.CompactionTask{
TriggerID: 1,
@ -605,11 +560,6 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
Channel: "ch-02",
State: datapb.CompactionTaskState_failed,
}, nil, s.mockMeta, s.mockSessMgr)
t3.plan = &datapb.CompactionPlan{
PlanID: 3,
Type: datapb.CompactionType_Level0DeleteCompaction,
Channel: "ch-02",
}
inTasks := map[int64]CompactionTask{
1: t1,
@ -654,11 +604,6 @@ func (s *CompactionPlanHandlerSuite) TestCompactionQueueFull() {
Channel: "ch-01",
State: datapb.CompactionTaskState_executing,
}, nil, s.mockMeta, s.mockSessMgr)
t1.plan = &datapb.CompactionPlan{
PlanID: 1,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-01",
}
s.NoError(s.handler.submitTask(t1))
@ -669,11 +614,6 @@ func (s *CompactionPlanHandlerSuite) TestCompactionQueueFull() {
Channel: "ch-01",
State: datapb.CompactionTaskState_completed,
}, nil, s.mockMeta, s.mockSessMgr)
t2.plan = &datapb.CompactionPlan{
PlanID: 2,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-01",
}
s.Error(s.handler.submitTask(t2))
}
@ -730,11 +670,6 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
State: datapb.CompactionTaskState_executing,
NodeID: 111,
}, nil, s.mockMeta, s.mockSessMgr)
t1.plan = &datapb.CompactionPlan{
PlanID: 1, Channel: "ch-1",
TimeoutInSeconds: 1,
Type: datapb.CompactionType_MixCompaction,
}
t2 := newMixCompactionTask(&datapb.CompactionTask{
PlanID: 2,
@ -743,11 +678,6 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
State: datapb.CompactionTaskState_executing,
NodeID: 111,
}, nil, s.mockMeta, s.mockSessMgr)
t2.plan = &datapb.CompactionPlan{
PlanID: 2,
Channel: "ch-1",
Type: datapb.CompactionType_MixCompaction,
}
t3 := newMixCompactionTask(&datapb.CompactionTask{
PlanID: 3,
@ -756,11 +686,6 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
State: datapb.CompactionTaskState_timeout,
NodeID: 111,
}, nil, s.mockMeta, s.mockSessMgr)
t3.plan = &datapb.CompactionPlan{
PlanID: 3,
Channel: "ch-1",
Type: datapb.CompactionType_MixCompaction,
}
t4 := newMixCompactionTask(&datapb.CompactionTask{
PlanID: 4,
@ -769,11 +694,6 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
State: datapb.CompactionTaskState_timeout,
NodeID: 111,
}, nil, s.mockMeta, s.mockSessMgr)
t4.plan = &datapb.CompactionPlan{
PlanID: 4,
Channel: "ch-1",
Type: datapb.CompactionType_MixCompaction,
}
t6 := newMixCompactionTask(&datapb.CompactionTask{
PlanID: 6,
@ -782,11 +702,6 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
State: datapb.CompactionTaskState_executing,
NodeID: 111,
}, nil, s.mockMeta, s.mockSessMgr)
t6.plan = &datapb.CompactionPlan{
PlanID: 6,
Channel: "ch-2",
Type: datapb.CompactionType_MixCompaction,
}
inTasks := map[int64]CompactionTask{
1: t1,