From d18c49013b841ba3e51e0daadb342c4e7fb11377 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Tue, 25 Jun 2024 10:08:03 +0800 Subject: [PATCH] enhance: Refine compaction (#33982) issue : https://github.com/milvus-io/milvus/issues/32939 Signed-off-by: zhenshan.cao --- .golangci.yml | 3 + go.mod | 2 +- internal/datacoord/compaction.go | 90 +++--- .../datacoord/compaction_task_clustering.go | 12 +- internal/datacoord/compaction_task_l0.go | 12 +- internal/datacoord/compaction_task_mix.go | 36 ++- internal/datacoord/compaction_test.go | 23 +- internal/datacoord/meta.go | 259 +++++++++--------- internal/datacoord/meta_test.go | 91 +----- internal/datacoord/mock_compaction_meta.go | 58 ++-- internal/mocks/mock_datanode.go | 90 +++--- internal/mocks/mock_datanode_client.go | 120 ++++---- 12 files changed, 376 insertions(+), 420 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 40320aef71..3e5f75d7dd 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -8,6 +8,9 @@ run: - scripts - internal/core - cmake_build + - mmap + - data + - ci skip-files: - partial_search_test.go diff --git a/go.mod b/go.mod index 166f21a012..97570a30fc 100644 --- a/go.mod +++ b/go.mod @@ -70,6 +70,7 @@ require ( github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000 github.com/pkg/errors v0.9.1 github.com/remeh/sizedwaitgroup v1.0.0 + github.com/valyala/fastjson v1.6.4 github.com/zeebo/xxh3 v1.0.2 google.golang.org/protobuf v1.33.0 gopkg.in/yaml.v3 v3.0.1 @@ -210,7 +211,6 @@ require ( github.com/twmb/murmur3 v1.1.3 // indirect github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect github.com/ugorji/go/codec v1.2.11 // indirect - github.com/valyala/fastjson v1.6.4 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index d2174da0d6..145849bec3 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -274,8 +274,37 @@ func (c *compactionPlanHandler) loadMeta() { triggers := c.meta.(*meta).compactionTaskMeta.GetCompactionTasks() for _, tasks := range triggers { for _, task := range tasks { - if task.State != datapb.CompactionTaskState_completed && task.State != datapb.CompactionTaskState_cleaned { - c.enqueueCompaction(task) + state := task.GetState() + if state == datapb.CompactionTaskState_completed || + state == datapb.CompactionTaskState_cleaned || + state == datapb.CompactionTaskState_unknown { + log.Info("compactionPlanHandler loadMeta abandon compactionTask", + zap.Int64("planID", task.GetPlanID()), + zap.String("State", task.GetState().String())) + continue + } else { + t, err := c.createCompactTask(task) + if err != nil { + log.Warn("compactionPlanHandler loadMeta create compactionTask failed", + zap.Int64("planID", task.GetPlanID()), + zap.String("State", task.GetState().String())) + continue + } + if t.NeedReAssignNodeID() { + c.submitTask(t) + log.Info("compactionPlanHandler loadMeta submitTask", + zap.Int64("planID", t.GetPlanID()), + zap.Int64("triggerID", t.GetTriggerID()), + zap.Int64("collectionID", t.GetCollectionID()), + zap.String("state", t.GetState().String())) + } else { + c.restoreTask(t) + log.Info("compactionPlanHandler loadMeta restoreTask", + zap.Int64("planID", t.GetPlanID()), + zap.Int64("triggerID", t.GetTriggerID()), + zap.Int64("collectionID", t.GetCollectionID()), + zap.String("state", t.GetState().String())) + } } } } @@ -466,6 +495,8 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) { } func (c *compactionPlanHandler) submitTask(t CompactionTask) { + _, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType())) + t.SetSpan(span) c.mu.Lock() c.queueTasks[t.GetPlanID()] = t c.mu.Unlock() @@ -474,6 +505,8 @@ func (c *compactionPlanHandler) submitTask(t CompactionTask) { // restoreTask used to restore Task from etcd func (c *compactionPlanHandler) restoreTask(t CompactionTask) { + _, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType())) + t.SetSpan(span) c.executingMu.Lock() c.executingTasks[t.GetPlanID()] = t c.executingMu.Unlock() @@ -504,38 +537,23 @@ func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) e if c.isFull() { return errCompactionBusy } - // TODO change to set this on scheduling task - exist, succeed := c.checkAndSetSegmentsCompacting(task) - if !exist { - return merr.WrapErrIllegalCompactionPlan("segment not exist") + t, err := c.createCompactTask(task) + if err != nil { + return err } - if !succeed { - return merr.WrapErrCompactionPlanConflict("segment is compacting") + t.SetTask(t.ShadowClone(setStartTime(time.Now().Unix()))) + err = t.SaveTaskMeta() + if err != nil { + c.meta.SetSegmentsCompacting(t.GetInputSegments(), false) + return err } - - // TODO change to set this on scheduling task - t := c.createCompactTask(task) - if t == nil { - return merr.WrapErrIllegalCompactionPlan("illegal compaction type") - } - if task.StartTime != 0 { - t.SetTask(t.ShadowClone(setStartTime(time.Now().Unix()))) - err := t.SaveTaskMeta() - if err != nil { - return err - } - } - - _, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", task.GetType())) - t.SetSpan(span) - c.submitTask(t) log.Info("Compaction plan submitted") return nil } // set segments compacting, one segment can only participate one compactionTask -func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) CompactionTask { +func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (CompactionTask, error) { var task CompactionTask switch t.GetType() { case datapb.CompactionType_MixCompaction: @@ -558,19 +576,17 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) Comp handler: c.handler, analyzeScheduler: c.analyzeScheduler, } + default: + return nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type") } - return task -} - -// set segments compacting, one segment can only participate one compactionTask -func (c *compactionPlanHandler) setSegmentsCompacting(task CompactionTask, compacting bool) { - for _, segmentID := range task.GetInputSegments() { - c.meta.SetSegmentCompacting(segmentID, compacting) + exist, succeed := c.meta.CheckAndSetSegmentsCompacting(t.GetInputSegments()) + if !exist { + return nil, merr.WrapErrIllegalCompactionPlan("segment not exist") } -} - -func (c *compactionPlanHandler) checkAndSetSegmentsCompacting(task *datapb.CompactionTask) (bool, bool) { - return c.meta.CheckAndSetSegmentsCompacting(task.GetInputSegments()) + if !succeed { + return nil, merr.WrapErrCompactionPlanConflict("segment is compacting") + } + return task, nil } func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) { diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 063c4ead83..dad1ab9e60 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -212,7 +212,7 @@ func (t *clusteringCompactionTask) processExecuting() error { if errors.Is(err, merr.ErrNodeNotFound) { log.Warn("GetCompactionPlanResult fail", zap.Error(err)) // todo reassign node ID - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0)) + t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) return nil } return err @@ -232,7 +232,7 @@ func (t *clusteringCompactionTask) processExecuting() error { return segment.GetSegmentID() }) - _, metricMutation, err := t.meta.CompleteCompactionMutation(t.GetPlan(), t.result) + _, metricMutation, err := t.meta.CompleteCompactionMutation(t.CompactionTask, t.result) if err != nil { return err } @@ -336,9 +336,11 @@ func (t *clusteringCompactionTask) processAnalyzing() error { } func (t *clusteringCompactionTask) resetSegmentCompacting() { - for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() { - t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false) + var segmentIDs []UniqueID + for _, binLogs := range t.GetPlan().GetSegmentBinlogs() { + segmentIDs = append(segmentIDs, binLogs.GetSegmentID()) } + t.meta.SetSegmentsCompacting(segmentIDs, false) } func (t *clusteringCompactionTask) processFailedOrTimeout() error { @@ -414,7 +416,7 @@ func (t *clusteringCompactionTask) doCompact() error { err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan()) if err != nil { log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err)) - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0)) + t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) return err } t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index fdadfd1371..87b185db0e 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -74,7 +74,7 @@ func (t *l0CompactionTask) processPipelining() bool { err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan()) if err != nil { log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err)) - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0)) + t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) return false } t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) @@ -85,7 +85,7 @@ func (t *l0CompactionTask) processExecuting() bool { result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID()) if err != nil || result == nil { if errors.Is(err, merr.ErrNodeNotFound) { - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0)) + t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) } return false } @@ -186,7 +186,7 @@ func (t *l0CompactionTask) SetStartTime(startTime int64) { } func (t *l0CompactionTask) NeedReAssignNodeID() bool { - return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == 0 + return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == NullNodeID } func (t *l0CompactionTask) SetResult(result *datapb.CompactionPlanResult) { @@ -307,9 +307,11 @@ func (t *l0CompactionTask) processCompleted() bool { } func (t *l0CompactionTask) resetSegmentCompacting() { - for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() { - t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false) + var segmentIDs []UniqueID + for _, binLogs := range t.GetPlan().GetSegmentBinlogs() { + segmentIDs = append(segmentIDs, binLogs.GetSegmentID()) } + t.meta.SetSegmentsCompacting(segmentIDs, false) } func (t *l0CompactionTask) processTimeout() bool { diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index ab291cb89e..ef796cb1fa 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -32,6 +32,7 @@ func (t *mixCompactionTask) processPipelining() bool { } var err error t.plan, err = t.BuildCompactionRequest() + // Segment not found if err != nil { err2 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error())) return err2 == nil @@ -39,7 +40,7 @@ func (t *mixCompactionTask) processPipelining() bool { err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan()) if err != nil { log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err)) - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0)) + t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) return false } t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) @@ -59,7 +60,7 @@ func (t *mixCompactionTask) processExecuting() bool { result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID()) if err != nil || result == nil { if errors.Is(err, merr.ErrNodeNotFound) { - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0)) + t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) } return false } @@ -82,13 +83,20 @@ func (t *mixCompactionTask) processExecuting() bool { } return t.processFailed() } - saveSuccess := t.saveSegmentMeta() - if !saveSuccess { + err2 := t.saveSegmentMeta() + if err2 != nil { + if errors.Is(err2, merr.ErrIllegalCompactionPlan) { + err3 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) + if err3 != nil { + log.Warn("fail to updateAndSaveTaskMeta") + } + return true + } return false } segments := []UniqueID{t.newSegment.GetID()} - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setResultSegments(segments)) - if err == nil { + err3 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setResultSegments(segments)) + if err3 == nil { return t.processMetaSaved() } return false @@ -110,18 +118,18 @@ func (t *mixCompactionTask) SaveTaskMeta() error { return t.saveTaskMeta(t.CompactionTask) } -func (t *mixCompactionTask) saveSegmentMeta() bool { +func (t *mixCompactionTask) saveSegmentMeta() error { log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String())) // Also prepare metric updates. - newSegments, metricMutation, err := t.meta.CompleteCompactionMutation(t.GetPlan(), t.result) + newSegments, metricMutation, err := t.meta.CompleteCompactionMutation(t.CompactionTask, t.result) if err != nil { - return false + return err } // Apply metrics after successful meta update. t.newSegment = newSegments[0] metricMutation.commit() log.Info("mixCompactionTask success to save segment meta") - return true + return nil } func (t *mixCompactionTask) Process() bool { @@ -161,7 +169,7 @@ func (t *mixCompactionTask) GetLabel() string { } func (t *mixCompactionTask) NeedReAssignNodeID() bool { - return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == 0 + return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == NullNodeID } func (t *mixCompactionTask) processCompleted() bool { @@ -178,9 +186,11 @@ func (t *mixCompactionTask) processCompleted() bool { } func (t *mixCompactionTask) resetSegmentCompacting() { - for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() { - t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false) + var segmentIDs []UniqueID + for _, binLogs := range t.GetPlan().GetSegmentBinlogs() { + segmentIDs = append(segmentIDs, binLogs.GetSegmentID()) } + t.meta.SetSegmentsCompacting(segmentIDs, false) } func (t *mixCompactionTask) processTimeout() bool { diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 6162ab37da..ea5d41e58b 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -529,6 +529,7 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() { func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() { s.SetupTest() s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything).Return(true, true).Once() + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil) task := &datapb.CompactionTask{ @@ -569,6 +570,7 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() { }, nil).Once() s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return() inTasks := map[int64]CompactionTask{ 1: &mixCompactionTask{ @@ -658,11 +660,11 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() { // s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything).Return(nil) s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).RunAndReturn( - func(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) { - if plan.GetPlanID() == 2 { + func(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) { + if t.GetPlanID() == 2 { segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100}) return []*SegmentInfo{segment}, &segMetricMutation{}, nil - } else if plan.GetPlanID() == 6 { + } else if t.GetPlanID() == 6 { return nil, nil, errors.Errorf("intended error") } return nil, nil, errors.Errorf("unexpected error") @@ -706,7 +708,7 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() { // s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once() s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) - s.mockMeta.EXPECT().SetSegmentCompacting(mock.Anything, mock.Anything).Return().Twice() + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once() segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100}) s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return( []*SegmentInfo{segment}, @@ -749,13 +751,14 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() { task := &mixCompactionTask{ CompactionTask: &datapb.CompactionTask{ - PlanID: plan.GetPlanID(), - TriggerID: 1, - Type: plan.GetType(), - State: datapb.CompactionTaskState_executing, - NodeID: dataNodeID, + PlanID: plan.GetPlanID(), + TriggerID: 1, + Type: plan.GetType(), + State: datapb.CompactionTaskState_executing, + NodeID: dataNodeID, + InputSegments: []UniqueID{1, 2}, }, - plan: plan, + // plan: plan, sessions: s.mockSessMgr, meta: s.mockMeta, } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index c38a369cfb..ca0db3357c 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -56,9 +56,9 @@ type CompactionMeta interface { SelectSegments(filters ...SegmentFilter) []*SegmentInfo GetHealthySegment(segID UniqueID) *SegmentInfo UpdateSegmentsInfo(operators ...UpdateOperator) error - SetSegmentCompacting(segmentID int64, compacting bool) + SetSegmentsCompacting(segmentID []int64, compacting bool) CheckAndSetSegmentsCompacting(segmentIDs []int64) (bool, bool) - CompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) + CompleteCompactionMutation(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) CleanPartitionStatsInfo(info *datapb.PartitionStatsInfo) error SaveCompactionTask(task *datapb.CompactionTask) error @@ -1167,7 +1167,7 @@ func (m *meta) GetSegmentsIDOfCollection(collectionID UniqueID) []UniqueID { }) } -// GetSegmentsIDOfCollection returns all segment ids which collection equals to provided `collectionID` +// GetSegmentsIDOfCollectionWithDropped returns all dropped segment ids which collection equals to provided `collectionID` func (m *meta) GetSegmentsIDOfCollectionWithDropped(collectionID UniqueID) []UniqueID { segments := m.SelectSegments(WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool { return segment != nil && @@ -1192,7 +1192,7 @@ func (m *meta) GetSegmentsIDOfPartition(collectionID, partitionID UniqueID) []Un }) } -// GetSegmentsIDOfPartition returns all segments ids which collection & partition equals to provided `collectionID`, `partitionID` +// GetSegmentsIDOfPartitionWithDropped returns all dropped segments ids which collection & partition equals to provided `collectionID`, `partitionID` func (m *meta) GetSegmentsIDOfPartitionWithDropped(collectionID, partitionID UniqueID) []UniqueID { segments := m.SelectSegments(WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool { return segment.GetState() != commonpb.SegmentState_SegmentStateNone && @@ -1309,24 +1309,29 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) { // CheckAndSetSegmentsCompacting check all segments are not compacting // if true, set them compacting and return true // if false, skip setting and -func (m *meta) CheckAndSetSegmentsCompacting(segmentIDs []UniqueID) (exist, hasCompactingSegment bool) { +func (m *meta) CheckAndSetSegmentsCompacting(segmentIDs []UniqueID) (exist, canDo bool) { m.Lock() defer m.Unlock() + var hasCompacting bool + exist = true for _, segmentID := range segmentIDs { seg := m.segments.GetSegment(segmentID) if seg != nil { - hasCompactingSegment = seg.isCompacting + if seg.isCompacting { + hasCompacting = true + } } else { - return false, false + exist = false + break } } - if hasCompactingSegment { - return true, false + canDo = exist && !hasCompacting + if canDo { + for _, segmentID := range segmentIDs { + m.segments.SetIsCompacting(segmentID, true) + } } - for _, segmentID := range segmentIDs { - m.segments.SetIsCompacting(segmentID, true) - } - return true, true + return exist, canDo } func (m *meta) SetSegmentsCompacting(segmentIDs []UniqueID, compacting bool) { @@ -1345,19 +1350,16 @@ func (m *meta) SetSegmentLevel(segmentID UniqueID, level datapb.SegmentLevel) { m.segments.SetLevel(segmentID, level) } -func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) { - m.Lock() - defer m.Unlock() - - log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.String("type", plan.GetType().String())) +func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) { + log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String())) metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)} var compactFromSegIDs []int64 var latestCompactFromSegments []*SegmentInfo - for _, segmentBinlogs := range plan.GetSegmentBinlogs() { - segment := m.segments.GetSegment(segmentBinlogs.GetSegmentID()) + for _, segmentID := range t.GetInputSegments() { + segment := m.segments.GetSegment(segmentID) if segment == nil { - return nil, nil, merr.WrapErrSegmentNotFound(segmentBinlogs.GetSegmentID()) + return nil, nil, merr.WrapErrSegmentNotFound(segmentID) } cloned := segment.Clone() @@ -1371,13 +1373,100 @@ func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *d updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation) } - logIDsFromPlan := make(map[int64]struct{}) - for _, segBinlogs := range plan.GetSegmentBinlogs() { - for _, fieldBinlog := range segBinlogs.GetDeltalogs() { - for _, binlog := range fieldBinlog.GetBinlogs() { - logIDsFromPlan[binlog.GetLogID()] = struct{}{} + getMinPosition := func(positions []*msgpb.MsgPosition) *msgpb.MsgPosition { + var minPos *msgpb.MsgPosition + for _, pos := range positions { + if minPos == nil || + pos != nil && pos.GetTimestamp() < minPos.GetTimestamp() { + minPos = pos } } + return minPos + } + newSegments := make([]*SegmentInfo, 0) + for _, seg := range result.GetSegments() { + segmentInfo := &datapb.SegmentInfo{ + ID: seg.GetSegmentID(), + CollectionID: latestCompactFromSegments[0].CollectionID, + PartitionID: latestCompactFromSegments[0].PartitionID, + InsertChannel: t.GetChannel(), + NumOfRows: seg.NumOfRows, + State: commonpb.SegmentState_Flushed, + MaxRowNum: latestCompactFromSegments[0].MaxRowNum, + Binlogs: seg.GetInsertLogs(), + Statslogs: seg.GetField2StatslogPaths(), + CreatedByCompaction: true, + CompactionFrom: compactFromSegIDs, + LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(t.GetStartTime(), 0), 0), + Level: datapb.SegmentLevel_L2, + StartPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition { + return info.GetStartPosition() + })), + DmlPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition { + return info.GetDmlPosition() + })), + } + segment := NewSegmentInfo(segmentInfo) + newSegments = append(newSegments, segment) + metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows()) + } + compactionTo := make([]UniqueID, 0, len(newSegments)) + for _, s := range newSegments { + compactionTo = append(compactionTo, s.GetID()) + } + + log.Info("meta update: prepare for complete compaction mutation - complete", + zap.Int64("collectionID", latestCompactFromSegments[0].CollectionID), + zap.Int64("partitionID", latestCompactFromSegments[0].PartitionID), + zap.Any("compacted from", compactFromSegIDs), + zap.Any("compacted to", compactionTo)) + + compactFromInfos := lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo { + return info.SegmentInfo + }) + + newSegmentInfos := lo.Map(newSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo { + return info.SegmentInfo + }) + + binlogs := make([]metastore.BinlogsIncrement, 0) + for _, seg := range newSegmentInfos { + binlogs = append(binlogs, metastore.BinlogsIncrement{Segment: seg}) + } + if err := m.catalog.AlterSegments(m.ctx, append(compactFromInfos, newSegmentInfos...), binlogs...); err != nil { + log.Warn("fail to alter segments and new segment", zap.Error(err)) + return nil, nil, err + } + lo.ForEach(latestCompactFromSegments, func(info *SegmentInfo, _ int) { + m.segments.SetSegment(info.GetID(), info) + }) + lo.ForEach(newSegments, func(info *SegmentInfo, _ int) { + m.segments.SetSegment(info.GetID(), info) + }) + return newSegments, metricMutation, nil +} + +func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) { + log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String())) + + metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)} + var compactFromSegIDs []int64 + var latestCompactFromSegments []*SegmentInfo + for _, segmentID := range t.GetInputSegments() { + segment := m.segments.GetSegment(segmentID) + if segment == nil { + return nil, nil, merr.WrapErrSegmentNotFound(segmentID) + } + + cloned := segment.Clone() + cloned.DroppedAt = uint64(time.Now().UnixNano()) + cloned.Compacted = true + + latestCompactFromSegments = append(latestCompactFromSegments, cloned) + compactFromSegIDs = append(compactFromSegIDs, cloned.GetID()) + + // metrics mutation for compaction from segments + updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation) } getMinPosition := func(positions []*msgpb.MsgPosition) *msgpb.MsgPosition { @@ -1391,89 +1480,15 @@ func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *d return minPos } - if plan.GetType() == datapb.CompactionType_ClusteringCompaction { - newSegments := make([]*SegmentInfo, 0) - for _, seg := range result.GetSegments() { - segmentInfo := &datapb.SegmentInfo{ - ID: seg.GetSegmentID(), - CollectionID: latestCompactFromSegments[0].CollectionID, - PartitionID: latestCompactFromSegments[0].PartitionID, - InsertChannel: plan.GetChannel(), - NumOfRows: seg.NumOfRows, - State: commonpb.SegmentState_Flushed, - MaxRowNum: latestCompactFromSegments[0].MaxRowNum, - Binlogs: seg.GetInsertLogs(), - Statslogs: seg.GetField2StatslogPaths(), - CreatedByCompaction: true, - CompactionFrom: compactFromSegIDs, - LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(plan.GetStartTime(), 0), 0), - Level: datapb.SegmentLevel_L2, - StartPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition { - return info.GetStartPosition() - })), - DmlPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition { - return info.GetDmlPosition() - })), - } - segment := NewSegmentInfo(segmentInfo) - newSegments = append(newSegments, segment) - metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows()) - } - compactionTo := make([]UniqueID, 0, len(newSegments)) - for _, s := range newSegments { - compactionTo = append(compactionTo, s.GetID()) - } - - log.Info("meta update: prepare for complete compaction mutation - complete", - zap.Int64("collectionID", latestCompactFromSegments[0].CollectionID), - zap.Int64("partitionID", latestCompactFromSegments[0].PartitionID), - zap.Any("compacted from", compactFromSegIDs), - zap.Any("compacted to", compactionTo)) - - compactFromInfos := lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo { - return info.SegmentInfo - }) - - newSegmentInfos := lo.Map(newSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo { - return info.SegmentInfo - }) - - binlogs := make([]metastore.BinlogsIncrement, 0) - for _, seg := range newSegmentInfos { - binlogs = append(binlogs, metastore.BinlogsIncrement{Segment: seg}) - } - if err := m.catalog.AlterSegments(m.ctx, append(compactFromInfos, newSegmentInfos...), binlogs...); err != nil { - log.Warn("fail to alter segments and new segment", zap.Error(err)) - return nil, nil, err - } - lo.ForEach(latestCompactFromSegments, func(info *SegmentInfo, _ int) { - m.segments.SetSegment(info.GetID(), info) - }) - lo.ForEach(newSegments, func(info *SegmentInfo, _ int) { - m.segments.SetSegment(info.GetID(), info) - }) - return newSegments, metricMutation, nil - } - // MixCompaction / MergeCompaction will generates one and only one segment compactToSegment := result.GetSegments()[0] - // copy new deltalogs in compactFrom segments to compactTo segments. - // TODO: Not needed when enable L0 segments. - newDeltalogs, err := m.copyNewDeltalogs(latestCompactFromSegments, logIDsFromPlan, compactToSegment.GetSegmentID()) - if err != nil { - return nil, nil, err - } - if len(newDeltalogs) > 0 { - compactToSegment.Deltalogs = append(compactToSegment.GetDeltalogs(), &datapb.FieldBinlog{Binlogs: newDeltalogs}) - } - compactToSegmentInfo := NewSegmentInfo( &datapb.SegmentInfo{ ID: compactToSegment.GetSegmentID(), CollectionID: latestCompactFromSegments[0].CollectionID, PartitionID: latestCompactFromSegments[0].PartitionID, - InsertChannel: plan.GetChannel(), + InsertChannel: t.GetChannel(), NumOfRows: compactToSegment.NumOfRows, State: commonpb.SegmentState_Flushed, MaxRowNum: latestCompactFromSegments[0].MaxRowNum, @@ -1483,7 +1498,7 @@ func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *d CreatedByCompaction: true, CompactionFrom: compactFromSegIDs, - LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(plan.GetStartTime(), 0), 0), + LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(t.GetStartTime(), 0), 0), Level: datapb.SegmentLevel_L1, StartPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition { @@ -1503,7 +1518,7 @@ func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *d } log = log.With( - zap.String("channel", plan.GetChannel()), + zap.String("channel", t.GetChannel()), zap.Int64("partitionID", compactToSegmentInfo.GetPartitionID()), zap.Int64("compactTo segmentID", compactToSegmentInfo.GetID()), zap.Int64("compactTo segment numRows", compactToSegmentInfo.GetNumOfRows()), @@ -1536,36 +1551,16 @@ func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *d return []*SegmentInfo{compactToSegmentInfo}, metricMutation, nil } -func (m *meta) copyNewDeltalogs(latestCompactFromInfos []*SegmentInfo, logIDsInPlan map[int64]struct{}, toSegment int64) ([]*datapb.Binlog, error) { - newBinlogs := []*datapb.Binlog{} - for _, seg := range latestCompactFromInfos { - for _, fieldLog := range seg.GetDeltalogs() { - for _, l := range fieldLog.GetBinlogs() { - if _, ok := logIDsInPlan[l.GetLogID()]; !ok { - fromKey := metautil.BuildDeltaLogPath(m.chunkManager.RootPath(), seg.CollectionID, seg.PartitionID, seg.ID, l.GetLogID()) - toKey := metautil.BuildDeltaLogPath(m.chunkManager.RootPath(), seg.CollectionID, seg.PartitionID, toSegment, l.GetLogID()) - log.Warn("found new deltalog in compactFrom segment, copying it...", - zap.Any("deltalog", l), - zap.Int64("copyFrom segmentID", seg.GetID()), - zap.Int64("copyTo segmentID", toSegment), - zap.String("copyFrom key", fromKey), - zap.String("copyTo key", toKey), - ) - - blob, err := m.chunkManager.Read(m.ctx, fromKey) - if err != nil { - return nil, err - } - - if err := m.chunkManager.Write(m.ctx, toKey, blob); err != nil { - return nil, err - } - newBinlogs = append(newBinlogs, l) - } - } - } +func (m *meta) CompleteCompactionMutation(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) { + m.Lock() + defer m.Unlock() + switch t.GetType() { + case datapb.CompactionType_MixCompaction: + return m.completeMixCompactionMutation(t, result) + case datapb.CompactionType_ClusteringCompaction: + return m.completeClusterCompactionMutation(t, result) } - return newBinlogs, nil + return nil, nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type") } // buildSegment utility function for compose datapb.SegmentInfo struct with provided info diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index aa0a23d84a..b6191da3ef 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -209,33 +209,12 @@ func (suite *MetaBasicSuite) TestCompleteCompactionMutation() { } mockChMgr := mocks.NewChunkManager(suite.T()) - mockChMgr.EXPECT().RootPath().Return("mockroot").Times(4) - mockChMgr.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, nil).Twice() - mockChMgr.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice() - m := &meta{ catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, segments: latestSegments, chunkManager: mockChMgr, } - plan := &datapb.CompactionPlan{ - SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ - { - SegmentID: 1, - FieldBinlogs: m.GetSegment(1).GetBinlogs(), - Field2StatslogPaths: m.GetSegment(1).GetStatslogs(), - Deltalogs: m.GetSegment(1).GetDeltalogs()[:1], // compaction plan use only 1 deltalog - }, - { - SegmentID: 2, - FieldBinlogs: m.GetSegment(2).GetBinlogs(), - Field2StatslogPaths: m.GetSegment(2).GetStatslogs(), - Deltalogs: m.GetSegment(2).GetDeltalogs()[:1], // compaction plan use only 1 deltalog - }, - }, - } - compactToSeg := &datapb.CompactionSegment{ SegmentID: 3, InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50000)}, @@ -246,8 +225,13 @@ func (suite *MetaBasicSuite) TestCompleteCompactionMutation() { result := &datapb.CompactionPlanResult{ Segments: []*datapb.CompactionSegment{compactToSeg}, } + task := &datapb.CompactionTask{ + InputSegments: []UniqueID{1, 2}, + Type: datapb.CompactionType_MixCompaction, + } - infos, mutation, err := m.CompleteCompactionMutation(plan, result) + infos, mutation, err := m.CompleteCompactionMutation(task, result) + assert.NoError(suite.T(), err) suite.Equal(1, len(infos)) info := infos[0] suite.NoError(err) @@ -275,16 +259,6 @@ func (suite *MetaBasicSuite) TestCompleteCompactionMutation() { } } - deltalogs := info.GetDeltalogs() - deltalogIDs := []int64{} - for _, fbinlog := range deltalogs { - for _, blog := range fbinlog.GetBinlogs() { - suite.Empty(blog.GetLogPath()) - deltalogIDs = append(deltalogIDs, blog.GetLogID()) - } - } - suite.ElementsMatch([]int64{30001, 31001}, deltalogIDs) - // check compactFrom segments for _, segID := range []int64{1, 2} { seg := m.GetSegment(segID) @@ -856,7 +830,7 @@ func TestUpdateSegmentsInfo(t *testing.T) { }) } -func Test_meta_SetSegmentCompacting(t *testing.T) { +func Test_meta_SetSegmentsCompacting(t *testing.T) { type fields struct { client kv.MetaKv segments *SegmentsInfo @@ -899,62 +873,13 @@ func Test_meta_SetSegmentCompacting(t *testing.T) { catalog: &datacoord.Catalog{MetaKv: tt.fields.client}, segments: tt.fields.segments, } - m.SetSegmentCompacting(tt.args.segmentID, tt.args.compacting) + m.SetSegmentsCompacting([]UniqueID{tt.args.segmentID}, tt.args.compacting) segment := m.GetHealthySegment(tt.args.segmentID) assert.Equal(t, tt.args.compacting, segment.isCompacting) }) } } -func Test_meta_SetSegmentImporting(t *testing.T) { - type fields struct { - client kv.MetaKv - segments *SegmentsInfo - } - type args struct { - segmentID UniqueID - importing bool - } - tests := []struct { - name string - fields fields - args args - }{ - { - "test set segment importing", - fields{ - NewMetaMemoryKV(), - &SegmentsInfo{ - segments: map[int64]*SegmentInfo{ - 1: { - SegmentInfo: &datapb.SegmentInfo{ - ID: 1, - State: commonpb.SegmentState_Flushed, - IsImporting: false, - }, - }, - }, - }, - }, - args{ - segmentID: 1, - importing: true, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - m := &meta{ - catalog: &datacoord.Catalog{MetaKv: tt.fields.client}, - segments: tt.fields.segments, - } - m.SetSegmentCompacting(tt.args.segmentID, tt.args.importing) - segment := m.GetHealthySegment(tt.args.segmentID) - assert.Equal(t, tt.args.importing, segment.isCompacting) - }) - } -} - func Test_meta_GetSegmentsOfCollection(t *testing.T) { storedSegments := NewSegmentsInfo() diff --git a/internal/datacoord/mock_compaction_meta.go b/internal/datacoord/mock_compaction_meta.go index e47624c3f6..ec90d4b216 100644 --- a/internal/datacoord/mock_compaction_meta.go +++ b/internal/datacoord/mock_compaction_meta.go @@ -114,34 +114,34 @@ func (_c *MockCompactionMeta_CleanPartitionStatsInfo_Call) RunAndReturn(run func return _c } -// CompleteCompactionMutation provides a mock function with given fields: plan, result -func (_m *MockCompactionMeta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) { - ret := _m.Called(plan, result) +// CompleteCompactionMutation provides a mock function with given fields: t, result +func (_m *MockCompactionMeta) CompleteCompactionMutation(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) { + ret := _m.Called(t, result) var r0 []*SegmentInfo var r1 *segMetricMutation var r2 error - if rf, ok := ret.Get(0).(func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error)); ok { - return rf(plan, result) + if rf, ok := ret.Get(0).(func(*datapb.CompactionTask, *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error)); ok { + return rf(t, result) } - if rf, ok := ret.Get(0).(func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) []*SegmentInfo); ok { - r0 = rf(plan, result) + if rf, ok := ret.Get(0).(func(*datapb.CompactionTask, *datapb.CompactionPlanResult) []*SegmentInfo); ok { + r0 = rf(t, result) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]*SegmentInfo) } } - if rf, ok := ret.Get(1).(func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) *segMetricMutation); ok { - r1 = rf(plan, result) + if rf, ok := ret.Get(1).(func(*datapb.CompactionTask, *datapb.CompactionPlanResult) *segMetricMutation); ok { + r1 = rf(t, result) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(*segMetricMutation) } } - if rf, ok := ret.Get(2).(func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) error); ok { - r2 = rf(plan, result) + if rf, ok := ret.Get(2).(func(*datapb.CompactionTask, *datapb.CompactionPlanResult) error); ok { + r2 = rf(t, result) } else { r2 = ret.Error(2) } @@ -155,15 +155,15 @@ type MockCompactionMeta_CompleteCompactionMutation_Call struct { } // CompleteCompactionMutation is a helper method to define mock.On call -// - plan *datapb.CompactionPlan +// - t *datapb.CompactionTask // - result *datapb.CompactionPlanResult -func (_e *MockCompactionMeta_Expecter) CompleteCompactionMutation(plan interface{}, result interface{}) *MockCompactionMeta_CompleteCompactionMutation_Call { - return &MockCompactionMeta_CompleteCompactionMutation_Call{Call: _e.mock.On("CompleteCompactionMutation", plan, result)} +func (_e *MockCompactionMeta_Expecter) CompleteCompactionMutation(t interface{}, result interface{}) *MockCompactionMeta_CompleteCompactionMutation_Call { + return &MockCompactionMeta_CompleteCompactionMutation_Call{Call: _e.mock.On("CompleteCompactionMutation", t, result)} } -func (_c *MockCompactionMeta_CompleteCompactionMutation_Call) Run(run func(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult)) *MockCompactionMeta_CompleteCompactionMutation_Call { +func (_c *MockCompactionMeta_CompleteCompactionMutation_Call) Run(run func(t *datapb.CompactionTask, result *datapb.CompactionPlanResult)) *MockCompactionMeta_CompleteCompactionMutation_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*datapb.CompactionPlan), args[1].(*datapb.CompactionPlanResult)) + run(args[0].(*datapb.CompactionTask), args[1].(*datapb.CompactionPlanResult)) }) return _c } @@ -173,7 +173,7 @@ func (_c *MockCompactionMeta_CompleteCompactionMutation_Call) Return(_a0 []*Segm return _c } -func (_c *MockCompactionMeta_CompleteCompactionMutation_Call) RunAndReturn(run func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error)) *MockCompactionMeta_CompleteCompactionMutation_Call { +func (_c *MockCompactionMeta_CompleteCompactionMutation_Call) RunAndReturn(run func(*datapb.CompactionTask, *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error)) *MockCompactionMeta_CompleteCompactionMutation_Call { _c.Call.Return(run) return _c } @@ -666,36 +666,36 @@ func (_c *MockCompactionMeta_SelectSegments_Call) RunAndReturn(run func(...Segme return _c } -// SetSegmentCompacting provides a mock function with given fields: segmentID, compacting -func (_m *MockCompactionMeta) SetSegmentCompacting(segmentID int64, compacting bool) { +// SetSegmentsCompacting provides a mock function with given fields: segmentID, compacting +func (_m *MockCompactionMeta) SetSegmentsCompacting(segmentID []int64, compacting bool) { _m.Called(segmentID, compacting) } -// MockCompactionMeta_SetSegmentCompacting_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetSegmentCompacting' -type MockCompactionMeta_SetSegmentCompacting_Call struct { +// MockCompactionMeta_SetSegmentsCompacting_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetSegmentsCompacting' +type MockCompactionMeta_SetSegmentsCompacting_Call struct { *mock.Call } -// SetSegmentCompacting is a helper method to define mock.On call -// - segmentID int64 +// SetSegmentsCompacting is a helper method to define mock.On call +// - segmentID []int64 // - compacting bool -func (_e *MockCompactionMeta_Expecter) SetSegmentCompacting(segmentID interface{}, compacting interface{}) *MockCompactionMeta_SetSegmentCompacting_Call { - return &MockCompactionMeta_SetSegmentCompacting_Call{Call: _e.mock.On("SetSegmentCompacting", segmentID, compacting)} +func (_e *MockCompactionMeta_Expecter) SetSegmentsCompacting(segmentID interface{}, compacting interface{}) *MockCompactionMeta_SetSegmentsCompacting_Call { + return &MockCompactionMeta_SetSegmentsCompacting_Call{Call: _e.mock.On("SetSegmentsCompacting", segmentID, compacting)} } -func (_c *MockCompactionMeta_SetSegmentCompacting_Call) Run(run func(segmentID int64, compacting bool)) *MockCompactionMeta_SetSegmentCompacting_Call { +func (_c *MockCompactionMeta_SetSegmentsCompacting_Call) Run(run func(segmentID []int64, compacting bool)) *MockCompactionMeta_SetSegmentsCompacting_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64), args[1].(bool)) + run(args[0].([]int64), args[1].(bool)) }) return _c } -func (_c *MockCompactionMeta_SetSegmentCompacting_Call) Return() *MockCompactionMeta_SetSegmentCompacting_Call { +func (_c *MockCompactionMeta_SetSegmentsCompacting_Call) Return() *MockCompactionMeta_SetSegmentsCompacting_Call { _c.Call.Return() return _c } -func (_c *MockCompactionMeta_SetSegmentCompacting_Call) RunAndReturn(run func(int64, bool)) *MockCompactionMeta_SetSegmentCompacting_Call { +func (_c *MockCompactionMeta_SetSegmentsCompacting_Call) RunAndReturn(run func([]int64, bool)) *MockCompactionMeta_SetSegmentsCompacting_Call { _c.Call.Return(run) return _c } diff --git a/internal/mocks/mock_datanode.go b/internal/mocks/mock_datanode.go index 6ba06b0009..190da75be0 100644 --- a/internal/mocks/mock_datanode.go +++ b/internal/mocks/mock_datanode.go @@ -64,8 +64,8 @@ type MockDataNode_CheckChannelOperationProgress_Call struct { } // CheckChannelOperationProgress is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.ChannelWatchInfo +// - _a0 context.Context +// - _a1 *datapb.ChannelWatchInfo func (_e *MockDataNode_Expecter) CheckChannelOperationProgress(_a0 interface{}, _a1 interface{}) *MockDataNode_CheckChannelOperationProgress_Call { return &MockDataNode_CheckChannelOperationProgress_Call{Call: _e.mock.On("CheckChannelOperationProgress", _a0, _a1)} } @@ -119,8 +119,8 @@ type MockDataNode_CompactionV2_Call struct { } // CompactionV2 is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.CompactionPlan +// - _a0 context.Context +// - _a1 *datapb.CompactionPlan func (_e *MockDataNode_Expecter) CompactionV2(_a0 interface{}, _a1 interface{}) *MockDataNode_CompactionV2_Call { return &MockDataNode_CompactionV2_Call{Call: _e.mock.On("CompactionV2", _a0, _a1)} } @@ -174,8 +174,8 @@ type MockDataNode_DropCompactionPlan_Call struct { } // DropCompactionPlan is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.DropCompactionPlanRequest +// - _a0 context.Context +// - _a1 *datapb.DropCompactionPlanRequest func (_e *MockDataNode_Expecter) DropCompactionPlan(_a0 interface{}, _a1 interface{}) *MockDataNode_DropCompactionPlan_Call { return &MockDataNode_DropCompactionPlan_Call{Call: _e.mock.On("DropCompactionPlan", _a0, _a1)} } @@ -229,8 +229,8 @@ type MockDataNode_DropImport_Call struct { } // DropImport is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.DropImportRequest +// - _a0 context.Context +// - _a1 *datapb.DropImportRequest func (_e *MockDataNode_Expecter) DropImport(_a0 interface{}, _a1 interface{}) *MockDataNode_DropImport_Call { return &MockDataNode_DropImport_Call{Call: _e.mock.On("DropImport", _a0, _a1)} } @@ -284,8 +284,8 @@ type MockDataNode_FlushChannels_Call struct { } // FlushChannels is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.FlushChannelsRequest +// - _a0 context.Context +// - _a1 *datapb.FlushChannelsRequest func (_e *MockDataNode_Expecter) FlushChannels(_a0 interface{}, _a1 interface{}) *MockDataNode_FlushChannels_Call { return &MockDataNode_FlushChannels_Call{Call: _e.mock.On("FlushChannels", _a0, _a1)} } @@ -339,8 +339,8 @@ type MockDataNode_FlushSegments_Call struct { } // FlushSegments is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.FlushSegmentsRequest +// - _a0 context.Context +// - _a1 *datapb.FlushSegmentsRequest func (_e *MockDataNode_Expecter) FlushSegments(_a0 interface{}, _a1 interface{}) *MockDataNode_FlushSegments_Call { return &MockDataNode_FlushSegments_Call{Call: _e.mock.On("FlushSegments", _a0, _a1)} } @@ -435,8 +435,8 @@ type MockDataNode_GetCompactionState_Call struct { } // GetCompactionState is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.CompactionStateRequest +// - _a0 context.Context +// - _a1 *datapb.CompactionStateRequest func (_e *MockDataNode_Expecter) GetCompactionState(_a0 interface{}, _a1 interface{}) *MockDataNode_GetCompactionState_Call { return &MockDataNode_GetCompactionState_Call{Call: _e.mock.On("GetCompactionState", _a0, _a1)} } @@ -490,8 +490,8 @@ type MockDataNode_GetComponentStates_Call struct { } // GetComponentStates is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *milvuspb.GetComponentStatesRequest +// - _a0 context.Context +// - _a1 *milvuspb.GetComponentStatesRequest func (_e *MockDataNode_Expecter) GetComponentStates(_a0 interface{}, _a1 interface{}) *MockDataNode_GetComponentStates_Call { return &MockDataNode_GetComponentStates_Call{Call: _e.mock.On("GetComponentStates", _a0, _a1)} } @@ -545,8 +545,8 @@ type MockDataNode_GetMetrics_Call struct { } // GetMetrics is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *milvuspb.GetMetricsRequest +// - _a0 context.Context +// - _a1 *milvuspb.GetMetricsRequest func (_e *MockDataNode_Expecter) GetMetrics(_a0 interface{}, _a1 interface{}) *MockDataNode_GetMetrics_Call { return &MockDataNode_GetMetrics_Call{Call: _e.mock.On("GetMetrics", _a0, _a1)} } @@ -682,8 +682,8 @@ type MockDataNode_GetStatisticsChannel_Call struct { } // GetStatisticsChannel is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *internalpb.GetStatisticsChannelRequest +// - _a0 context.Context +// - _a1 *internalpb.GetStatisticsChannelRequest func (_e *MockDataNode_Expecter) GetStatisticsChannel(_a0 interface{}, _a1 interface{}) *MockDataNode_GetStatisticsChannel_Call { return &MockDataNode_GetStatisticsChannel_Call{Call: _e.mock.On("GetStatisticsChannel", _a0, _a1)} } @@ -737,8 +737,8 @@ type MockDataNode_ImportV2_Call struct { } // ImportV2 is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.ImportRequest +// - _a0 context.Context +// - _a1 *datapb.ImportRequest func (_e *MockDataNode_Expecter) ImportV2(_a0 interface{}, _a1 interface{}) *MockDataNode_ImportV2_Call { return &MockDataNode_ImportV2_Call{Call: _e.mock.On("ImportV2", _a0, _a1)} } @@ -833,8 +833,8 @@ type MockDataNode_NotifyChannelOperation_Call struct { } // NotifyChannelOperation is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.ChannelOperationsRequest +// - _a0 context.Context +// - _a1 *datapb.ChannelOperationsRequest func (_e *MockDataNode_Expecter) NotifyChannelOperation(_a0 interface{}, _a1 interface{}) *MockDataNode_NotifyChannelOperation_Call { return &MockDataNode_NotifyChannelOperation_Call{Call: _e.mock.On("NotifyChannelOperation", _a0, _a1)} } @@ -888,8 +888,8 @@ type MockDataNode_PreImport_Call struct { } // PreImport is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.PreImportRequest +// - _a0 context.Context +// - _a1 *datapb.PreImportRequest func (_e *MockDataNode_Expecter) PreImport(_a0 interface{}, _a1 interface{}) *MockDataNode_PreImport_Call { return &MockDataNode_PreImport_Call{Call: _e.mock.On("PreImport", _a0, _a1)} } @@ -943,8 +943,8 @@ type MockDataNode_QueryImport_Call struct { } // QueryImport is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.QueryImportRequest +// - _a0 context.Context +// - _a1 *datapb.QueryImportRequest func (_e *MockDataNode_Expecter) QueryImport(_a0 interface{}, _a1 interface{}) *MockDataNode_QueryImport_Call { return &MockDataNode_QueryImport_Call{Call: _e.mock.On("QueryImport", _a0, _a1)} } @@ -998,8 +998,8 @@ type MockDataNode_QueryPreImport_Call struct { } // QueryPreImport is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.QueryPreImportRequest +// - _a0 context.Context +// - _a1 *datapb.QueryPreImportRequest func (_e *MockDataNode_Expecter) QueryPreImport(_a0 interface{}, _a1 interface{}) *MockDataNode_QueryPreImport_Call { return &MockDataNode_QueryPreImport_Call{Call: _e.mock.On("QueryPreImport", _a0, _a1)} } @@ -1053,8 +1053,8 @@ type MockDataNode_QuerySlot_Call struct { } // QuerySlot is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.QuerySlotRequest +// - _a0 context.Context +// - _a1 *datapb.QuerySlotRequest func (_e *MockDataNode_Expecter) QuerySlot(_a0 interface{}, _a1 interface{}) *MockDataNode_QuerySlot_Call { return &MockDataNode_QuerySlot_Call{Call: _e.mock.On("QuerySlot", _a0, _a1)} } @@ -1149,8 +1149,8 @@ type MockDataNode_ResendSegmentStats_Call struct { } // ResendSegmentStats is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.ResendSegmentStatsRequest +// - _a0 context.Context +// - _a1 *datapb.ResendSegmentStatsRequest func (_e *MockDataNode_Expecter) ResendSegmentStats(_a0 interface{}, _a1 interface{}) *MockDataNode_ResendSegmentStats_Call { return &MockDataNode_ResendSegmentStats_Call{Call: _e.mock.On("ResendSegmentStats", _a0, _a1)} } @@ -1183,7 +1183,7 @@ type MockDataNode_SetAddress_Call struct { } // SetAddress is a helper method to define mock.On call -// - address string +// - address string func (_e *MockDataNode_Expecter) SetAddress(address interface{}) *MockDataNode_SetAddress_Call { return &MockDataNode_SetAddress_Call{Call: _e.mock.On("SetAddress", address)} } @@ -1225,7 +1225,7 @@ type MockDataNode_SetDataCoordClient_Call struct { } // SetDataCoordClient is a helper method to define mock.On call -// - dataCoord types.DataCoordClient +// - dataCoord types.DataCoordClient func (_e *MockDataNode_Expecter) SetDataCoordClient(dataCoord interface{}) *MockDataNode_SetDataCoordClient_Call { return &MockDataNode_SetDataCoordClient_Call{Call: _e.mock.On("SetDataCoordClient", dataCoord)} } @@ -1258,7 +1258,7 @@ type MockDataNode_SetEtcdClient_Call struct { } // SetEtcdClient is a helper method to define mock.On call -// - etcdClient *clientv3.Client +// - etcdClient *clientv3.Client func (_e *MockDataNode_Expecter) SetEtcdClient(etcdClient interface{}) *MockDataNode_SetEtcdClient_Call { return &MockDataNode_SetEtcdClient_Call{Call: _e.mock.On("SetEtcdClient", etcdClient)} } @@ -1300,7 +1300,7 @@ type MockDataNode_SetRootCoordClient_Call struct { } // SetRootCoordClient is a helper method to define mock.On call -// - rootCoord types.RootCoordClient +// - rootCoord types.RootCoordClient func (_e *MockDataNode_Expecter) SetRootCoordClient(rootCoord interface{}) *MockDataNode_SetRootCoordClient_Call { return &MockDataNode_SetRootCoordClient_Call{Call: _e.mock.On("SetRootCoordClient", rootCoord)} } @@ -1354,8 +1354,8 @@ type MockDataNode_ShowConfigurations_Call struct { } // ShowConfigurations is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *internalpb.ShowConfigurationsRequest +// - _a0 context.Context +// - _a1 *internalpb.ShowConfigurationsRequest func (_e *MockDataNode_Expecter) ShowConfigurations(_a0 interface{}, _a1 interface{}) *MockDataNode_ShowConfigurations_Call { return &MockDataNode_ShowConfigurations_Call{Call: _e.mock.On("ShowConfigurations", _a0, _a1)} } @@ -1491,8 +1491,8 @@ type MockDataNode_SyncSegments_Call struct { } // SyncSegments is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.SyncSegmentsRequest +// - _a0 context.Context +// - _a1 *datapb.SyncSegmentsRequest func (_e *MockDataNode_Expecter) SyncSegments(_a0 interface{}, _a1 interface{}) *MockDataNode_SyncSegments_Call { return &MockDataNode_SyncSegments_Call{Call: _e.mock.On("SyncSegments", _a0, _a1)} } @@ -1525,7 +1525,7 @@ type MockDataNode_UpdateStateCode_Call struct { } // UpdateStateCode is a helper method to define mock.On call -// - stateCode commonpb.StateCode +// - stateCode commonpb.StateCode func (_e *MockDataNode_Expecter) UpdateStateCode(stateCode interface{}) *MockDataNode_UpdateStateCode_Call { return &MockDataNode_UpdateStateCode_Call{Call: _e.mock.On("UpdateStateCode", stateCode)} } @@ -1579,8 +1579,8 @@ type MockDataNode_WatchDmChannels_Call struct { } // WatchDmChannels is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.WatchDmChannelsRequest +// - _a0 context.Context +// - _a1 *datapb.WatchDmChannelsRequest func (_e *MockDataNode_Expecter) WatchDmChannels(_a0 interface{}, _a1 interface{}) *MockDataNode_WatchDmChannels_Call { return &MockDataNode_WatchDmChannels_Call{Call: _e.mock.On("WatchDmChannels", _a0, _a1)} } diff --git a/internal/mocks/mock_datanode_client.go b/internal/mocks/mock_datanode_client.go index 0b4f876803..91661051c3 100644 --- a/internal/mocks/mock_datanode_client.go +++ b/internal/mocks/mock_datanode_client.go @@ -70,9 +70,9 @@ type MockDataNodeClient_CheckChannelOperationProgress_Call struct { } // CheckChannelOperationProgress is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.ChannelWatchInfo -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.ChannelWatchInfo +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) CheckChannelOperationProgress(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_CheckChannelOperationProgress_Call { return &MockDataNodeClient_CheckChannelOperationProgress_Call{Call: _e.mock.On("CheckChannelOperationProgress", append([]interface{}{ctx, in}, opts...)...)} @@ -181,9 +181,9 @@ type MockDataNodeClient_CompactionV2_Call struct { } // CompactionV2 is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.CompactionPlan -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.CompactionPlan +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) CompactionV2(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_CompactionV2_Call { return &MockDataNodeClient_CompactionV2_Call{Call: _e.mock.On("CompactionV2", append([]interface{}{ctx, in}, opts...)...)} @@ -251,9 +251,9 @@ type MockDataNodeClient_DropCompactionPlan_Call struct { } // DropCompactionPlan is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.DropCompactionPlanRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.DropCompactionPlanRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) DropCompactionPlan(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_DropCompactionPlan_Call { return &MockDataNodeClient_DropCompactionPlan_Call{Call: _e.mock.On("DropCompactionPlan", append([]interface{}{ctx, in}, opts...)...)} @@ -321,9 +321,9 @@ type MockDataNodeClient_DropImport_Call struct { } // DropImport is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.DropImportRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.DropImportRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) DropImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_DropImport_Call { return &MockDataNodeClient_DropImport_Call{Call: _e.mock.On("DropImport", append([]interface{}{ctx, in}, opts...)...)} @@ -391,9 +391,9 @@ type MockDataNodeClient_FlushChannels_Call struct { } // FlushChannels is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.FlushChannelsRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.FlushChannelsRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) FlushChannels(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_FlushChannels_Call { return &MockDataNodeClient_FlushChannels_Call{Call: _e.mock.On("FlushChannels", append([]interface{}{ctx, in}, opts...)...)} @@ -461,9 +461,9 @@ type MockDataNodeClient_FlushSegments_Call struct { } // FlushSegments is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.FlushSegmentsRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.FlushSegmentsRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) FlushSegments(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_FlushSegments_Call { return &MockDataNodeClient_FlushSegments_Call{Call: _e.mock.On("FlushSegments", append([]interface{}{ctx, in}, opts...)...)} @@ -531,9 +531,9 @@ type MockDataNodeClient_GetCompactionState_Call struct { } // GetCompactionState is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.CompactionStateRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.CompactionStateRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) GetCompactionState(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetCompactionState_Call { return &MockDataNodeClient_GetCompactionState_Call{Call: _e.mock.On("GetCompactionState", append([]interface{}{ctx, in}, opts...)...)} @@ -601,9 +601,9 @@ type MockDataNodeClient_GetComponentStates_Call struct { } // GetComponentStates is a helper method to define mock.On call -// - ctx context.Context -// - in *milvuspb.GetComponentStatesRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *milvuspb.GetComponentStatesRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) GetComponentStates(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetComponentStates_Call { return &MockDataNodeClient_GetComponentStates_Call{Call: _e.mock.On("GetComponentStates", append([]interface{}{ctx, in}, opts...)...)} @@ -671,9 +671,9 @@ type MockDataNodeClient_GetMetrics_Call struct { } // GetMetrics is a helper method to define mock.On call -// - ctx context.Context -// - in *milvuspb.GetMetricsRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *milvuspb.GetMetricsRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) GetMetrics(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetMetrics_Call { return &MockDataNodeClient_GetMetrics_Call{Call: _e.mock.On("GetMetrics", append([]interface{}{ctx, in}, opts...)...)} @@ -741,9 +741,9 @@ type MockDataNodeClient_GetStatisticsChannel_Call struct { } // GetStatisticsChannel is a helper method to define mock.On call -// - ctx context.Context -// - in *internalpb.GetStatisticsChannelRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *internalpb.GetStatisticsChannelRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) GetStatisticsChannel(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetStatisticsChannel_Call { return &MockDataNodeClient_GetStatisticsChannel_Call{Call: _e.mock.On("GetStatisticsChannel", append([]interface{}{ctx, in}, opts...)...)} @@ -811,9 +811,9 @@ type MockDataNodeClient_ImportV2_Call struct { } // ImportV2 is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.ImportRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.ImportRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) ImportV2(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_ImportV2_Call { return &MockDataNodeClient_ImportV2_Call{Call: _e.mock.On("ImportV2", append([]interface{}{ctx, in}, opts...)...)} @@ -881,9 +881,9 @@ type MockDataNodeClient_NotifyChannelOperation_Call struct { } // NotifyChannelOperation is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.ChannelOperationsRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.ChannelOperationsRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) NotifyChannelOperation(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_NotifyChannelOperation_Call { return &MockDataNodeClient_NotifyChannelOperation_Call{Call: _e.mock.On("NotifyChannelOperation", append([]interface{}{ctx, in}, opts...)...)} @@ -951,9 +951,9 @@ type MockDataNodeClient_PreImport_Call struct { } // PreImport is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.PreImportRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.PreImportRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) PreImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_PreImport_Call { return &MockDataNodeClient_PreImport_Call{Call: _e.mock.On("PreImport", append([]interface{}{ctx, in}, opts...)...)} @@ -1021,9 +1021,9 @@ type MockDataNodeClient_QueryImport_Call struct { } // QueryImport is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.QueryImportRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.QueryImportRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) QueryImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_QueryImport_Call { return &MockDataNodeClient_QueryImport_Call{Call: _e.mock.On("QueryImport", append([]interface{}{ctx, in}, opts...)...)} @@ -1091,9 +1091,9 @@ type MockDataNodeClient_QueryPreImport_Call struct { } // QueryPreImport is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.QueryPreImportRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.QueryPreImportRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) QueryPreImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_QueryPreImport_Call { return &MockDataNodeClient_QueryPreImport_Call{Call: _e.mock.On("QueryPreImport", append([]interface{}{ctx, in}, opts...)...)} @@ -1161,9 +1161,9 @@ type MockDataNodeClient_QuerySlot_Call struct { } // QuerySlot is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.QuerySlotRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.QuerySlotRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) QuerySlot(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_QuerySlot_Call { return &MockDataNodeClient_QuerySlot_Call{Call: _e.mock.On("QuerySlot", append([]interface{}{ctx, in}, opts...)...)} @@ -1231,9 +1231,9 @@ type MockDataNodeClient_ResendSegmentStats_Call struct { } // ResendSegmentStats is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.ResendSegmentStatsRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.ResendSegmentStatsRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) ResendSegmentStats(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_ResendSegmentStats_Call { return &MockDataNodeClient_ResendSegmentStats_Call{Call: _e.mock.On("ResendSegmentStats", append([]interface{}{ctx, in}, opts...)...)} @@ -1301,9 +1301,9 @@ type MockDataNodeClient_ShowConfigurations_Call struct { } // ShowConfigurations is a helper method to define mock.On call -// - ctx context.Context -// - in *internalpb.ShowConfigurationsRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *internalpb.ShowConfigurationsRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) ShowConfigurations(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_ShowConfigurations_Call { return &MockDataNodeClient_ShowConfigurations_Call{Call: _e.mock.On("ShowConfigurations", append([]interface{}{ctx, in}, opts...)...)} @@ -1371,9 +1371,9 @@ type MockDataNodeClient_SyncSegments_Call struct { } // SyncSegments is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.SyncSegmentsRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.SyncSegmentsRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) SyncSegments(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_SyncSegments_Call { return &MockDataNodeClient_SyncSegments_Call{Call: _e.mock.On("SyncSegments", append([]interface{}{ctx, in}, opts...)...)} @@ -1441,9 +1441,9 @@ type MockDataNodeClient_WatchDmChannels_Call struct { } // WatchDmChannels is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.WatchDmChannelsRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.WatchDmChannelsRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) WatchDmChannels(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_WatchDmChannels_Call { return &MockDataNodeClient_WatchDmChannels_Call{Call: _e.mock.On("WatchDmChannels", append([]interface{}{ctx, in}, opts...)...)}