fix: Clear DN unkown compaction tasks (#30850)

If DC restarted,  those unkonwn compaction tasks
will never get call back in DN, so that the segments in the compaction
task will be locked, unable to sync and compaction again, blocking cp
advance and compaction executing.

See also: #30137

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2024-03-01 11:31:00 +08:00 committed by GitHub
parent 72ac2b1e93
commit 2867f50fcc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 345 additions and 82 deletions

View File

@ -329,12 +329,14 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) {
} }
if plan.GetType() == datapb.CompactionType_MixCompaction { if plan.GetType() == datapb.CompactionType_MixCompaction {
segIDMap := make(map[int64][]*datapb.FieldBinlog, len(plan.SegmentBinlogs))
for _, seg := range plan.GetSegmentBinlogs() { for _, seg := range plan.GetSegmentBinlogs() {
if info := c.meta.GetHealthySegment(seg.GetSegmentID()); info != nil { if info := c.meta.GetHealthySegment(seg.GetSegmentID()); info != nil {
seg.Deltalogs = info.GetDeltalogs() seg.Deltalogs = info.GetDeltalogs()
segIDMap[seg.SegmentID] = info.GetDeltalogs()
} }
} }
log.Info("Compaction handler refreshed mix compaction plan") log.Info("Compaction handler refreshed mix compaction plan", zap.Any("segID2DeltaLogs", segIDMap))
return return
} }
} }
@ -488,73 +490,95 @@ func (c *compactionPlanHandler) getCompaction(planID int64) *compactionTask {
return c.plans[planID] return c.plans[planID]
} }
// expireCompaction set the compaction state to expired
func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure, // Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
// for DC might add new task while GetCompactionState. // for DC might add new task while GetCompactionState.
executingTasks := c.getTasksByState(executing) executingTasks := c.getTasksByState(executing)
timeoutTasks := c.getTasksByState(timeout) timeoutTasks := c.getTasksByState(timeout)
planStates := c.sessions.GetCompactionPlansResults() planStates := c.sessions.GetCompactionPlansResults()
cachedPlans := []int64{}
// TODO reduce the lock range
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock()
for _, task := range executingTasks { for _, task := range executingTasks {
planResult, ok := planStates[task.plan.PlanID] log := log.With(zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
state := planResult.GetState()
planID := task.plan.PlanID planID := task.plan.PlanID
// check whether the state of CompactionPlan is working cachedPlans = append(cachedPlans, planID)
if ok { if nodePlan, ok := planStates[planID]; ok {
if state == commonpb.CompactionState_Completed { planResult := nodePlan.B
log.Info("complete compaction", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID)) switch planResult.GetState() {
err := c.completeCompaction(planResult) case commonpb.CompactionState_Completed:
if err != nil { log.Info("start to complete compaction")
log.Warn("fail to complete compaction", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID), zap.Error(err)) if err := c.completeCompaction(planResult); err != nil {
log.Warn("fail to complete compaction", zap.Error(err))
}
case commonpb.CompactionState_Executing:
if c.isTimeout(ts, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()) {
log.Warn("compaction timeout",
zap.Int32("timeout in seconds", task.plan.GetTimeoutInSeconds()),
zap.Uint64("startTime", task.plan.GetStartTime()),
zap.Uint64("now", ts),
)
c.plans[planID] = c.plans[planID].shadowClone(setState(timeout), endSpan())
} }
continue
} }
// check wether the CompactionPlan is timeout } else {
if state == commonpb.CompactionState_Executing && !c.isTimeout(ts, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()) { // compaction task in DC but not found in DN means the compaction plan has failed
continue log.Info("compaction failed")
}
log.Warn("compaction timeout",
zap.Int64("planID", task.plan.PlanID),
zap.Int64("nodeID", task.dataNodeID),
zap.Uint64("startTime", task.plan.GetStartTime()),
zap.Uint64("now", ts),
)
c.plans[planID] = c.plans[planID].shadowClone(setState(timeout), endSpan())
continue
}
log.Info("compaction failed", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan())
c.setSegmentsCompacting(task.plan, false)
c.scheduler.Finish(task.dataNodeID, task.plan)
}
// Timeout tasks will be timeout and failed in DataNode
// need to wait for DataNode reporting failure and
// clean the status.
for _, task := range timeoutTasks {
stateResult, ok := planStates[task.plan.PlanID]
planID := task.plan.PlanID
if !ok {
log.Info("compaction failed for timeout", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan()) c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan())
c.setSegmentsCompacting(task.plan, false) c.setSegmentsCompacting(task.plan, false)
c.scheduler.Finish(task.dataNodeID, task.plan) c.scheduler.Finish(task.dataNodeID, task.plan)
} }
}
// DataNode will check if plan's are timeout but not as sensitive as DataCoord, // Timeout tasks will be timeout and failed in DataNode
// just wait another round. // need to wait for DataNode reporting failure and clean the status.
if ok && stateResult.GetState() == commonpb.CompactionState_Executing { for _, task := range timeoutTasks {
log.Info("compaction timeout in DataCoord yet DataNode is still running", log := log.With(zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
zap.Int64("planID", planID), planID := task.plan.PlanID
zap.Int64("nodeID", task.dataNodeID)) cachedPlans = append(cachedPlans, planID)
continue if nodePlan, ok := planStates[planID]; ok {
if nodePlan.B.GetState() == commonpb.CompactionState_Executing {
log.RatedInfo(1, "compaction timeout in DataCoord yet DataNode is still running")
}
} else {
// compaction task in DC but not found in DN means the compaction plan has failed
log.Info("compaction failed for timeout")
c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan())
c.setSegmentsCompacting(task.plan, false)
c.scheduler.Finish(task.dataNodeID, task.plan)
} }
} }
c.mu.Unlock()
// Compaction plans in DN but not in DC are unknown plans, need to notify DN to clear it.
// No locks needed, because no changes in DC memeory
completedPlans := lo.PickBy(planStates, func(planID int64, planState *typeutil.Pair[int64, *datapb.CompactionPlanResult]) bool {
return planState.B.GetState() == commonpb.CompactionState_Completed
})
unkonwnPlansInWorker, _ := lo.Difference(lo.Keys(completedPlans), cachedPlans)
for _, planID := range unkonwnPlansInWorker {
if nodeUnkonwnPlan, ok := completedPlans[planID]; ok {
nodeID, plan := nodeUnkonwnPlan.A, nodeUnkonwnPlan.B
log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID))
// Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task
// without changing the meta
req := &datapb.SyncSegmentsRequest{
PlanID: planID,
ChannelName: plan.GetChannel(),
}
log.Info("compaction syncing unknown plan with node")
if err := c.sessions.SyncSegments(nodeID, req); err != nil {
log.Warn("compaction failed to sync segments with node", zap.Error(err))
return err
}
}
}
return nil return nil
} }

View File

@ -76,12 +76,14 @@ func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() {
} }
func (s *CompactionPlanHandlerSuite) TestCheckResult() { func (s *CompactionPlanHandlerSuite) TestCheckResult() {
s.mockSessMgr.EXPECT().GetCompactionPlansResults().Return(map[int64]*datapb.CompactionPlanResult{ s.mockSessMgr.EXPECT().GetCompactionPlansResults().Return(map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]{
1: {PlanID: 1, State: commonpb.CompactionState_Executing}, 1: {A: 100, B: &datapb.CompactionPlanResult{PlanID: 1, State: commonpb.CompactionState_Executing}},
2: {PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}}, 2: {A: 100, B: &datapb.CompactionPlanResult{PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}}},
3: {PlanID: 3, State: commonpb.CompactionState_Executing}, 3: {A: 100, B: &datapb.CompactionPlanResult{PlanID: 3, State: commonpb.CompactionState_Executing}},
4: {PlanID: 4, State: commonpb.CompactionState_Executing}, 4: {A: 100, B: &datapb.CompactionPlanResult{PlanID: 4, State: commonpb.CompactionState_Executing}},
}) })
s.mockSessMgr.EXPECT().SyncSegments(int64(100), mock.Anything).Return(nil).Once()
{ {
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once() s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once()
handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc) handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc)
@ -510,10 +512,11 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
} }
func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() { func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
s.mockSessMgr.EXPECT().GetCompactionPlansResults().Return(map[int64]*datapb.CompactionPlanResult{ s.mockSessMgr.EXPECT().GetCompactionPlansResults().Return(map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]{
1: {PlanID: 1, State: commonpb.CompactionState_Executing}, 1: {A: 111, B: &datapb.CompactionPlanResult{PlanID: 1, State: commonpb.CompactionState_Executing}},
2: {PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}}, 2: {A: 111, B: &datapb.CompactionPlanResult{PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}}},
3: {PlanID: 3, State: commonpb.CompactionState_Executing}, 3: {A: 111, B: &datapb.CompactionPlanResult{PlanID: 3, State: commonpb.CompactionState_Executing}},
5: {A: 222, B: &datapb.CompactionPlanResult{PlanID: 5, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 5}}}},
}) })
inPlans := map[int64]*compactionTask{ inPlans := map[int64]*compactionTask{
@ -521,27 +524,42 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
triggerInfo: &compactionSignal{}, triggerInfo: &compactionSignal{},
plan: &datapb.CompactionPlan{PlanID: 1}, plan: &datapb.CompactionPlan{PlanID: 1},
state: executing, state: executing,
dataNodeID: 111,
}, },
2: { 2: {
triggerInfo: &compactionSignal{}, triggerInfo: &compactionSignal{},
plan: &datapb.CompactionPlan{PlanID: 2}, plan: &datapb.CompactionPlan{PlanID: 2},
state: executing, state: executing,
dataNodeID: 111,
}, },
3: { 3: {
triggerInfo: &compactionSignal{}, triggerInfo: &compactionSignal{},
plan: &datapb.CompactionPlan{PlanID: 3}, plan: &datapb.CompactionPlan{PlanID: 3},
state: timeout, state: timeout,
dataNodeID: 111,
}, },
4: { 4: {
triggerInfo: &compactionSignal{}, triggerInfo: &compactionSignal{},
plan: &datapb.CompactionPlan{PlanID: 4}, plan: &datapb.CompactionPlan{PlanID: 4},
state: timeout, state: timeout,
dataNodeID: 111,
}, },
} }
s.mockSessMgr.EXPECT().SyncSegments(int64(222), mock.Anything).RunAndReturn(func(nodeID int64, req *datapb.SyncSegmentsRequest) error {
s.EqualValues(nodeID, 222)
s.NotNil(req)
s.Empty(req.GetCompactedFrom())
s.EqualValues(5, req.GetPlanID())
return nil
}).Once()
handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc) handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
handler.plans = inPlans handler.plans = inPlans
_, ok := handler.plans[5]
s.Require().False(ok)
err := handler.updateCompaction(0) err := handler.updateCompaction(0)
s.NoError(err) s.NoError(err)

View File

@ -465,13 +465,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
continue continue
} }
segIDMap := make(map[int64][]*datapb.FieldBinlog, len(plan.SegmentBinlogs))
for _, seg := range plan.SegmentBinlogs {
segIDMap[seg.SegmentID] = seg.Deltalogs
}
log.Info("time cost of generating global compaction", log.Info("time cost of generating global compaction",
zap.Any("segID2DeltaLogs", segIDMap),
zap.Int64("planID", plan.PlanID), zap.Int64("planID", plan.PlanID),
zap.Int64("time cost", time.Since(start).Milliseconds()), zap.Int64("time cost", time.Since(start).Milliseconds()),
zap.Int64("collectionID", signal.collectionID), zap.Int64("collectionID", signal.collectionID),

View File

@ -7,6 +7,8 @@ import (
datapb "github.com/milvus-io/milvus/internal/proto/datapb" datapb "github.com/milvus-io/milvus/internal/proto/datapb"
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
typeutil "github.com/milvus-io/milvus/pkg/util/typeutil"
) )
// MockSessionManager is an autogenerated mock type for the SessionManager type // MockSessionManager is an autogenerated mock type for the SessionManager type
@ -398,15 +400,15 @@ func (_c *MockSessionManager_FlushChannels_Call) RunAndReturn(run func(context.C
} }
// GetCompactionPlansResults provides a mock function with given fields: // GetCompactionPlansResults provides a mock function with given fields:
func (_m *MockSessionManager) GetCompactionPlansResults() map[int64]*datapb.CompactionPlanResult { func (_m *MockSessionManager) GetCompactionPlansResults() map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult] {
ret := _m.Called() ret := _m.Called()
var r0 map[int64]*datapb.CompactionPlanResult var r0 map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]
if rf, ok := ret.Get(0).(func() map[int64]*datapb.CompactionPlanResult); ok { if rf, ok := ret.Get(0).(func() map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]); ok {
r0 = rf() r0 = rf()
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).(map[int64]*datapb.CompactionPlanResult) r0 = ret.Get(0).(map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult])
} }
} }
@ -430,12 +432,12 @@ func (_c *MockSessionManager_GetCompactionPlansResults_Call) Run(run func()) *Mo
return _c return _c
} }
func (_c *MockSessionManager_GetCompactionPlansResults_Call) Return(_a0 map[int64]*datapb.CompactionPlanResult) *MockSessionManager_GetCompactionPlansResults_Call { func (_c *MockSessionManager_GetCompactionPlansResults_Call) Return(_a0 map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]) *MockSessionManager_GetCompactionPlansResults_Call {
_c.Call.Return(_a0) _c.Call.Return(_a0)
return _c return _c
} }
func (_c *MockSessionManager_GetCompactionPlansResults_Call) RunAndReturn(run func() map[int64]*datapb.CompactionPlanResult) *MockSessionManager_GetCompactionPlansResults_Call { func (_c *MockSessionManager_GetCompactionPlansResults_Call) RunAndReturn(run func() map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]) *MockSessionManager_GetCompactionPlansResults_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }

View File

@ -58,7 +58,7 @@ type SessionManager interface {
Compaction(ctx context.Context, nodeID int64, plan *datapb.CompactionPlan) error Compaction(ctx context.Context, nodeID int64, plan *datapb.CompactionPlan) error
SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequest) error SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequest) error
Import(ctx context.Context, nodeID int64, itr *datapb.ImportTaskRequest) Import(ctx context.Context, nodeID int64, itr *datapb.ImportTaskRequest)
GetCompactionPlansResults() map[int64]*datapb.CompactionPlanResult GetCompactionPlansResults() map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]
NotifyChannelOperation(ctx context.Context, nodeID int64, req *datapb.ChannelOperationsRequest) error NotifyChannelOperation(ctx context.Context, nodeID int64, req *datapb.ChannelOperationsRequest) error
CheckChannelOperationProgress(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error) CheckChannelOperationProgress(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error)
AddImportSegment(ctx context.Context, nodeID int64, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) AddImportSegment(ctx context.Context, nodeID int64, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error)
@ -263,11 +263,12 @@ func (c *SessionManagerImpl) execImport(ctx context.Context, nodeID int64, itr *
log.Info("success to import", zap.Int64("node", nodeID), zap.Any("import task", itr)) log.Info("success to import", zap.Int64("node", nodeID), zap.Any("import task", itr))
} }
func (c *SessionManagerImpl) GetCompactionPlansResults() map[int64]*datapb.CompactionPlanResult { // GetCompactionPlanResults returns map[planID]*pair[nodeID, *CompactionPlanResults]
func (c *SessionManagerImpl) GetCompactionPlansResults() map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult] {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
ctx := context.Background() ctx := context.Background()
plans := typeutil.NewConcurrentMap[int64, *datapb.CompactionPlanResult]() plans := typeutil.NewConcurrentMap[int64, *typeutil.Pair[int64, *datapb.CompactionPlanResult]]()
c.sessions.RLock() c.sessions.RLock()
for nodeID, s := range c.sessions.data { for nodeID, s := range c.sessions.data {
wg.Add(1) wg.Add(1)
@ -296,15 +297,16 @@ func (c *SessionManagerImpl) GetCompactionPlansResults() map[int64]*datapb.Compa
// for compatibility issue, before 2.3.4, resp has only logpath // for compatibility issue, before 2.3.4, resp has only logpath
// try to parse path and fill logid // try to parse path and fill logid
binlog.CompressCompactionBinlogs(rst.GetSegments()) binlog.CompressCompactionBinlogs(rst.GetSegments())
plans.Insert(rst.PlanID, rst) nodeRst := typeutil.NewPair(nodeID, rst)
plans.Insert(rst.PlanID, &nodeRst)
} }
}(nodeID, s) }(nodeID, s)
} }
c.sessions.RUnlock() c.sessions.RUnlock()
wg.Wait() wg.Wait()
rst := make(map[int64]*datapb.CompactionPlanResult) rst := make(map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult])
plans.Range(func(planID int64, result *datapb.CompactionPlanResult) bool { plans.Range(func(planID int64, result *typeutil.Pair[int64, *datapb.CompactionPlanResult]) bool {
rst[planID] = result rst[planID] = result
return true return true
}) })

View File

@ -68,6 +68,7 @@ func (c *compactionExecutor) injectDone(planID UniqueID) {
c.completed.GetAndRemove(planID) c.completed.GetAndRemove(planID)
task, loaded := c.completedCompactor.GetAndRemove(planID) task, loaded := c.completedCompactor.GetAndRemove(planID)
if loaded { if loaded {
log.Info("Compaction task inject done", zap.Int64("planID", planID), zap.String("channel", task.getChannelName()))
task.injectDone() task.injectDone()
} }
} }

View File

@ -321,21 +321,26 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac
// SyncSegments called by DataCoord, sync the compacted segments' meta between DC and DN // SyncSegments called by DataCoord, sync the compacted segments' meta between DC and DN
func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegmentsRequest) (*commonpb.Status, error) { func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegmentsRequest) (*commonpb.Status, error) {
log.Ctx(ctx).Info("DataNode receives SyncSegments", log := log.Ctx(ctx).With(
zap.Int64("planID", req.GetPlanID()), zap.Int64("planID", req.GetPlanID()),
zap.Int64("nodeID", node.GetNodeID()),
zap.Int64("target segmentID", req.GetCompactedTo()), zap.Int64("target segmentID", req.GetCompactedTo()),
zap.Int64s("compacted from", req.GetCompactedFrom()), zap.Int64s("compacted from", req.GetCompactedFrom()),
zap.Int64("numOfRows", req.GetNumOfRows()), zap.Int64("numOfRows", req.GetNumOfRows()),
zap.String("channelName", req.GetChannelName()), zap.String("channelName", req.GetChannelName()),
) )
log.Info("DataNode receives SyncSegments")
if err := merr.CheckHealthy(node.GetStateCode()); err != nil { if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
log.Warn("DataNode.SyncSegments failed", zap.Int64("nodeId", node.GetNodeID()), zap.Error(err)) log.Warn("DataNode.SyncSegments failed", zap.Error(err))
return merr.Status(err), nil return merr.Status(err), nil
} }
if len(req.GetCompactedFrom()) <= 0 { if len(req.GetCompactedFrom()) <= 0 {
return merr.Status(merr.WrapErrParameterInvalid(">0", "0", "compacted from segments shouldn't be empty")), nil log.Info("SyncSegments with empty compactedFrom, clearing the plan")
node.compactionExecutor.injectDone(req.GetPlanID())
return merr.Success(), nil
} }
ds, ok := node.flowgraphManager.GetFlowgraphService(req.GetChannelName()) ds, ok := node.flowgraphManager.GetFlowgraphService(req.GetChannelName())

View File

@ -802,7 +802,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 201, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory) fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 201, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory)
fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 300, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory) fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 300, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory)
s.Run("invalid compacted from", func() { s.Run("empty compactedFrom", func() {
req := &datapb.SyncSegmentsRequest{ req := &datapb.SyncSegmentsRequest{
CompactedTo: 400, CompactedTo: 400,
NumOfRows: 100, NumOfRows: 100,
@ -811,10 +811,18 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
req.CompactedFrom = []UniqueID{} req.CompactedFrom = []UniqueID{}
status, err := s.node.SyncSegments(s.ctx, req) status, err := s.node.SyncSegments(s.ctx, req)
s.Assert().NoError(err) s.Assert().NoError(err)
s.Assert().False(merr.Ok(status)) s.Assert().True(merr.Ok(status))
})
s.Run("invalid compacted from", func() {
req := &datapb.SyncSegmentsRequest{
CompactedTo: 400,
NumOfRows: 100,
CompactedFrom: []UniqueID{101, 201},
}
req.CompactedFrom = []UniqueID{101, 201} req.CompactedFrom = []UniqueID{101, 201}
status, err = s.node.SyncSegments(s.ctx, req) status, err := s.node.SyncSegments(s.ctx, req)
s.Assert().NoError(err) s.Assert().NoError(err)
s.Assert().False(merr.Ok(status)) s.Assert().False(merr.Ok(status))
}) })

View File

@ -0,0 +1,209 @@
package datanode
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
grpcdatacoord "github.com/milvus-io/milvus/internal/distributed/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/tests/integration"
)
// issue: https://github.com/milvus-io/milvus/issues/30137
func (s *DataNodeSuite) TestClearCompactionTask() {
s.dim = 128
collName := "test_yx"
// generate 1 segment
pks := s.generateSegment(collName, 1)
// triggers a compaction
// restart a datacoord
s.compactAndReboot(collName)
// delete data
// flush -> won't timeout
s.deleteAndFlush(pks, collName)
}
func (s *DataNodeSuite) deleteAndFlush(pks []int64, collection string) {
ctx := context.Background()
expr := fmt.Sprintf("%s in [%s]", integration.Int64Field, strings.Join(lo.Map(pks, func(pk int64, _ int) string { return strconv.FormatInt(pk, 10) }), ","))
log.Info("========================delete expr==================",
zap.String("expr", expr),
)
deleteResp, err := s.Cluster.Proxy.Delete(ctx, &milvuspb.DeleteRequest{
CollectionName: collection,
Expr: expr,
})
s.Require().NoError(err)
s.Require().True(merr.Ok(deleteResp.GetStatus()))
s.Require().EqualValues(len(pks), deleteResp.GetDeleteCnt())
log.Info("=========================Data flush=========================")
flushResp, err := s.Cluster.Proxy.Flush(context.TODO(), &milvuspb.FlushRequest{
CollectionNames: []string{collection},
})
s.NoError(err)
segmentLongArr, has := flushResp.GetCollSegIDs()[collection]
s.Require().True(has)
segmentIDs := segmentLongArr.GetData()
s.Require().Empty(segmentLongArr)
s.Require().True(has)
flushTs, has := flushResp.GetCollFlushTs()[collection]
s.True(has)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
log.Info("=========================Wait for flush for 2min=========================")
s.WaitForFlush(ctx, segmentIDs, flushTs, "", collection)
log.Info("=========================Data flush done=========================")
}
func (s *DataNodeSuite) compactAndReboot(collection string) {
ctx := context.Background()
// create index
createIndexStatus, err := s.Cluster.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collection,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: integration.ConstructIndexParam(s.dim, "FLAT", metric.IP),
})
s.Require().NoError(err)
s.Require().True(merr.Ok(createIndexStatus))
coll, err := s.Cluster.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
CollectionName: collection,
})
s.Require().NoError(err)
s.Require().True(merr.Ok(coll.GetStatus()))
s.Require().EqualValues(coll.GetCollectionName(), collection)
collID := coll.GetCollectionID()
compactionResp, err := s.Cluster.Proxy.ManualCompaction(context.TODO(), &milvuspb.ManualCompactionRequest{
CollectionID: collID,
})
s.Require().NoError(err)
s.Require().True(merr.Ok(coll.GetStatus()))
s.NotEqualValues(-1, compactionResp.GetCompactionID())
s.EqualValues(1, compactionResp.GetCompactionPlanCount())
compactID := compactionResp.GetCompactionID()
stateResp, err := s.Cluster.Proxy.GetCompactionState(ctx, &milvuspb.GetCompactionStateRequest{
CompactionID: compactID,
})
s.Require().NoError(err)
s.Require().True(merr.Ok(stateResp.GetStatus()))
// sleep to ensure compaction tasks are submitted to DN
time.Sleep(time.Second)
planResp, err := s.Cluster.Proxy.GetCompactionStateWithPlans(ctx, &milvuspb.GetCompactionPlansRequest{
CompactionID: compactID,
})
s.Require().NoError(err)
s.Require().True(merr.Ok(planResp.GetStatus()))
s.Require().Equal(1, len(planResp.GetMergeInfos()))
// Reboot
if planResp.GetMergeInfos()[0].GetTarget() == int64(-1) {
s.Cluster.DataCoord.Stop()
s.Cluster.DataCoord = grpcdatacoord.NewServer(ctx, s.Cluster.GetFactory())
err = s.Cluster.DataCoord.Run()
s.Require().NoError(err)
stateResp, err = s.Cluster.Proxy.GetCompactionState(ctx, &milvuspb.GetCompactionStateRequest{
CompactionID: compactID,
})
s.Require().NoError(err)
s.Require().True(merr.Ok(stateResp.GetStatus()))
s.Require().EqualValues(0, stateResp.GetTimeoutPlanNo())
s.Require().EqualValues(0, stateResp.GetExecutingPlanNo())
s.Require().EqualValues(0, stateResp.GetCompletedPlanNo())
s.Require().EqualValues(0, stateResp.GetFailedPlanNo())
}
}
func (s *DataNodeSuite) generateSegment(collection string, segmentCount int) []int64 {
c := s.Cluster
schema := integration.ConstructSchema(collection, s.dim, true)
marshaledSchema, err := proto.Marshal(schema)
s.Require().NoError(err)
createCollectionStatus, err := c.Proxy.CreateCollection(context.TODO(), &milvuspb.CreateCollectionRequest{
CollectionName: collection,
Schema: marshaledSchema,
ShardsNum: 1,
})
s.Require().NoError(err)
err = merr.Error(createCollectionStatus)
s.Require().NoError(err)
rowNum := 3000
pks := []int64{}
for i := 0; i < segmentCount; i++ {
log.Info("=========================Data insertion=========================", zap.Any("count", i))
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, s.dim)
hashKeys := integration.GenerateHashKeys(rowNum)
insertResult, err := c.Proxy.Insert(context.TODO(), &milvuspb.InsertRequest{
CollectionName: collection,
FieldsData: []*schemapb.FieldData{fVecColumn},
HashKeys: hashKeys,
NumRows: uint32(rowNum),
})
s.NoError(err)
s.True(merr.Ok(insertResult.GetStatus()))
s.Require().EqualValues(rowNum, insertResult.GetInsertCnt())
s.Require().EqualValues(rowNum, len(insertResult.GetIDs().GetIntId().GetData()))
pks = append(pks, insertResult.GetIDs().GetIntId().GetData()...)
log.Info("=========================Data flush=========================", zap.Any("count", i))
flushResp, err := c.Proxy.Flush(context.TODO(), &milvuspb.FlushRequest{
CollectionNames: []string{collection},
})
s.NoError(err)
segmentLongArr, has := flushResp.GetCollSegIDs()[collection]
s.Require().True(has)
segmentIDs := segmentLongArr.GetData()
s.Require().NotEmpty(segmentLongArr)
s.Require().True(has)
flushTs, has := flushResp.GetCollFlushTs()[collection]
s.True(has)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
s.WaitForFlush(ctx, segmentIDs, flushTs, "", collection)
log.Info("=========================Data flush done=========================", zap.Any("count", i))
}
log.Info("=========================Data insertion finished=========================")
segments, err := c.MetaWatcher.ShowSegments()
s.Require().NoError(err)
s.Require().Equal(segmentCount, len(segments))
lo.ForEach(segments, func(info *datapb.SegmentInfo, _ int) {
s.Require().Equal(commonpb.SegmentState_Flushed, info.GetState())
s.Require().EqualValues(3000, info.GetNumOfRows())
})
return pks[:300]
}