From fe22720ff380e6342d5e7bdbd5bae4e0921f560e Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Thu, 6 Jul 2023 18:20:26 +0800 Subject: [PATCH] Avoid rewriting all binlogs every time a segment is updated (#25339) Signed-off-by: sunby Co-authored-by: sunby --- internal/datacoord/meta.go | 8 +- internal/metastore/catalog.go | 9 +- internal/metastore/kv/datacoord/kv_catalog.go | 90 ++++--- .../metastore/kv/datacoord/kv_catalog_test.go | 23 +- internal/metastore/mocks/DataCoordCatalog.go | 226 ++++++++++++----- internal/mocks/mock_datacoord_catalog.go | 229 +++++++++++++----- 6 files changed, 444 insertions(+), 141 deletions(-) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 4f1d82cc7c..396b7b5062 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -593,7 +593,13 @@ func (m *meta) UpdateFlushSegmentsInfo( for _, seg := range modSegments { segments = append(segments, seg.SegmentInfo) } - if err := m.catalog.AlterSegments(m.ctx, segments); err != nil { + if err := m.catalog.AlterSegments(m.ctx, segments, + metastore.BinlogsIncrement{ + Segment: clonedSegment.SegmentInfo, + Insertlogs: binlogs, + Statslogs: statslogs, + Deltalogs: deltalogs, + }); err != nil { log.Error("meta update: update flush segments info - failed to store flush segment info into Etcd", zap.Error(err)) return err diff --git a/internal/metastore/catalog.go b/internal/metastore/catalog.go index 9a2b84f4aa..00bef323aa 100644 --- a/internal/metastore/catalog.go +++ b/internal/metastore/catalog.go @@ -102,12 +102,19 @@ func (t AlterType) String() string { return "" } +type BinlogsIncrement struct { + Segment *datapb.SegmentInfo + Insertlogs []*datapb.FieldBinlog + Statslogs []*datapb.FieldBinlog + Deltalogs []*datapb.FieldBinlog +} + //go:generate mockery --name=DataCoordCatalog --with-expecter type DataCoordCatalog interface { ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, error) AddSegment(ctx context.Context, segment *datapb.SegmentInfo) error // TODO Remove this later, we should update flush segments info for each segment separately, so far we still need transaction - AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo) error + AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo, binlogs ...BinlogsIncrement) error // AlterSegmentsAndAddNewSegment for transaction AlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error AlterSegment(ctx context.Context, newSegment *datapb.SegmentInfo, oldSegment *datapb.SegmentInfo) error diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index e6d1a46b88..40350c62fd 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/kv" + "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" @@ -253,52 +254,87 @@ func (kc *Catalog) LoadFromSegmentPath(colID, partID, segID typeutil.UniqueID) ( return segInfo, nil } -func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo) error { +func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo, binlogs ...metastore.BinlogsIncrement) error { if len(newSegments) == 0 { return nil } - kvsBySeg := make(map[int64]map[string]string) + kvs := make(map[string]string) for _, segment := range newSegments { kc.collectMetrics(segment) - segmentKvs, err := buildSegmentAndBinlogsKvs(segment) + + // we don't persist binlog fields, but instead store binlogs as independent kvs + cloned := proto.Clone(segment).(*datapb.SegmentInfo) + resetBinlogFields(cloned) + + rowCount := segmentutil.CalcRowCountFromBinLog(segment) + if cloned.GetNumOfRows() != rowCount { + cloned.NumOfRows = rowCount + } + + k, v, err := buildSegmentKv(cloned) if err != nil { return err } - kvsBySeg[segment.GetID()] = make(map[string]string) - maps.Copy(kvsBySeg[segment.GetID()], segmentKvs) + kvs[k] = v } - // Split kvs into multiple operations to avoid over-sized operations. - // Also make sure kvs of the same segment are not split into different operations. - kvsPiece := make(map[string]string) - currSize := 0 + + for _, b := range binlogs { + segment := b.Segment + binlogKvs, err := buildBinlogKvsWithLogID(segment.GetCollectionID(), segment.GetPartitionID(), + segment.GetID(), cloneLogs(b.Insertlogs), cloneLogs(b.Deltalogs), cloneLogs(b.Statslogs), len(segment.GetCompactionFrom()) > 0) + if err != nil { + return err + } + maps.Copy(kvs, binlogKvs) + } + saveFn := func(partialKvs map[string]string) error { return kc.MetaKv.MultiSave(partialKvs) } - for _, kvs := range kvsBySeg { - if currSize+len(kvs) >= maxEtcdTxnNum { - if err := etcd.SaveByBatch(kvsPiece, saveFn); err != nil { - log.Error("failed to save by batch", zap.Error(err)) - return err - } - kvsPiece = make(map[string]string) - currSize = 0 - } - maps.Copy(kvsPiece, kvs) - currSize += len(kvs) - if len(kvs) >= maxEtcdTxnNum { - log.Warn("a single segment's Etcd save has over maxEtcdTxnNum operations." + - " Please double check your config") - } - } - if currSize > 0 { - if err := etcd.SaveByBatch(kvsPiece, saveFn); err != nil { + if len(kvs) <= maxEtcdTxnNum { + if err := etcd.SaveByBatch(kvs, saveFn); err != nil { log.Error("failed to save by batch", zap.Error(err)) return err } + } else { + // Split kvs into multiple operations to avoid over-sized operations. + // Also make sure kvs of the same segment are not split into different operations. + batch := make(map[string]string) + for k, v := range kvs { + if len(batch) == maxEtcdTxnNum { + if err := etcd.SaveByBatch(batch, saveFn); err != nil { + log.Error("failed to save by batch", zap.Error(err)) + return err + } + maps.Clear(batch) + } + batch[k] = v + } + + if len(batch) > 0 { + if err := etcd.SaveByBatch(batch, saveFn); err != nil { + log.Error("failed to save by batch", zap.Error(err)) + return err + } + } } return nil } +func resetBinlogFields(segment *datapb.SegmentInfo) { + segment.Binlogs = nil + segment.Deltalogs = nil + segment.Statslogs = nil +} + +func cloneLogs(binlogs []*datapb.FieldBinlog) []*datapb.FieldBinlog { + var res []*datapb.FieldBinlog + for _, log := range binlogs { + res = append(res, proto.Clone(log).(*datapb.FieldBinlog)) + } + return res +} + func (kc *Catalog) AlterSegment(ctx context.Context, newSegment *datapb.SegmentInfo, oldSegment *datapb.SegmentInfo) error { kvs := make(map[string]string) segmentKvs, err := buildSegmentAndBinlogsKvs(newSegment) diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index b7ef705523..613c52131a 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/kv/mocks" + "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" @@ -374,7 +375,12 @@ func Test_AlterSegments(t *testing.T) { catalog := NewCatalog(txn, rootPath, "") assert.Panics(t, func() { - catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{invalidSegment}) + catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{invalidSegment}, metastore.BinlogsIncrement{ + Segment: invalidSegment, + Insertlogs: invalidSegment.Binlogs, + Statslogs: invalidSegment.Statslogs, + Deltalogs: invalidSegment.Deltalogs, + }) }) }) @@ -402,7 +408,12 @@ func Test_AlterSegments(t *testing.T) { err := catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{}) assert.NoError(t, err) - err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segment1}) + err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segment1}, metastore.BinlogsIncrement{ + Segment: segment1, + Insertlogs: segment1.Binlogs, + Statslogs: segment1.Statslogs, + Deltalogs: segment1.Deltalogs, + }) assert.NoError(t, err) _, ok := savedKvs[k4] @@ -459,7 +470,13 @@ func Test_AlterSegments(t *testing.T) { Statslogs: statslogs, } - err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segmentXL}) + err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segmentXL}, + metastore.BinlogsIncrement{ + Segment: segmentXL, + Insertlogs: segmentXL.Binlogs, + Statslogs: segmentXL.Statslogs, + Deltalogs: segmentXL.Deltalogs, + }) assert.NoError(t, err) assert.Equal(t, 255+3, len(savedKvs)) assert.Equal(t, 3, opGroupCount) diff --git a/internal/metastore/mocks/DataCoordCatalog.go b/internal/metastore/mocks/DataCoordCatalog.go index 010edbcd2f..8fd49e52bc 100644 --- a/internal/metastore/mocks/DataCoordCatalog.go +++ b/internal/metastore/mocks/DataCoordCatalog.go @@ -1,10 +1,11 @@ -// Code generated by mockery v2.15.0. DO NOT EDIT. +// Code generated by mockery v2.30.1. DO NOT EDIT. package mocks import ( context "context" + metastore "github.com/milvus-io/milvus/internal/metastore" datapb "github.com/milvus-io/milvus/internal/proto/datapb" mock "github.com/stretchr/testify/mock" @@ -65,6 +66,11 @@ func (_c *DataCoordCatalog_AddSegment_Call) Return(_a0 error) *DataCoordCatalog_ return _c } +func (_c *DataCoordCatalog_AddSegment_Call) RunAndReturn(run func(context.Context, *datapb.SegmentInfo) error) *DataCoordCatalog_AddSegment_Call { + _c.Call.Return(run) + return _c +} + // AlterIndex provides a mock function with given fields: ctx, newIndex func (_m *DataCoordCatalog) AlterIndex(ctx context.Context, newIndex *model.Index) error { ret := _m.Called(ctx, newIndex) @@ -103,6 +109,11 @@ func (_c *DataCoordCatalog_AlterIndex_Call) Return(_a0 error) *DataCoordCatalog_ return _c } +func (_c *DataCoordCatalog_AlterIndex_Call) RunAndReturn(run func(context.Context, *model.Index) error) *DataCoordCatalog_AlterIndex_Call { + _c.Call.Return(run) + return _c +} + // AlterIndexes provides a mock function with given fields: ctx, newIndexes func (_m *DataCoordCatalog) AlterIndexes(ctx context.Context, newIndexes []*model.Index) error { ret := _m.Called(ctx, newIndexes) @@ -141,6 +152,11 @@ func (_c *DataCoordCatalog_AlterIndexes_Call) Return(_a0 error) *DataCoordCatalo return _c } +func (_c *DataCoordCatalog_AlterIndexes_Call) RunAndReturn(run func(context.Context, []*model.Index) error) *DataCoordCatalog_AlterIndexes_Call { + _c.Call.Return(run) + return _c +} + // AlterSegment provides a mock function with given fields: ctx, newSegment, oldSegment func (_m *DataCoordCatalog) AlterSegment(ctx context.Context, newSegment *datapb.SegmentInfo, oldSegment *datapb.SegmentInfo) error { ret := _m.Called(ctx, newSegment, oldSegment) @@ -180,6 +196,11 @@ func (_c *DataCoordCatalog_AlterSegment_Call) Return(_a0 error) *DataCoordCatalo return _c } +func (_c *DataCoordCatalog_AlterSegment_Call) RunAndReturn(run func(context.Context, *datapb.SegmentInfo, *datapb.SegmentInfo) error) *DataCoordCatalog_AlterSegment_Call { + _c.Call.Return(run) + return _c +} + // AlterSegmentIndex provides a mock function with given fields: ctx, newSegIndex func (_m *DataCoordCatalog) AlterSegmentIndex(ctx context.Context, newSegIndex *model.SegmentIndex) error { ret := _m.Called(ctx, newSegIndex) @@ -218,6 +239,11 @@ func (_c *DataCoordCatalog_AlterSegmentIndex_Call) Return(_a0 error) *DataCoordC return _c } +func (_c *DataCoordCatalog_AlterSegmentIndex_Call) RunAndReturn(run func(context.Context, *model.SegmentIndex) error) *DataCoordCatalog_AlterSegmentIndex_Call { + _c.Call.Return(run) + return _c +} + // AlterSegmentIndexes provides a mock function with given fields: ctx, newSegIdxes func (_m *DataCoordCatalog) AlterSegmentIndexes(ctx context.Context, newSegIdxes []*model.SegmentIndex) error { ret := _m.Called(ctx, newSegIdxes) @@ -256,13 +282,25 @@ func (_c *DataCoordCatalog_AlterSegmentIndexes_Call) Return(_a0 error) *DataCoor return _c } -// AlterSegments provides a mock function with given fields: ctx, newSegments -func (_m *DataCoordCatalog) AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo) error { - ret := _m.Called(ctx, newSegments) +func (_c *DataCoordCatalog_AlterSegmentIndexes_Call) RunAndReturn(run func(context.Context, []*model.SegmentIndex) error) *DataCoordCatalog_AlterSegmentIndexes_Call { + _c.Call.Return(run) + return _c +} + +// AlterSegments provides a mock function with given fields: ctx, newSegments, binlogs +func (_m *DataCoordCatalog) AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo, binlogs ...metastore.BinlogsIncrement) error { + _va := make([]interface{}, len(binlogs)) + for _i := range binlogs { + _va[_i] = binlogs[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, newSegments) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []*datapb.SegmentInfo) error); ok { - r0 = rf(ctx, newSegments) + if rf, ok := ret.Get(0).(func(context.Context, []*datapb.SegmentInfo, ...metastore.BinlogsIncrement) error); ok { + r0 = rf(ctx, newSegments, binlogs...) } else { r0 = ret.Error(0) } @@ -278,13 +316,21 @@ type DataCoordCatalog_AlterSegments_Call struct { // AlterSegments is a helper method to define mock.On call // - ctx context.Context // - newSegments []*datapb.SegmentInfo -func (_e *DataCoordCatalog_Expecter) AlterSegments(ctx interface{}, newSegments interface{}) *DataCoordCatalog_AlterSegments_Call { - return &DataCoordCatalog_AlterSegments_Call{Call: _e.mock.On("AlterSegments", ctx, newSegments)} +// - binlogs ...metastore.BinlogsIncrement +func (_e *DataCoordCatalog_Expecter) AlterSegments(ctx interface{}, newSegments interface{}, binlogs ...interface{}) *DataCoordCatalog_AlterSegments_Call { + return &DataCoordCatalog_AlterSegments_Call{Call: _e.mock.On("AlterSegments", + append([]interface{}{ctx, newSegments}, binlogs...)...)} } -func (_c *DataCoordCatalog_AlterSegments_Call) Run(run func(ctx context.Context, newSegments []*datapb.SegmentInfo)) *DataCoordCatalog_AlterSegments_Call { +func (_c *DataCoordCatalog_AlterSegments_Call) Run(run func(ctx context.Context, newSegments []*datapb.SegmentInfo, binlogs ...metastore.BinlogsIncrement)) *DataCoordCatalog_AlterSegments_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]*datapb.SegmentInfo)) + variadicArgs := make([]metastore.BinlogsIncrement, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(metastore.BinlogsIncrement) + } + } + run(args[0].(context.Context), args[1].([]*datapb.SegmentInfo), variadicArgs...) }) return _c } @@ -294,6 +340,11 @@ func (_c *DataCoordCatalog_AlterSegments_Call) Return(_a0 error) *DataCoordCatal return _c } +func (_c *DataCoordCatalog_AlterSegments_Call) RunAndReturn(run func(context.Context, []*datapb.SegmentInfo, ...metastore.BinlogsIncrement) error) *DataCoordCatalog_AlterSegments_Call { + _c.Call.Return(run) + return _c +} + // AlterSegmentsAndAddNewSegment provides a mock function with given fields: ctx, segments, newSegment func (_m *DataCoordCatalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error { ret := _m.Called(ctx, segments, newSegment) @@ -333,6 +384,11 @@ func (_c *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call) Return(_a0 error) return _c } +func (_c *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call) RunAndReturn(run func(context.Context, []*datapb.SegmentInfo, *datapb.SegmentInfo) error) *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call { + _c.Call.Return(run) + return _c +} + // ChannelExists provides a mock function with given fields: ctx, channel func (_m *DataCoordCatalog) ChannelExists(ctx context.Context, channel string) bool { ret := _m.Called(ctx, channel) @@ -371,6 +427,11 @@ func (_c *DataCoordCatalog_ChannelExists_Call) Return(_a0 bool) *DataCoordCatalo return _c } +func (_c *DataCoordCatalog_ChannelExists_Call) RunAndReturn(run func(context.Context, string) bool) *DataCoordCatalog_ChannelExists_Call { + _c.Call.Return(run) + return _c +} + // CreateIndex provides a mock function with given fields: ctx, index func (_m *DataCoordCatalog) CreateIndex(ctx context.Context, index *model.Index) error { ret := _m.Called(ctx, index) @@ -409,6 +470,11 @@ func (_c *DataCoordCatalog_CreateIndex_Call) Return(_a0 error) *DataCoordCatalog return _c } +func (_c *DataCoordCatalog_CreateIndex_Call) RunAndReturn(run func(context.Context, *model.Index) error) *DataCoordCatalog_CreateIndex_Call { + _c.Call.Return(run) + return _c +} + // CreateSegmentIndex provides a mock function with given fields: ctx, segIdx func (_m *DataCoordCatalog) CreateSegmentIndex(ctx context.Context, segIdx *model.SegmentIndex) error { ret := _m.Called(ctx, segIdx) @@ -447,6 +513,11 @@ func (_c *DataCoordCatalog_CreateSegmentIndex_Call) Return(_a0 error) *DataCoord return _c } +func (_c *DataCoordCatalog_CreateSegmentIndex_Call) RunAndReturn(run func(context.Context, *model.SegmentIndex) error) *DataCoordCatalog_CreateSegmentIndex_Call { + _c.Call.Return(run) + return _c +} + // DropChannel provides a mock function with given fields: ctx, channel func (_m *DataCoordCatalog) DropChannel(ctx context.Context, channel string) error { ret := _m.Called(ctx, channel) @@ -485,6 +556,11 @@ func (_c *DataCoordCatalog_DropChannel_Call) Return(_a0 error) *DataCoordCatalog return _c } +func (_c *DataCoordCatalog_DropChannel_Call) RunAndReturn(run func(context.Context, string) error) *DataCoordCatalog_DropChannel_Call { + _c.Call.Return(run) + return _c +} + // DropChannelCheckpoint provides a mock function with given fields: ctx, vChannel func (_m *DataCoordCatalog) DropChannelCheckpoint(ctx context.Context, vChannel string) error { ret := _m.Called(ctx, vChannel) @@ -523,6 +599,11 @@ func (_c *DataCoordCatalog_DropChannelCheckpoint_Call) Return(_a0 error) *DataCo return _c } +func (_c *DataCoordCatalog_DropChannelCheckpoint_Call) RunAndReturn(run func(context.Context, string) error) *DataCoordCatalog_DropChannelCheckpoint_Call { + _c.Call.Return(run) + return _c +} + // DropIndex provides a mock function with given fields: ctx, collID, dropIdxID func (_m *DataCoordCatalog) DropIndex(ctx context.Context, collID int64, dropIdxID int64) error { ret := _m.Called(ctx, collID, dropIdxID) @@ -562,6 +643,11 @@ func (_c *DataCoordCatalog_DropIndex_Call) Return(_a0 error) *DataCoordCatalog_D return _c } +func (_c *DataCoordCatalog_DropIndex_Call) RunAndReturn(run func(context.Context, int64, int64) error) *DataCoordCatalog_DropIndex_Call { + _c.Call.Return(run) + return _c +} + // DropSegment provides a mock function with given fields: ctx, segment func (_m *DataCoordCatalog) DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error { ret := _m.Called(ctx, segment) @@ -600,6 +686,11 @@ func (_c *DataCoordCatalog_DropSegment_Call) Return(_a0 error) *DataCoordCatalog return _c } +func (_c *DataCoordCatalog_DropSegment_Call) RunAndReturn(run func(context.Context, *datapb.SegmentInfo) error) *DataCoordCatalog_DropSegment_Call { + _c.Call.Return(run) + return _c +} + // DropSegmentIndex provides a mock function with given fields: ctx, collID, partID, segID, buildID func (_m *DataCoordCatalog) DropSegmentIndex(ctx context.Context, collID int64, partID int64, segID int64, buildID int64) error { ret := _m.Called(ctx, collID, partID, segID, buildID) @@ -641,6 +732,11 @@ func (_c *DataCoordCatalog_DropSegmentIndex_Call) Return(_a0 error) *DataCoordCa return _c } +func (_c *DataCoordCatalog_DropSegmentIndex_Call) RunAndReturn(run func(context.Context, int64, int64, int64, int64) error) *DataCoordCatalog_DropSegmentIndex_Call { + _c.Call.Return(run) + return _c +} + // GcConfirm provides a mock function with given fields: ctx, collectionID, partitionID func (_m *DataCoordCatalog) GcConfirm(ctx context.Context, collectionID int64, partitionID int64) bool { ret := _m.Called(ctx, collectionID, partitionID) @@ -680,11 +776,20 @@ func (_c *DataCoordCatalog_GcConfirm_Call) Return(_a0 bool) *DataCoordCatalog_Gc return _c } +func (_c *DataCoordCatalog_GcConfirm_Call) RunAndReturn(run func(context.Context, int64, int64) bool) *DataCoordCatalog_GcConfirm_Call { + _c.Call.Return(run) + return _c +} + // ListChannelCheckpoint provides a mock function with given fields: ctx func (_m *DataCoordCatalog) ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb.MsgPosition, error) { ret := _m.Called(ctx) var r0 map[string]*msgpb.MsgPosition + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (map[string]*msgpb.MsgPosition, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) map[string]*msgpb.MsgPosition); ok { r0 = rf(ctx) } else { @@ -693,7 +798,6 @@ func (_m *DataCoordCatalog) ListChannelCheckpoint(ctx context.Context) (map[stri } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -726,11 +830,20 @@ func (_c *DataCoordCatalog_ListChannelCheckpoint_Call) Return(_a0 map[string]*ms return _c } +func (_c *DataCoordCatalog_ListChannelCheckpoint_Call) RunAndReturn(run func(context.Context) (map[string]*msgpb.MsgPosition, error)) *DataCoordCatalog_ListChannelCheckpoint_Call { + _c.Call.Return(run) + return _c +} + // ListIndexes provides a mock function with given fields: ctx func (_m *DataCoordCatalog) ListIndexes(ctx context.Context) ([]*model.Index, error) { ret := _m.Called(ctx) var r0 []*model.Index + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]*model.Index, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) []*model.Index); ok { r0 = rf(ctx) } else { @@ -739,7 +852,6 @@ func (_m *DataCoordCatalog) ListIndexes(ctx context.Context) ([]*model.Index, er } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -772,11 +884,20 @@ func (_c *DataCoordCatalog_ListIndexes_Call) Return(_a0 []*model.Index, _a1 erro return _c } +func (_c *DataCoordCatalog_ListIndexes_Call) RunAndReturn(run func(context.Context) ([]*model.Index, error)) *DataCoordCatalog_ListIndexes_Call { + _c.Call.Return(run) + return _c +} + // ListSegmentIndexes provides a mock function with given fields: ctx func (_m *DataCoordCatalog) ListSegmentIndexes(ctx context.Context) ([]*model.SegmentIndex, error) { ret := _m.Called(ctx) var r0 []*model.SegmentIndex + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]*model.SegmentIndex, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) []*model.SegmentIndex); ok { r0 = rf(ctx) } else { @@ -785,7 +906,6 @@ func (_m *DataCoordCatalog) ListSegmentIndexes(ctx context.Context) ([]*model.Se } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -818,11 +938,20 @@ func (_c *DataCoordCatalog_ListSegmentIndexes_Call) Return(_a0 []*model.SegmentI return _c } +func (_c *DataCoordCatalog_ListSegmentIndexes_Call) RunAndReturn(run func(context.Context) ([]*model.SegmentIndex, error)) *DataCoordCatalog_ListSegmentIndexes_Call { + _c.Call.Return(run) + return _c +} + // ListSegments provides a mock function with given fields: ctx func (_m *DataCoordCatalog) ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, error) { ret := _m.Called(ctx) var r0 []*datapb.SegmentInfo + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]*datapb.SegmentInfo, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) []*datapb.SegmentInfo); ok { r0 = rf(ctx) } else { @@ -831,7 +960,6 @@ func (_m *DataCoordCatalog) ListSegments(ctx context.Context) ([]*datapb.Segment } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -864,6 +992,11 @@ func (_c *DataCoordCatalog_ListSegments_Call) Return(_a0 []*datapb.SegmentInfo, return _c } +func (_c *DataCoordCatalog_ListSegments_Call) RunAndReturn(run func(context.Context) ([]*datapb.SegmentInfo, error)) *DataCoordCatalog_ListSegments_Call { + _c.Call.Return(run) + return _c +} + // MarkChannelAdded provides a mock function with given fields: ctx, channel func (_m *DataCoordCatalog) MarkChannelAdded(ctx context.Context, channel string) error { ret := _m.Called(ctx, channel) @@ -902,6 +1035,11 @@ func (_c *DataCoordCatalog_MarkChannelAdded_Call) Return(_a0 error) *DataCoordCa return _c } +func (_c *DataCoordCatalog_MarkChannelAdded_Call) RunAndReturn(run func(context.Context, string) error) *DataCoordCatalog_MarkChannelAdded_Call { + _c.Call.Return(run) + return _c +} + // MarkChannelDeleted provides a mock function with given fields: ctx, channel func (_m *DataCoordCatalog) MarkChannelDeleted(ctx context.Context, channel string) error { ret := _m.Called(ctx, channel) @@ -940,42 +1078,8 @@ func (_c *DataCoordCatalog_MarkChannelDeleted_Call) Return(_a0 error) *DataCoord return _c } -// RevertAlterSegmentsAndAddNewSegment provides a mock function with given fields: ctx, segments, removalSegment -func (_m *DataCoordCatalog) RevertAlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, removalSegment *datapb.SegmentInfo) error { - ret := _m.Called(ctx, segments, removalSegment) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []*datapb.SegmentInfo, *datapb.SegmentInfo) error); ok { - r0 = rf(ctx, segments, removalSegment) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RevertAlterSegmentsAndAddNewSegment' -type DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call struct { - *mock.Call -} - -// RevertAlterSegmentsAndAddNewSegment is a helper method to define mock.On call -// - ctx context.Context -// - segments []*datapb.SegmentInfo -// - removalSegment *datapb.SegmentInfo -func (_e *DataCoordCatalog_Expecter) RevertAlterSegmentsAndAddNewSegment(ctx interface{}, segments interface{}, removalSegment interface{}) *DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call { - return &DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call{Call: _e.mock.On("RevertAlterSegmentsAndAddNewSegment", ctx, segments, removalSegment)} -} - -func (_c *DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call) Run(run func(ctx context.Context, segments []*datapb.SegmentInfo, removalSegment *datapb.SegmentInfo)) *DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]*datapb.SegmentInfo), args[2].(*datapb.SegmentInfo)) - }) - return _c -} - -func (_c *DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call) Return(_a0 error) *DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call { - _c.Call.Return(_a0) +func (_c *DataCoordCatalog_MarkChannelDeleted_Call) RunAndReturn(run func(context.Context, string) error) *DataCoordCatalog_MarkChannelDeleted_Call { + _c.Call.Return(run) return _c } @@ -1018,6 +1122,11 @@ func (_c *DataCoordCatalog_SaveChannelCheckpoint_Call) Return(_a0 error) *DataCo return _c } +func (_c *DataCoordCatalog_SaveChannelCheckpoint_Call) RunAndReturn(run func(context.Context, string, *msgpb.MsgPosition) error) *DataCoordCatalog_SaveChannelCheckpoint_Call { + _c.Call.Return(run) + return _c +} + // SaveDroppedSegmentsInBatch provides a mock function with given fields: ctx, segments func (_m *DataCoordCatalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error { ret := _m.Called(ctx, segments) @@ -1056,6 +1165,11 @@ func (_c *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call) Return(_a0 error) *D return _c } +func (_c *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call) RunAndReturn(run func(context.Context, []*datapb.SegmentInfo) error) *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call { + _c.Call.Return(run) + return _c +} + // ShouldDropChannel provides a mock function with given fields: ctx, channel func (_m *DataCoordCatalog) ShouldDropChannel(ctx context.Context, channel string) bool { ret := _m.Called(ctx, channel) @@ -1094,13 +1208,17 @@ func (_c *DataCoordCatalog_ShouldDropChannel_Call) Return(_a0 bool) *DataCoordCa return _c } -type mockConstructorTestingTNewDataCoordCatalog interface { - mock.TestingT - Cleanup(func()) +func (_c *DataCoordCatalog_ShouldDropChannel_Call) RunAndReturn(run func(context.Context, string) bool) *DataCoordCatalog_ShouldDropChannel_Call { + _c.Call.Return(run) + return _c } // NewDataCoordCatalog creates a new instance of DataCoordCatalog. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewDataCoordCatalog(t mockConstructorTestingTNewDataCoordCatalog) *DataCoordCatalog { +// The first argument is typically a *testing.T value. +func NewDataCoordCatalog(t interface { + mock.TestingT + Cleanup(func()) +}) *DataCoordCatalog { mock := &DataCoordCatalog{} mock.Mock.Test(t) diff --git a/internal/mocks/mock_datacoord_catalog.go b/internal/mocks/mock_datacoord_catalog.go index 88a6feed64..8fd49e52bc 100644 --- a/internal/mocks/mock_datacoord_catalog.go +++ b/internal/mocks/mock_datacoord_catalog.go @@ -1,16 +1,18 @@ -// Code generated by mockery v2.15.0. DO NOT EDIT. +// Code generated by mockery v2.30.1. DO NOT EDIT. package mocks import ( context "context" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + metastore "github.com/milvus-io/milvus/internal/metastore" datapb "github.com/milvus-io/milvus/internal/proto/datapb" mock "github.com/stretchr/testify/mock" model "github.com/milvus-io/milvus/internal/metastore/model" + + msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" ) // DataCoordCatalog is an autogenerated mock type for the DataCoordCatalog type @@ -64,6 +66,11 @@ func (_c *DataCoordCatalog_AddSegment_Call) Return(_a0 error) *DataCoordCatalog_ return _c } +func (_c *DataCoordCatalog_AddSegment_Call) RunAndReturn(run func(context.Context, *datapb.SegmentInfo) error) *DataCoordCatalog_AddSegment_Call { + _c.Call.Return(run) + return _c +} + // AlterIndex provides a mock function with given fields: ctx, newIndex func (_m *DataCoordCatalog) AlterIndex(ctx context.Context, newIndex *model.Index) error { ret := _m.Called(ctx, newIndex) @@ -102,6 +109,11 @@ func (_c *DataCoordCatalog_AlterIndex_Call) Return(_a0 error) *DataCoordCatalog_ return _c } +func (_c *DataCoordCatalog_AlterIndex_Call) RunAndReturn(run func(context.Context, *model.Index) error) *DataCoordCatalog_AlterIndex_Call { + _c.Call.Return(run) + return _c +} + // AlterIndexes provides a mock function with given fields: ctx, newIndexes func (_m *DataCoordCatalog) AlterIndexes(ctx context.Context, newIndexes []*model.Index) error { ret := _m.Called(ctx, newIndexes) @@ -140,6 +152,11 @@ func (_c *DataCoordCatalog_AlterIndexes_Call) Return(_a0 error) *DataCoordCatalo return _c } +func (_c *DataCoordCatalog_AlterIndexes_Call) RunAndReturn(run func(context.Context, []*model.Index) error) *DataCoordCatalog_AlterIndexes_Call { + _c.Call.Return(run) + return _c +} + // AlterSegment provides a mock function with given fields: ctx, newSegment, oldSegment func (_m *DataCoordCatalog) AlterSegment(ctx context.Context, newSegment *datapb.SegmentInfo, oldSegment *datapb.SegmentInfo) error { ret := _m.Called(ctx, newSegment, oldSegment) @@ -179,6 +196,11 @@ func (_c *DataCoordCatalog_AlterSegment_Call) Return(_a0 error) *DataCoordCatalo return _c } +func (_c *DataCoordCatalog_AlterSegment_Call) RunAndReturn(run func(context.Context, *datapb.SegmentInfo, *datapb.SegmentInfo) error) *DataCoordCatalog_AlterSegment_Call { + _c.Call.Return(run) + return _c +} + // AlterSegmentIndex provides a mock function with given fields: ctx, newSegIndex func (_m *DataCoordCatalog) AlterSegmentIndex(ctx context.Context, newSegIndex *model.SegmentIndex) error { ret := _m.Called(ctx, newSegIndex) @@ -217,6 +239,11 @@ func (_c *DataCoordCatalog_AlterSegmentIndex_Call) Return(_a0 error) *DataCoordC return _c } +func (_c *DataCoordCatalog_AlterSegmentIndex_Call) RunAndReturn(run func(context.Context, *model.SegmentIndex) error) *DataCoordCatalog_AlterSegmentIndex_Call { + _c.Call.Return(run) + return _c +} + // AlterSegmentIndexes provides a mock function with given fields: ctx, newSegIdxes func (_m *DataCoordCatalog) AlterSegmentIndexes(ctx context.Context, newSegIdxes []*model.SegmentIndex) error { ret := _m.Called(ctx, newSegIdxes) @@ -255,13 +282,25 @@ func (_c *DataCoordCatalog_AlterSegmentIndexes_Call) Return(_a0 error) *DataCoor return _c } -// AlterSegments provides a mock function with given fields: ctx, newSegments -func (_m *DataCoordCatalog) AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo) error { - ret := _m.Called(ctx, newSegments) +func (_c *DataCoordCatalog_AlterSegmentIndexes_Call) RunAndReturn(run func(context.Context, []*model.SegmentIndex) error) *DataCoordCatalog_AlterSegmentIndexes_Call { + _c.Call.Return(run) + return _c +} + +// AlterSegments provides a mock function with given fields: ctx, newSegments, binlogs +func (_m *DataCoordCatalog) AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo, binlogs ...metastore.BinlogsIncrement) error { + _va := make([]interface{}, len(binlogs)) + for _i := range binlogs { + _va[_i] = binlogs[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, newSegments) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []*datapb.SegmentInfo) error); ok { - r0 = rf(ctx, newSegments) + if rf, ok := ret.Get(0).(func(context.Context, []*datapb.SegmentInfo, ...metastore.BinlogsIncrement) error); ok { + r0 = rf(ctx, newSegments, binlogs...) } else { r0 = ret.Error(0) } @@ -277,13 +316,21 @@ type DataCoordCatalog_AlterSegments_Call struct { // AlterSegments is a helper method to define mock.On call // - ctx context.Context // - newSegments []*datapb.SegmentInfo -func (_e *DataCoordCatalog_Expecter) AlterSegments(ctx interface{}, newSegments interface{}) *DataCoordCatalog_AlterSegments_Call { - return &DataCoordCatalog_AlterSegments_Call{Call: _e.mock.On("AlterSegments", ctx, newSegments)} +// - binlogs ...metastore.BinlogsIncrement +func (_e *DataCoordCatalog_Expecter) AlterSegments(ctx interface{}, newSegments interface{}, binlogs ...interface{}) *DataCoordCatalog_AlterSegments_Call { + return &DataCoordCatalog_AlterSegments_Call{Call: _e.mock.On("AlterSegments", + append([]interface{}{ctx, newSegments}, binlogs...)...)} } -func (_c *DataCoordCatalog_AlterSegments_Call) Run(run func(ctx context.Context, newSegments []*datapb.SegmentInfo)) *DataCoordCatalog_AlterSegments_Call { +func (_c *DataCoordCatalog_AlterSegments_Call) Run(run func(ctx context.Context, newSegments []*datapb.SegmentInfo, binlogs ...metastore.BinlogsIncrement)) *DataCoordCatalog_AlterSegments_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]*datapb.SegmentInfo)) + variadicArgs := make([]metastore.BinlogsIncrement, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(metastore.BinlogsIncrement) + } + } + run(args[0].(context.Context), args[1].([]*datapb.SegmentInfo), variadicArgs...) }) return _c } @@ -293,6 +340,11 @@ func (_c *DataCoordCatalog_AlterSegments_Call) Return(_a0 error) *DataCoordCatal return _c } +func (_c *DataCoordCatalog_AlterSegments_Call) RunAndReturn(run func(context.Context, []*datapb.SegmentInfo, ...metastore.BinlogsIncrement) error) *DataCoordCatalog_AlterSegments_Call { + _c.Call.Return(run) + return _c +} + // AlterSegmentsAndAddNewSegment provides a mock function with given fields: ctx, segments, newSegment func (_m *DataCoordCatalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error { ret := _m.Called(ctx, segments, newSegment) @@ -332,6 +384,11 @@ func (_c *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call) Return(_a0 error) return _c } +func (_c *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call) RunAndReturn(run func(context.Context, []*datapb.SegmentInfo, *datapb.SegmentInfo) error) *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call { + _c.Call.Return(run) + return _c +} + // ChannelExists provides a mock function with given fields: ctx, channel func (_m *DataCoordCatalog) ChannelExists(ctx context.Context, channel string) bool { ret := _m.Called(ctx, channel) @@ -370,6 +427,11 @@ func (_c *DataCoordCatalog_ChannelExists_Call) Return(_a0 bool) *DataCoordCatalo return _c } +func (_c *DataCoordCatalog_ChannelExists_Call) RunAndReturn(run func(context.Context, string) bool) *DataCoordCatalog_ChannelExists_Call { + _c.Call.Return(run) + return _c +} + // CreateIndex provides a mock function with given fields: ctx, index func (_m *DataCoordCatalog) CreateIndex(ctx context.Context, index *model.Index) error { ret := _m.Called(ctx, index) @@ -408,6 +470,11 @@ func (_c *DataCoordCatalog_CreateIndex_Call) Return(_a0 error) *DataCoordCatalog return _c } +func (_c *DataCoordCatalog_CreateIndex_Call) RunAndReturn(run func(context.Context, *model.Index) error) *DataCoordCatalog_CreateIndex_Call { + _c.Call.Return(run) + return _c +} + // CreateSegmentIndex provides a mock function with given fields: ctx, segIdx func (_m *DataCoordCatalog) CreateSegmentIndex(ctx context.Context, segIdx *model.SegmentIndex) error { ret := _m.Called(ctx, segIdx) @@ -446,6 +513,11 @@ func (_c *DataCoordCatalog_CreateSegmentIndex_Call) Return(_a0 error) *DataCoord return _c } +func (_c *DataCoordCatalog_CreateSegmentIndex_Call) RunAndReturn(run func(context.Context, *model.SegmentIndex) error) *DataCoordCatalog_CreateSegmentIndex_Call { + _c.Call.Return(run) + return _c +} + // DropChannel provides a mock function with given fields: ctx, channel func (_m *DataCoordCatalog) DropChannel(ctx context.Context, channel string) error { ret := _m.Called(ctx, channel) @@ -484,6 +556,11 @@ func (_c *DataCoordCatalog_DropChannel_Call) Return(_a0 error) *DataCoordCatalog return _c } +func (_c *DataCoordCatalog_DropChannel_Call) RunAndReturn(run func(context.Context, string) error) *DataCoordCatalog_DropChannel_Call { + _c.Call.Return(run) + return _c +} + // DropChannelCheckpoint provides a mock function with given fields: ctx, vChannel func (_m *DataCoordCatalog) DropChannelCheckpoint(ctx context.Context, vChannel string) error { ret := _m.Called(ctx, vChannel) @@ -522,6 +599,11 @@ func (_c *DataCoordCatalog_DropChannelCheckpoint_Call) Return(_a0 error) *DataCo return _c } +func (_c *DataCoordCatalog_DropChannelCheckpoint_Call) RunAndReturn(run func(context.Context, string) error) *DataCoordCatalog_DropChannelCheckpoint_Call { + _c.Call.Return(run) + return _c +} + // DropIndex provides a mock function with given fields: ctx, collID, dropIdxID func (_m *DataCoordCatalog) DropIndex(ctx context.Context, collID int64, dropIdxID int64) error { ret := _m.Called(ctx, collID, dropIdxID) @@ -561,6 +643,11 @@ func (_c *DataCoordCatalog_DropIndex_Call) Return(_a0 error) *DataCoordCatalog_D return _c } +func (_c *DataCoordCatalog_DropIndex_Call) RunAndReturn(run func(context.Context, int64, int64) error) *DataCoordCatalog_DropIndex_Call { + _c.Call.Return(run) + return _c +} + // DropSegment provides a mock function with given fields: ctx, segment func (_m *DataCoordCatalog) DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error { ret := _m.Called(ctx, segment) @@ -599,6 +686,11 @@ func (_c *DataCoordCatalog_DropSegment_Call) Return(_a0 error) *DataCoordCatalog return _c } +func (_c *DataCoordCatalog_DropSegment_Call) RunAndReturn(run func(context.Context, *datapb.SegmentInfo) error) *DataCoordCatalog_DropSegment_Call { + _c.Call.Return(run) + return _c +} + // DropSegmentIndex provides a mock function with given fields: ctx, collID, partID, segID, buildID func (_m *DataCoordCatalog) DropSegmentIndex(ctx context.Context, collID int64, partID int64, segID int64, buildID int64) error { ret := _m.Called(ctx, collID, partID, segID, buildID) @@ -640,6 +732,11 @@ func (_c *DataCoordCatalog_DropSegmentIndex_Call) Return(_a0 error) *DataCoordCa return _c } +func (_c *DataCoordCatalog_DropSegmentIndex_Call) RunAndReturn(run func(context.Context, int64, int64, int64, int64) error) *DataCoordCatalog_DropSegmentIndex_Call { + _c.Call.Return(run) + return _c +} + // GcConfirm provides a mock function with given fields: ctx, collectionID, partitionID func (_m *DataCoordCatalog) GcConfirm(ctx context.Context, collectionID int64, partitionID int64) bool { ret := _m.Called(ctx, collectionID, partitionID) @@ -679,11 +776,20 @@ func (_c *DataCoordCatalog_GcConfirm_Call) Return(_a0 bool) *DataCoordCatalog_Gc return _c } +func (_c *DataCoordCatalog_GcConfirm_Call) RunAndReturn(run func(context.Context, int64, int64) bool) *DataCoordCatalog_GcConfirm_Call { + _c.Call.Return(run) + return _c +} + // ListChannelCheckpoint provides a mock function with given fields: ctx func (_m *DataCoordCatalog) ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb.MsgPosition, error) { ret := _m.Called(ctx) var r0 map[string]*msgpb.MsgPosition + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (map[string]*msgpb.MsgPosition, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) map[string]*msgpb.MsgPosition); ok { r0 = rf(ctx) } else { @@ -692,7 +798,6 @@ func (_m *DataCoordCatalog) ListChannelCheckpoint(ctx context.Context) (map[stri } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -725,11 +830,20 @@ func (_c *DataCoordCatalog_ListChannelCheckpoint_Call) Return(_a0 map[string]*ms return _c } +func (_c *DataCoordCatalog_ListChannelCheckpoint_Call) RunAndReturn(run func(context.Context) (map[string]*msgpb.MsgPosition, error)) *DataCoordCatalog_ListChannelCheckpoint_Call { + _c.Call.Return(run) + return _c +} + // ListIndexes provides a mock function with given fields: ctx func (_m *DataCoordCatalog) ListIndexes(ctx context.Context) ([]*model.Index, error) { ret := _m.Called(ctx) var r0 []*model.Index + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]*model.Index, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) []*model.Index); ok { r0 = rf(ctx) } else { @@ -738,7 +852,6 @@ func (_m *DataCoordCatalog) ListIndexes(ctx context.Context) ([]*model.Index, er } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -771,11 +884,20 @@ func (_c *DataCoordCatalog_ListIndexes_Call) Return(_a0 []*model.Index, _a1 erro return _c } +func (_c *DataCoordCatalog_ListIndexes_Call) RunAndReturn(run func(context.Context) ([]*model.Index, error)) *DataCoordCatalog_ListIndexes_Call { + _c.Call.Return(run) + return _c +} + // ListSegmentIndexes provides a mock function with given fields: ctx func (_m *DataCoordCatalog) ListSegmentIndexes(ctx context.Context) ([]*model.SegmentIndex, error) { ret := _m.Called(ctx) var r0 []*model.SegmentIndex + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]*model.SegmentIndex, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) []*model.SegmentIndex); ok { r0 = rf(ctx) } else { @@ -784,7 +906,6 @@ func (_m *DataCoordCatalog) ListSegmentIndexes(ctx context.Context) ([]*model.Se } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -817,11 +938,20 @@ func (_c *DataCoordCatalog_ListSegmentIndexes_Call) Return(_a0 []*model.SegmentI return _c } +func (_c *DataCoordCatalog_ListSegmentIndexes_Call) RunAndReturn(run func(context.Context) ([]*model.SegmentIndex, error)) *DataCoordCatalog_ListSegmentIndexes_Call { + _c.Call.Return(run) + return _c +} + // ListSegments provides a mock function with given fields: ctx func (_m *DataCoordCatalog) ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, error) { ret := _m.Called(ctx) var r0 []*datapb.SegmentInfo + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]*datapb.SegmentInfo, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) []*datapb.SegmentInfo); ok { r0 = rf(ctx) } else { @@ -830,7 +960,6 @@ func (_m *DataCoordCatalog) ListSegments(ctx context.Context) ([]*datapb.Segment } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -863,6 +992,11 @@ func (_c *DataCoordCatalog_ListSegments_Call) Return(_a0 []*datapb.SegmentInfo, return _c } +func (_c *DataCoordCatalog_ListSegments_Call) RunAndReturn(run func(context.Context) ([]*datapb.SegmentInfo, error)) *DataCoordCatalog_ListSegments_Call { + _c.Call.Return(run) + return _c +} + // MarkChannelAdded provides a mock function with given fields: ctx, channel func (_m *DataCoordCatalog) MarkChannelAdded(ctx context.Context, channel string) error { ret := _m.Called(ctx, channel) @@ -901,6 +1035,11 @@ func (_c *DataCoordCatalog_MarkChannelAdded_Call) Return(_a0 error) *DataCoordCa return _c } +func (_c *DataCoordCatalog_MarkChannelAdded_Call) RunAndReturn(run func(context.Context, string) error) *DataCoordCatalog_MarkChannelAdded_Call { + _c.Call.Return(run) + return _c +} + // MarkChannelDeleted provides a mock function with given fields: ctx, channel func (_m *DataCoordCatalog) MarkChannelDeleted(ctx context.Context, channel string) error { ret := _m.Called(ctx, channel) @@ -939,42 +1078,8 @@ func (_c *DataCoordCatalog_MarkChannelDeleted_Call) Return(_a0 error) *DataCoord return _c } -// RevertAlterSegmentsAndAddNewSegment provides a mock function with given fields: ctx, segments, removalSegment -func (_m *DataCoordCatalog) RevertAlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, removalSegment *datapb.SegmentInfo) error { - ret := _m.Called(ctx, segments, removalSegment) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []*datapb.SegmentInfo, *datapb.SegmentInfo) error); ok { - r0 = rf(ctx, segments, removalSegment) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RevertAlterSegmentsAndAddNewSegment' -type DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call struct { - *mock.Call -} - -// RevertAlterSegmentsAndAddNewSegment is a helper method to define mock.On call -// - ctx context.Context -// - segments []*datapb.SegmentInfo -// - removalSegment *datapb.SegmentInfo -func (_e *DataCoordCatalog_Expecter) RevertAlterSegmentsAndAddNewSegment(ctx interface{}, segments interface{}, removalSegment interface{}) *DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call { - return &DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call{Call: _e.mock.On("RevertAlterSegmentsAndAddNewSegment", ctx, segments, removalSegment)} -} - -func (_c *DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call) Run(run func(ctx context.Context, segments []*datapb.SegmentInfo, removalSegment *datapb.SegmentInfo)) *DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]*datapb.SegmentInfo), args[2].(*datapb.SegmentInfo)) - }) - return _c -} - -func (_c *DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call) Return(_a0 error) *DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call { - _c.Call.Return(_a0) +func (_c *DataCoordCatalog_MarkChannelDeleted_Call) RunAndReturn(run func(context.Context, string) error) *DataCoordCatalog_MarkChannelDeleted_Call { + _c.Call.Return(run) return _c } @@ -1017,6 +1122,11 @@ func (_c *DataCoordCatalog_SaveChannelCheckpoint_Call) Return(_a0 error) *DataCo return _c } +func (_c *DataCoordCatalog_SaveChannelCheckpoint_Call) RunAndReturn(run func(context.Context, string, *msgpb.MsgPosition) error) *DataCoordCatalog_SaveChannelCheckpoint_Call { + _c.Call.Return(run) + return _c +} + // SaveDroppedSegmentsInBatch provides a mock function with given fields: ctx, segments func (_m *DataCoordCatalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error { ret := _m.Called(ctx, segments) @@ -1055,6 +1165,11 @@ func (_c *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call) Return(_a0 error) *D return _c } +func (_c *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call) RunAndReturn(run func(context.Context, []*datapb.SegmentInfo) error) *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call { + _c.Call.Return(run) + return _c +} + // ShouldDropChannel provides a mock function with given fields: ctx, channel func (_m *DataCoordCatalog) ShouldDropChannel(ctx context.Context, channel string) bool { ret := _m.Called(ctx, channel) @@ -1093,13 +1208,17 @@ func (_c *DataCoordCatalog_ShouldDropChannel_Call) Return(_a0 bool) *DataCoordCa return _c } -type mockConstructorTestingTNewDataCoordCatalog interface { - mock.TestingT - Cleanup(func()) +func (_c *DataCoordCatalog_ShouldDropChannel_Call) RunAndReturn(run func(context.Context, string) bool) *DataCoordCatalog_ShouldDropChannel_Call { + _c.Call.Return(run) + return _c } // NewDataCoordCatalog creates a new instance of DataCoordCatalog. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewDataCoordCatalog(t mockConstructorTestingTNewDataCoordCatalog) *DataCoordCatalog { +// The first argument is typically a *testing.T value. +func NewDataCoordCatalog(t interface { + mock.TestingT + Cleanup(func()) +}) *DataCoordCatalog { mock := &DataCoordCatalog{} mock.Mock.Test(t)