enhance: Refine compaction (#33982)

issue : https://github.com/milvus-io/milvus/issues/32939

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
zhenshan.cao 2024-06-25 10:08:03 +08:00 committed by GitHub
parent 506a915272
commit d18c49013b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 376 additions and 420 deletions

View File

@ -8,6 +8,9 @@ run:
- scripts - scripts
- internal/core - internal/core
- cmake_build - cmake_build
- mmap
- data
- ci
skip-files: skip-files:
- partial_search_test.go - partial_search_test.go

2
go.mod
View File

@ -70,6 +70,7 @@ require (
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000 github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/remeh/sizedwaitgroup v1.0.0 github.com/remeh/sizedwaitgroup v1.0.0
github.com/valyala/fastjson v1.6.4
github.com/zeebo/xxh3 v1.0.2 github.com/zeebo/xxh3 v1.0.2
google.golang.org/protobuf v1.33.0 google.golang.org/protobuf v1.33.0
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
@ -210,7 +211,6 @@ require (
github.com/twmb/murmur3 v1.1.3 // indirect github.com/twmb/murmur3 v1.1.3 // indirect
github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect
github.com/ugorji/go/codec v1.2.11 // indirect github.com/ugorji/go/codec v1.2.11 // indirect
github.com/valyala/fastjson v1.6.4 // indirect
github.com/x448/float16 v0.8.4 // indirect github.com/x448/float16 v0.8.4 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect

View File

@ -274,8 +274,37 @@ func (c *compactionPlanHandler) loadMeta() {
triggers := c.meta.(*meta).compactionTaskMeta.GetCompactionTasks() triggers := c.meta.(*meta).compactionTaskMeta.GetCompactionTasks()
for _, tasks := range triggers { for _, tasks := range triggers {
for _, task := range tasks { for _, task := range tasks {
if task.State != datapb.CompactionTaskState_completed && task.State != datapb.CompactionTaskState_cleaned { state := task.GetState()
c.enqueueCompaction(task) if state == datapb.CompactionTaskState_completed ||
state == datapb.CompactionTaskState_cleaned ||
state == datapb.CompactionTaskState_unknown {
log.Info("compactionPlanHandler loadMeta abandon compactionTask",
zap.Int64("planID", task.GetPlanID()),
zap.String("State", task.GetState().String()))
continue
} else {
t, err := c.createCompactTask(task)
if err != nil {
log.Warn("compactionPlanHandler loadMeta create compactionTask failed",
zap.Int64("planID", task.GetPlanID()),
zap.String("State", task.GetState().String()))
continue
}
if t.NeedReAssignNodeID() {
c.submitTask(t)
log.Info("compactionPlanHandler loadMeta submitTask",
zap.Int64("planID", t.GetPlanID()),
zap.Int64("triggerID", t.GetTriggerID()),
zap.Int64("collectionID", t.GetCollectionID()),
zap.String("state", t.GetState().String()))
} else {
c.restoreTask(t)
log.Info("compactionPlanHandler loadMeta restoreTask",
zap.Int64("planID", t.GetPlanID()),
zap.Int64("triggerID", t.GetTriggerID()),
zap.Int64("collectionID", t.GetCollectionID()),
zap.String("state", t.GetState().String()))
}
} }
} }
} }
@ -466,6 +495,8 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
} }
func (c *compactionPlanHandler) submitTask(t CompactionTask) { func (c *compactionPlanHandler) submitTask(t CompactionTask) {
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType()))
t.SetSpan(span)
c.mu.Lock() c.mu.Lock()
c.queueTasks[t.GetPlanID()] = t c.queueTasks[t.GetPlanID()] = t
c.mu.Unlock() c.mu.Unlock()
@ -474,6 +505,8 @@ func (c *compactionPlanHandler) submitTask(t CompactionTask) {
// restoreTask used to restore Task from etcd // restoreTask used to restore Task from etcd
func (c *compactionPlanHandler) restoreTask(t CompactionTask) { func (c *compactionPlanHandler) restoreTask(t CompactionTask) {
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType()))
t.SetSpan(span)
c.executingMu.Lock() c.executingMu.Lock()
c.executingTasks[t.GetPlanID()] = t c.executingTasks[t.GetPlanID()] = t
c.executingMu.Unlock() c.executingMu.Unlock()
@ -504,38 +537,23 @@ func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) e
if c.isFull() { if c.isFull() {
return errCompactionBusy return errCompactionBusy
} }
// TODO change to set this on scheduling task t, err := c.createCompactTask(task)
exist, succeed := c.checkAndSetSegmentsCompacting(task) if err != nil {
if !exist { return err
return merr.WrapErrIllegalCompactionPlan("segment not exist")
} }
if !succeed { t.SetTask(t.ShadowClone(setStartTime(time.Now().Unix())))
return merr.WrapErrCompactionPlanConflict("segment is compacting") err = t.SaveTaskMeta()
if err != nil {
c.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
return err
} }
// TODO change to set this on scheduling task
t := c.createCompactTask(task)
if t == nil {
return merr.WrapErrIllegalCompactionPlan("illegal compaction type")
}
if task.StartTime != 0 {
t.SetTask(t.ShadowClone(setStartTime(time.Now().Unix())))
err := t.SaveTaskMeta()
if err != nil {
return err
}
}
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", task.GetType()))
t.SetSpan(span)
c.submitTask(t) c.submitTask(t)
log.Info("Compaction plan submitted") log.Info("Compaction plan submitted")
return nil return nil
} }
// set segments compacting, one segment can only participate one compactionTask // set segments compacting, one segment can only participate one compactionTask
func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) CompactionTask { func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (CompactionTask, error) {
var task CompactionTask var task CompactionTask
switch t.GetType() { switch t.GetType() {
case datapb.CompactionType_MixCompaction: case datapb.CompactionType_MixCompaction:
@ -558,19 +576,17 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) Comp
handler: c.handler, handler: c.handler,
analyzeScheduler: c.analyzeScheduler, analyzeScheduler: c.analyzeScheduler,
} }
default:
return nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type")
} }
return task exist, succeed := c.meta.CheckAndSetSegmentsCompacting(t.GetInputSegments())
} if !exist {
return nil, merr.WrapErrIllegalCompactionPlan("segment not exist")
// set segments compacting, one segment can only participate one compactionTask
func (c *compactionPlanHandler) setSegmentsCompacting(task CompactionTask, compacting bool) {
for _, segmentID := range task.GetInputSegments() {
c.meta.SetSegmentCompacting(segmentID, compacting)
} }
} if !succeed {
return nil, merr.WrapErrCompactionPlanConflict("segment is compacting")
func (c *compactionPlanHandler) checkAndSetSegmentsCompacting(task *datapb.CompactionTask) (bool, bool) { }
return c.meta.CheckAndSetSegmentsCompacting(task.GetInputSegments()) return task, nil
} }
func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) { func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) {

View File

@ -212,7 +212,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
if errors.Is(err, merr.ErrNodeNotFound) { if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("GetCompactionPlanResult fail", zap.Error(err)) log.Warn("GetCompactionPlanResult fail", zap.Error(err))
// todo reassign node ID // todo reassign node ID
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return nil return nil
} }
return err return err
@ -232,7 +232,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
return segment.GetSegmentID() return segment.GetSegmentID()
}) })
_, metricMutation, err := t.meta.CompleteCompactionMutation(t.GetPlan(), t.result) _, metricMutation, err := t.meta.CompleteCompactionMutation(t.CompactionTask, t.result)
if err != nil { if err != nil {
return err return err
} }
@ -336,9 +336,11 @@ func (t *clusteringCompactionTask) processAnalyzing() error {
} }
func (t *clusteringCompactionTask) resetSegmentCompacting() { func (t *clusteringCompactionTask) resetSegmentCompacting() {
for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() { var segmentIDs []UniqueID
t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false) for _, binLogs := range t.GetPlan().GetSegmentBinlogs() {
segmentIDs = append(segmentIDs, binLogs.GetSegmentID())
} }
t.meta.SetSegmentsCompacting(segmentIDs, false)
} }
func (t *clusteringCompactionTask) processFailedOrTimeout() error { func (t *clusteringCompactionTask) processFailedOrTimeout() error {
@ -414,7 +416,7 @@ func (t *clusteringCompactionTask) doCompact() error {
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan()) err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
if err != nil { if err != nil {
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err)) log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return err return err
} }
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))

View File

@ -74,7 +74,7 @@ func (t *l0CompactionTask) processPipelining() bool {
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan()) err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
if err != nil { if err != nil {
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err)) log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return false return false
} }
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
@ -85,7 +85,7 @@ func (t *l0CompactionTask) processExecuting() bool {
result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID()) result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID())
if err != nil || result == nil { if err != nil || result == nil {
if errors.Is(err, merr.ErrNodeNotFound) { if errors.Is(err, merr.ErrNodeNotFound) {
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
} }
return false return false
} }
@ -186,7 +186,7 @@ func (t *l0CompactionTask) SetStartTime(startTime int64) {
} }
func (t *l0CompactionTask) NeedReAssignNodeID() bool { func (t *l0CompactionTask) NeedReAssignNodeID() bool {
return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == 0 return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == NullNodeID
} }
func (t *l0CompactionTask) SetResult(result *datapb.CompactionPlanResult) { func (t *l0CompactionTask) SetResult(result *datapb.CompactionPlanResult) {
@ -307,9 +307,11 @@ func (t *l0CompactionTask) processCompleted() bool {
} }
func (t *l0CompactionTask) resetSegmentCompacting() { func (t *l0CompactionTask) resetSegmentCompacting() {
for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() { var segmentIDs []UniqueID
t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false) for _, binLogs := range t.GetPlan().GetSegmentBinlogs() {
segmentIDs = append(segmentIDs, binLogs.GetSegmentID())
} }
t.meta.SetSegmentsCompacting(segmentIDs, false)
} }
func (t *l0CompactionTask) processTimeout() bool { func (t *l0CompactionTask) processTimeout() bool {

View File

@ -32,6 +32,7 @@ func (t *mixCompactionTask) processPipelining() bool {
} }
var err error var err error
t.plan, err = t.BuildCompactionRequest() t.plan, err = t.BuildCompactionRequest()
// Segment not found
if err != nil { if err != nil {
err2 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error())) err2 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
return err2 == nil return err2 == nil
@ -39,7 +40,7 @@ func (t *mixCompactionTask) processPipelining() bool {
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan()) err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
if err != nil { if err != nil {
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err)) log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return false return false
} }
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
@ -59,7 +60,7 @@ func (t *mixCompactionTask) processExecuting() bool {
result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID()) result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID())
if err != nil || result == nil { if err != nil || result == nil {
if errors.Is(err, merr.ErrNodeNotFound) { if errors.Is(err, merr.ErrNodeNotFound) {
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
} }
return false return false
} }
@ -82,13 +83,20 @@ func (t *mixCompactionTask) processExecuting() bool {
} }
return t.processFailed() return t.processFailed()
} }
saveSuccess := t.saveSegmentMeta() err2 := t.saveSegmentMeta()
if !saveSuccess { if err2 != nil {
if errors.Is(err2, merr.ErrIllegalCompactionPlan) {
err3 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err3 != nil {
log.Warn("fail to updateAndSaveTaskMeta")
}
return true
}
return false return false
} }
segments := []UniqueID{t.newSegment.GetID()} segments := []UniqueID{t.newSegment.GetID()}
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setResultSegments(segments)) err3 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setResultSegments(segments))
if err == nil { if err3 == nil {
return t.processMetaSaved() return t.processMetaSaved()
} }
return false return false
@ -110,18 +118,18 @@ func (t *mixCompactionTask) SaveTaskMeta() error {
return t.saveTaskMeta(t.CompactionTask) return t.saveTaskMeta(t.CompactionTask)
} }
func (t *mixCompactionTask) saveSegmentMeta() bool { func (t *mixCompactionTask) saveSegmentMeta() error {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String())) log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
// Also prepare metric updates. // Also prepare metric updates.
newSegments, metricMutation, err := t.meta.CompleteCompactionMutation(t.GetPlan(), t.result) newSegments, metricMutation, err := t.meta.CompleteCompactionMutation(t.CompactionTask, t.result)
if err != nil { if err != nil {
return false return err
} }
// Apply metrics after successful meta update. // Apply metrics after successful meta update.
t.newSegment = newSegments[0] t.newSegment = newSegments[0]
metricMutation.commit() metricMutation.commit()
log.Info("mixCompactionTask success to save segment meta") log.Info("mixCompactionTask success to save segment meta")
return true return nil
} }
func (t *mixCompactionTask) Process() bool { func (t *mixCompactionTask) Process() bool {
@ -161,7 +169,7 @@ func (t *mixCompactionTask) GetLabel() string {
} }
func (t *mixCompactionTask) NeedReAssignNodeID() bool { func (t *mixCompactionTask) NeedReAssignNodeID() bool {
return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == 0 return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == NullNodeID
} }
func (t *mixCompactionTask) processCompleted() bool { func (t *mixCompactionTask) processCompleted() bool {
@ -178,9 +186,11 @@ func (t *mixCompactionTask) processCompleted() bool {
} }
func (t *mixCompactionTask) resetSegmentCompacting() { func (t *mixCompactionTask) resetSegmentCompacting() {
for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() { var segmentIDs []UniqueID
t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false) for _, binLogs := range t.GetPlan().GetSegmentBinlogs() {
segmentIDs = append(segmentIDs, binLogs.GetSegmentID())
} }
t.meta.SetSegmentsCompacting(segmentIDs, false)
} }
func (t *mixCompactionTask) processTimeout() bool { func (t *mixCompactionTask) processTimeout() bool {

View File

@ -529,6 +529,7 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() { func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
s.SetupTest() s.SetupTest()
s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything).Return(true, true).Once() s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything).Return(true, true).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil) handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil)
task := &datapb.CompactionTask{ task := &datapb.CompactionTask{
@ -569,6 +570,7 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
}, nil).Once() }, nil).Once()
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return()
inTasks := map[int64]CompactionTask{ inTasks := map[int64]CompactionTask{
1: &mixCompactionTask{ 1: &mixCompactionTask{
@ -658,11 +660,11 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
// s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything).Return(nil) // s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything).Return(nil)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).RunAndReturn( s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).RunAndReturn(
func(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) { func(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
if plan.GetPlanID() == 2 { if t.GetPlanID() == 2 {
segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100}) segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100})
return []*SegmentInfo{segment}, &segMetricMutation{}, nil return []*SegmentInfo{segment}, &segMetricMutation{}, nil
} else if plan.GetPlanID() == 6 { } else if t.GetPlanID() == 6 {
return nil, nil, errors.Errorf("intended error") return nil, nil, errors.Errorf("intended error")
} }
return nil, nil, errors.Errorf("unexpected error") return nil, nil, errors.Errorf("unexpected error")
@ -706,7 +708,7 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() {
// s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once() // s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockMeta.EXPECT().SetSegmentCompacting(mock.Anything, mock.Anything).Return().Twice() s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once()
segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100}) segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100})
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return( s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return(
[]*SegmentInfo{segment}, []*SegmentInfo{segment},
@ -749,13 +751,14 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() {
task := &mixCompactionTask{ task := &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{ CompactionTask: &datapb.CompactionTask{
PlanID: plan.GetPlanID(), PlanID: plan.GetPlanID(),
TriggerID: 1, TriggerID: 1,
Type: plan.GetType(), Type: plan.GetType(),
State: datapb.CompactionTaskState_executing, State: datapb.CompactionTaskState_executing,
NodeID: dataNodeID, NodeID: dataNodeID,
InputSegments: []UniqueID{1, 2},
}, },
plan: plan, // plan: plan,
sessions: s.mockSessMgr, sessions: s.mockSessMgr,
meta: s.mockMeta, meta: s.mockMeta,
} }

View File

@ -56,9 +56,9 @@ type CompactionMeta interface {
SelectSegments(filters ...SegmentFilter) []*SegmentInfo SelectSegments(filters ...SegmentFilter) []*SegmentInfo
GetHealthySegment(segID UniqueID) *SegmentInfo GetHealthySegment(segID UniqueID) *SegmentInfo
UpdateSegmentsInfo(operators ...UpdateOperator) error UpdateSegmentsInfo(operators ...UpdateOperator) error
SetSegmentCompacting(segmentID int64, compacting bool) SetSegmentsCompacting(segmentID []int64, compacting bool)
CheckAndSetSegmentsCompacting(segmentIDs []int64) (bool, bool) CheckAndSetSegmentsCompacting(segmentIDs []int64) (bool, bool)
CompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) CompleteCompactionMutation(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error)
CleanPartitionStatsInfo(info *datapb.PartitionStatsInfo) error CleanPartitionStatsInfo(info *datapb.PartitionStatsInfo) error
SaveCompactionTask(task *datapb.CompactionTask) error SaveCompactionTask(task *datapb.CompactionTask) error
@ -1167,7 +1167,7 @@ func (m *meta) GetSegmentsIDOfCollection(collectionID UniqueID) []UniqueID {
}) })
} }
// GetSegmentsIDOfCollection returns all segment ids which collection equals to provided `collectionID` // GetSegmentsIDOfCollectionWithDropped returns all dropped segment ids which collection equals to provided `collectionID`
func (m *meta) GetSegmentsIDOfCollectionWithDropped(collectionID UniqueID) []UniqueID { func (m *meta) GetSegmentsIDOfCollectionWithDropped(collectionID UniqueID) []UniqueID {
segments := m.SelectSegments(WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool { segments := m.SelectSegments(WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool {
return segment != nil && return segment != nil &&
@ -1192,7 +1192,7 @@ func (m *meta) GetSegmentsIDOfPartition(collectionID, partitionID UniqueID) []Un
}) })
} }
// GetSegmentsIDOfPartition returns all segments ids which collection & partition equals to provided `collectionID`, `partitionID` // GetSegmentsIDOfPartitionWithDropped returns all dropped segments ids which collection & partition equals to provided `collectionID`, `partitionID`
func (m *meta) GetSegmentsIDOfPartitionWithDropped(collectionID, partitionID UniqueID) []UniqueID { func (m *meta) GetSegmentsIDOfPartitionWithDropped(collectionID, partitionID UniqueID) []UniqueID {
segments := m.SelectSegments(WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool { segments := m.SelectSegments(WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool {
return segment.GetState() != commonpb.SegmentState_SegmentStateNone && return segment.GetState() != commonpb.SegmentState_SegmentStateNone &&
@ -1309,24 +1309,29 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) {
// CheckAndSetSegmentsCompacting check all segments are not compacting // CheckAndSetSegmentsCompacting check all segments are not compacting
// if true, set them compacting and return true // if true, set them compacting and return true
// if false, skip setting and // if false, skip setting and
func (m *meta) CheckAndSetSegmentsCompacting(segmentIDs []UniqueID) (exist, hasCompactingSegment bool) { func (m *meta) CheckAndSetSegmentsCompacting(segmentIDs []UniqueID) (exist, canDo bool) {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
var hasCompacting bool
exist = true
for _, segmentID := range segmentIDs { for _, segmentID := range segmentIDs {
seg := m.segments.GetSegment(segmentID) seg := m.segments.GetSegment(segmentID)
if seg != nil { if seg != nil {
hasCompactingSegment = seg.isCompacting if seg.isCompacting {
hasCompacting = true
}
} else { } else {
return false, false exist = false
break
} }
} }
if hasCompactingSegment { canDo = exist && !hasCompacting
return true, false if canDo {
for _, segmentID := range segmentIDs {
m.segments.SetIsCompacting(segmentID, true)
}
} }
for _, segmentID := range segmentIDs { return exist, canDo
m.segments.SetIsCompacting(segmentID, true)
}
return true, true
} }
func (m *meta) SetSegmentsCompacting(segmentIDs []UniqueID, compacting bool) { func (m *meta) SetSegmentsCompacting(segmentIDs []UniqueID, compacting bool) {
@ -1345,19 +1350,16 @@ func (m *meta) SetSegmentLevel(segmentID UniqueID, level datapb.SegmentLevel) {
m.segments.SetLevel(segmentID, level) m.segments.SetLevel(segmentID, level)
} }
func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) { func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
m.Lock() log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
defer m.Unlock()
log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.String("type", plan.GetType().String()))
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)} metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)}
var compactFromSegIDs []int64 var compactFromSegIDs []int64
var latestCompactFromSegments []*SegmentInfo var latestCompactFromSegments []*SegmentInfo
for _, segmentBinlogs := range plan.GetSegmentBinlogs() { for _, segmentID := range t.GetInputSegments() {
segment := m.segments.GetSegment(segmentBinlogs.GetSegmentID()) segment := m.segments.GetSegment(segmentID)
if segment == nil { if segment == nil {
return nil, nil, merr.WrapErrSegmentNotFound(segmentBinlogs.GetSegmentID()) return nil, nil, merr.WrapErrSegmentNotFound(segmentID)
} }
cloned := segment.Clone() cloned := segment.Clone()
@ -1371,13 +1373,100 @@ func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *d
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation) updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
} }
logIDsFromPlan := make(map[int64]struct{}) getMinPosition := func(positions []*msgpb.MsgPosition) *msgpb.MsgPosition {
for _, segBinlogs := range plan.GetSegmentBinlogs() { var minPos *msgpb.MsgPosition
for _, fieldBinlog := range segBinlogs.GetDeltalogs() { for _, pos := range positions {
for _, binlog := range fieldBinlog.GetBinlogs() { if minPos == nil ||
logIDsFromPlan[binlog.GetLogID()] = struct{}{} pos != nil && pos.GetTimestamp() < minPos.GetTimestamp() {
minPos = pos
} }
} }
return minPos
}
newSegments := make([]*SegmentInfo, 0)
for _, seg := range result.GetSegments() {
segmentInfo := &datapb.SegmentInfo{
ID: seg.GetSegmentID(),
CollectionID: latestCompactFromSegments[0].CollectionID,
PartitionID: latestCompactFromSegments[0].PartitionID,
InsertChannel: t.GetChannel(),
NumOfRows: seg.NumOfRows,
State: commonpb.SegmentState_Flushed,
MaxRowNum: latestCompactFromSegments[0].MaxRowNum,
Binlogs: seg.GetInsertLogs(),
Statslogs: seg.GetField2StatslogPaths(),
CreatedByCompaction: true,
CompactionFrom: compactFromSegIDs,
LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(t.GetStartTime(), 0), 0),
Level: datapb.SegmentLevel_L2,
StartPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetStartPosition()
})),
DmlPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetDmlPosition()
})),
}
segment := NewSegmentInfo(segmentInfo)
newSegments = append(newSegments, segment)
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows())
}
compactionTo := make([]UniqueID, 0, len(newSegments))
for _, s := range newSegments {
compactionTo = append(compactionTo, s.GetID())
}
log.Info("meta update: prepare for complete compaction mutation - complete",
zap.Int64("collectionID", latestCompactFromSegments[0].CollectionID),
zap.Int64("partitionID", latestCompactFromSegments[0].PartitionID),
zap.Any("compacted from", compactFromSegIDs),
zap.Any("compacted to", compactionTo))
compactFromInfos := lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
return info.SegmentInfo
})
newSegmentInfos := lo.Map(newSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
return info.SegmentInfo
})
binlogs := make([]metastore.BinlogsIncrement, 0)
for _, seg := range newSegmentInfos {
binlogs = append(binlogs, metastore.BinlogsIncrement{Segment: seg})
}
if err := m.catalog.AlterSegments(m.ctx, append(compactFromInfos, newSegmentInfos...), binlogs...); err != nil {
log.Warn("fail to alter segments and new segment", zap.Error(err))
return nil, nil, err
}
lo.ForEach(latestCompactFromSegments, func(info *SegmentInfo, _ int) {
m.segments.SetSegment(info.GetID(), info)
})
lo.ForEach(newSegments, func(info *SegmentInfo, _ int) {
m.segments.SetSegment(info.GetID(), info)
})
return newSegments, metricMutation, nil
}
func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)}
var compactFromSegIDs []int64
var latestCompactFromSegments []*SegmentInfo
for _, segmentID := range t.GetInputSegments() {
segment := m.segments.GetSegment(segmentID)
if segment == nil {
return nil, nil, merr.WrapErrSegmentNotFound(segmentID)
}
cloned := segment.Clone()
cloned.DroppedAt = uint64(time.Now().UnixNano())
cloned.Compacted = true
latestCompactFromSegments = append(latestCompactFromSegments, cloned)
compactFromSegIDs = append(compactFromSegIDs, cloned.GetID())
// metrics mutation for compaction from segments
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
} }
getMinPosition := func(positions []*msgpb.MsgPosition) *msgpb.MsgPosition { getMinPosition := func(positions []*msgpb.MsgPosition) *msgpb.MsgPosition {
@ -1391,89 +1480,15 @@ func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *d
return minPos return minPos
} }
if plan.GetType() == datapb.CompactionType_ClusteringCompaction {
newSegments := make([]*SegmentInfo, 0)
for _, seg := range result.GetSegments() {
segmentInfo := &datapb.SegmentInfo{
ID: seg.GetSegmentID(),
CollectionID: latestCompactFromSegments[0].CollectionID,
PartitionID: latestCompactFromSegments[0].PartitionID,
InsertChannel: plan.GetChannel(),
NumOfRows: seg.NumOfRows,
State: commonpb.SegmentState_Flushed,
MaxRowNum: latestCompactFromSegments[0].MaxRowNum,
Binlogs: seg.GetInsertLogs(),
Statslogs: seg.GetField2StatslogPaths(),
CreatedByCompaction: true,
CompactionFrom: compactFromSegIDs,
LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(plan.GetStartTime(), 0), 0),
Level: datapb.SegmentLevel_L2,
StartPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetStartPosition()
})),
DmlPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetDmlPosition()
})),
}
segment := NewSegmentInfo(segmentInfo)
newSegments = append(newSegments, segment)
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows())
}
compactionTo := make([]UniqueID, 0, len(newSegments))
for _, s := range newSegments {
compactionTo = append(compactionTo, s.GetID())
}
log.Info("meta update: prepare for complete compaction mutation - complete",
zap.Int64("collectionID", latestCompactFromSegments[0].CollectionID),
zap.Int64("partitionID", latestCompactFromSegments[0].PartitionID),
zap.Any("compacted from", compactFromSegIDs),
zap.Any("compacted to", compactionTo))
compactFromInfos := lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
return info.SegmentInfo
})
newSegmentInfos := lo.Map(newSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
return info.SegmentInfo
})
binlogs := make([]metastore.BinlogsIncrement, 0)
for _, seg := range newSegmentInfos {
binlogs = append(binlogs, metastore.BinlogsIncrement{Segment: seg})
}
if err := m.catalog.AlterSegments(m.ctx, append(compactFromInfos, newSegmentInfos...), binlogs...); err != nil {
log.Warn("fail to alter segments and new segment", zap.Error(err))
return nil, nil, err
}
lo.ForEach(latestCompactFromSegments, func(info *SegmentInfo, _ int) {
m.segments.SetSegment(info.GetID(), info)
})
lo.ForEach(newSegments, func(info *SegmentInfo, _ int) {
m.segments.SetSegment(info.GetID(), info)
})
return newSegments, metricMutation, nil
}
// MixCompaction / MergeCompaction will generates one and only one segment // MixCompaction / MergeCompaction will generates one and only one segment
compactToSegment := result.GetSegments()[0] compactToSegment := result.GetSegments()[0]
// copy new deltalogs in compactFrom segments to compactTo segments.
// TODO: Not needed when enable L0 segments.
newDeltalogs, err := m.copyNewDeltalogs(latestCompactFromSegments, logIDsFromPlan, compactToSegment.GetSegmentID())
if err != nil {
return nil, nil, err
}
if len(newDeltalogs) > 0 {
compactToSegment.Deltalogs = append(compactToSegment.GetDeltalogs(), &datapb.FieldBinlog{Binlogs: newDeltalogs})
}
compactToSegmentInfo := NewSegmentInfo( compactToSegmentInfo := NewSegmentInfo(
&datapb.SegmentInfo{ &datapb.SegmentInfo{
ID: compactToSegment.GetSegmentID(), ID: compactToSegment.GetSegmentID(),
CollectionID: latestCompactFromSegments[0].CollectionID, CollectionID: latestCompactFromSegments[0].CollectionID,
PartitionID: latestCompactFromSegments[0].PartitionID, PartitionID: latestCompactFromSegments[0].PartitionID,
InsertChannel: plan.GetChannel(), InsertChannel: t.GetChannel(),
NumOfRows: compactToSegment.NumOfRows, NumOfRows: compactToSegment.NumOfRows,
State: commonpb.SegmentState_Flushed, State: commonpb.SegmentState_Flushed,
MaxRowNum: latestCompactFromSegments[0].MaxRowNum, MaxRowNum: latestCompactFromSegments[0].MaxRowNum,
@ -1483,7 +1498,7 @@ func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *d
CreatedByCompaction: true, CreatedByCompaction: true,
CompactionFrom: compactFromSegIDs, CompactionFrom: compactFromSegIDs,
LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(plan.GetStartTime(), 0), 0), LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(t.GetStartTime(), 0), 0),
Level: datapb.SegmentLevel_L1, Level: datapb.SegmentLevel_L1,
StartPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition { StartPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
@ -1503,7 +1518,7 @@ func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *d
} }
log = log.With( log = log.With(
zap.String("channel", plan.GetChannel()), zap.String("channel", t.GetChannel()),
zap.Int64("partitionID", compactToSegmentInfo.GetPartitionID()), zap.Int64("partitionID", compactToSegmentInfo.GetPartitionID()),
zap.Int64("compactTo segmentID", compactToSegmentInfo.GetID()), zap.Int64("compactTo segmentID", compactToSegmentInfo.GetID()),
zap.Int64("compactTo segment numRows", compactToSegmentInfo.GetNumOfRows()), zap.Int64("compactTo segment numRows", compactToSegmentInfo.GetNumOfRows()),
@ -1536,36 +1551,16 @@ func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *d
return []*SegmentInfo{compactToSegmentInfo}, metricMutation, nil return []*SegmentInfo{compactToSegmentInfo}, metricMutation, nil
} }
func (m *meta) copyNewDeltalogs(latestCompactFromInfos []*SegmentInfo, logIDsInPlan map[int64]struct{}, toSegment int64) ([]*datapb.Binlog, error) { func (m *meta) CompleteCompactionMutation(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
newBinlogs := []*datapb.Binlog{} m.Lock()
for _, seg := range latestCompactFromInfos { defer m.Unlock()
for _, fieldLog := range seg.GetDeltalogs() { switch t.GetType() {
for _, l := range fieldLog.GetBinlogs() { case datapb.CompactionType_MixCompaction:
if _, ok := logIDsInPlan[l.GetLogID()]; !ok { return m.completeMixCompactionMutation(t, result)
fromKey := metautil.BuildDeltaLogPath(m.chunkManager.RootPath(), seg.CollectionID, seg.PartitionID, seg.ID, l.GetLogID()) case datapb.CompactionType_ClusteringCompaction:
toKey := metautil.BuildDeltaLogPath(m.chunkManager.RootPath(), seg.CollectionID, seg.PartitionID, toSegment, l.GetLogID()) return m.completeClusterCompactionMutation(t, result)
log.Warn("found new deltalog in compactFrom segment, copying it...",
zap.Any("deltalog", l),
zap.Int64("copyFrom segmentID", seg.GetID()),
zap.Int64("copyTo segmentID", toSegment),
zap.String("copyFrom key", fromKey),
zap.String("copyTo key", toKey),
)
blob, err := m.chunkManager.Read(m.ctx, fromKey)
if err != nil {
return nil, err
}
if err := m.chunkManager.Write(m.ctx, toKey, blob); err != nil {
return nil, err
}
newBinlogs = append(newBinlogs, l)
}
}
}
} }
return newBinlogs, nil return nil, nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type")
} }
// buildSegment utility function for compose datapb.SegmentInfo struct with provided info // buildSegment utility function for compose datapb.SegmentInfo struct with provided info

View File

@ -209,33 +209,12 @@ func (suite *MetaBasicSuite) TestCompleteCompactionMutation() {
} }
mockChMgr := mocks.NewChunkManager(suite.T()) mockChMgr := mocks.NewChunkManager(suite.T())
mockChMgr.EXPECT().RootPath().Return("mockroot").Times(4)
mockChMgr.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, nil).Twice()
mockChMgr.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m := &meta{ m := &meta{
catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()},
segments: latestSegments, segments: latestSegments,
chunkManager: mockChMgr, chunkManager: mockChMgr,
} }
plan := &datapb.CompactionPlan{
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 1,
FieldBinlogs: m.GetSegment(1).GetBinlogs(),
Field2StatslogPaths: m.GetSegment(1).GetStatslogs(),
Deltalogs: m.GetSegment(1).GetDeltalogs()[:1], // compaction plan use only 1 deltalog
},
{
SegmentID: 2,
FieldBinlogs: m.GetSegment(2).GetBinlogs(),
Field2StatslogPaths: m.GetSegment(2).GetStatslogs(),
Deltalogs: m.GetSegment(2).GetDeltalogs()[:1], // compaction plan use only 1 deltalog
},
},
}
compactToSeg := &datapb.CompactionSegment{ compactToSeg := &datapb.CompactionSegment{
SegmentID: 3, SegmentID: 3,
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50000)}, InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50000)},
@ -246,8 +225,13 @@ func (suite *MetaBasicSuite) TestCompleteCompactionMutation() {
result := &datapb.CompactionPlanResult{ result := &datapb.CompactionPlanResult{
Segments: []*datapb.CompactionSegment{compactToSeg}, Segments: []*datapb.CompactionSegment{compactToSeg},
} }
task := &datapb.CompactionTask{
InputSegments: []UniqueID{1, 2},
Type: datapb.CompactionType_MixCompaction,
}
infos, mutation, err := m.CompleteCompactionMutation(plan, result) infos, mutation, err := m.CompleteCompactionMutation(task, result)
assert.NoError(suite.T(), err)
suite.Equal(1, len(infos)) suite.Equal(1, len(infos))
info := infos[0] info := infos[0]
suite.NoError(err) suite.NoError(err)
@ -275,16 +259,6 @@ func (suite *MetaBasicSuite) TestCompleteCompactionMutation() {
} }
} }
deltalogs := info.GetDeltalogs()
deltalogIDs := []int64{}
for _, fbinlog := range deltalogs {
for _, blog := range fbinlog.GetBinlogs() {
suite.Empty(blog.GetLogPath())
deltalogIDs = append(deltalogIDs, blog.GetLogID())
}
}
suite.ElementsMatch([]int64{30001, 31001}, deltalogIDs)
// check compactFrom segments // check compactFrom segments
for _, segID := range []int64{1, 2} { for _, segID := range []int64{1, 2} {
seg := m.GetSegment(segID) seg := m.GetSegment(segID)
@ -856,7 +830,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
}) })
} }
func Test_meta_SetSegmentCompacting(t *testing.T) { func Test_meta_SetSegmentsCompacting(t *testing.T) {
type fields struct { type fields struct {
client kv.MetaKv client kv.MetaKv
segments *SegmentsInfo segments *SegmentsInfo
@ -899,62 +873,13 @@ func Test_meta_SetSegmentCompacting(t *testing.T) {
catalog: &datacoord.Catalog{MetaKv: tt.fields.client}, catalog: &datacoord.Catalog{MetaKv: tt.fields.client},
segments: tt.fields.segments, segments: tt.fields.segments,
} }
m.SetSegmentCompacting(tt.args.segmentID, tt.args.compacting) m.SetSegmentsCompacting([]UniqueID{tt.args.segmentID}, tt.args.compacting)
segment := m.GetHealthySegment(tt.args.segmentID) segment := m.GetHealthySegment(tt.args.segmentID)
assert.Equal(t, tt.args.compacting, segment.isCompacting) assert.Equal(t, tt.args.compacting, segment.isCompacting)
}) })
} }
} }
func Test_meta_SetSegmentImporting(t *testing.T) {
type fields struct {
client kv.MetaKv
segments *SegmentsInfo
}
type args struct {
segmentID UniqueID
importing bool
}
tests := []struct {
name string
fields fields
args args
}{
{
"test set segment importing",
fields{
NewMetaMemoryKV(),
&SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
State: commonpb.SegmentState_Flushed,
IsImporting: false,
},
},
},
},
},
args{
segmentID: 1,
importing: true,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &meta{
catalog: &datacoord.Catalog{MetaKv: tt.fields.client},
segments: tt.fields.segments,
}
m.SetSegmentCompacting(tt.args.segmentID, tt.args.importing)
segment := m.GetHealthySegment(tt.args.segmentID)
assert.Equal(t, tt.args.importing, segment.isCompacting)
})
}
}
func Test_meta_GetSegmentsOfCollection(t *testing.T) { func Test_meta_GetSegmentsOfCollection(t *testing.T) {
storedSegments := NewSegmentsInfo() storedSegments := NewSegmentsInfo()

View File

@ -114,34 +114,34 @@ func (_c *MockCompactionMeta_CleanPartitionStatsInfo_Call) RunAndReturn(run func
return _c return _c
} }
// CompleteCompactionMutation provides a mock function with given fields: plan, result // CompleteCompactionMutation provides a mock function with given fields: t, result
func (_m *MockCompactionMeta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) { func (_m *MockCompactionMeta) CompleteCompactionMutation(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
ret := _m.Called(plan, result) ret := _m.Called(t, result)
var r0 []*SegmentInfo var r0 []*SegmentInfo
var r1 *segMetricMutation var r1 *segMetricMutation
var r2 error var r2 error
if rf, ok := ret.Get(0).(func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error)); ok { if rf, ok := ret.Get(0).(func(*datapb.CompactionTask, *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error)); ok {
return rf(plan, result) return rf(t, result)
} }
if rf, ok := ret.Get(0).(func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) []*SegmentInfo); ok { if rf, ok := ret.Get(0).(func(*datapb.CompactionTask, *datapb.CompactionPlanResult) []*SegmentInfo); ok {
r0 = rf(plan, result) r0 = rf(t, result)
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).([]*SegmentInfo) r0 = ret.Get(0).([]*SegmentInfo)
} }
} }
if rf, ok := ret.Get(1).(func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) *segMetricMutation); ok { if rf, ok := ret.Get(1).(func(*datapb.CompactionTask, *datapb.CompactionPlanResult) *segMetricMutation); ok {
r1 = rf(plan, result) r1 = rf(t, result)
} else { } else {
if ret.Get(1) != nil { if ret.Get(1) != nil {
r1 = ret.Get(1).(*segMetricMutation) r1 = ret.Get(1).(*segMetricMutation)
} }
} }
if rf, ok := ret.Get(2).(func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) error); ok { if rf, ok := ret.Get(2).(func(*datapb.CompactionTask, *datapb.CompactionPlanResult) error); ok {
r2 = rf(plan, result) r2 = rf(t, result)
} else { } else {
r2 = ret.Error(2) r2 = ret.Error(2)
} }
@ -155,15 +155,15 @@ type MockCompactionMeta_CompleteCompactionMutation_Call struct {
} }
// CompleteCompactionMutation is a helper method to define mock.On call // CompleteCompactionMutation is a helper method to define mock.On call
// - plan *datapb.CompactionPlan // - t *datapb.CompactionTask
// - result *datapb.CompactionPlanResult // - result *datapb.CompactionPlanResult
func (_e *MockCompactionMeta_Expecter) CompleteCompactionMutation(plan interface{}, result interface{}) *MockCompactionMeta_CompleteCompactionMutation_Call { func (_e *MockCompactionMeta_Expecter) CompleteCompactionMutation(t interface{}, result interface{}) *MockCompactionMeta_CompleteCompactionMutation_Call {
return &MockCompactionMeta_CompleteCompactionMutation_Call{Call: _e.mock.On("CompleteCompactionMutation", plan, result)} return &MockCompactionMeta_CompleteCompactionMutation_Call{Call: _e.mock.On("CompleteCompactionMutation", t, result)}
} }
func (_c *MockCompactionMeta_CompleteCompactionMutation_Call) Run(run func(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult)) *MockCompactionMeta_CompleteCompactionMutation_Call { func (_c *MockCompactionMeta_CompleteCompactionMutation_Call) Run(run func(t *datapb.CompactionTask, result *datapb.CompactionPlanResult)) *MockCompactionMeta_CompleteCompactionMutation_Call {
_c.Call.Run(func(args mock.Arguments) { _c.Call.Run(func(args mock.Arguments) {
run(args[0].(*datapb.CompactionPlan), args[1].(*datapb.CompactionPlanResult)) run(args[0].(*datapb.CompactionTask), args[1].(*datapb.CompactionPlanResult))
}) })
return _c return _c
} }
@ -173,7 +173,7 @@ func (_c *MockCompactionMeta_CompleteCompactionMutation_Call) Return(_a0 []*Segm
return _c return _c
} }
func (_c *MockCompactionMeta_CompleteCompactionMutation_Call) RunAndReturn(run func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error)) *MockCompactionMeta_CompleteCompactionMutation_Call { func (_c *MockCompactionMeta_CompleteCompactionMutation_Call) RunAndReturn(run func(*datapb.CompactionTask, *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error)) *MockCompactionMeta_CompleteCompactionMutation_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }
@ -666,36 +666,36 @@ func (_c *MockCompactionMeta_SelectSegments_Call) RunAndReturn(run func(...Segme
return _c return _c
} }
// SetSegmentCompacting provides a mock function with given fields: segmentID, compacting // SetSegmentsCompacting provides a mock function with given fields: segmentID, compacting
func (_m *MockCompactionMeta) SetSegmentCompacting(segmentID int64, compacting bool) { func (_m *MockCompactionMeta) SetSegmentsCompacting(segmentID []int64, compacting bool) {
_m.Called(segmentID, compacting) _m.Called(segmentID, compacting)
} }
// MockCompactionMeta_SetSegmentCompacting_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetSegmentCompacting' // MockCompactionMeta_SetSegmentsCompacting_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetSegmentsCompacting'
type MockCompactionMeta_SetSegmentCompacting_Call struct { type MockCompactionMeta_SetSegmentsCompacting_Call struct {
*mock.Call *mock.Call
} }
// SetSegmentCompacting is a helper method to define mock.On call // SetSegmentsCompacting is a helper method to define mock.On call
// - segmentID int64 // - segmentID []int64
// - compacting bool // - compacting bool
func (_e *MockCompactionMeta_Expecter) SetSegmentCompacting(segmentID interface{}, compacting interface{}) *MockCompactionMeta_SetSegmentCompacting_Call { func (_e *MockCompactionMeta_Expecter) SetSegmentsCompacting(segmentID interface{}, compacting interface{}) *MockCompactionMeta_SetSegmentsCompacting_Call {
return &MockCompactionMeta_SetSegmentCompacting_Call{Call: _e.mock.On("SetSegmentCompacting", segmentID, compacting)} return &MockCompactionMeta_SetSegmentsCompacting_Call{Call: _e.mock.On("SetSegmentsCompacting", segmentID, compacting)}
} }
func (_c *MockCompactionMeta_SetSegmentCompacting_Call) Run(run func(segmentID int64, compacting bool)) *MockCompactionMeta_SetSegmentCompacting_Call { func (_c *MockCompactionMeta_SetSegmentsCompacting_Call) Run(run func(segmentID []int64, compacting bool)) *MockCompactionMeta_SetSegmentsCompacting_Call {
_c.Call.Run(func(args mock.Arguments) { _c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(bool)) run(args[0].([]int64), args[1].(bool))
}) })
return _c return _c
} }
func (_c *MockCompactionMeta_SetSegmentCompacting_Call) Return() *MockCompactionMeta_SetSegmentCompacting_Call { func (_c *MockCompactionMeta_SetSegmentsCompacting_Call) Return() *MockCompactionMeta_SetSegmentsCompacting_Call {
_c.Call.Return() _c.Call.Return()
return _c return _c
} }
func (_c *MockCompactionMeta_SetSegmentCompacting_Call) RunAndReturn(run func(int64, bool)) *MockCompactionMeta_SetSegmentCompacting_Call { func (_c *MockCompactionMeta_SetSegmentsCompacting_Call) RunAndReturn(run func([]int64, bool)) *MockCompactionMeta_SetSegmentsCompacting_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }

View File

@ -64,8 +64,8 @@ type MockDataNode_CheckChannelOperationProgress_Call struct {
} }
// CheckChannelOperationProgress is a helper method to define mock.On call // CheckChannelOperationProgress is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *datapb.ChannelWatchInfo // - _a1 *datapb.ChannelWatchInfo
func (_e *MockDataNode_Expecter) CheckChannelOperationProgress(_a0 interface{}, _a1 interface{}) *MockDataNode_CheckChannelOperationProgress_Call { func (_e *MockDataNode_Expecter) CheckChannelOperationProgress(_a0 interface{}, _a1 interface{}) *MockDataNode_CheckChannelOperationProgress_Call {
return &MockDataNode_CheckChannelOperationProgress_Call{Call: _e.mock.On("CheckChannelOperationProgress", _a0, _a1)} return &MockDataNode_CheckChannelOperationProgress_Call{Call: _e.mock.On("CheckChannelOperationProgress", _a0, _a1)}
} }
@ -119,8 +119,8 @@ type MockDataNode_CompactionV2_Call struct {
} }
// CompactionV2 is a helper method to define mock.On call // CompactionV2 is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *datapb.CompactionPlan // - _a1 *datapb.CompactionPlan
func (_e *MockDataNode_Expecter) CompactionV2(_a0 interface{}, _a1 interface{}) *MockDataNode_CompactionV2_Call { func (_e *MockDataNode_Expecter) CompactionV2(_a0 interface{}, _a1 interface{}) *MockDataNode_CompactionV2_Call {
return &MockDataNode_CompactionV2_Call{Call: _e.mock.On("CompactionV2", _a0, _a1)} return &MockDataNode_CompactionV2_Call{Call: _e.mock.On("CompactionV2", _a0, _a1)}
} }
@ -174,8 +174,8 @@ type MockDataNode_DropCompactionPlan_Call struct {
} }
// DropCompactionPlan is a helper method to define mock.On call // DropCompactionPlan is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *datapb.DropCompactionPlanRequest // - _a1 *datapb.DropCompactionPlanRequest
func (_e *MockDataNode_Expecter) DropCompactionPlan(_a0 interface{}, _a1 interface{}) *MockDataNode_DropCompactionPlan_Call { func (_e *MockDataNode_Expecter) DropCompactionPlan(_a0 interface{}, _a1 interface{}) *MockDataNode_DropCompactionPlan_Call {
return &MockDataNode_DropCompactionPlan_Call{Call: _e.mock.On("DropCompactionPlan", _a0, _a1)} return &MockDataNode_DropCompactionPlan_Call{Call: _e.mock.On("DropCompactionPlan", _a0, _a1)}
} }
@ -229,8 +229,8 @@ type MockDataNode_DropImport_Call struct {
} }
// DropImport is a helper method to define mock.On call // DropImport is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *datapb.DropImportRequest // - _a1 *datapb.DropImportRequest
func (_e *MockDataNode_Expecter) DropImport(_a0 interface{}, _a1 interface{}) *MockDataNode_DropImport_Call { func (_e *MockDataNode_Expecter) DropImport(_a0 interface{}, _a1 interface{}) *MockDataNode_DropImport_Call {
return &MockDataNode_DropImport_Call{Call: _e.mock.On("DropImport", _a0, _a1)} return &MockDataNode_DropImport_Call{Call: _e.mock.On("DropImport", _a0, _a1)}
} }
@ -284,8 +284,8 @@ type MockDataNode_FlushChannels_Call struct {
} }
// FlushChannels is a helper method to define mock.On call // FlushChannels is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *datapb.FlushChannelsRequest // - _a1 *datapb.FlushChannelsRequest
func (_e *MockDataNode_Expecter) FlushChannels(_a0 interface{}, _a1 interface{}) *MockDataNode_FlushChannels_Call { func (_e *MockDataNode_Expecter) FlushChannels(_a0 interface{}, _a1 interface{}) *MockDataNode_FlushChannels_Call {
return &MockDataNode_FlushChannels_Call{Call: _e.mock.On("FlushChannels", _a0, _a1)} return &MockDataNode_FlushChannels_Call{Call: _e.mock.On("FlushChannels", _a0, _a1)}
} }
@ -339,8 +339,8 @@ type MockDataNode_FlushSegments_Call struct {
} }
// FlushSegments is a helper method to define mock.On call // FlushSegments is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *datapb.FlushSegmentsRequest // - _a1 *datapb.FlushSegmentsRequest
func (_e *MockDataNode_Expecter) FlushSegments(_a0 interface{}, _a1 interface{}) *MockDataNode_FlushSegments_Call { func (_e *MockDataNode_Expecter) FlushSegments(_a0 interface{}, _a1 interface{}) *MockDataNode_FlushSegments_Call {
return &MockDataNode_FlushSegments_Call{Call: _e.mock.On("FlushSegments", _a0, _a1)} return &MockDataNode_FlushSegments_Call{Call: _e.mock.On("FlushSegments", _a0, _a1)}
} }
@ -435,8 +435,8 @@ type MockDataNode_GetCompactionState_Call struct {
} }
// GetCompactionState is a helper method to define mock.On call // GetCompactionState is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *datapb.CompactionStateRequest // - _a1 *datapb.CompactionStateRequest
func (_e *MockDataNode_Expecter) GetCompactionState(_a0 interface{}, _a1 interface{}) *MockDataNode_GetCompactionState_Call { func (_e *MockDataNode_Expecter) GetCompactionState(_a0 interface{}, _a1 interface{}) *MockDataNode_GetCompactionState_Call {
return &MockDataNode_GetCompactionState_Call{Call: _e.mock.On("GetCompactionState", _a0, _a1)} return &MockDataNode_GetCompactionState_Call{Call: _e.mock.On("GetCompactionState", _a0, _a1)}
} }
@ -490,8 +490,8 @@ type MockDataNode_GetComponentStates_Call struct {
} }
// GetComponentStates is a helper method to define mock.On call // GetComponentStates is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *milvuspb.GetComponentStatesRequest // - _a1 *milvuspb.GetComponentStatesRequest
func (_e *MockDataNode_Expecter) GetComponentStates(_a0 interface{}, _a1 interface{}) *MockDataNode_GetComponentStates_Call { func (_e *MockDataNode_Expecter) GetComponentStates(_a0 interface{}, _a1 interface{}) *MockDataNode_GetComponentStates_Call {
return &MockDataNode_GetComponentStates_Call{Call: _e.mock.On("GetComponentStates", _a0, _a1)} return &MockDataNode_GetComponentStates_Call{Call: _e.mock.On("GetComponentStates", _a0, _a1)}
} }
@ -545,8 +545,8 @@ type MockDataNode_GetMetrics_Call struct {
} }
// GetMetrics is a helper method to define mock.On call // GetMetrics is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *milvuspb.GetMetricsRequest // - _a1 *milvuspb.GetMetricsRequest
func (_e *MockDataNode_Expecter) GetMetrics(_a0 interface{}, _a1 interface{}) *MockDataNode_GetMetrics_Call { func (_e *MockDataNode_Expecter) GetMetrics(_a0 interface{}, _a1 interface{}) *MockDataNode_GetMetrics_Call {
return &MockDataNode_GetMetrics_Call{Call: _e.mock.On("GetMetrics", _a0, _a1)} return &MockDataNode_GetMetrics_Call{Call: _e.mock.On("GetMetrics", _a0, _a1)}
} }
@ -682,8 +682,8 @@ type MockDataNode_GetStatisticsChannel_Call struct {
} }
// GetStatisticsChannel is a helper method to define mock.On call // GetStatisticsChannel is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *internalpb.GetStatisticsChannelRequest // - _a1 *internalpb.GetStatisticsChannelRequest
func (_e *MockDataNode_Expecter) GetStatisticsChannel(_a0 interface{}, _a1 interface{}) *MockDataNode_GetStatisticsChannel_Call { func (_e *MockDataNode_Expecter) GetStatisticsChannel(_a0 interface{}, _a1 interface{}) *MockDataNode_GetStatisticsChannel_Call {
return &MockDataNode_GetStatisticsChannel_Call{Call: _e.mock.On("GetStatisticsChannel", _a0, _a1)} return &MockDataNode_GetStatisticsChannel_Call{Call: _e.mock.On("GetStatisticsChannel", _a0, _a1)}
} }
@ -737,8 +737,8 @@ type MockDataNode_ImportV2_Call struct {
} }
// ImportV2 is a helper method to define mock.On call // ImportV2 is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *datapb.ImportRequest // - _a1 *datapb.ImportRequest
func (_e *MockDataNode_Expecter) ImportV2(_a0 interface{}, _a1 interface{}) *MockDataNode_ImportV2_Call { func (_e *MockDataNode_Expecter) ImportV2(_a0 interface{}, _a1 interface{}) *MockDataNode_ImportV2_Call {
return &MockDataNode_ImportV2_Call{Call: _e.mock.On("ImportV2", _a0, _a1)} return &MockDataNode_ImportV2_Call{Call: _e.mock.On("ImportV2", _a0, _a1)}
} }
@ -833,8 +833,8 @@ type MockDataNode_NotifyChannelOperation_Call struct {
} }
// NotifyChannelOperation is a helper method to define mock.On call // NotifyChannelOperation is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *datapb.ChannelOperationsRequest // - _a1 *datapb.ChannelOperationsRequest
func (_e *MockDataNode_Expecter) NotifyChannelOperation(_a0 interface{}, _a1 interface{}) *MockDataNode_NotifyChannelOperation_Call { func (_e *MockDataNode_Expecter) NotifyChannelOperation(_a0 interface{}, _a1 interface{}) *MockDataNode_NotifyChannelOperation_Call {
return &MockDataNode_NotifyChannelOperation_Call{Call: _e.mock.On("NotifyChannelOperation", _a0, _a1)} return &MockDataNode_NotifyChannelOperation_Call{Call: _e.mock.On("NotifyChannelOperation", _a0, _a1)}
} }
@ -888,8 +888,8 @@ type MockDataNode_PreImport_Call struct {
} }
// PreImport is a helper method to define mock.On call // PreImport is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *datapb.PreImportRequest // - _a1 *datapb.PreImportRequest
func (_e *MockDataNode_Expecter) PreImport(_a0 interface{}, _a1 interface{}) *MockDataNode_PreImport_Call { func (_e *MockDataNode_Expecter) PreImport(_a0 interface{}, _a1 interface{}) *MockDataNode_PreImport_Call {
return &MockDataNode_PreImport_Call{Call: _e.mock.On("PreImport", _a0, _a1)} return &MockDataNode_PreImport_Call{Call: _e.mock.On("PreImport", _a0, _a1)}
} }
@ -943,8 +943,8 @@ type MockDataNode_QueryImport_Call struct {
} }
// QueryImport is a helper method to define mock.On call // QueryImport is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *datapb.QueryImportRequest // - _a1 *datapb.QueryImportRequest
func (_e *MockDataNode_Expecter) QueryImport(_a0 interface{}, _a1 interface{}) *MockDataNode_QueryImport_Call { func (_e *MockDataNode_Expecter) QueryImport(_a0 interface{}, _a1 interface{}) *MockDataNode_QueryImport_Call {
return &MockDataNode_QueryImport_Call{Call: _e.mock.On("QueryImport", _a0, _a1)} return &MockDataNode_QueryImport_Call{Call: _e.mock.On("QueryImport", _a0, _a1)}
} }
@ -998,8 +998,8 @@ type MockDataNode_QueryPreImport_Call struct {
} }
// QueryPreImport is a helper method to define mock.On call // QueryPreImport is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *datapb.QueryPreImportRequest // - _a1 *datapb.QueryPreImportRequest
func (_e *MockDataNode_Expecter) QueryPreImport(_a0 interface{}, _a1 interface{}) *MockDataNode_QueryPreImport_Call { func (_e *MockDataNode_Expecter) QueryPreImport(_a0 interface{}, _a1 interface{}) *MockDataNode_QueryPreImport_Call {
return &MockDataNode_QueryPreImport_Call{Call: _e.mock.On("QueryPreImport", _a0, _a1)} return &MockDataNode_QueryPreImport_Call{Call: _e.mock.On("QueryPreImport", _a0, _a1)}
} }
@ -1053,8 +1053,8 @@ type MockDataNode_QuerySlot_Call struct {
} }
// QuerySlot is a helper method to define mock.On call // QuerySlot is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *datapb.QuerySlotRequest // - _a1 *datapb.QuerySlotRequest
func (_e *MockDataNode_Expecter) QuerySlot(_a0 interface{}, _a1 interface{}) *MockDataNode_QuerySlot_Call { func (_e *MockDataNode_Expecter) QuerySlot(_a0 interface{}, _a1 interface{}) *MockDataNode_QuerySlot_Call {
return &MockDataNode_QuerySlot_Call{Call: _e.mock.On("QuerySlot", _a0, _a1)} return &MockDataNode_QuerySlot_Call{Call: _e.mock.On("QuerySlot", _a0, _a1)}
} }
@ -1149,8 +1149,8 @@ type MockDataNode_ResendSegmentStats_Call struct {
} }
// ResendSegmentStats is a helper method to define mock.On call // ResendSegmentStats is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *datapb.ResendSegmentStatsRequest // - _a1 *datapb.ResendSegmentStatsRequest
func (_e *MockDataNode_Expecter) ResendSegmentStats(_a0 interface{}, _a1 interface{}) *MockDataNode_ResendSegmentStats_Call { func (_e *MockDataNode_Expecter) ResendSegmentStats(_a0 interface{}, _a1 interface{}) *MockDataNode_ResendSegmentStats_Call {
return &MockDataNode_ResendSegmentStats_Call{Call: _e.mock.On("ResendSegmentStats", _a0, _a1)} return &MockDataNode_ResendSegmentStats_Call{Call: _e.mock.On("ResendSegmentStats", _a0, _a1)}
} }
@ -1183,7 +1183,7 @@ type MockDataNode_SetAddress_Call struct {
} }
// SetAddress is a helper method to define mock.On call // SetAddress is a helper method to define mock.On call
// - address string // - address string
func (_e *MockDataNode_Expecter) SetAddress(address interface{}) *MockDataNode_SetAddress_Call { func (_e *MockDataNode_Expecter) SetAddress(address interface{}) *MockDataNode_SetAddress_Call {
return &MockDataNode_SetAddress_Call{Call: _e.mock.On("SetAddress", address)} return &MockDataNode_SetAddress_Call{Call: _e.mock.On("SetAddress", address)}
} }
@ -1225,7 +1225,7 @@ type MockDataNode_SetDataCoordClient_Call struct {
} }
// SetDataCoordClient is a helper method to define mock.On call // SetDataCoordClient is a helper method to define mock.On call
// - dataCoord types.DataCoordClient // - dataCoord types.DataCoordClient
func (_e *MockDataNode_Expecter) SetDataCoordClient(dataCoord interface{}) *MockDataNode_SetDataCoordClient_Call { func (_e *MockDataNode_Expecter) SetDataCoordClient(dataCoord interface{}) *MockDataNode_SetDataCoordClient_Call {
return &MockDataNode_SetDataCoordClient_Call{Call: _e.mock.On("SetDataCoordClient", dataCoord)} return &MockDataNode_SetDataCoordClient_Call{Call: _e.mock.On("SetDataCoordClient", dataCoord)}
} }
@ -1258,7 +1258,7 @@ type MockDataNode_SetEtcdClient_Call struct {
} }
// SetEtcdClient is a helper method to define mock.On call // SetEtcdClient is a helper method to define mock.On call
// - etcdClient *clientv3.Client // - etcdClient *clientv3.Client
func (_e *MockDataNode_Expecter) SetEtcdClient(etcdClient interface{}) *MockDataNode_SetEtcdClient_Call { func (_e *MockDataNode_Expecter) SetEtcdClient(etcdClient interface{}) *MockDataNode_SetEtcdClient_Call {
return &MockDataNode_SetEtcdClient_Call{Call: _e.mock.On("SetEtcdClient", etcdClient)} return &MockDataNode_SetEtcdClient_Call{Call: _e.mock.On("SetEtcdClient", etcdClient)}
} }
@ -1300,7 +1300,7 @@ type MockDataNode_SetRootCoordClient_Call struct {
} }
// SetRootCoordClient is a helper method to define mock.On call // SetRootCoordClient is a helper method to define mock.On call
// - rootCoord types.RootCoordClient // - rootCoord types.RootCoordClient
func (_e *MockDataNode_Expecter) SetRootCoordClient(rootCoord interface{}) *MockDataNode_SetRootCoordClient_Call { func (_e *MockDataNode_Expecter) SetRootCoordClient(rootCoord interface{}) *MockDataNode_SetRootCoordClient_Call {
return &MockDataNode_SetRootCoordClient_Call{Call: _e.mock.On("SetRootCoordClient", rootCoord)} return &MockDataNode_SetRootCoordClient_Call{Call: _e.mock.On("SetRootCoordClient", rootCoord)}
} }
@ -1354,8 +1354,8 @@ type MockDataNode_ShowConfigurations_Call struct {
} }
// ShowConfigurations is a helper method to define mock.On call // ShowConfigurations is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *internalpb.ShowConfigurationsRequest // - _a1 *internalpb.ShowConfigurationsRequest
func (_e *MockDataNode_Expecter) ShowConfigurations(_a0 interface{}, _a1 interface{}) *MockDataNode_ShowConfigurations_Call { func (_e *MockDataNode_Expecter) ShowConfigurations(_a0 interface{}, _a1 interface{}) *MockDataNode_ShowConfigurations_Call {
return &MockDataNode_ShowConfigurations_Call{Call: _e.mock.On("ShowConfigurations", _a0, _a1)} return &MockDataNode_ShowConfigurations_Call{Call: _e.mock.On("ShowConfigurations", _a0, _a1)}
} }
@ -1491,8 +1491,8 @@ type MockDataNode_SyncSegments_Call struct {
} }
// SyncSegments is a helper method to define mock.On call // SyncSegments is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *datapb.SyncSegmentsRequest // - _a1 *datapb.SyncSegmentsRequest
func (_e *MockDataNode_Expecter) SyncSegments(_a0 interface{}, _a1 interface{}) *MockDataNode_SyncSegments_Call { func (_e *MockDataNode_Expecter) SyncSegments(_a0 interface{}, _a1 interface{}) *MockDataNode_SyncSegments_Call {
return &MockDataNode_SyncSegments_Call{Call: _e.mock.On("SyncSegments", _a0, _a1)} return &MockDataNode_SyncSegments_Call{Call: _e.mock.On("SyncSegments", _a0, _a1)}
} }
@ -1525,7 +1525,7 @@ type MockDataNode_UpdateStateCode_Call struct {
} }
// UpdateStateCode is a helper method to define mock.On call // UpdateStateCode is a helper method to define mock.On call
// - stateCode commonpb.StateCode // - stateCode commonpb.StateCode
func (_e *MockDataNode_Expecter) UpdateStateCode(stateCode interface{}) *MockDataNode_UpdateStateCode_Call { func (_e *MockDataNode_Expecter) UpdateStateCode(stateCode interface{}) *MockDataNode_UpdateStateCode_Call {
return &MockDataNode_UpdateStateCode_Call{Call: _e.mock.On("UpdateStateCode", stateCode)} return &MockDataNode_UpdateStateCode_Call{Call: _e.mock.On("UpdateStateCode", stateCode)}
} }
@ -1579,8 +1579,8 @@ type MockDataNode_WatchDmChannels_Call struct {
} }
// WatchDmChannels is a helper method to define mock.On call // WatchDmChannels is a helper method to define mock.On call
// - _a0 context.Context // - _a0 context.Context
// - _a1 *datapb.WatchDmChannelsRequest // - _a1 *datapb.WatchDmChannelsRequest
func (_e *MockDataNode_Expecter) WatchDmChannels(_a0 interface{}, _a1 interface{}) *MockDataNode_WatchDmChannels_Call { func (_e *MockDataNode_Expecter) WatchDmChannels(_a0 interface{}, _a1 interface{}) *MockDataNode_WatchDmChannels_Call {
return &MockDataNode_WatchDmChannels_Call{Call: _e.mock.On("WatchDmChannels", _a0, _a1)} return &MockDataNode_WatchDmChannels_Call{Call: _e.mock.On("WatchDmChannels", _a0, _a1)}
} }

View File

@ -70,9 +70,9 @@ type MockDataNodeClient_CheckChannelOperationProgress_Call struct {
} }
// CheckChannelOperationProgress is a helper method to define mock.On call // CheckChannelOperationProgress is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *datapb.ChannelWatchInfo // - in *datapb.ChannelWatchInfo
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) CheckChannelOperationProgress(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_CheckChannelOperationProgress_Call { func (_e *MockDataNodeClient_Expecter) CheckChannelOperationProgress(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_CheckChannelOperationProgress_Call {
return &MockDataNodeClient_CheckChannelOperationProgress_Call{Call: _e.mock.On("CheckChannelOperationProgress", return &MockDataNodeClient_CheckChannelOperationProgress_Call{Call: _e.mock.On("CheckChannelOperationProgress",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -181,9 +181,9 @@ type MockDataNodeClient_CompactionV2_Call struct {
} }
// CompactionV2 is a helper method to define mock.On call // CompactionV2 is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *datapb.CompactionPlan // - in *datapb.CompactionPlan
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) CompactionV2(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_CompactionV2_Call { func (_e *MockDataNodeClient_Expecter) CompactionV2(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_CompactionV2_Call {
return &MockDataNodeClient_CompactionV2_Call{Call: _e.mock.On("CompactionV2", return &MockDataNodeClient_CompactionV2_Call{Call: _e.mock.On("CompactionV2",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -251,9 +251,9 @@ type MockDataNodeClient_DropCompactionPlan_Call struct {
} }
// DropCompactionPlan is a helper method to define mock.On call // DropCompactionPlan is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *datapb.DropCompactionPlanRequest // - in *datapb.DropCompactionPlanRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) DropCompactionPlan(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_DropCompactionPlan_Call { func (_e *MockDataNodeClient_Expecter) DropCompactionPlan(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_DropCompactionPlan_Call {
return &MockDataNodeClient_DropCompactionPlan_Call{Call: _e.mock.On("DropCompactionPlan", return &MockDataNodeClient_DropCompactionPlan_Call{Call: _e.mock.On("DropCompactionPlan",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -321,9 +321,9 @@ type MockDataNodeClient_DropImport_Call struct {
} }
// DropImport is a helper method to define mock.On call // DropImport is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *datapb.DropImportRequest // - in *datapb.DropImportRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) DropImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_DropImport_Call { func (_e *MockDataNodeClient_Expecter) DropImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_DropImport_Call {
return &MockDataNodeClient_DropImport_Call{Call: _e.mock.On("DropImport", return &MockDataNodeClient_DropImport_Call{Call: _e.mock.On("DropImport",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -391,9 +391,9 @@ type MockDataNodeClient_FlushChannels_Call struct {
} }
// FlushChannels is a helper method to define mock.On call // FlushChannels is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *datapb.FlushChannelsRequest // - in *datapb.FlushChannelsRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) FlushChannels(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_FlushChannels_Call { func (_e *MockDataNodeClient_Expecter) FlushChannels(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_FlushChannels_Call {
return &MockDataNodeClient_FlushChannels_Call{Call: _e.mock.On("FlushChannels", return &MockDataNodeClient_FlushChannels_Call{Call: _e.mock.On("FlushChannels",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -461,9 +461,9 @@ type MockDataNodeClient_FlushSegments_Call struct {
} }
// FlushSegments is a helper method to define mock.On call // FlushSegments is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *datapb.FlushSegmentsRequest // - in *datapb.FlushSegmentsRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) FlushSegments(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_FlushSegments_Call { func (_e *MockDataNodeClient_Expecter) FlushSegments(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_FlushSegments_Call {
return &MockDataNodeClient_FlushSegments_Call{Call: _e.mock.On("FlushSegments", return &MockDataNodeClient_FlushSegments_Call{Call: _e.mock.On("FlushSegments",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -531,9 +531,9 @@ type MockDataNodeClient_GetCompactionState_Call struct {
} }
// GetCompactionState is a helper method to define mock.On call // GetCompactionState is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *datapb.CompactionStateRequest // - in *datapb.CompactionStateRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) GetCompactionState(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetCompactionState_Call { func (_e *MockDataNodeClient_Expecter) GetCompactionState(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetCompactionState_Call {
return &MockDataNodeClient_GetCompactionState_Call{Call: _e.mock.On("GetCompactionState", return &MockDataNodeClient_GetCompactionState_Call{Call: _e.mock.On("GetCompactionState",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -601,9 +601,9 @@ type MockDataNodeClient_GetComponentStates_Call struct {
} }
// GetComponentStates is a helper method to define mock.On call // GetComponentStates is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *milvuspb.GetComponentStatesRequest // - in *milvuspb.GetComponentStatesRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) GetComponentStates(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetComponentStates_Call { func (_e *MockDataNodeClient_Expecter) GetComponentStates(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetComponentStates_Call {
return &MockDataNodeClient_GetComponentStates_Call{Call: _e.mock.On("GetComponentStates", return &MockDataNodeClient_GetComponentStates_Call{Call: _e.mock.On("GetComponentStates",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -671,9 +671,9 @@ type MockDataNodeClient_GetMetrics_Call struct {
} }
// GetMetrics is a helper method to define mock.On call // GetMetrics is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *milvuspb.GetMetricsRequest // - in *milvuspb.GetMetricsRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) GetMetrics(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetMetrics_Call { func (_e *MockDataNodeClient_Expecter) GetMetrics(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetMetrics_Call {
return &MockDataNodeClient_GetMetrics_Call{Call: _e.mock.On("GetMetrics", return &MockDataNodeClient_GetMetrics_Call{Call: _e.mock.On("GetMetrics",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -741,9 +741,9 @@ type MockDataNodeClient_GetStatisticsChannel_Call struct {
} }
// GetStatisticsChannel is a helper method to define mock.On call // GetStatisticsChannel is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *internalpb.GetStatisticsChannelRequest // - in *internalpb.GetStatisticsChannelRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) GetStatisticsChannel(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetStatisticsChannel_Call { func (_e *MockDataNodeClient_Expecter) GetStatisticsChannel(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetStatisticsChannel_Call {
return &MockDataNodeClient_GetStatisticsChannel_Call{Call: _e.mock.On("GetStatisticsChannel", return &MockDataNodeClient_GetStatisticsChannel_Call{Call: _e.mock.On("GetStatisticsChannel",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -811,9 +811,9 @@ type MockDataNodeClient_ImportV2_Call struct {
} }
// ImportV2 is a helper method to define mock.On call // ImportV2 is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *datapb.ImportRequest // - in *datapb.ImportRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) ImportV2(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_ImportV2_Call { func (_e *MockDataNodeClient_Expecter) ImportV2(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_ImportV2_Call {
return &MockDataNodeClient_ImportV2_Call{Call: _e.mock.On("ImportV2", return &MockDataNodeClient_ImportV2_Call{Call: _e.mock.On("ImportV2",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -881,9 +881,9 @@ type MockDataNodeClient_NotifyChannelOperation_Call struct {
} }
// NotifyChannelOperation is a helper method to define mock.On call // NotifyChannelOperation is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *datapb.ChannelOperationsRequest // - in *datapb.ChannelOperationsRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) NotifyChannelOperation(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_NotifyChannelOperation_Call { func (_e *MockDataNodeClient_Expecter) NotifyChannelOperation(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_NotifyChannelOperation_Call {
return &MockDataNodeClient_NotifyChannelOperation_Call{Call: _e.mock.On("NotifyChannelOperation", return &MockDataNodeClient_NotifyChannelOperation_Call{Call: _e.mock.On("NotifyChannelOperation",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -951,9 +951,9 @@ type MockDataNodeClient_PreImport_Call struct {
} }
// PreImport is a helper method to define mock.On call // PreImport is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *datapb.PreImportRequest // - in *datapb.PreImportRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) PreImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_PreImport_Call { func (_e *MockDataNodeClient_Expecter) PreImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_PreImport_Call {
return &MockDataNodeClient_PreImport_Call{Call: _e.mock.On("PreImport", return &MockDataNodeClient_PreImport_Call{Call: _e.mock.On("PreImport",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -1021,9 +1021,9 @@ type MockDataNodeClient_QueryImport_Call struct {
} }
// QueryImport is a helper method to define mock.On call // QueryImport is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *datapb.QueryImportRequest // - in *datapb.QueryImportRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) QueryImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_QueryImport_Call { func (_e *MockDataNodeClient_Expecter) QueryImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_QueryImport_Call {
return &MockDataNodeClient_QueryImport_Call{Call: _e.mock.On("QueryImport", return &MockDataNodeClient_QueryImport_Call{Call: _e.mock.On("QueryImport",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -1091,9 +1091,9 @@ type MockDataNodeClient_QueryPreImport_Call struct {
} }
// QueryPreImport is a helper method to define mock.On call // QueryPreImport is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *datapb.QueryPreImportRequest // - in *datapb.QueryPreImportRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) QueryPreImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_QueryPreImport_Call { func (_e *MockDataNodeClient_Expecter) QueryPreImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_QueryPreImport_Call {
return &MockDataNodeClient_QueryPreImport_Call{Call: _e.mock.On("QueryPreImport", return &MockDataNodeClient_QueryPreImport_Call{Call: _e.mock.On("QueryPreImport",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -1161,9 +1161,9 @@ type MockDataNodeClient_QuerySlot_Call struct {
} }
// QuerySlot is a helper method to define mock.On call // QuerySlot is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *datapb.QuerySlotRequest // - in *datapb.QuerySlotRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) QuerySlot(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_QuerySlot_Call { func (_e *MockDataNodeClient_Expecter) QuerySlot(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_QuerySlot_Call {
return &MockDataNodeClient_QuerySlot_Call{Call: _e.mock.On("QuerySlot", return &MockDataNodeClient_QuerySlot_Call{Call: _e.mock.On("QuerySlot",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -1231,9 +1231,9 @@ type MockDataNodeClient_ResendSegmentStats_Call struct {
} }
// ResendSegmentStats is a helper method to define mock.On call // ResendSegmentStats is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *datapb.ResendSegmentStatsRequest // - in *datapb.ResendSegmentStatsRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) ResendSegmentStats(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_ResendSegmentStats_Call { func (_e *MockDataNodeClient_Expecter) ResendSegmentStats(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_ResendSegmentStats_Call {
return &MockDataNodeClient_ResendSegmentStats_Call{Call: _e.mock.On("ResendSegmentStats", return &MockDataNodeClient_ResendSegmentStats_Call{Call: _e.mock.On("ResendSegmentStats",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -1301,9 +1301,9 @@ type MockDataNodeClient_ShowConfigurations_Call struct {
} }
// ShowConfigurations is a helper method to define mock.On call // ShowConfigurations is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *internalpb.ShowConfigurationsRequest // - in *internalpb.ShowConfigurationsRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) ShowConfigurations(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_ShowConfigurations_Call { func (_e *MockDataNodeClient_Expecter) ShowConfigurations(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_ShowConfigurations_Call {
return &MockDataNodeClient_ShowConfigurations_Call{Call: _e.mock.On("ShowConfigurations", return &MockDataNodeClient_ShowConfigurations_Call{Call: _e.mock.On("ShowConfigurations",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -1371,9 +1371,9 @@ type MockDataNodeClient_SyncSegments_Call struct {
} }
// SyncSegments is a helper method to define mock.On call // SyncSegments is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *datapb.SyncSegmentsRequest // - in *datapb.SyncSegmentsRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) SyncSegments(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_SyncSegments_Call { func (_e *MockDataNodeClient_Expecter) SyncSegments(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_SyncSegments_Call {
return &MockDataNodeClient_SyncSegments_Call{Call: _e.mock.On("SyncSegments", return &MockDataNodeClient_SyncSegments_Call{Call: _e.mock.On("SyncSegments",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}
@ -1441,9 +1441,9 @@ type MockDataNodeClient_WatchDmChannels_Call struct {
} }
// WatchDmChannels is a helper method to define mock.On call // WatchDmChannels is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - in *datapb.WatchDmChannelsRequest // - in *datapb.WatchDmChannelsRequest
// - opts ...grpc.CallOption // - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) WatchDmChannels(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_WatchDmChannels_Call { func (_e *MockDataNodeClient_Expecter) WatchDmChannels(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_WatchDmChannels_Call {
return &MockDataNodeClient_WatchDmChannels_Call{Call: _e.mock.On("WatchDmChannels", return &MockDataNodeClient_WatchDmChannels_Call{Call: _e.mock.On("WatchDmChannels",
append([]interface{}{ctx, in}, opts...)...)} append([]interface{}{ctx, in}, opts...)...)}