mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
enhance: Remove compaction plans on the datanode (#33548)
issue: #33546 --------- Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
703fc73f71
commit
feeb869ff9
@ -266,17 +266,21 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
|
|||||||
func (t *l0CompactionTask) processMetaSaved() bool {
|
func (t *l0CompactionTask) processMetaSaved() bool {
|
||||||
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
|
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.resetSegmentCompacting()
|
return t.processCompleted()
|
||||||
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
|
|
||||||
log.Info("handleCompactionResult: success to handle l0 compaction result")
|
|
||||||
}
|
}
|
||||||
return err == nil
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *l0CompactionTask) processCompleted() bool {
|
func (t *l0CompactionTask) processCompleted() bool {
|
||||||
for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() {
|
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
|
||||||
t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false)
|
PlanID: t.GetPlanID(),
|
||||||
|
}); err != nil {
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.resetSegmentCompacting()
|
||||||
|
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
|
||||||
|
log.Info("handleCompactionResult: success to handle l0 compaction result")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -292,6 +296,12 @@ func (t *l0CompactionTask) processTimeout() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *l0CompactionTask) processFailed() bool {
|
func (t *l0CompactionTask) processFailed() bool {
|
||||||
|
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
|
||||||
|
PlanID: t.GetPlanID(),
|
||||||
|
}); err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
t.resetSegmentCompacting()
|
t.resetSegmentCompacting()
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|||||||
@ -50,11 +50,9 @@ func (t *mixCompactionTask) processPipelining() bool {
|
|||||||
func (t *mixCompactionTask) processMetaSaved() bool {
|
func (t *mixCompactionTask) processMetaSaved() bool {
|
||||||
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
|
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.resetSegmentCompacting()
|
return t.processCompleted()
|
||||||
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
|
|
||||||
log.Info("handleCompactionResult: success to handle merge compaction result")
|
|
||||||
}
|
}
|
||||||
return err == nil
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *mixCompactionTask) processExecuting() bool {
|
func (t *mixCompactionTask) processExecuting() bool {
|
||||||
@ -77,11 +75,13 @@ func (t *mixCompactionTask) processExecuting() bool {
|
|||||||
return false
|
return false
|
||||||
case commonpb.CompactionState_Completed:
|
case commonpb.CompactionState_Completed:
|
||||||
t.result = result
|
t.result = result
|
||||||
result := t.result
|
|
||||||
if len(result.GetSegments()) == 0 || len(result.GetSegments()) > 1 {
|
if len(result.GetSegments()) == 0 || len(result.GetSegments()) > 1 {
|
||||||
log.Info("illegal compaction results")
|
log.Info("illegal compaction results")
|
||||||
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
|
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
|
||||||
return err == nil
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return t.processFailed()
|
||||||
}
|
}
|
||||||
saveSuccess := t.saveSegmentMeta()
|
saveSuccess := t.saveSegmentMeta()
|
||||||
if !saveSuccess {
|
if !saveSuccess {
|
||||||
@ -160,10 +160,16 @@ func (t *mixCompactionTask) NeedReAssignNodeID() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *mixCompactionTask) processCompleted() bool {
|
func (t *mixCompactionTask) processCompleted() bool {
|
||||||
for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() {
|
err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
|
||||||
t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false)
|
PlanID: t.GetPlanID(),
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.resetSegmentCompacting()
|
||||||
|
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
|
||||||
|
log.Info("handleCompactionResult: success to handle merge compaction result")
|
||||||
}
|
}
|
||||||
return true
|
|
||||||
|
return err == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *mixCompactionTask) resetSegmentCompacting() {
|
func (t *mixCompactionTask) resetSegmentCompacting() {
|
||||||
@ -206,8 +212,14 @@ func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compa
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *mixCompactionTask) processFailed() bool {
|
func (t *mixCompactionTask) processFailed() bool {
|
||||||
|
err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
|
||||||
|
PlanID: t.GetPlanID(),
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
t.resetSegmentCompacting()
|
t.resetSegmentCompacting()
|
||||||
return true
|
}
|
||||||
|
|
||||||
|
return err == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *mixCompactionTask) checkTimeout() bool {
|
func (t *mixCompactionTask) checkTimeout() bool {
|
||||||
|
|||||||
@ -570,6 +570,8 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
|
|||||||
Segments: []*datapb.CompactionSegment{{PlanID: 6}},
|
Segments: []*datapb.CompactionSegment{{PlanID: 6}},
|
||||||
}, nil).Once()
|
}, nil).Once()
|
||||||
|
|
||||||
|
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
|
||||||
|
|
||||||
inTasks := map[int64]CompactionTask{
|
inTasks := map[int64]CompactionTask{
|
||||||
1: &mixCompactionTask{
|
1: &mixCompactionTask{
|
||||||
CompactionTask: &datapb.CompactionTask{
|
CompactionTask: &datapb.CompactionTask{
|
||||||
@ -775,6 +777,7 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
s.mockSessMgr.EXPECT().GetCompactionPlanResult(UniqueID(111), int64(1)).Return(&compactionResult, nil).Once()
|
s.mockSessMgr.EXPECT().GetCompactionPlanResult(UniqueID(111), int64(1)).Return(&compactionResult, nil).Once()
|
||||||
|
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
|
||||||
|
|
||||||
s.handler.submitTask(task)
|
s.handler.submitTask(task)
|
||||||
s.handler.doSchedule()
|
s.handler.doSchedule()
|
||||||
|
|||||||
@ -179,50 +179,6 @@ func (_c *MockRWChannelStore_GetNodeChannelCount_Call) RunAndReturn(run func(int
|
|||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetNodeChannelsByCollectionID provides a mock function with given fields: collectionID
|
|
||||||
func (_m *MockRWChannelStore) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string {
|
|
||||||
ret := _m.Called(collectionID)
|
|
||||||
|
|
||||||
var r0 map[int64][]string
|
|
||||||
if rf, ok := ret.Get(0).(func(int64) map[int64][]string); ok {
|
|
||||||
r0 = rf(collectionID)
|
|
||||||
} else {
|
|
||||||
if ret.Get(0) != nil {
|
|
||||||
r0 = ret.Get(0).(map[int64][]string)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return r0
|
|
||||||
}
|
|
||||||
|
|
||||||
// MockRWChannelStore_GetNodeChannelsByCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelsByCollectionID'
|
|
||||||
type MockRWChannelStore_GetNodeChannelsByCollectionID_Call struct {
|
|
||||||
*mock.Call
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetNodeChannelsByCollectionID is a helper method to define mock.On call
|
|
||||||
// - collectionID int64
|
|
||||||
func (_e *MockRWChannelStore_Expecter) GetNodeChannelsByCollectionID(collectionID interface{}) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
|
|
||||||
return &MockRWChannelStore_GetNodeChannelsByCollectionID_Call{Call: _e.mock.On("GetNodeChannelsByCollectionID", collectionID)}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Run(run func(collectionID int64)) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
|
|
||||||
_c.Call.Run(func(args mock.Arguments) {
|
|
||||||
run(args[0].(int64))
|
|
||||||
})
|
|
||||||
return _c
|
|
||||||
}
|
|
||||||
|
|
||||||
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Return(_a0 map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
|
|
||||||
_c.Call.Return(_a0)
|
|
||||||
return _c
|
|
||||||
}
|
|
||||||
|
|
||||||
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) RunAndReturn(run func(int64) map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
|
|
||||||
_c.Call.Return(run)
|
|
||||||
return _c
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetNodeChannelsBy provides a mock function with given fields: nodeSelector, channelSelectors
|
// GetNodeChannelsBy provides a mock function with given fields: nodeSelector, channelSelectors
|
||||||
func (_m *MockRWChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo {
|
func (_m *MockRWChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo {
|
||||||
_va := make([]interface{}, len(channelSelectors))
|
_va := make([]interface{}, len(channelSelectors))
|
||||||
@ -282,6 +238,50 @@ func (_c *MockRWChannelStore_GetNodeChannelsBy_Call) RunAndReturn(run func(NodeS
|
|||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetNodeChannelsByCollectionID provides a mock function with given fields: collectionID
|
||||||
|
func (_m *MockRWChannelStore) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string {
|
||||||
|
ret := _m.Called(collectionID)
|
||||||
|
|
||||||
|
var r0 map[int64][]string
|
||||||
|
if rf, ok := ret.Get(0).(func(int64) map[int64][]string); ok {
|
||||||
|
r0 = rf(collectionID)
|
||||||
|
} else {
|
||||||
|
if ret.Get(0) != nil {
|
||||||
|
r0 = ret.Get(0).(map[int64][]string)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockRWChannelStore_GetNodeChannelsByCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelsByCollectionID'
|
||||||
|
type MockRWChannelStore_GetNodeChannelsByCollectionID_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetNodeChannelsByCollectionID is a helper method to define mock.On call
|
||||||
|
// - collectionID int64
|
||||||
|
func (_e *MockRWChannelStore_Expecter) GetNodeChannelsByCollectionID(collectionID interface{}) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
|
||||||
|
return &MockRWChannelStore_GetNodeChannelsByCollectionID_Call{Call: _e.mock.On("GetNodeChannelsByCollectionID", collectionID)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Run(run func(collectionID int64)) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(int64))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Return(_a0 map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
|
||||||
|
_c.Call.Return(_a0)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) RunAndReturn(run func(int64) map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
|
||||||
|
_c.Call.Return(run)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// GetNodes provides a mock function with given fields:
|
// GetNodes provides a mock function with given fields:
|
||||||
func (_m *MockRWChannelStore) GetNodes() []int64 {
|
func (_m *MockRWChannelStore) GetNodes() []int64 {
|
||||||
ret := _m.Called()
|
ret := _m.Called()
|
||||||
|
|||||||
@ -376,58 +376,6 @@ func (_c *MockChannelManager_GetNodeChannelsByCollectionID_Call) RunAndReturn(ru
|
|||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetNodeIDByChannelName provides a mock function with given fields: channel
|
|
||||||
func (_m *MockChannelManager) GetNodeIDByChannelName(channel string) (int64, bool) {
|
|
||||||
ret := _m.Called(channel)
|
|
||||||
|
|
||||||
var r0 int64
|
|
||||||
var r1 bool
|
|
||||||
if rf, ok := ret.Get(0).(func(string) (int64, bool)); ok {
|
|
||||||
return rf(channel)
|
|
||||||
}
|
|
||||||
if rf, ok := ret.Get(0).(func(string) int64); ok {
|
|
||||||
r0 = rf(channel)
|
|
||||||
} else {
|
|
||||||
r0 = ret.Get(0).(int64)
|
|
||||||
}
|
|
||||||
|
|
||||||
if rf, ok := ret.Get(1).(func(string) bool); ok {
|
|
||||||
r1 = rf(channel)
|
|
||||||
} else {
|
|
||||||
r1 = ret.Get(1).(bool)
|
|
||||||
}
|
|
||||||
|
|
||||||
return r0, r1
|
|
||||||
}
|
|
||||||
|
|
||||||
// MockChannelManager_GetNodeIDByChannelName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeIDByChannelName'
|
|
||||||
type MockChannelManager_GetNodeIDByChannelName_Call struct {
|
|
||||||
*mock.Call
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetNodeIDByChannelName is a helper method to define mock.On call
|
|
||||||
// - channel string
|
|
||||||
func (_e *MockChannelManager_Expecter) GetNodeIDByChannelName(channel interface{}) *MockChannelManager_GetNodeIDByChannelName_Call {
|
|
||||||
return &MockChannelManager_GetNodeIDByChannelName_Call{Call: _e.mock.On("GetNodeIDByChannelName", channel)}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (_c *MockChannelManager_GetNodeIDByChannelName_Call) Run(run func(channel string)) *MockChannelManager_GetNodeIDByChannelName_Call {
|
|
||||||
_c.Call.Run(func(args mock.Arguments) {
|
|
||||||
run(args[0].(string))
|
|
||||||
})
|
|
||||||
return _c
|
|
||||||
}
|
|
||||||
|
|
||||||
func (_c *MockChannelManager_GetNodeIDByChannelName_Call) Return(_a0 int64, _a1 bool) *MockChannelManager_GetNodeIDByChannelName_Call {
|
|
||||||
_c.Call.Return(_a0, _a1)
|
|
||||||
return _c
|
|
||||||
}
|
|
||||||
|
|
||||||
func (_c *MockChannelManager_GetNodeIDByChannelName_Call) RunAndReturn(run func(string) (int64, bool)) *MockChannelManager_GetNodeIDByChannelName_Call {
|
|
||||||
_c.Call.Return(run)
|
|
||||||
return _c
|
|
||||||
}
|
|
||||||
|
|
||||||
// Match provides a mock function with given fields: nodeID, channel
|
// Match provides a mock function with given fields: nodeID, channel
|
||||||
func (_m *MockChannelManager) Match(nodeID int64, channel string) bool {
|
func (_m *MockChannelManager) Match(nodeID int64, channel string) bool {
|
||||||
ret := _m.Called(nodeID, channel)
|
ret := _m.Called(nodeID, channel)
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
// Code generated by mockery v2.30.1. DO NOT EDIT.
|
// Code generated by mockery v2.32.4. DO NOT EDIT.
|
||||||
|
|
||||||
package datacoord
|
package datacoord
|
||||||
|
|
||||||
|
|||||||
@ -264,6 +264,49 @@ func (_c *MockSessionManager_DeleteSession_Call) RunAndReturn(run func(*NodeInfo
|
|||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DropCompactionPlan provides a mock function with given fields: nodeID, req
|
||||||
|
func (_m *MockSessionManager) DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error {
|
||||||
|
ret := _m.Called(nodeID, req)
|
||||||
|
|
||||||
|
var r0 error
|
||||||
|
if rf, ok := ret.Get(0).(func(int64, *datapb.DropCompactionPlanRequest) error); ok {
|
||||||
|
r0 = rf(nodeID, req)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockSessionManager_DropCompactionPlan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropCompactionPlan'
|
||||||
|
type MockSessionManager_DropCompactionPlan_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// DropCompactionPlan is a helper method to define mock.On call
|
||||||
|
// - nodeID int64
|
||||||
|
// - req *datapb.DropCompactionPlanRequest
|
||||||
|
func (_e *MockSessionManager_Expecter) DropCompactionPlan(nodeID interface{}, req interface{}) *MockSessionManager_DropCompactionPlan_Call {
|
||||||
|
return &MockSessionManager_DropCompactionPlan_Call{Call: _e.mock.On("DropCompactionPlan", nodeID, req)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockSessionManager_DropCompactionPlan_Call) Run(run func(nodeID int64, req *datapb.DropCompactionPlanRequest)) *MockSessionManager_DropCompactionPlan_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(int64), args[1].(*datapb.DropCompactionPlanRequest))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockSessionManager_DropCompactionPlan_Call) Return(_a0 error) *MockSessionManager_DropCompactionPlan_Call {
|
||||||
|
_c.Call.Return(_a0)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockSessionManager_DropCompactionPlan_Call) RunAndReturn(run func(int64, *datapb.DropCompactionPlanRequest) error) *MockSessionManager_DropCompactionPlan_Call {
|
||||||
|
_c.Call.Return(run)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// DropImport provides a mock function with given fields: nodeID, in
|
// DropImport provides a mock function with given fields: nodeID, in
|
||||||
func (_m *MockSessionManager) DropImport(nodeID int64, in *datapb.DropImportRequest) error {
|
func (_m *MockSessionManager) DropImport(nodeID int64, in *datapb.DropImportRequest) error {
|
||||||
ret := _m.Called(nodeID, in)
|
ret := _m.Called(nodeID, in)
|
||||||
|
|||||||
@ -323,6 +323,10 @@ func (c *mockDataNodeClient) QuerySlot(ctx context.Context, req *datapb.QuerySlo
|
|||||||
return &datapb.QuerySlotResponse{Status: merr.Success()}, nil
|
return &datapb.QuerySlotResponse{Status: merr.Success()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *mockDataNodeClient) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||||
|
return merr.Success(), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *mockDataNodeClient) Stop() error {
|
func (c *mockDataNodeClient) Stop() error {
|
||||||
c.state = commonpb.StateCode_Abnormal
|
c.state = commonpb.StateCode_Abnormal
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@ -21,6 +21,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/cockroachdb/errors"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
@ -70,6 +71,7 @@ type SessionManager interface {
|
|||||||
DropImport(nodeID int64, in *datapb.DropImportRequest) error
|
DropImport(nodeID int64, in *datapb.DropImportRequest) error
|
||||||
CheckHealth(ctx context.Context) error
|
CheckHealth(ctx context.Context) error
|
||||||
QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error)
|
QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error)
|
||||||
|
DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error
|
||||||
Close()
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -547,6 +549,43 @@ func (c *SessionManagerImpl) QuerySlot(nodeID int64) (*datapb.QuerySlotResponse,
|
|||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *SessionManagerImpl) DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error {
|
||||||
|
log := log.With(
|
||||||
|
zap.Int64("nodeID", nodeID),
|
||||||
|
zap.Int64("planID", req.GetPlanID()),
|
||||||
|
)
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second))
|
||||||
|
defer cancel()
|
||||||
|
cli, err := c.getClient(ctx, nodeID)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, merr.ErrNodeNotFound) {
|
||||||
|
log.Info("node not found, skip dropping compaction plan")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
log.Warn("failed to get client", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = retry.Do(context.Background(), func() error {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second))
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
resp, err := cli.DropCompactionPlan(ctx, req)
|
||||||
|
if err := VerifyResponse(resp, err); err != nil {
|
||||||
|
log.Warn("failed to drop compaction plan", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("failed to drop compaction plan after retry", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("success to drop compaction plan")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Close release sessions
|
// Close release sessions
|
||||||
func (c *SessionManagerImpl) Close() {
|
func (c *SessionManagerImpl) Close() {
|
||||||
c.sessions.Lock()
|
c.sessions.Lock()
|
||||||
|
|||||||
@ -542,3 +542,13 @@ func (node *DataNode) QuerySlot(ctx context.Context, req *datapb.QuerySlotReques
|
|||||||
NumSlots: Params.DataNodeCfg.SlotCap.GetAsInt64() - int64(node.compactionExecutor.executing.Len()),
|
NumSlots: Params.DataNodeCfg.SlotCap.GetAsInt64() - int64(node.compactionExecutor.executing.Len()),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (node *DataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) {
|
||||||
|
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
|
||||||
|
return merr.Status(err), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
node.compactionExecutor.removeTask(req.GetPlanID())
|
||||||
|
log.Ctx(ctx).Info("DropCompactionPlans success", zap.Int64("planID", req.GetPlanID()))
|
||||||
|
return merr.Success(), nil
|
||||||
|
}
|
||||||
|
|||||||
@ -529,3 +529,28 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
|
|||||||
s.False(merr.Ok(status))
|
s.False(merr.Ok(status))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *DataNodeServicesSuite) TestDropCompactionPlan() {
|
||||||
|
s.Run("node not healthy", func() {
|
||||||
|
s.SetupTest()
|
||||||
|
s.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
status, err := s.node.DropCompactionPlan(ctx, nil)
|
||||||
|
s.NoError(err)
|
||||||
|
s.False(merr.Ok(status))
|
||||||
|
s.ErrorIs(merr.Error(status), merr.ErrServiceNotReady)
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("normal case", func() {
|
||||||
|
s.SetupTest()
|
||||||
|
ctx := context.Background()
|
||||||
|
req := &datapb.DropCompactionPlanRequest{
|
||||||
|
PlanID: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
status, err := s.node.DropCompactionPlan(ctx, req)
|
||||||
|
s.NoError(err)
|
||||||
|
s.True(merr.Ok(status))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@ -261,3 +261,9 @@ func (c *Client) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest, op
|
|||||||
return client.QuerySlot(ctx, req)
|
return client.QuerySlot(ctx, req)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||||
|
return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*commonpb.Status, error) {
|
||||||
|
return client.DropCompactionPlan(ctx, req)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@ -83,6 +83,9 @@ func Test_NewClient(t *testing.T) {
|
|||||||
|
|
||||||
r13, err := client.CheckChannelOperationProgress(ctx, nil)
|
r13, err := client.CheckChannelOperationProgress(ctx, nil)
|
||||||
retCheck(retNotNil, r13, err)
|
retCheck(retNotNil, r13, err)
|
||||||
|
|
||||||
|
r14, err := client.DropCompactionPlan(ctx, nil)
|
||||||
|
retCheck(retNotNil, r14, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
client.(*Client).grpcClient = &mock.GRPCClientBase[datapb.DataNodeClient]{
|
client.(*Client).grpcClient = &mock.GRPCClientBase[datapb.DataNodeClient]{
|
||||||
|
|||||||
@ -406,3 +406,7 @@ func (s *Server) DropImport(ctx context.Context, req *datapb.DropImportRequest)
|
|||||||
func (s *Server) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest) (*datapb.QuerySlotResponse, error) {
|
func (s *Server) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest) (*datapb.QuerySlotResponse, error) {
|
||||||
return s.datanode.QuerySlot(ctx, req)
|
return s.datanode.QuerySlot(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) {
|
||||||
|
return s.datanode.DropCompactionPlan(ctx, req)
|
||||||
|
}
|
||||||
|
|||||||
@ -181,6 +181,10 @@ func (m *MockDataNode) QuerySlot(ctx context.Context, req *datapb.QuerySlotReque
|
|||||||
return &datapb.QuerySlotResponse{}, m.err
|
return &datapb.QuerySlotResponse{}, m.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MockDataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) {
|
||||||
|
return m.status, m.err
|
||||||
|
}
|
||||||
|
|
||||||
// /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
// /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
func Test_NewServer(t *testing.T) {
|
func Test_NewServer(t *testing.T) {
|
||||||
paramtable.Init()
|
paramtable.Init()
|
||||||
@ -317,6 +321,15 @@ func Test_NewServer(t *testing.T) {
|
|||||||
assert.NotNil(t, resp)
|
assert.NotNil(t, resp)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("DropCompactionPlans", func(t *testing.T) {
|
||||||
|
server.datanode = &MockDataNode{
|
||||||
|
status: &commonpb.Status{},
|
||||||
|
}
|
||||||
|
resp, err := server.DropCompactionPlan(ctx, nil)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NotNil(t, resp)
|
||||||
|
})
|
||||||
|
|
||||||
err = server.Stop()
|
err = server.Stop()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -142,6 +142,61 @@ func (_c *MockDataNode_Compaction_Call) RunAndReturn(run func(context.Context, *
|
|||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DropCompactionPlan provides a mock function with given fields: _a0, _a1
|
||||||
|
func (_m *MockDataNode) DropCompactionPlan(_a0 context.Context, _a1 *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) {
|
||||||
|
ret := _m.Called(_a0, _a1)
|
||||||
|
|
||||||
|
var r0 *commonpb.Status
|
||||||
|
var r1 error
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, *datapb.DropCompactionPlanRequest) (*commonpb.Status, error)); ok {
|
||||||
|
return rf(_a0, _a1)
|
||||||
|
}
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, *datapb.DropCompactionPlanRequest) *commonpb.Status); ok {
|
||||||
|
r0 = rf(_a0, _a1)
|
||||||
|
} else {
|
||||||
|
if ret.Get(0) != nil {
|
||||||
|
r0 = ret.Get(0).(*commonpb.Status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if rf, ok := ret.Get(1).(func(context.Context, *datapb.DropCompactionPlanRequest) error); ok {
|
||||||
|
r1 = rf(_a0, _a1)
|
||||||
|
} else {
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockDataNode_DropCompactionPlan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropCompactionPlan'
|
||||||
|
type MockDataNode_DropCompactionPlan_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// DropCompactionPlan is a helper method to define mock.On call
|
||||||
|
// - _a0 context.Context
|
||||||
|
// - _a1 *datapb.DropCompactionPlanRequest
|
||||||
|
func (_e *MockDataNode_Expecter) DropCompactionPlan(_a0 interface{}, _a1 interface{}) *MockDataNode_DropCompactionPlan_Call {
|
||||||
|
return &MockDataNode_DropCompactionPlan_Call{Call: _e.mock.On("DropCompactionPlan", _a0, _a1)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockDataNode_DropCompactionPlan_Call) Run(run func(_a0 context.Context, _a1 *datapb.DropCompactionPlanRequest)) *MockDataNode_DropCompactionPlan_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(context.Context), args[1].(*datapb.DropCompactionPlanRequest))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockDataNode_DropCompactionPlan_Call) Return(_a0 *commonpb.Status, _a1 error) *MockDataNode_DropCompactionPlan_Call {
|
||||||
|
_c.Call.Return(_a0, _a1)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockDataNode_DropCompactionPlan_Call) RunAndReturn(run func(context.Context, *datapb.DropCompactionPlanRequest) (*commonpb.Status, error)) *MockDataNode_DropCompactionPlan_Call {
|
||||||
|
_c.Call.Return(run)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// DropImport provides a mock function with given fields: _a0, _a1
|
// DropImport provides a mock function with given fields: _a0, _a1
|
||||||
func (_m *MockDataNode) DropImport(_a0 context.Context, _a1 *datapb.DropImportRequest) (*commonpb.Status, error) {
|
func (_m *MockDataNode) DropImport(_a0 context.Context, _a1 *datapb.DropImportRequest) (*commonpb.Status, error) {
|
||||||
ret := _m.Called(_a0, _a1)
|
ret := _m.Called(_a0, _a1)
|
||||||
|
|||||||
@ -212,6 +212,76 @@ func (_c *MockDataNodeClient_Compaction_Call) RunAndReturn(run func(context.Cont
|
|||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DropCompactionPlan provides a mock function with given fields: ctx, in, opts
|
||||||
|
func (_m *MockDataNodeClient) DropCompactionPlan(ctx context.Context, in *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||||
|
_va := make([]interface{}, len(opts))
|
||||||
|
for _i := range opts {
|
||||||
|
_va[_i] = opts[_i]
|
||||||
|
}
|
||||||
|
var _ca []interface{}
|
||||||
|
_ca = append(_ca, ctx, in)
|
||||||
|
_ca = append(_ca, _va...)
|
||||||
|
ret := _m.Called(_ca...)
|
||||||
|
|
||||||
|
var r0 *commonpb.Status
|
||||||
|
var r1 error
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, *datapb.DropCompactionPlanRequest, ...grpc.CallOption) (*commonpb.Status, error)); ok {
|
||||||
|
return rf(ctx, in, opts...)
|
||||||
|
}
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, *datapb.DropCompactionPlanRequest, ...grpc.CallOption) *commonpb.Status); ok {
|
||||||
|
r0 = rf(ctx, in, opts...)
|
||||||
|
} else {
|
||||||
|
if ret.Get(0) != nil {
|
||||||
|
r0 = ret.Get(0).(*commonpb.Status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if rf, ok := ret.Get(1).(func(context.Context, *datapb.DropCompactionPlanRequest, ...grpc.CallOption) error); ok {
|
||||||
|
r1 = rf(ctx, in, opts...)
|
||||||
|
} else {
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockDataNodeClient_DropCompactionPlan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropCompactionPlan'
|
||||||
|
type MockDataNodeClient_DropCompactionPlan_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// DropCompactionPlan is a helper method to define mock.On call
|
||||||
|
// - ctx context.Context
|
||||||
|
// - in *datapb.DropCompactionPlanRequest
|
||||||
|
// - opts ...grpc.CallOption
|
||||||
|
func (_e *MockDataNodeClient_Expecter) DropCompactionPlan(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_DropCompactionPlan_Call {
|
||||||
|
return &MockDataNodeClient_DropCompactionPlan_Call{Call: _e.mock.On("DropCompactionPlan",
|
||||||
|
append([]interface{}{ctx, in}, opts...)...)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockDataNodeClient_DropCompactionPlan_Call) Run(run func(ctx context.Context, in *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption)) *MockDataNodeClient_DropCompactionPlan_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
variadicArgs := make([]grpc.CallOption, len(args)-2)
|
||||||
|
for i, a := range args[2:] {
|
||||||
|
if a != nil {
|
||||||
|
variadicArgs[i] = a.(grpc.CallOption)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
run(args[0].(context.Context), args[1].(*datapb.DropCompactionPlanRequest), variadicArgs...)
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockDataNodeClient_DropCompactionPlan_Call) Return(_a0 *commonpb.Status, _a1 error) *MockDataNodeClient_DropCompactionPlan_Call {
|
||||||
|
_c.Call.Return(_a0, _a1)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockDataNodeClient_DropCompactionPlan_Call) RunAndReturn(run func(context.Context, *datapb.DropCompactionPlanRequest, ...grpc.CallOption) (*commonpb.Status, error)) *MockDataNodeClient_DropCompactionPlan_Call {
|
||||||
|
_c.Call.Return(run)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// DropImport provides a mock function with given fields: ctx, in, opts
|
// DropImport provides a mock function with given fields: ctx, in, opts
|
||||||
func (_m *MockDataNodeClient) DropImport(ctx context.Context, in *datapb.DropImportRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
func (_m *MockDataNodeClient) DropImport(ctx context.Context, in *datapb.DropImportRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||||
_va := make([]interface{}, len(opts))
|
_va := make([]interface{}, len(opts))
|
||||||
|
|||||||
@ -129,6 +129,8 @@ service DataNode {
|
|||||||
rpc DropImport(DropImportRequest) returns(common.Status) {}
|
rpc DropImport(DropImportRequest) returns(common.Status) {}
|
||||||
|
|
||||||
rpc QuerySlot(QuerySlotRequest) returns(QuerySlotResponse) {}
|
rpc QuerySlot(QuerySlotRequest) returns(QuerySlotResponse) {}
|
||||||
|
|
||||||
|
rpc DropCompactionPlan(DropCompactionPlanRequest) returns(common.Status) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
message FlushRequest {
|
message FlushRequest {
|
||||||
@ -892,3 +894,7 @@ message CompactionTask{
|
|||||||
int64 nodeID = 18;
|
int64 nodeID = 18;
|
||||||
schema.CollectionSchema schema = 19;
|
schema.CollectionSchema schema = 19;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message DropCompactionPlanRequest {
|
||||||
|
int64 planID = 1;
|
||||||
|
}
|
||||||
|
|||||||
@ -108,3 +108,7 @@ func (m *GrpcDataNodeClient) DropImport(ctx context.Context, req *datapb.DropImp
|
|||||||
func (m *GrpcDataNodeClient) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest, opts ...grpc.CallOption) (*datapb.QuerySlotResponse, error) {
|
func (m *GrpcDataNodeClient) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest, opts ...grpc.CallOption) (*datapb.QuerySlotResponse, error) {
|
||||||
return &datapb.QuerySlotResponse{}, m.Err
|
return &datapb.QuerySlotResponse{}, m.Err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *GrpcDataNodeClient) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||||
|
return &commonpb.Status{}, m.Err
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user