mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
feat: refine drop parition through the new interface notifydroppartition in datacoord (#41543)
refine drop parition through the new interface notifydroppartition in datacoord issue: https://github.com/milvus-io/milvus/issues/41542 --------- Signed-off-by: Xianhui.Lin <xianhui.lin@zilliz.com>
This commit is contained in:
parent
6e18ededab
commit
21ca05e445
@ -2188,3 +2188,46 @@ func (m *meta) getSegmentsMetrics(collectionID int64) []*metricsinfo.Segment {
|
||||
|
||||
return segments
|
||||
}
|
||||
|
||||
func (m *meta) DropSegmentsOfPartition(ctx context.Context, partitionIDs []int64) error {
|
||||
m.segMu.RLock()
|
||||
defer m.segMu.RUnlock()
|
||||
|
||||
// Filter out the segments of the partition to be dropped.
|
||||
metricMutation := &segMetricMutation{
|
||||
stateChange: make(map[string]map[string]map[string]int),
|
||||
}
|
||||
modSegments := make([]*SegmentInfo, 0)
|
||||
segments := make([]*datapb.SegmentInfo, 0)
|
||||
// set existed segments of channel to Dropped
|
||||
for _, seg := range m.segments.segments {
|
||||
if contains(partitionIDs, seg.PartitionID) {
|
||||
clonedSeg := seg.Clone()
|
||||
updateSegStateAndPrepareMetrics(clonedSeg, commonpb.SegmentState_Dropped, metricMutation)
|
||||
modSegments = append(modSegments, clonedSeg)
|
||||
segments = append(segments, clonedSeg.SegmentInfo)
|
||||
}
|
||||
}
|
||||
|
||||
// Save dropped segments in batch into meta.
|
||||
err := m.catalog.SaveDroppedSegmentsInBatch(m.ctx, segments)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// update memory info
|
||||
for _, segment := range modSegments {
|
||||
m.segments.SetSegment(segment.GetID(), segment)
|
||||
}
|
||||
metricMutation.commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
func contains(arr []int64, target int64) bool {
|
||||
for _, val := range arr {
|
||||
if val == target {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@ -1488,3 +1488,37 @@ func TestMeta_GetSegmentsJSON(t *testing.T) {
|
||||
assert.Equal(t, "Sealed", segments[1].State)
|
||||
assert.True(t, segments[1].Compacted)
|
||||
}
|
||||
|
||||
func Test_meta_DropSegmentsOfPartition(t *testing.T) {
|
||||
meta, err := newMemoryMeta(t)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = meta.AddSegment(context.Background(), NewSegmentInfo(&datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
PartitionID: 1,
|
||||
CollectionID: 1,
|
||||
}))
|
||||
assert.NoError(t, err)
|
||||
err = meta.AddSegment(context.Background(), NewSegmentInfo(&datapb.SegmentInfo{
|
||||
ID: 2,
|
||||
PartitionID: 1,
|
||||
CollectionID: 1,
|
||||
}))
|
||||
assert.NoError(t, err)
|
||||
err = meta.AddSegment(context.Background(), NewSegmentInfo(&datapb.SegmentInfo{
|
||||
ID: 3,
|
||||
PartitionID: 2,
|
||||
CollectionID: 1,
|
||||
}))
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = meta.DropSegmentsOfPartition(context.Background(), []int64{1})
|
||||
assert.NoError(t, err)
|
||||
|
||||
segment := meta.GetSegment(context.Background(), 1)
|
||||
assert.Equal(t, commonpb.SegmentState_Dropped, segment.GetState())
|
||||
segment = meta.GetSegment(context.Background(), 2)
|
||||
assert.Equal(t, commonpb.SegmentState_Dropped, segment.GetState())
|
||||
segment = meta.GetSegment(context.Background(), 3)
|
||||
assert.NotEqual(t, commonpb.SegmentState_Dropped, segment.GetState())
|
||||
}
|
||||
|
||||
@ -249,6 +249,41 @@ func (_c *MockManager_DropSegmentsOfChannel_Call) RunAndReturn(run func(context.
|
||||
return _c
|
||||
}
|
||||
|
||||
// DropSegmentsOfPartition provides a mock function with given fields: ctx, channel, partitionID
|
||||
func (_m *MockManager) DropSegmentsOfPartition(ctx context.Context, channel string, partitionID []int64) {
|
||||
_m.Called(ctx, channel, partitionID)
|
||||
}
|
||||
|
||||
// MockManager_DropSegmentsOfPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropSegmentsOfPartition'
|
||||
type MockManager_DropSegmentsOfPartition_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// DropSegmentsOfPartition is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - channel string
|
||||
// - partitionID []int64
|
||||
func (_e *MockManager_Expecter) DropSegmentsOfPartition(ctx interface{}, channel interface{}, partitionID interface{}) *MockManager_DropSegmentsOfPartition_Call {
|
||||
return &MockManager_DropSegmentsOfPartition_Call{Call: _e.mock.On("DropSegmentsOfPartition", ctx, channel, partitionID)}
|
||||
}
|
||||
|
||||
func (_c *MockManager_DropSegmentsOfPartition_Call) Run(run func(ctx context.Context, channel string, partitionID []int64)) *MockManager_DropSegmentsOfPartition_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(string), args[2].([]int64))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockManager_DropSegmentsOfPartition_Call) Return() *MockManager_DropSegmentsOfPartition_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockManager_DropSegmentsOfPartition_Call) RunAndReturn(run func(context.Context, string, []int64)) *MockManager_DropSegmentsOfPartition_Call {
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// ExpireAllocations provides a mock function with given fields: ctx, channel, ts
|
||||
func (_m *MockManager) ExpireAllocations(ctx context.Context, channel string, ts uint64) {
|
||||
_m.Called(ctx, channel, ts)
|
||||
|
||||
@ -92,6 +92,8 @@ type Manager interface {
|
||||
DropSegmentsOfChannel(ctx context.Context, channel string)
|
||||
// CleanZeroSealedSegmentsOfChannel try to clean real empty sealed segments in a channel
|
||||
CleanZeroSealedSegmentsOfChannel(ctx context.Context, channel string, cpTs Timestamp)
|
||||
|
||||
DropSegmentsOfPartition(ctx context.Context, channel string, partitionID []int64)
|
||||
}
|
||||
|
||||
// Allocation records the allocation info
|
||||
@ -683,3 +685,48 @@ func (s *SegmentManager) DropSegmentsOfChannel(ctx context.Context, channel stri
|
||||
})
|
||||
s.channel2Growing.Remove(channel)
|
||||
}
|
||||
|
||||
func (s *SegmentManager) DropSegmentsOfPartition(ctx context.Context, channel string, partitionIDs []int64) {
|
||||
s.channelLock.Lock(channel)
|
||||
defer s.channelLock.Unlock(channel)
|
||||
if growing, ok := s.channel2Growing.Get(channel); ok {
|
||||
for sid := range growing {
|
||||
segment := s.meta.GetHealthySegment(ctx, sid)
|
||||
if segment == nil {
|
||||
log.Warn("failed to get segment, remove it",
|
||||
zap.String("channel", channel),
|
||||
zap.Int64("segmentID", sid))
|
||||
growing.Remove(sid)
|
||||
continue
|
||||
}
|
||||
|
||||
if contains(partitionIDs, segment.GetPartitionID()) {
|
||||
growing.Remove(sid)
|
||||
}
|
||||
s.meta.SetAllocations(sid, nil)
|
||||
for _, allocation := range segment.allocations {
|
||||
putAllocation(allocation)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if sealed, ok := s.channel2Sealed.Get(channel); ok {
|
||||
for sid := range sealed {
|
||||
segment := s.meta.GetHealthySegment(ctx, sid)
|
||||
if segment == nil {
|
||||
log.Warn("failed to get segment, remove it",
|
||||
zap.String("channel", channel),
|
||||
zap.Int64("segmentID", sid))
|
||||
sealed.Remove(sid)
|
||||
continue
|
||||
}
|
||||
if contains(partitionIDs, segment.GetPartitionID()) {
|
||||
sealed.Remove(sid)
|
||||
}
|
||||
s.meta.SetAllocations(sid, nil)
|
||||
for _, allocation := range segment.allocations {
|
||||
putAllocation(allocation)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -416,6 +416,29 @@ func TestDropSegment(t *testing.T) {
|
||||
assert.NotNil(t, segment)
|
||||
}
|
||||
|
||||
func TestDropSegmentOfPartition(t *testing.T) {
|
||||
paramtable.Init()
|
||||
mockAllocator := newMockAllocator(t)
|
||||
meta, err := newMemoryMeta(t)
|
||||
assert.NoError(t, err)
|
||||
|
||||
schema := newTestSchema()
|
||||
collID, err := mockAllocator.AllocID(context.Background())
|
||||
assert.NoError(t, err)
|
||||
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
|
||||
segmentManager, _ := newSegmentManager(meta, mockAllocator)
|
||||
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 100, "c1", 1000)
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 1, len(allocations))
|
||||
segID := allocations[0].SegmentID
|
||||
segment := meta.GetHealthySegment(context.TODO(), segID)
|
||||
assert.NotNil(t, segment)
|
||||
|
||||
segmentManager.DropSegmentsOfPartition(context.Background(), "c1", []int64{100})
|
||||
segment = meta.GetHealthySegment(context.TODO(), segID)
|
||||
assert.NotNil(t, segment)
|
||||
}
|
||||
|
||||
func TestAllocRowsLargerThanOneSegment(t *testing.T) {
|
||||
paramtable.Init()
|
||||
mockAllocator := newMockAllocator(t)
|
||||
|
||||
@ -692,6 +692,25 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Server) NotifyDropPartition(ctx context.Context, req *datapb.NotifyDropPartitionRequest) (*datapb.NotifyDropPartitionResponse, error) {
|
||||
if err := s.notifyDropPartition(ctx, req); err != nil {
|
||||
return &datapb.NotifyDropPartitionResponse{Status: merr.Status(err)}, nil
|
||||
}
|
||||
return &datapb.NotifyDropPartitionResponse{Status: merr.Success()}, nil
|
||||
}
|
||||
|
||||
func (s *Server) notifyDropPartition(ctx context.Context, req *datapb.NotifyDropPartitionRequest) error {
|
||||
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Ctx(ctx).Info("receive NotifyDropPartition request",
|
||||
zap.String("channelname", req.GetChannel()),
|
||||
zap.Any("partitionID", req.GetPartitionIDs()))
|
||||
s.segmentManager.DropSegmentsOfPartition(ctx, req.GetChannel(), req.GetPartitionIDs())
|
||||
// release all segments of the partition.
|
||||
return s.meta.DropSegmentsOfPartition(ctx, req.GetPartitionIDs())
|
||||
}
|
||||
|
||||
// SetSegmentState reset the state of the given segment.
|
||||
func (s *Server) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStateRequest) (*datapb.SetSegmentStateResponse, error) {
|
||||
log := log.Ctx(ctx)
|
||||
|
||||
@ -781,3 +781,9 @@ func (c *Client) ListIndexes(ctx context.Context, in *indexpb.ListIndexesRequest
|
||||
return client.ListIndexes(ctx, in)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Client) NotifyDropPartition(ctx context.Context, in *datapb.NotifyDropPartitionRequest, opts ...grpc.CallOption) (*datapb.NotifyDropPartitionResponse, error) {
|
||||
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.NotifyDropPartitionResponse, error) {
|
||||
return client.NotifyDropPartition(ctx, in)
|
||||
})
|
||||
}
|
||||
|
||||
@ -2318,3 +2318,54 @@ func Test_GetChannelRecoveryInfo(t *testing.T) {
|
||||
_, err = client.GetChannelRecoveryInfo(ctx, &datapb.GetChannelRecoveryInfoRequest{})
|
||||
assert.ErrorIs(t, err, context.Canceled)
|
||||
}
|
||||
|
||||
func Test_NotifyDroppartition(t *testing.T) {
|
||||
paramtable.Init()
|
||||
|
||||
ctx := context.Background()
|
||||
client, err := NewClient(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, client)
|
||||
defer client.Close()
|
||||
|
||||
mockDC := mocks.NewMockDataCoordClient(t)
|
||||
mockGrpcClient := mocks.NewMockGrpcClient[datapb.DataCoordClient](t)
|
||||
mockGrpcClient.EXPECT().Close().Return(nil)
|
||||
mockGrpcClient.EXPECT().ReCall(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, f func(datapb.DataCoordClient) (interface{}, error)) (interface{}, error) {
|
||||
return f(mockDC)
|
||||
})
|
||||
client.(*Client).grpcClient = mockGrpcClient
|
||||
|
||||
// test success
|
||||
mockDC.EXPECT().NotifyDropPartition(mock.Anything, mock.Anything).Return(&datapb.NotifyDropPartitionResponse{
|
||||
Status: merr.Success(),
|
||||
}, nil)
|
||||
_, err = client.NotifyDropPartition(ctx, &datapb.NotifyDropPartitionRequest{})
|
||||
assert.Nil(t, err)
|
||||
|
||||
// test return error status
|
||||
mockDC.ExpectedCalls = nil
|
||||
mockDC.EXPECT().NotifyDropPartition(mock.Anything, mock.Anything).Return(&datapb.NotifyDropPartitionResponse{
|
||||
Status: merr.Status(merr.ErrServiceNotReady),
|
||||
}, nil)
|
||||
|
||||
rsp, err := client.NotifyDropPartition(ctx, &datapb.NotifyDropPartitionRequest{})
|
||||
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())
|
||||
assert.Nil(t, err)
|
||||
|
||||
// test return error
|
||||
mockDC.ExpectedCalls = nil
|
||||
mockDC.EXPECT().NotifyDropPartition(mock.Anything, mock.Anything).Return(&datapb.NotifyDropPartitionResponse{
|
||||
Status: merr.Success(),
|
||||
}, mockErr)
|
||||
|
||||
_, err = client.NotifyDropPartition(ctx, &datapb.NotifyDropPartitionRequest{})
|
||||
assert.NotNil(t, err)
|
||||
|
||||
// test ctx done
|
||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
|
||||
defer cancel()
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
_, err = client.NotifyDropPartition(ctx, &datapb.NotifyDropPartitionRequest{})
|
||||
assert.ErrorIs(t, err, context.DeadlineExceeded)
|
||||
}
|
||||
|
||||
@ -514,3 +514,8 @@ func (s *Server) ListImports(ctx context.Context, in *internalpb.ListImportsRequ
|
||||
func (s *Server) ListIndexes(ctx context.Context, in *indexpb.ListIndexesRequest) (*indexpb.ListIndexesResponse, error) {
|
||||
return s.dataCoord.ListIndexes(ctx, in)
|
||||
}
|
||||
|
||||
// DropVirtualChannel drop virtual channel in datacoord
|
||||
func (s *Server) NotifyDropPartition(ctx context.Context, req *datapb.NotifyDropPartitionRequest) (*datapb.NotifyDropPartitionResponse, error) {
|
||||
return s.dataCoord.NotifyDropPartition(ctx, req)
|
||||
}
|
||||
|
||||
@ -329,6 +329,15 @@ func Test_NewServer(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, merr.Ok(ret.GetStatus()))
|
||||
})
|
||||
|
||||
t.Run("NotifyDropPartition", func(t *testing.T) {
|
||||
mockDataCoord.EXPECT().NotifyDropPartition(mock.Anything, mock.Anything).Return(&datapb.NotifyDropPartitionResponse{
|
||||
Status: merr.Success(),
|
||||
}, nil)
|
||||
ret, err := server.NotifyDropPartition(ctx, &datapb.NotifyDropPartitionRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, merr.Ok(ret.GetStatus()))
|
||||
})
|
||||
}
|
||||
|
||||
func Test_Run(t *testing.T) {
|
||||
|
||||
@ -37,4 +37,5 @@ type DataCoord interface {
|
||||
UpdateChannelCheckpoint(ctx context.Context, channelCPs []*msgpb.MsgPosition) error
|
||||
SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) error
|
||||
DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error)
|
||||
NotifyDropPartition(ctx context.Context, req *datapb.NotifyDropPartitionRequest) (*datapb.NotifyDropPartitionResponse, error)
|
||||
}
|
||||
|
||||
@ -154,3 +154,14 @@ func (dc *dataCoordBroker) DropVirtualChannel(ctx context.Context, req *datapb.D
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (dc *dataCoordBroker) NotifyDropPartition(ctx context.Context, req *datapb.NotifyDropPartitionRequest) (*datapb.NotifyDropPartitionResponse, error) {
|
||||
log := log.Ctx(ctx)
|
||||
resp, err := dc.client.NotifyDropPartition(ctx, req)
|
||||
if err := merr.CheckRPCCall(resp, err); err != nil {
|
||||
log.Warn("failed to NotifyDropPartition", zap.Error(err))
|
||||
return resp, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@ -245,6 +245,34 @@ func (s *dataCoordSuite) TestSaveBinlogPaths() {
|
||||
})
|
||||
}
|
||||
|
||||
func (s *dataCoordSuite) TestNotifyDropPartition() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
req := &datapb.NotifyDropPartitionRequest{
|
||||
PartitionIDs: []int64{1},
|
||||
}
|
||||
|
||||
s.Run("normal_case", func() {
|
||||
s.dc.EXPECT().NotifyDropPartition(mock.Anything, mock.Anything).
|
||||
Run(func(_ context.Context, req *datapb.NotifyDropPartitionRequest, _ ...grpc.CallOption) {
|
||||
s.Equal([]int64{1}, req.GetPartitionIDs())
|
||||
}).
|
||||
Return(&datapb.NotifyDropPartitionResponse{Status: merr.Status(nil)}, nil)
|
||||
_, err := s.broker.NotifyDropPartition(ctx, req)
|
||||
s.NoError(err)
|
||||
s.resetMock()
|
||||
})
|
||||
|
||||
s.Run("datacoord_return_error", func() {
|
||||
s.dc.EXPECT().NotifyDropPartition(mock.Anything, mock.Anything).
|
||||
Return(nil, errors.New("mock"))
|
||||
_, err := s.broker.NotifyDropPartition(ctx, req)
|
||||
s.Error(err)
|
||||
s.resetMock()
|
||||
})
|
||||
}
|
||||
|
||||
func (s *dataCoordSuite) TestDropVirtualChannel() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
// Code generated by mockery v2.46.0. DO NOT EDIT.
|
||||
// Code generated by mockery v2.53.3. DO NOT EDIT.
|
||||
|
||||
package broker
|
||||
|
||||
@ -215,6 +215,65 @@ func (_c *MockBroker_GetSegmentInfo_Call) RunAndReturn(run func(context.Context,
|
||||
return _c
|
||||
}
|
||||
|
||||
// NotifyDropPartition provides a mock function with given fields: ctx, req
|
||||
func (_m *MockBroker) NotifyDropPartition(ctx context.Context, req *datapb.NotifyDropPartitionRequest) (*datapb.NotifyDropPartitionResponse, error) {
|
||||
ret := _m.Called(ctx, req)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for NotifyDropPartition")
|
||||
}
|
||||
|
||||
var r0 *datapb.NotifyDropPartitionResponse
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *datapb.NotifyDropPartitionRequest) (*datapb.NotifyDropPartitionResponse, error)); ok {
|
||||
return rf(ctx, req)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *datapb.NotifyDropPartitionRequest) *datapb.NotifyDropPartitionResponse); ok {
|
||||
r0 = rf(ctx, req)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*datapb.NotifyDropPartitionResponse)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, *datapb.NotifyDropPartitionRequest) error); ok {
|
||||
r1 = rf(ctx, req)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockBroker_NotifyDropPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NotifyDropPartition'
|
||||
type MockBroker_NotifyDropPartition_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// NotifyDropPartition is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - req *datapb.NotifyDropPartitionRequest
|
||||
func (_e *MockBroker_Expecter) NotifyDropPartition(ctx interface{}, req interface{}) *MockBroker_NotifyDropPartition_Call {
|
||||
return &MockBroker_NotifyDropPartition_Call{Call: _e.mock.On("NotifyDropPartition", ctx, req)}
|
||||
}
|
||||
|
||||
func (_c *MockBroker_NotifyDropPartition_Call) Run(run func(ctx context.Context, req *datapb.NotifyDropPartitionRequest)) *MockBroker_NotifyDropPartition_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(*datapb.NotifyDropPartitionRequest))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockBroker_NotifyDropPartition_Call) Return(_a0 *datapb.NotifyDropPartitionResponse, _a1 error) *MockBroker_NotifyDropPartition_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockBroker_NotifyDropPartition_Call) RunAndReturn(run func(context.Context, *datapb.NotifyDropPartitionRequest) (*datapb.NotifyDropPartitionResponse, error)) *MockBroker_NotifyDropPartition_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// ReportTimeTick provides a mock function with given fields: ctx, msgs
|
||||
func (_m *MockBroker) ReportTimeTick(ctx context.Context, msgs []*msgpb.DataNodeTtMsg) error {
|
||||
ret := _m.Called(ctx, msgs)
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
// Code generated by mockery v2.46.0. DO NOT EDIT.
|
||||
// Code generated by mockery v2.53.3. DO NOT EDIT.
|
||||
|
||||
package io
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
// Code generated by mockery v2.46.0. DO NOT EDIT.
|
||||
// Code generated by mockery v2.53.3. DO NOT EDIT.
|
||||
|
||||
package metacache
|
||||
|
||||
@ -72,11 +72,11 @@ func (_c *MockMetaCache_AddSegment_Call) Return() *MockMetaCache_AddSegment_Call
|
||||
}
|
||||
|
||||
func (_c *MockMetaCache_AddSegment_Call) RunAndReturn(run func(*datapb.SegmentInfo, PkStatsFactory, BM25StatsFactory, ...SegmentAction)) *MockMetaCache_AddSegment_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// Collection provides a mock function with given fields:
|
||||
// Collection provides a mock function with no fields
|
||||
func (_m *MockMetaCache) Collection() int64 {
|
||||
ret := _m.Called()
|
||||
|
||||
@ -498,7 +498,7 @@ func (_c *MockMetaCache_RemoveSegments_Call) RunAndReturn(run func(...SegmentFil
|
||||
return _c
|
||||
}
|
||||
|
||||
// Schema provides a mock function with given fields:
|
||||
// Schema provides a mock function with no fields
|
||||
func (_m *MockMetaCache) Schema() *schemapb.CollectionSchema {
|
||||
ret := _m.Called()
|
||||
|
||||
@ -577,7 +577,7 @@ func (_c *MockMetaCache_UpdateSegmentView_Call) Return() *MockMetaCache_UpdateSe
|
||||
}
|
||||
|
||||
func (_c *MockMetaCache_UpdateSegmentView_Call) RunAndReturn(run func(int64, []*datapb.SyncSegmentInfo, []*pkoracle.BloomFilterSet, map[int64]struct{})) *MockMetaCache_UpdateSegmentView_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
@ -625,7 +625,7 @@ func (_c *MockMetaCache_UpdateSegments_Call) Return() *MockMetaCache_UpdateSegme
|
||||
}
|
||||
|
||||
func (_c *MockMetaCache_UpdateSegments_Call) RunAndReturn(run func(SegmentAction, ...SegmentFilter)) *MockMetaCache_UpdateSegments_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
// Code generated by mockery v2.46.0. DO NOT EDIT.
|
||||
// Code generated by mockery v2.53.3. DO NOT EDIT.
|
||||
|
||||
package pipeline
|
||||
|
||||
@ -46,11 +46,11 @@ func (_c *MockFlowgraphManager_AddFlowgraph_Call) Return() *MockFlowgraphManager
|
||||
}
|
||||
|
||||
func (_c *MockFlowgraphManager_AddFlowgraph_Call) RunAndReturn(run func(*DataSyncService)) *MockFlowgraphManager_AddFlowgraph_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// ClearFlowgraphs provides a mock function with given fields:
|
||||
// ClearFlowgraphs provides a mock function with no fields
|
||||
func (_m *MockFlowgraphManager) ClearFlowgraphs() {
|
||||
_m.Called()
|
||||
}
|
||||
@ -78,11 +78,11 @@ func (_c *MockFlowgraphManager_ClearFlowgraphs_Call) Return() *MockFlowgraphMana
|
||||
}
|
||||
|
||||
func (_c *MockFlowgraphManager_ClearFlowgraphs_Call) RunAndReturn(run func()) *MockFlowgraphManager_ClearFlowgraphs_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// Close provides a mock function with given fields:
|
||||
// Close provides a mock function with no fields
|
||||
func (_m *MockFlowgraphManager) Close() {
|
||||
_m.Called()
|
||||
}
|
||||
@ -110,7 +110,7 @@ func (_c *MockFlowgraphManager_Close_Call) Return() *MockFlowgraphManager_Close_
|
||||
}
|
||||
|
||||
func (_c *MockFlowgraphManager_Close_Call) RunAndReturn(run func()) *MockFlowgraphManager_Close_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
@ -160,7 +160,7 @@ func (_c *MockFlowgraphManager_GetChannelsJSON_Call) RunAndReturn(run func(int64
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetCollectionIDs provides a mock function with given fields:
|
||||
// GetCollectionIDs provides a mock function with no fields
|
||||
func (_m *MockFlowgraphManager) GetCollectionIDs() []int64 {
|
||||
ret := _m.Called()
|
||||
|
||||
@ -207,7 +207,7 @@ func (_c *MockFlowgraphManager_GetCollectionIDs_Call) RunAndReturn(run func() []
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetFlowgraphCount provides a mock function with given fields:
|
||||
// GetFlowgraphCount provides a mock function with no fields
|
||||
func (_m *MockFlowgraphManager) GetFlowgraphCount() int {
|
||||
ret := _m.Called()
|
||||
|
||||
@ -478,7 +478,7 @@ func (_c *MockFlowgraphManager_RemoveFlowgraph_Call) Return() *MockFlowgraphMana
|
||||
}
|
||||
|
||||
func (_c *MockFlowgraphManager_RemoveFlowgraph_Call) RunAndReturn(run func(string)) *MockFlowgraphManager_RemoveFlowgraph_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
|
||||
@ -21,6 +21,7 @@ import (
|
||||
type MetaWriter interface {
|
||||
UpdateSync(context.Context, *SyncTask) error
|
||||
DropChannel(context.Context, string) error
|
||||
NotifyDropPartition(context.Context, int64, string, []int64) error
|
||||
}
|
||||
|
||||
type brokerMetaWriter struct {
|
||||
@ -173,3 +174,37 @@ func (b *brokerMetaWriter) DropChannel(ctx context.Context, channelName string)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *brokerMetaWriter) NotifyDropPartition(ctx context.Context, collectionID int64, channelName string, partitionIDs []int64) error {
|
||||
if len(partitionIDs) == 0 {
|
||||
return merr.WrapErrParameterInvalid("non-empty partitionIDs", "empty partitionIDs")
|
||||
}
|
||||
|
||||
err := retry.Handle(ctx, func() (bool, error) {
|
||||
status, err := b.broker.NotifyDropPartition(context.Background(), &datapb.NotifyDropPartitionRequest{
|
||||
Channel: channelName,
|
||||
CollectionId: collectionID,
|
||||
PartitionIDs: partitionIDs,
|
||||
})
|
||||
err = merr.CheckRPCCall(status, err)
|
||||
if err != nil {
|
||||
return !merr.IsCanceledOrTimeout(err), err
|
||||
}
|
||||
return false, nil
|
||||
}, b.opts...)
|
||||
if err != nil {
|
||||
log.Warn("failed to notify drop partition",
|
||||
zap.String("channel", channelName),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64s("partitionIDs", partitionIDs),
|
||||
zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("successfully notified drop partition",
|
||||
zap.String("channel", channelName),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64s("partitionIDs", partitionIDs))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
// Code generated by mockery v2.46.0. DO NOT EDIT.
|
||||
// Code generated by mockery v2.53.3. DO NOT EDIT.
|
||||
|
||||
package syncmgr
|
||||
|
||||
@ -68,6 +68,55 @@ func (_c *MockMetaWriter_DropChannel_Call) RunAndReturn(run func(context.Context
|
||||
return _c
|
||||
}
|
||||
|
||||
// NotifyDropPartition provides a mock function with given fields: _a0, _a1, _a2, _a3
|
||||
func (_m *MockMetaWriter) NotifyDropPartition(_a0 context.Context, _a1 int64, _a2 string, _a3 []int64) error {
|
||||
ret := _m.Called(_a0, _a1, _a2, _a3)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for NotifyDropPartition")
|
||||
}
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, int64, string, []int64) error); ok {
|
||||
r0 = rf(_a0, _a1, _a2, _a3)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// MockMetaWriter_NotifyDropPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NotifyDropPartition'
|
||||
type MockMetaWriter_NotifyDropPartition_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// NotifyDropPartition is a helper method to define mock.On call
|
||||
// - _a0 context.Context
|
||||
// - _a1 int64
|
||||
// - _a2 string
|
||||
// - _a3 []int64
|
||||
func (_e *MockMetaWriter_Expecter) NotifyDropPartition(_a0 interface{}, _a1 interface{}, _a2 interface{}, _a3 interface{}) *MockMetaWriter_NotifyDropPartition_Call {
|
||||
return &MockMetaWriter_NotifyDropPartition_Call{Call: _e.mock.On("NotifyDropPartition", _a0, _a1, _a2, _a3)}
|
||||
}
|
||||
|
||||
func (_c *MockMetaWriter_NotifyDropPartition_Call) Run(run func(_a0 context.Context, _a1 int64, _a2 string, _a3 []int64)) *MockMetaWriter_NotifyDropPartition_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(int64), args[2].(string), args[3].([]int64))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetaWriter_NotifyDropPartition_Call) Return(_a0 error) *MockMetaWriter_NotifyDropPartition_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetaWriter_NotifyDropPartition_Call) RunAndReturn(run func(context.Context, int64, string, []int64) error) *MockMetaWriter_NotifyDropPartition_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// UpdateSync provides a mock function with given fields: _a0, _a1
|
||||
func (_m *MockMetaWriter) UpdateSync(_a0 context.Context, _a1 *SyncTask) error {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
// Code generated by mockery v2.46.0. DO NOT EDIT.
|
||||
// Code generated by mockery v2.53.3. DO NOT EDIT.
|
||||
|
||||
package syncmgr
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
// Code generated by mockery v2.46.0. DO NOT EDIT.
|
||||
// Code generated by mockery v2.53.3. DO NOT EDIT.
|
||||
|
||||
package syncmgr
|
||||
|
||||
@ -23,7 +23,7 @@ func (_m *MockSyncManager) EXPECT() *MockSyncManager_Expecter {
|
||||
return &MockSyncManager_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// Close provides a mock function with given fields:
|
||||
// Close provides a mock function with no fields
|
||||
func (_m *MockSyncManager) Close() error {
|
||||
ret := _m.Called()
|
||||
|
||||
@ -142,7 +142,7 @@ func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context,
|
||||
return _c
|
||||
}
|
||||
|
||||
// TaskStatsJSON provides a mock function with given fields:
|
||||
// TaskStatsJSON provides a mock function with no fields
|
||||
func (_m *MockSyncManager) TaskStatsJSON() string {
|
||||
ret := _m.Called()
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
// Code generated by mockery v2.46.0. DO NOT EDIT.
|
||||
// Code generated by mockery v2.53.3. DO NOT EDIT.
|
||||
|
||||
package syncmgr
|
||||
|
||||
@ -22,7 +22,7 @@ func (_m *MockTask) EXPECT() *MockTask_Expecter {
|
||||
return &MockTask_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// ChannelName provides a mock function with given fields:
|
||||
// ChannelName provides a mock function with no fields
|
||||
func (_m *MockTask) ChannelName() string {
|
||||
ret := _m.Called()
|
||||
|
||||
@ -67,7 +67,7 @@ func (_c *MockTask_ChannelName_Call) RunAndReturn(run func() string) *MockTask_C
|
||||
return _c
|
||||
}
|
||||
|
||||
// Checkpoint provides a mock function with given fields:
|
||||
// Checkpoint provides a mock function with no fields
|
||||
func (_m *MockTask) Checkpoint() *msgpb.MsgPosition {
|
||||
ret := _m.Called()
|
||||
|
||||
@ -143,11 +143,11 @@ func (_c *MockTask_HandleError_Call) Return() *MockTask_HandleError_Call {
|
||||
}
|
||||
|
||||
func (_c *MockTask_HandleError_Call) RunAndReturn(run func(error)) *MockTask_HandleError_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// IsFlush provides a mock function with given fields:
|
||||
// IsFlush provides a mock function with no fields
|
||||
func (_m *MockTask) IsFlush() bool {
|
||||
ret := _m.Called()
|
||||
|
||||
@ -238,7 +238,7 @@ func (_c *MockTask_Run_Call) RunAndReturn(run func(context.Context) error) *Mock
|
||||
return _c
|
||||
}
|
||||
|
||||
// SegmentID provides a mock function with given fields:
|
||||
// SegmentID provides a mock function with no fields
|
||||
func (_m *MockTask) SegmentID() int64 {
|
||||
ret := _m.Called()
|
||||
|
||||
@ -283,7 +283,7 @@ func (_c *MockTask_SegmentID_Call) RunAndReturn(run func() int64) *MockTask_Segm
|
||||
return _c
|
||||
}
|
||||
|
||||
// StartPosition provides a mock function with given fields:
|
||||
// StartPosition provides a mock function with no fields
|
||||
func (_m *MockTask) StartPosition() *msgpb.MsgPosition {
|
||||
ret := _m.Called()
|
||||
|
||||
|
||||
@ -280,6 +280,5 @@ func (m *bufferManager) DropPartitions(channel string, partitionIDs []int64) {
|
||||
log.Warn("failed to drop partition, channel not maintained in manager", zap.String("channel", channel), zap.Int64s("partitionIDs", partitionIDs))
|
||||
return
|
||||
}
|
||||
|
||||
buf.DropPartitions(partitionIDs)
|
||||
}
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
// Code generated by mockery v2.46.0. DO NOT EDIT.
|
||||
// Code generated by mockery v2.53.3. DO NOT EDIT.
|
||||
|
||||
package writebuffer
|
||||
|
||||
@ -154,7 +154,7 @@ func (_c *MockBufferManager_DropChannel_Call) Return() *MockBufferManager_DropCh
|
||||
}
|
||||
|
||||
func (_c *MockBufferManager_DropChannel_Call) RunAndReturn(run func(string)) *MockBufferManager_DropChannel_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
@ -188,7 +188,7 @@ func (_c *MockBufferManager_DropPartitions_Call) Return() *MockBufferManager_Dro
|
||||
}
|
||||
|
||||
func (_c *MockBufferManager_DropPartitions_Call) RunAndReturn(run func(string, []int64)) *MockBufferManager_DropPartitions_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
@ -335,7 +335,7 @@ func (_c *MockBufferManager_NotifyCheckpointUpdated_Call) Return() *MockBufferMa
|
||||
}
|
||||
|
||||
func (_c *MockBufferManager_NotifyCheckpointUpdated_Call) RunAndReturn(run func(string, uint64)) *MockBufferManager_NotifyCheckpointUpdated_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
@ -430,7 +430,7 @@ func (_c *MockBufferManager_RemoveChannel_Call) Return() *MockBufferManager_Remo
|
||||
}
|
||||
|
||||
func (_c *MockBufferManager_RemoveChannel_Call) RunAndReturn(run func(string)) *MockBufferManager_RemoveChannel_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
@ -482,7 +482,7 @@ func (_c *MockBufferManager_SealSegments_Call) RunAndReturn(run func(context.Con
|
||||
return _c
|
||||
}
|
||||
|
||||
// Start provides a mock function with given fields:
|
||||
// Start provides a mock function with no fields
|
||||
func (_m *MockBufferManager) Start() {
|
||||
_m.Called()
|
||||
}
|
||||
@ -510,11 +510,11 @@ func (_c *MockBufferManager_Start_Call) Return() *MockBufferManager_Start_Call {
|
||||
}
|
||||
|
||||
func (_c *MockBufferManager_Start_Call) RunAndReturn(run func()) *MockBufferManager_Start_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// Stop provides a mock function with given fields:
|
||||
// Stop provides a mock function with no fields
|
||||
func (_m *MockBufferManager) Stop() {
|
||||
_m.Called()
|
||||
}
|
||||
@ -542,7 +542,7 @@ func (_c *MockBufferManager_Stop_Call) Return() *MockBufferManager_Stop_Call {
|
||||
}
|
||||
|
||||
func (_c *MockBufferManager_Stop_Call) RunAndReturn(run func()) *MockBufferManager_Stop_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
// Code generated by mockery v2.46.0. DO NOT EDIT.
|
||||
// Code generated by mockery v2.53.3. DO NOT EDIT.
|
||||
|
||||
package writebuffer
|
||||
|
||||
@ -103,7 +103,7 @@ func (_c *MockWriteBuffer_Close_Call) Return() *MockWriteBuffer_Close_Call {
|
||||
}
|
||||
|
||||
func (_c *MockWriteBuffer_Close_Call) RunAndReturn(run func(context.Context, bool)) *MockWriteBuffer_Close_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
@ -138,7 +138,7 @@ func (_c *MockWriteBuffer_CreateNewGrowingSegment_Call) Return() *MockWriteBuffe
|
||||
}
|
||||
|
||||
func (_c *MockWriteBuffer_CreateNewGrowingSegment_Call) RunAndReturn(run func(int64, int64, *msgpb.MsgPosition)) *MockWriteBuffer_CreateNewGrowingSegment_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
@ -171,7 +171,7 @@ func (_c *MockWriteBuffer_DropPartitions_Call) Return() *MockWriteBuffer_DropPar
|
||||
}
|
||||
|
||||
func (_c *MockWriteBuffer_DropPartitions_Call) RunAndReturn(run func([]int64)) *MockWriteBuffer_DropPartitions_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
@ -217,11 +217,11 @@ func (_c *MockWriteBuffer_EvictBuffer_Call) Return() *MockWriteBuffer_EvictBuffe
|
||||
}
|
||||
|
||||
func (_c *MockWriteBuffer_EvictBuffer_Call) RunAndReturn(run func(...SyncPolicy)) *MockWriteBuffer_EvictBuffer_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetCheckpoint provides a mock function with given fields:
|
||||
// GetCheckpoint provides a mock function with no fields
|
||||
func (_m *MockWriteBuffer) GetCheckpoint() *msgpb.MsgPosition {
|
||||
ret := _m.Called()
|
||||
|
||||
@ -268,7 +268,7 @@ func (_c *MockWriteBuffer_GetCheckpoint_Call) RunAndReturn(run func() *msgpb.Msg
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetFlushTimestamp provides a mock function with given fields:
|
||||
// GetFlushTimestamp provides a mock function with no fields
|
||||
func (_m *MockWriteBuffer) GetFlushTimestamp() uint64 {
|
||||
ret := _m.Called()
|
||||
|
||||
@ -359,7 +359,7 @@ func (_c *MockWriteBuffer_HasSegment_Call) RunAndReturn(run func(int64) bool) *M
|
||||
return _c
|
||||
}
|
||||
|
||||
// MemorySize provides a mock function with given fields:
|
||||
// MemorySize provides a mock function with no fields
|
||||
func (_m *MockWriteBuffer) MemorySize() int64 {
|
||||
ret := _m.Called()
|
||||
|
||||
@ -480,7 +480,7 @@ func (_c *MockWriteBuffer_SetFlushTimestamp_Call) Return() *MockWriteBuffer_SetF
|
||||
}
|
||||
|
||||
func (_c *MockWriteBuffer_SetFlushTimestamp_Call) RunAndReturn(run func(uint64)) *MockWriteBuffer_SetFlushTimestamp_Call {
|
||||
_c.Call.Return(run)
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
|
||||
@ -200,9 +200,13 @@ func (wb *writeBufferBase) SealSegments(ctx context.Context, segmentIDs []int64)
|
||||
}
|
||||
|
||||
func (wb *writeBufferBase) DropPartitions(partitionIDs []int64) {
|
||||
err := wb.metaWriter.NotifyDropPartition(context.Background(), wb.collectionID, wb.channelName, partitionIDs)
|
||||
if err != nil {
|
||||
log.Error("failed to drop partition", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
wb.mut.RLock()
|
||||
defer wb.mut.RUnlock()
|
||||
|
||||
wb.dropPartitions(partitionIDs)
|
||||
}
|
||||
|
||||
|
||||
@ -2559,6 +2559,65 @@ func (_c *MockDataCoord_MarkSegmentsDropped_Call) RunAndReturn(run func(context.
|
||||
return _c
|
||||
}
|
||||
|
||||
// NotifyDropPartition provides a mock function with given fields: _a0, _a1
|
||||
func (_m *MockDataCoord) NotifyDropPartition(_a0 context.Context, _a1 *datapb.NotifyDropPartitionRequest) (*datapb.NotifyDropPartitionResponse, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for NotifyDropPartition")
|
||||
}
|
||||
|
||||
var r0 *datapb.NotifyDropPartitionResponse
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *datapb.NotifyDropPartitionRequest) (*datapb.NotifyDropPartitionResponse, error)); ok {
|
||||
return rf(_a0, _a1)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *datapb.NotifyDropPartitionRequest) *datapb.NotifyDropPartitionResponse); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*datapb.NotifyDropPartitionResponse)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, *datapb.NotifyDropPartitionRequest) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockDataCoord_NotifyDropPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NotifyDropPartition'
|
||||
type MockDataCoord_NotifyDropPartition_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// NotifyDropPartition is a helper method to define mock.On call
|
||||
// - _a0 context.Context
|
||||
// - _a1 *datapb.NotifyDropPartitionRequest
|
||||
func (_e *MockDataCoord_Expecter) NotifyDropPartition(_a0 interface{}, _a1 interface{}) *MockDataCoord_NotifyDropPartition_Call {
|
||||
return &MockDataCoord_NotifyDropPartition_Call{Call: _e.mock.On("NotifyDropPartition", _a0, _a1)}
|
||||
}
|
||||
|
||||
func (_c *MockDataCoord_NotifyDropPartition_Call) Run(run func(_a0 context.Context, _a1 *datapb.NotifyDropPartitionRequest)) *MockDataCoord_NotifyDropPartition_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(*datapb.NotifyDropPartitionRequest))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockDataCoord_NotifyDropPartition_Call) Return(_a0 *datapb.NotifyDropPartitionResponse, _a1 error) *MockDataCoord_NotifyDropPartition_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockDataCoord_NotifyDropPartition_Call) RunAndReturn(run func(context.Context, *datapb.NotifyDropPartitionRequest) (*datapb.NotifyDropPartitionResponse, error)) *MockDataCoord_NotifyDropPartition_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// Register provides a mock function with no fields
|
||||
func (_m *MockDataCoord) Register() error {
|
||||
ret := _m.Called()
|
||||
|
||||
@ -3186,6 +3186,80 @@ func (_c *MockDataCoordClient_MarkSegmentsDropped_Call) RunAndReturn(run func(co
|
||||
return _c
|
||||
}
|
||||
|
||||
// NotifyDropPartition provides a mock function with given fields: ctx, in, opts
|
||||
func (_m *MockDataCoordClient) NotifyDropPartition(ctx context.Context, in *datapb.NotifyDropPartitionRequest, opts ...grpc.CallOption) (*datapb.NotifyDropPartitionResponse, error) {
|
||||
_va := make([]interface{}, len(opts))
|
||||
for _i := range opts {
|
||||
_va[_i] = opts[_i]
|
||||
}
|
||||
var _ca []interface{}
|
||||
_ca = append(_ca, ctx, in)
|
||||
_ca = append(_ca, _va...)
|
||||
ret := _m.Called(_ca...)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for NotifyDropPartition")
|
||||
}
|
||||
|
||||
var r0 *datapb.NotifyDropPartitionResponse
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *datapb.NotifyDropPartitionRequest, ...grpc.CallOption) (*datapb.NotifyDropPartitionResponse, error)); ok {
|
||||
return rf(ctx, in, opts...)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *datapb.NotifyDropPartitionRequest, ...grpc.CallOption) *datapb.NotifyDropPartitionResponse); ok {
|
||||
r0 = rf(ctx, in, opts...)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*datapb.NotifyDropPartitionResponse)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, *datapb.NotifyDropPartitionRequest, ...grpc.CallOption) error); ok {
|
||||
r1 = rf(ctx, in, opts...)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockDataCoordClient_NotifyDropPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NotifyDropPartition'
|
||||
type MockDataCoordClient_NotifyDropPartition_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// NotifyDropPartition is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - in *datapb.NotifyDropPartitionRequest
|
||||
// - opts ...grpc.CallOption
|
||||
func (_e *MockDataCoordClient_Expecter) NotifyDropPartition(ctx interface{}, in interface{}, opts ...interface{}) *MockDataCoordClient_NotifyDropPartition_Call {
|
||||
return &MockDataCoordClient_NotifyDropPartition_Call{Call: _e.mock.On("NotifyDropPartition",
|
||||
append([]interface{}{ctx, in}, opts...)...)}
|
||||
}
|
||||
|
||||
func (_c *MockDataCoordClient_NotifyDropPartition_Call) Run(run func(ctx context.Context, in *datapb.NotifyDropPartitionRequest, opts ...grpc.CallOption)) *MockDataCoordClient_NotifyDropPartition_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
variadicArgs := make([]grpc.CallOption, len(args)-2)
|
||||
for i, a := range args[2:] {
|
||||
if a != nil {
|
||||
variadicArgs[i] = a.(grpc.CallOption)
|
||||
}
|
||||
}
|
||||
run(args[0].(context.Context), args[1].(*datapb.NotifyDropPartitionRequest), variadicArgs...)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockDataCoordClient_NotifyDropPartition_Call) Return(_a0 *datapb.NotifyDropPartitionResponse, _a1 error) *MockDataCoordClient_NotifyDropPartition_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockDataCoordClient_NotifyDropPartition_Call) RunAndReturn(run func(context.Context, *datapb.NotifyDropPartitionRequest, ...grpc.CallOption) (*datapb.NotifyDropPartitionResponse, error)) *MockDataCoordClient_NotifyDropPartition_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// ReportDataNodeTtMsgs provides a mock function with given fields: ctx, in, opts
|
||||
func (_m *MockDataCoordClient) ReportDataNodeTtMsgs(ctx context.Context, in *datapb.ReportDataNodeTtMsgsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
_va := make([]interface{}, len(opts))
|
||||
|
||||
@ -71,6 +71,8 @@ service DataCoord {
|
||||
rpc DropVirtualChannel(DropVirtualChannelRequest) returns (DropVirtualChannelResponse) {}
|
||||
|
||||
rpc SetSegmentState(SetSegmentStateRequest) returns (SetSegmentStateResponse) {}
|
||||
|
||||
rpc NotifyDropPartition(NotifyDropPartitionRequest) returns (NotifyDropPartitionResponse) {}
|
||||
// Deprecated
|
||||
rpc UpdateSegmentStatistics(UpdateSegmentStatisticsRequest) returns (common.Status) {}
|
||||
rpc UpdateChannelCheckpoint(UpdateChannelCheckpointRequest) returns (common.Status) {}
|
||||
@ -1012,3 +1014,14 @@ message PartitionStatsInfo {
|
||||
message DropCompactionPlanRequest {
|
||||
int64 planID = 1;
|
||||
}
|
||||
|
||||
|
||||
message NotifyDropPartitionRequest {
|
||||
string channel = 1;
|
||||
int64 collection_id = 2;
|
||||
repeated int64 partitionIDs = 3;
|
||||
}
|
||||
|
||||
message NotifyDropPartitionResponse {
|
||||
common.Status status = 1;
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@ -51,6 +51,7 @@ const (
|
||||
DataCoord_GetFlushState_FullMethodName = "/milvus.proto.data.DataCoord/GetFlushState"
|
||||
DataCoord_DropVirtualChannel_FullMethodName = "/milvus.proto.data.DataCoord/DropVirtualChannel"
|
||||
DataCoord_SetSegmentState_FullMethodName = "/milvus.proto.data.DataCoord/SetSegmentState"
|
||||
DataCoord_NotifyDropPartition_FullMethodName = "/milvus.proto.data.DataCoord/NotifyDropPartition"
|
||||
DataCoord_UpdateSegmentStatistics_FullMethodName = "/milvus.proto.data.DataCoord/UpdateSegmentStatistics"
|
||||
DataCoord_UpdateChannelCheckpoint_FullMethodName = "/milvus.proto.data.DataCoord/UpdateChannelCheckpoint"
|
||||
DataCoord_MarkSegmentsDropped_FullMethodName = "/milvus.proto.data.DataCoord/MarkSegmentsDropped"
|
||||
@ -109,6 +110,7 @@ type DataCoordClient interface {
|
||||
GetFlushState(ctx context.Context, in *GetFlushStateRequest, opts ...grpc.CallOption) (*milvuspb.GetFlushStateResponse, error)
|
||||
DropVirtualChannel(ctx context.Context, in *DropVirtualChannelRequest, opts ...grpc.CallOption) (*DropVirtualChannelResponse, error)
|
||||
SetSegmentState(ctx context.Context, in *SetSegmentStateRequest, opts ...grpc.CallOption) (*SetSegmentStateResponse, error)
|
||||
NotifyDropPartition(ctx context.Context, in *NotifyDropPartitionRequest, opts ...grpc.CallOption) (*NotifyDropPartitionResponse, error)
|
||||
// Deprecated
|
||||
UpdateSegmentStatistics(ctx context.Context, in *UpdateSegmentStatisticsRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
UpdateChannelCheckpoint(ctx context.Context, in *UpdateChannelCheckpointRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
@ -397,6 +399,15 @@ func (c *dataCoordClient) SetSegmentState(ctx context.Context, in *SetSegmentSta
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *dataCoordClient) NotifyDropPartition(ctx context.Context, in *NotifyDropPartitionRequest, opts ...grpc.CallOption) (*NotifyDropPartitionResponse, error) {
|
||||
out := new(NotifyDropPartitionResponse)
|
||||
err := c.cc.Invoke(ctx, DataCoord_NotifyDropPartition_FullMethodName, in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *dataCoordClient) UpdateSegmentStatistics(ctx context.Context, in *UpdateSegmentStatisticsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
out := new(commonpb.Status)
|
||||
err := c.cc.Invoke(ctx, DataCoord_UpdateSegmentStatistics_FullMethodName, in, out, opts...)
|
||||
@ -621,6 +632,7 @@ type DataCoordServer interface {
|
||||
GetFlushState(context.Context, *GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error)
|
||||
DropVirtualChannel(context.Context, *DropVirtualChannelRequest) (*DropVirtualChannelResponse, error)
|
||||
SetSegmentState(context.Context, *SetSegmentStateRequest) (*SetSegmentStateResponse, error)
|
||||
NotifyDropPartition(context.Context, *NotifyDropPartitionRequest) (*NotifyDropPartitionResponse, error)
|
||||
// Deprecated
|
||||
UpdateSegmentStatistics(context.Context, *UpdateSegmentStatisticsRequest) (*commonpb.Status, error)
|
||||
UpdateChannelCheckpoint(context.Context, *UpdateChannelCheckpointRequest) (*commonpb.Status, error)
|
||||
@ -736,6 +748,9 @@ func (UnimplementedDataCoordServer) DropVirtualChannel(context.Context, *DropVir
|
||||
func (UnimplementedDataCoordServer) SetSegmentState(context.Context, *SetSegmentStateRequest) (*SetSegmentStateResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method SetSegmentState not implemented")
|
||||
}
|
||||
func (UnimplementedDataCoordServer) NotifyDropPartition(context.Context, *NotifyDropPartitionRequest) (*NotifyDropPartitionResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method NotifyDropPartition not implemented")
|
||||
}
|
||||
func (UnimplementedDataCoordServer) UpdateSegmentStatistics(context.Context, *UpdateSegmentStatisticsRequest) (*commonpb.Status, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method UpdateSegmentStatistics not implemented")
|
||||
}
|
||||
@ -1315,6 +1330,24 @@ func _DataCoord_SetSegmentState_Handler(srv interface{}, ctx context.Context, de
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _DataCoord_NotifyDropPartition_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(NotifyDropPartitionRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(DataCoordServer).NotifyDropPartition(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: DataCoord_NotifyDropPartition_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(DataCoordServer).NotifyDropPartition(ctx, req.(*NotifyDropPartitionRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _DataCoord_UpdateSegmentStatistics_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(UpdateSegmentStatisticsRequest)
|
||||
if err := dec(in); err != nil {
|
||||
@ -1812,6 +1845,10 @@ var DataCoord_ServiceDesc = grpc.ServiceDesc{
|
||||
MethodName: "SetSegmentState",
|
||||
Handler: _DataCoord_SetSegmentState_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "NotifyDropPartition",
|
||||
Handler: _DataCoord_NotifyDropPartition_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "UpdateSegmentStatistics",
|
||||
Handler: _DataCoord_UpdateSegmentStatistics_Handler,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user