diff --git a/internal/datanode/broker/broker.go b/internal/datanode/broker/broker.go index 456bce8782..234d62dd7b 100644 --- a/internal/datanode/broker/broker.go +++ b/internal/datanode/broker/broker.go @@ -48,7 +48,7 @@ type DataCoord interface { GetSegmentInfo(ctx context.Context, segmentIDs []int64) ([]*datapb.SegmentInfo, error) UpdateChannelCheckpoint(ctx context.Context, channelName string, cp *msgpb.MsgPosition) error SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) error - DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) error + DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) error SaveImportSegment(ctx context.Context, req *datapb.SaveImportSegmentRequest) error } diff --git a/internal/datanode/broker/datacoord.go b/internal/datanode/broker/datacoord.go index 7814ccb828..4290b543e3 100644 --- a/internal/datanode/broker/datacoord.go +++ b/internal/datanode/broker/datacoord.go @@ -117,19 +117,16 @@ func (dc *dataCoordBroker) SaveBinlogPaths(ctx context.Context, req *datapb.Save return nil } -func (dc *dataCoordBroker) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) error { +func (dc *dataCoordBroker) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) { log := log.Ctx(ctx) resp, err := dc.client.DropVirtualChannel(ctx, req) if err := merr.CheckRPCCall(resp, err); err != nil { - if resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_MetaFailed { - err = merr.WrapErrChannelNotFound(req.GetChannelName()) - } log.Warn("failed to SaveBinlogPaths", zap.Error(err)) - return err + return resp, err } - return nil + return resp, nil } func (dc *dataCoordBroker) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) error { diff --git a/internal/datanode/broker/datacoord_test.go b/internal/datanode/broker/datacoord_test.go index bb5dd5f4a8..b4564aba38 100644 --- a/internal/datanode/broker/datacoord_test.go +++ b/internal/datanode/broker/datacoord_test.go @@ -260,7 +260,7 @@ func (s *dataCoordSuite) TestDropVirtualChannel() { s.Equal("dml_0", req.GetChannelName()) }). Return(&datapb.DropVirtualChannelResponse{Status: merr.Status(nil)}, nil) - err := s.broker.DropVirtualChannel(ctx, req) + _, err := s.broker.DropVirtualChannel(ctx, req) s.NoError(err) s.resetMock() }) @@ -268,7 +268,7 @@ func (s *dataCoordSuite) TestDropVirtualChannel() { s.Run("datacoord_return_error", func() { s.dc.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything). Return(nil, errors.New("mock")) - err := s.broker.DropVirtualChannel(ctx, req) + _, err := s.broker.DropVirtualChannel(ctx, req) s.Error(err) s.resetMock() }) @@ -276,7 +276,7 @@ func (s *dataCoordSuite) TestDropVirtualChannel() { s.Run("datacoord_return_failure_status", func() { s.dc.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything). Return(&datapb.DropVirtualChannelResponse{Status: merr.Status(errors.New("mock"))}, nil) - err := s.broker.DropVirtualChannel(ctx, req) + _, err := s.broker.DropVirtualChannel(ctx, req) s.Error(err) s.resetMock() }) @@ -284,7 +284,7 @@ func (s *dataCoordSuite) TestDropVirtualChannel() { s.Run("datacoord_return_legacy_MetaFailed", func() { s.dc.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything). Return(&datapb.DropVirtualChannelResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_MetaFailed}}, nil) - err := s.broker.DropVirtualChannel(ctx, req) + _, err := s.broker.DropVirtualChannel(ctx, req) s.Error(err) s.ErrorIs(err, merr.ErrChannelNotFound) s.resetMock() diff --git a/internal/datanode/broker/mock_broker.go b/internal/datanode/broker/mock_broker.go index 15b86a9954..894380acdd 100644 --- a/internal/datanode/broker/mock_broker.go +++ b/internal/datanode/broker/mock_broker.go @@ -214,17 +214,29 @@ func (_c *MockBroker_DescribeCollection_Call) RunAndReturn(run func(context.Cont } // DropVirtualChannel provides a mock function with given fields: ctx, req -func (_m *MockBroker) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) error { +func (_m *MockBroker) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) { ret := _m.Called(ctx, req) - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *datapb.DropVirtualChannelRequest) error); ok { + var r0 *datapb.DropVirtualChannelResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error)); ok { + return rf(ctx, req) + } + if rf, ok := ret.Get(0).(func(context.Context, *datapb.DropVirtualChannelRequest) *datapb.DropVirtualChannelResponse); ok { r0 = rf(ctx, req) } else { - r0 = ret.Error(0) + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datapb.DropVirtualChannelResponse) + } } - return r0 + if rf, ok := ret.Get(1).(func(context.Context, *datapb.DropVirtualChannelRequest) error); ok { + r1 = rf(ctx, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } // MockBroker_DropVirtualChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropVirtualChannel' @@ -246,12 +258,12 @@ func (_c *MockBroker_DropVirtualChannel_Call) Run(run func(ctx context.Context, return _c } -func (_c *MockBroker_DropVirtualChannel_Call) Return(_a0 error) *MockBroker_DropVirtualChannel_Call { - _c.Call.Return(_a0) +func (_c *MockBroker_DropVirtualChannel_Call) Return(_a0 *datapb.DropVirtualChannelResponse, _a1 error) *MockBroker_DropVirtualChannel_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *MockBroker_DropVirtualChannel_Call) RunAndReturn(run func(context.Context, *datapb.DropVirtualChannelRequest) error) *MockBroker_DropVirtualChannel_Call { +func (_c *MockBroker_DropVirtualChannel_Call) RunAndReturn(run func(context.Context, *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error)) *MockBroker_DropVirtualChannel_Call { _c.Call.Return(run) return _c } diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 114797ae20..75f9cc5896 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -198,7 +198,7 @@ func TestDataSyncService_Start(t *testing.T) { broker := broker.NewMockBroker(t) broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() - broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe() + broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe() broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() node.broker = broker @@ -376,7 +376,7 @@ func TestDataSyncService_Close(t *testing.T) { broker := broker.NewMockBroker(t) broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() - broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe() + broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe() broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() node.broker = broker diff --git a/internal/datanode/event_manager_test.go b/internal/datanode/event_manager_test.go index f41421246d..05d8bc4013 100644 --- a/internal/datanode/event_manager_test.go +++ b/internal/datanode/event_manager_test.go @@ -66,7 +66,7 @@ func TestWatchChannel(t *testing.T) { broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe() - broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe() + broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe() broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() node.broker = broker diff --git a/internal/datanode/flow_graph_manager_test.go b/internal/datanode/flow_graph_manager_test.go index 88098a7eb6..b0ecbaedc3 100644 --- a/internal/datanode/flow_graph_manager_test.go +++ b/internal/datanode/flow_graph_manager_test.go @@ -60,7 +60,7 @@ func TestFlowGraphManager(t *testing.T) { broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe() - broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe() + broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe() broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything). Return(&milvuspb.DescribeCollectionResponse{ diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index 9822682f0e..3dbf1febd3 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -29,6 +29,7 @@ import ( "go.uber.org/zap" "golang.org/x/sync/errgroup" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -819,10 +820,12 @@ func dropVirtualChannelFunc(dsService *dataSyncService, opts ...retry.Option) fl req.Segments = segments err := retry.Do(context.Background(), func() error { - err := dsService.broker.DropVirtualChannel(context.Background(), req) - if err != nil { + resp, err := dsService.broker.DropVirtualChannel(context.Background(), req) + if err != nil || + resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { // meta error, datanode handles a virtual channel does not belong here - if errors.Is(err, merr.ErrChannelNotFound) { + if errors.Is(err, merr.ErrChannelNotFound) || + resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_MetaFailed { log.Warn("meta error found, skip sync and start to drop virtual channel", zap.String("channel", dsService.vchannelName)) return nil } diff --git a/internal/datanode/flush_manager_test.go b/internal/datanode/flush_manager_test.go index 5c499c90a1..64f5e4d3a0 100644 --- a/internal/datanode/flush_manager_test.go +++ b/internal/datanode/flush_manager_test.go @@ -663,7 +663,7 @@ func TestFlushNotifyFunc(t *testing.T) { Schema: meta.GetSchema(), }, nil).Maybe() broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() - broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe() + broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -748,7 +748,7 @@ func TestDropVirtualChannelFunc(t *testing.T) { Schema: meta.GetSchema(), }, nil) broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() - broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe() + broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe() vchanName := "vchan_01" ctx, cancel := context.WithCancel(context.Background()) @@ -811,7 +811,7 @@ func TestDropVirtualChannelFunc(t *testing.T) { t.Run("datacoord_return_error", func(t *testing.T) { broker.ExpectedCalls = nil broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything). - Return(errors.New("mock")) + Return(nil, errors.New("mock")) assert.Panics(t, func() { dropFunc(nil) }) @@ -822,7 +822,7 @@ func TestDropVirtualChannelFunc(t *testing.T) { t.Run("datacoord_return_channel_not_found", func(t *testing.T) { broker.ExpectedCalls = nil broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything). - Return(merr.WrapErrChannelNotFound("channel")) + Return(nil, merr.WrapErrChannelNotFound("channel")) assert.NotPanics(t, func() { dropFunc(nil) })