diff --git a/internal/.mockery.yaml b/internal/.mockery.yaml index ec3746fd74..8dba9081c2 100644 --- a/internal/.mockery.yaml +++ b/internal/.mockery.yaml @@ -5,6 +5,9 @@ dir: 'internal/mocks/{{trimPrefix .PackagePath "github.com/milvus-io/milvus/inte mockname: "Mock{{.InterfaceName}}" outpkg: "mock_{{.PackageName}}" packages: + github.com/milvus-io/milvus/internal/distributed/streaming: + interfaces: + WALAccesser: github.com/milvus-io/milvus/internal/streamingcoord/server/balancer: interfaces: Balancer: diff --git a/internal/distributed/streaming/streaming.go b/internal/distributed/streaming/streaming.go index 43bc0463b4..879475f4e3 100644 --- a/internal/distributed/streaming/streaming.go +++ b/internal/distributed/streaming/streaming.go @@ -9,7 +9,11 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/util/options" ) -var singleton *walAccesserImpl = nil +var singleton WALAccesser = nil + +func SetWAL(w WALAccesser) { + singleton = w +} // Init initializes the wal accesser with the given etcd client. // should be called before any other operations. @@ -19,8 +23,8 @@ func Init(c *clientv3.Client) { // Release releases the resources of the wal accesser. func Release() { - if singleton != nil { - singleton.Close() + if w, ok := singleton.(*walAccesserImpl); ok && w != nil { + w.Close() } } diff --git a/internal/mocks/distributed/mock_streaming/mock_WALAccesser.go b/internal/mocks/distributed/mock_streaming/mock_WALAccesser.go new file mode 100644 index 0000000000..72234f9a33 --- /dev/null +++ b/internal/mocks/distributed/mock_streaming/mock_WALAccesser.go @@ -0,0 +1,141 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_streaming + +import ( + context "context" + + message "github.com/milvus-io/milvus/pkg/streaming/util/message" + mock "github.com/stretchr/testify/mock" + + streaming "github.com/milvus-io/milvus/internal/distributed/streaming" +) + +// MockWALAccesser is an autogenerated mock type for the WALAccesser type +type MockWALAccesser struct { + mock.Mock +} + +type MockWALAccesser_Expecter struct { + mock *mock.Mock +} + +func (_m *MockWALAccesser) EXPECT() *MockWALAccesser_Expecter { + return &MockWALAccesser_Expecter{mock: &_m.Mock} +} + +// Append provides a mock function with given fields: ctx, msgs +func (_m *MockWALAccesser) Append(ctx context.Context, msgs ...message.MutableMessage) streaming.AppendResponses { + _va := make([]interface{}, len(msgs)) + for _i := range msgs { + _va[_i] = msgs[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 streaming.AppendResponses + if rf, ok := ret.Get(0).(func(context.Context, ...message.MutableMessage) streaming.AppendResponses); ok { + r0 = rf(ctx, msgs...) + } else { + r0 = ret.Get(0).(streaming.AppendResponses) + } + + return r0 +} + +// MockWALAccesser_Append_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Append' +type MockWALAccesser_Append_Call struct { + *mock.Call +} + +// Append is a helper method to define mock.On call +// - ctx context.Context +// - msgs ...message.MutableMessage +func (_e *MockWALAccesser_Expecter) Append(ctx interface{}, msgs ...interface{}) *MockWALAccesser_Append_Call { + return &MockWALAccesser_Append_Call{Call: _e.mock.On("Append", + append([]interface{}{ctx}, msgs...)...)} +} + +func (_c *MockWALAccesser_Append_Call) Run(run func(ctx context.Context, msgs ...message.MutableMessage)) *MockWALAccesser_Append_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]message.MutableMessage, len(args)-1) + for i, a := range args[1:] { + if a != nil { + variadicArgs[i] = a.(message.MutableMessage) + } + } + run(args[0].(context.Context), variadicArgs...) + }) + return _c +} + +func (_c *MockWALAccesser_Append_Call) Return(_a0 streaming.AppendResponses) *MockWALAccesser_Append_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWALAccesser_Append_Call) RunAndReturn(run func(context.Context, ...message.MutableMessage) streaming.AppendResponses) *MockWALAccesser_Append_Call { + _c.Call.Return(run) + return _c +} + +// Read provides a mock function with given fields: ctx, opts +func (_m *MockWALAccesser) Read(ctx context.Context, opts streaming.ReadOption) streaming.Scanner { + ret := _m.Called(ctx, opts) + + var r0 streaming.Scanner + if rf, ok := ret.Get(0).(func(context.Context, streaming.ReadOption) streaming.Scanner); ok { + r0 = rf(ctx, opts) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(streaming.Scanner) + } + } + + return r0 +} + +// MockWALAccesser_Read_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Read' +type MockWALAccesser_Read_Call struct { + *mock.Call +} + +// Read is a helper method to define mock.On call +// - ctx context.Context +// - opts streaming.ReadOption +func (_e *MockWALAccesser_Expecter) Read(ctx interface{}, opts interface{}) *MockWALAccesser_Read_Call { + return &MockWALAccesser_Read_Call{Call: _e.mock.On("Read", ctx, opts)} +} + +func (_c *MockWALAccesser_Read_Call) Run(run func(ctx context.Context, opts streaming.ReadOption)) *MockWALAccesser_Read_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(streaming.ReadOption)) + }) + return _c +} + +func (_c *MockWALAccesser_Read_Call) Return(_a0 streaming.Scanner) *MockWALAccesser_Read_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWALAccesser_Read_Call) RunAndReturn(run func(context.Context, streaming.ReadOption) streaming.Scanner) *MockWALAccesser_Read_Call { + _c.Call.Return(run) + return _c +} + +// NewMockWALAccesser creates a new instance of MockWALAccesser. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockWALAccesser(t interface { + mock.TestingT + Cleanup(func()) +}) *MockWALAccesser { + mock := &MockWALAccesser{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/rootcoord/drop_collection_task_test.go b/internal/rootcoord/drop_collection_task_test.go index 543c59b58e..8865ad774b 100644 --- a/internal/rootcoord/drop_collection_task_test.go +++ b/internal/rootcoord/drop_collection_task_test.go @@ -240,14 +240,14 @@ func Test_dropCollectionTask_Execute(t *testing.T) { return true } - gc := newMockGarbageCollector() + gc := mockrootcoord.NewGarbageCollector(t) deleteCollectionCalled := false deleteCollectionChan := make(chan struct{}, 1) - gc.GcCollectionDataFunc = func(ctx context.Context, coll *model.Collection) (Timestamp, error) { + gc.EXPECT().GcCollectionData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, coll *model.Collection) (Timestamp, error) { deleteCollectionCalled = true deleteCollectionChan <- struct{}{} return 0, nil - } + }) core := newTestCore( withValidProxyManager(), diff --git a/internal/rootcoord/drop_partition_task.go b/internal/rootcoord/drop_partition_task.go index 17079a21c6..cc45422db7 100644 --- a/internal/rootcoord/drop_partition_task.go +++ b/internal/rootcoord/drop_partition_task.go @@ -89,6 +89,7 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error { redoTask.AddAsyncStep(&deletePartitionDataStep{ baseStep: baseStep{core: t.core}, pchans: t.collMeta.PhysicalChannelNames, + vchans: t.collMeta.VirtualChannelNames, partition: &model.Partition{ PartitionID: partID, PartitionName: t.Req.GetPartitionName(), diff --git a/internal/rootcoord/drop_partition_task_test.go b/internal/rootcoord/drop_partition_task_test.go index 2552a16fba..4b3e16703f 100644 --- a/internal/rootcoord/drop_partition_task_test.go +++ b/internal/rootcoord/drop_partition_task_test.go @@ -174,15 +174,15 @@ func Test_dropPartitionTask_Execute(t *testing.T) { return nil }) - gc := newMockGarbageCollector() + gc := mockrootcoord.NewGarbageCollector(t) deletePartitionCalled := false deletePartitionChan := make(chan struct{}, 1) - gc.GcPartitionDataFunc = func(ctx context.Context, pChannels []string, coll *model.Partition) (Timestamp, error) { + gc.EXPECT().GcPartitionData(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, pChannels, vchannel []string, coll *model.Partition) (Timestamp, error) { deletePartitionChan <- struct{}{} deletePartitionCalled = true time.Sleep(confirmGCInterval) return 0, nil - } + }) broker := newMockBroker() broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool { diff --git a/internal/rootcoord/garbage_collector.go b/internal/rootcoord/garbage_collector.go index 6657ca9592..2721fef772 100644 --- a/internal/rootcoord/garbage_collector.go +++ b/internal/rootcoord/garbage_collector.go @@ -21,8 +21,11 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/util/streamingutil" ms "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/util/commonpbutil" ) @@ -30,10 +33,10 @@ import ( type GarbageCollector interface { ReDropCollection(collMeta *model.Collection, ts Timestamp) RemoveCreatingCollection(collMeta *model.Collection) - ReDropPartition(dbID int64, pChannels []string, partition *model.Partition, ts Timestamp) + ReDropPartition(dbID int64, pChannels, vchannels []string, partition *model.Partition, ts Timestamp) RemoveCreatingPartition(dbID int64, partition *model.Partition, ts Timestamp) GcCollectionData(ctx context.Context, coll *model.Collection) (ddlTs Timestamp, err error) - GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (ddlTs Timestamp, err error) + GcPartitionData(ctx context.Context, pChannels, vchannels []string, partition *model.Partition) (ddlTs Timestamp, err error) } type bgGarbageCollector struct { @@ -110,7 +113,7 @@ func (c *bgGarbageCollector) RemoveCreatingCollection(collMeta *model.Collection _ = redo.Execute(context.Background()) } -func (c *bgGarbageCollector) ReDropPartition(dbID int64, pChannels []string, partition *model.Partition, ts Timestamp) { +func (c *bgGarbageCollector) ReDropPartition(dbID int64, pChannels, vchannels []string, partition *model.Partition, ts Timestamp) { // TODO: remove this after data gc can be notified by rpc. c.s.chanTimeTick.addDmlChannels(pChannels...) @@ -118,6 +121,7 @@ func (c *bgGarbageCollector) ReDropPartition(dbID int64, pChannels []string, par redo.AddAsyncStep(&deletePartitionDataStep{ baseStep: baseStep{core: c.s}, pchans: pChannels, + vchans: vchannels, partition: partition, isSkip: !Params.CommonCfg.TTMsgEnabled.GetAsBool(), }) @@ -227,6 +231,41 @@ func (c *bgGarbageCollector) notifyPartitionGc(ctx context.Context, pChannels [] return ts, nil } +func (c *bgGarbageCollector) notifyPartitionGcByStreamingService(ctx context.Context, vchannels []string, partition *model.Partition) (uint64, error) { + req := &msgpb.DropPartitionRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_DropPartition), + commonpbutil.WithTimeStamp(0), // ts is given by streamingnode. + commonpbutil.WithSourceID(c.s.session.ServerID), + ), + PartitionName: partition.PartitionName, + CollectionID: partition.CollectionID, + PartitionID: partition.PartitionID, + } + + msgs := make([]message.MutableMessage, 0, len(vchannels)) + for _, vchannel := range vchannels { + msg, err := message.NewDropPartitionMessageBuilderV1(). + WithVChannel(vchannel). + WithHeader(&message.DropPartitionMessageHeader{ + CollectionId: partition.CollectionID, + PartitionId: partition.PartitionID, + }). + WithBody(req). + BuildMutable() + if err != nil { + return 0, err + } + msgs = append(msgs, msg) + } + resp := streaming.WAL().Append(ctx, msgs...) + if err := resp.IsAnyError(); err != nil { + return 0, err + } + // TODO: sheep, return resp.MaxTimeTick(), nil + return c.s.tsoAllocator.GenerateTSO(1) +} + func (c *bgGarbageCollector) GcCollectionData(ctx context.Context, coll *model.Collection) (ddlTs Timestamp, err error) { c.s.ddlTsLockManager.Lock() c.s.ddlTsLockManager.AddRefCnt(1) @@ -241,13 +280,17 @@ func (c *bgGarbageCollector) GcCollectionData(ctx context.Context, coll *model.C return ddlTs, nil } -func (c *bgGarbageCollector) GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (ddlTs Timestamp, err error) { +func (c *bgGarbageCollector) GcPartitionData(ctx context.Context, pChannels, vchannels []string, partition *model.Partition) (ddlTs Timestamp, err error) { c.s.ddlTsLockManager.Lock() c.s.ddlTsLockManager.AddRefCnt(1) defer c.s.ddlTsLockManager.AddRefCnt(-1) defer c.s.ddlTsLockManager.Unlock() - ddlTs, err = c.notifyPartitionGc(ctx, pChannels, partition) + if streamingutil.IsStreamingServiceEnabled() { + ddlTs, err = c.notifyPartitionGcByStreamingService(ctx, vchannels, partition) + } else { + ddlTs, err = c.notifyPartitionGc(ctx, pChannels, partition) + } if err != nil { return 0, err } diff --git a/internal/rootcoord/garbage_collector_test.go b/internal/rootcoord/garbage_collector_test.go index b08e4088d4..a42667e06c 100644 --- a/internal/rootcoord/garbage_collector_test.go +++ b/internal/rootcoord/garbage_collector_test.go @@ -26,11 +26,14 @@ import ( "github.com/stretchr/testify/mock" "google.golang.org/grpc" + "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/mocks" + "github.com/milvus-io/milvus/internal/mocks/distributed/mock_streaming" "github.com/milvus-io/milvus/internal/proto/querypb" mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" mocktso "github.com/milvus-io/milvus/internal/tso/mocks" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/merr" ) @@ -353,7 +356,7 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) { core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator) gc := newBgGarbageCollector(core) core.garbageCollector = gc - gc.ReDropPartition(0, pchans, &model.Partition{}, 100000) + gc.ReDropPartition(0, pchans, nil, &model.Partition{}, 100000) }) t.Run("failed to RemovePartition", func(t *testing.T) { @@ -393,7 +396,7 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) { core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator) gc := newBgGarbageCollector(core) core.garbageCollector = gc - gc.ReDropPartition(0, pchans, &model.Partition{}, 100000) + gc.ReDropPartition(0, pchans, nil, &model.Partition{}, 100000) <-gcConfirmChan assert.True(t, gcConfirmCalled) <-removePartitionChan @@ -438,7 +441,7 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) { core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator) gc := newBgGarbageCollector(core) core.garbageCollector = gc - gc.ReDropPartition(0, pchans, &model.Partition{}, 100000) + gc.ReDropPartition(0, pchans, nil, &model.Partition{}, 100000) <-gcConfirmChan assert.True(t, gcConfirmCalled) <-removePartitionChan @@ -536,3 +539,24 @@ func TestGarbageCollector_RemoveCreatingPartition(t *testing.T) { <-signal }) } + +func TestGcPartitionData(t *testing.T) { + defer cleanTestEnv() + + streamingutil.SetStreamingServiceEnabled() + defer streamingutil.UnsetStreamingServiceEnabled() + + wal := mock_streaming.NewMockWALAccesser(t) + wal.EXPECT().Append(mock.Anything, mock.Anything, mock.Anything).Return(streaming.AppendResponses{}) + streaming.SetWAL(wal) + + tsoAllocator := mocktso.NewAllocator(t) + tsoAllocator.EXPECT().GenerateTSO(mock.Anything).Return(1000, nil) + + core := newTestCore(withTsoAllocator(tsoAllocator)) + gc := newBgGarbageCollector(core) + core.ddlTsLockManager = newDdlTsLockManager(tsoAllocator) + + _, err := gc.GcPartitionData(context.Background(), nil, []string{"ch-0", "ch-1"}, &model.Partition{}) + assert.NoError(t, err) +} diff --git a/internal/rootcoord/mock_test.go b/internal/rootcoord/mock_test.go index da77ef67c3..1e02adb201 100644 --- a/internal/rootcoord/mock_test.go +++ b/internal/rootcoord/mock_test.go @@ -941,24 +941,6 @@ func withBroker(b Broker) Opt { } } -type mockGarbageCollector struct { - GarbageCollector - GcCollectionDataFunc func(ctx context.Context, coll *model.Collection) (Timestamp, error) - GcPartitionDataFunc func(ctx context.Context, pChannels []string, partition *model.Partition) (Timestamp, error) -} - -func (m mockGarbageCollector) GcCollectionData(ctx context.Context, coll *model.Collection) (Timestamp, error) { - return m.GcCollectionDataFunc(ctx, coll) -} - -func (m mockGarbageCollector) GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (Timestamp, error) { - return m.GcPartitionDataFunc(ctx, pChannels, partition) -} - -func newMockGarbageCollector() *mockGarbageCollector { - return &mockGarbageCollector{} -} - func withGarbageCollector(gc GarbageCollector) Opt { return func(c *Core) { c.garbageCollector = gc diff --git a/internal/rootcoord/mocks/garbage_collector.go b/internal/rootcoord/mocks/garbage_collector.go index 584001cb2c..e602e8be25 100644 --- a/internal/rootcoord/mocks/garbage_collector.go +++ b/internal/rootcoord/mocks/garbage_collector.go @@ -52,8 +52,8 @@ type GarbageCollector_GcCollectionData_Call struct { } // GcCollectionData is a helper method to define mock.On call -// - ctx context.Context -// - coll *model.Collection +// - ctx context.Context +// - coll *model.Collection func (_e *GarbageCollector_Expecter) GcCollectionData(ctx interface{}, coll interface{}) *GarbageCollector_GcCollectionData_Call { return &GarbageCollector_GcCollectionData_Call{Call: _e.mock.On("GcCollectionData", ctx, coll)} } @@ -75,23 +75,23 @@ func (_c *GarbageCollector_GcCollectionData_Call) RunAndReturn(run func(context. return _c } -// GcPartitionData provides a mock function with given fields: ctx, pChannels, partition -func (_m *GarbageCollector) GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (uint64, error) { - ret := _m.Called(ctx, pChannels, partition) +// GcPartitionData provides a mock function with given fields: ctx, pChannels, vchannels, partition +func (_m *GarbageCollector) GcPartitionData(ctx context.Context, pChannels []string, vchannels []string, partition *model.Partition) (uint64, error) { + ret := _m.Called(ctx, pChannels, vchannels, partition) var r0 uint64 var r1 error - if rf, ok := ret.Get(0).(func(context.Context, []string, *model.Partition) (uint64, error)); ok { - return rf(ctx, pChannels, partition) + if rf, ok := ret.Get(0).(func(context.Context, []string, []string, *model.Partition) (uint64, error)); ok { + return rf(ctx, pChannels, vchannels, partition) } - if rf, ok := ret.Get(0).(func(context.Context, []string, *model.Partition) uint64); ok { - r0 = rf(ctx, pChannels, partition) + if rf, ok := ret.Get(0).(func(context.Context, []string, []string, *model.Partition) uint64); ok { + r0 = rf(ctx, pChannels, vchannels, partition) } else { r0 = ret.Get(0).(uint64) } - if rf, ok := ret.Get(1).(func(context.Context, []string, *model.Partition) error); ok { - r1 = rf(ctx, pChannels, partition) + if rf, ok := ret.Get(1).(func(context.Context, []string, []string, *model.Partition) error); ok { + r1 = rf(ctx, pChannels, vchannels, partition) } else { r1 = ret.Error(1) } @@ -105,16 +105,17 @@ type GarbageCollector_GcPartitionData_Call struct { } // GcPartitionData is a helper method to define mock.On call -// - ctx context.Context -// - pChannels []string -// - partition *model.Partition -func (_e *GarbageCollector_Expecter) GcPartitionData(ctx interface{}, pChannels interface{}, partition interface{}) *GarbageCollector_GcPartitionData_Call { - return &GarbageCollector_GcPartitionData_Call{Call: _e.mock.On("GcPartitionData", ctx, pChannels, partition)} +// - ctx context.Context +// - pChannels []string +// - vchannels []string +// - partition *model.Partition +func (_e *GarbageCollector_Expecter) GcPartitionData(ctx interface{}, pChannels interface{}, vchannels interface{}, partition interface{}) *GarbageCollector_GcPartitionData_Call { + return &GarbageCollector_GcPartitionData_Call{Call: _e.mock.On("GcPartitionData", ctx, pChannels, vchannels, partition)} } -func (_c *GarbageCollector_GcPartitionData_Call) Run(run func(ctx context.Context, pChannels []string, partition *model.Partition)) *GarbageCollector_GcPartitionData_Call { +func (_c *GarbageCollector_GcPartitionData_Call) Run(run func(ctx context.Context, pChannels []string, vchannels []string, partition *model.Partition)) *GarbageCollector_GcPartitionData_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]string), args[2].(*model.Partition)) + run(args[0].(context.Context), args[1].([]string), args[2].([]string), args[3].(*model.Partition)) }) return _c } @@ -124,7 +125,7 @@ func (_c *GarbageCollector_GcPartitionData_Call) Return(ddlTs uint64, err error) return _c } -func (_c *GarbageCollector_GcPartitionData_Call) RunAndReturn(run func(context.Context, []string, *model.Partition) (uint64, error)) *GarbageCollector_GcPartitionData_Call { +func (_c *GarbageCollector_GcPartitionData_Call) RunAndReturn(run func(context.Context, []string, []string, *model.Partition) (uint64, error)) *GarbageCollector_GcPartitionData_Call { _c.Call.Return(run) return _c } @@ -140,8 +141,8 @@ type GarbageCollector_ReDropCollection_Call struct { } // ReDropCollection is a helper method to define mock.On call -// - collMeta *model.Collection -// - ts uint64 +// - collMeta *model.Collection +// - ts uint64 func (_e *GarbageCollector_Expecter) ReDropCollection(collMeta interface{}, ts interface{}) *GarbageCollector_ReDropCollection_Call { return &GarbageCollector_ReDropCollection_Call{Call: _e.mock.On("ReDropCollection", collMeta, ts)} } @@ -163,9 +164,9 @@ func (_c *GarbageCollector_ReDropCollection_Call) RunAndReturn(run func(*model.C return _c } -// ReDropPartition provides a mock function with given fields: dbID, pChannels, partition, ts -func (_m *GarbageCollector) ReDropPartition(dbID int64, pChannels []string, partition *model.Partition, ts uint64) { - _m.Called(dbID, pChannels, partition, ts) +// ReDropPartition provides a mock function with given fields: dbID, pChannels, vchannels, partition, ts +func (_m *GarbageCollector) ReDropPartition(dbID int64, pChannels []string, vchannels []string, partition *model.Partition, ts uint64) { + _m.Called(dbID, pChannels, vchannels, partition, ts) } // GarbageCollector_ReDropPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReDropPartition' @@ -174,17 +175,18 @@ type GarbageCollector_ReDropPartition_Call struct { } // ReDropPartition is a helper method to define mock.On call -// - dbID int64 -// - pChannels []string -// - partition *model.Partition -// - ts uint64 -func (_e *GarbageCollector_Expecter) ReDropPartition(dbID interface{}, pChannels interface{}, partition interface{}, ts interface{}) *GarbageCollector_ReDropPartition_Call { - return &GarbageCollector_ReDropPartition_Call{Call: _e.mock.On("ReDropPartition", dbID, pChannels, partition, ts)} +// - dbID int64 +// - pChannels []string +// - vchannels []string +// - partition *model.Partition +// - ts uint64 +func (_e *GarbageCollector_Expecter) ReDropPartition(dbID interface{}, pChannels interface{}, vchannels interface{}, partition interface{}, ts interface{}) *GarbageCollector_ReDropPartition_Call { + return &GarbageCollector_ReDropPartition_Call{Call: _e.mock.On("ReDropPartition", dbID, pChannels, vchannels, partition, ts)} } -func (_c *GarbageCollector_ReDropPartition_Call) Run(run func(dbID int64, pChannels []string, partition *model.Partition, ts uint64)) *GarbageCollector_ReDropPartition_Call { +func (_c *GarbageCollector_ReDropPartition_Call) Run(run func(dbID int64, pChannels []string, vchannels []string, partition *model.Partition, ts uint64)) *GarbageCollector_ReDropPartition_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64), args[1].([]string), args[2].(*model.Partition), args[3].(uint64)) + run(args[0].(int64), args[1].([]string), args[2].([]string), args[3].(*model.Partition), args[4].(uint64)) }) return _c } @@ -194,7 +196,7 @@ func (_c *GarbageCollector_ReDropPartition_Call) Return() *GarbageCollector_ReDr return _c } -func (_c *GarbageCollector_ReDropPartition_Call) RunAndReturn(run func(int64, []string, *model.Partition, uint64)) *GarbageCollector_ReDropPartition_Call { +func (_c *GarbageCollector_ReDropPartition_Call) RunAndReturn(run func(int64, []string, []string, *model.Partition, uint64)) *GarbageCollector_ReDropPartition_Call { _c.Call.Return(run) return _c } @@ -210,7 +212,7 @@ type GarbageCollector_RemoveCreatingCollection_Call struct { } // RemoveCreatingCollection is a helper method to define mock.On call -// - collMeta *model.Collection +// - collMeta *model.Collection func (_e *GarbageCollector_Expecter) RemoveCreatingCollection(collMeta interface{}) *GarbageCollector_RemoveCreatingCollection_Call { return &GarbageCollector_RemoveCreatingCollection_Call{Call: _e.mock.On("RemoveCreatingCollection", collMeta)} } @@ -243,9 +245,9 @@ type GarbageCollector_RemoveCreatingPartition_Call struct { } // RemoveCreatingPartition is a helper method to define mock.On call -// - dbID int64 -// - partition *model.Partition -// - ts uint64 +// - dbID int64 +// - partition *model.Partition +// - ts uint64 func (_e *GarbageCollector_Expecter) RemoveCreatingPartition(dbID interface{}, partition interface{}, ts interface{}) *GarbageCollector_RemoveCreatingPartition_Call { return &GarbageCollector_RemoveCreatingPartition_Call{Call: _e.mock.On("RemoveCreatingPartition", dbID, partition, ts)} } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 7cf2743207..dfb5598d24 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -655,7 +655,7 @@ func (c *Core) restore(ctx context.Context) error { for _, part := range coll.Partitions { switch part.State { case pb.PartitionState_PartitionDropping: - go c.garbageCollector.ReDropPartition(coll.DBID, coll.PhysicalChannelNames, part.Clone(), ts) + go c.garbageCollector.ReDropPartition(coll.DBID, coll.PhysicalChannelNames, coll.VirtualChannelNames, part.Clone(), ts) case pb.PartitionState_PartitionCreating: go c.garbageCollector.RemoveCreatingPartition(coll.DBID, part.Clone(), ts) default: diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 5fea892a72..90a0499fda 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -1980,7 +1980,7 @@ func (s *RootCoordSuite) TestRestore() { gc := mockrootcoord.NewGarbageCollector(s.T()) finishCh := make(chan struct{}, 4) - gc.EXPECT().ReDropPartition(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Once(). + gc.EXPECT().ReDropPartition(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Once(). Run(func(args mock.Arguments) { finishCh <- struct{}{} }) diff --git a/internal/rootcoord/step.go b/internal/rootcoord/step.go index 7c76715029..feba1b3db4 100644 --- a/internal/rootcoord/step.go +++ b/internal/rootcoord/step.go @@ -263,6 +263,7 @@ func (s *waitForTsSyncedStep) Weight() stepPriority { type deletePartitionDataStep struct { baseStep pchans []string + vchans []string partition *model.Partition isSkip bool @@ -272,7 +273,7 @@ func (s *deletePartitionDataStep) Execute(ctx context.Context) ([]nestedStep, er if s.isSkip { return nil, nil } - _, err := s.core.garbageCollector.GcPartitionData(ctx, s.pchans, s.partition) + _, err := s.core.garbageCollector.GcPartitionData(ctx, s.pchans, s.vchans, s.partition) return nil, err } diff --git a/internal/util/streamingutil/checker.go b/internal/util/streamingutil/checker.go index bc9169f489..6572797a0c 100644 --- a/internal/util/streamingutil/checker.go +++ b/internal/util/streamingutil/checker.go @@ -2,10 +2,28 @@ package streamingutil import "os" +const MilvusStreamingServiceEnabled = "MILVUS_STREAMING_SERVICE_ENABLED" + // IsStreamingServiceEnabled returns whether the streaming service is enabled. func IsStreamingServiceEnabled() bool { // TODO: check if the environment variable MILVUS_STREAMING_SERVICE_ENABLED is set - return os.Getenv("MILVUS_STREAMING_SERVICE_ENABLED") == "1" + return os.Getenv(MilvusStreamingServiceEnabled) == "1" +} + +// SetStreamingServiceEnabled sets the env that indicates whether the streaming service is enabled. +func SetStreamingServiceEnabled() { + err := os.Setenv(MilvusStreamingServiceEnabled, "1") + if err != nil { + panic(err) + } +} + +// UnsetStreamingServiceEnabled unsets the env that indicates whether the streaming service is enabled. +func UnsetStreamingServiceEnabled() { + err := os.Setenv(MilvusStreamingServiceEnabled, "0") + if err != nil { + panic(err) + } } // MustEnableStreamingService panics if the streaming service is not enabled.