milvus/internal/datacoord/compaction.go
XuanYang-cn fb5e09d94d
fix: call injectDone after compaction failed (#30277)
syncMgr.Block() will lock the segment when executing compaction.

Previous implementation was unable to Unblock thoese segments when
compaction failed. If next compaction of the same segments arrives,
it'll stuck forever and block all later compation tasks.

This PR makes sure compaction executor would Unblock these segments
after a failure compaction.

Apart form that, this PR also refines some logs and clean some codes of
compaction, compactor:

1. Log segment count instead of segmentIDs to avoid logging too many
segments
2. Flush RPC returns L1 segments only, skip L0 and L2
3. CompactionType is checked in `Compaction`, no need to check again
inside compactor
4. Use ligter method to replace `getSegmentMeta`
5. Log information for L0 compaction when encounters an error

See also: #30213

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
2024-02-01 14:25:04 +08:00

680 lines
21 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"fmt"
"sync"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// TODO this num should be determined by resources of datanode, for now, we set to a fixed value for simple
// TODO we should split compaction into different priorities, small compaction helps to merge segment, large compaction helps to handle delta and expiration of large segments
const (
tsTimeout = uint64(1)
)
type compactionPlanContext interface {
start()
stop()
// execCompactionPlan start to execute plan and return immediately
execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error
// getCompaction return compaction task. If planId does not exist, return nil.
getCompaction(planID int64) *compactionTask
// updateCompaction set the compaction state to timeout or completed
updateCompaction(ts Timestamp) error
// isFull return true if the task pool is full
isFull() bool
// get compaction tasks by signal id
getCompactionTasksBySignalID(signalID int64) []*compactionTask
removeTasksByChannel(channel string)
}
type compactionTaskState int8
const (
executing compactionTaskState = iota + 1
pipelining
completed
failed
timeout
)
var (
errChannelNotWatched = errors.New("channel is not watched")
errChannelInBuffer = errors.New("channel is in buffer")
)
type CompactionMeta interface {
SelectSegments(selector SegmentInfoSelector) []*SegmentInfo
GetHealthySegment(segID UniqueID) *SegmentInfo
UpdateSegmentsInfo(operators ...UpdateOperator) error
SetSegmentCompacting(segmentID int64, compacting bool)
CompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) (*SegmentInfo, *segMetricMutation, error)
}
var _ CompactionMeta = (*meta)(nil)
type compactionTask struct {
triggerInfo *compactionSignal
plan *datapb.CompactionPlan
state compactionTaskState
dataNodeID int64
result *datapb.CompactionPlanResult
span trace.Span
}
func (t *compactionTask) shadowClone(opts ...compactionTaskOpt) *compactionTask {
task := &compactionTask{
triggerInfo: t.triggerInfo,
plan: t.plan,
state: t.state,
dataNodeID: t.dataNodeID,
span: t.span,
}
for _, opt := range opts {
opt(task)
}
return task
}
var _ compactionPlanContext = (*compactionPlanHandler)(nil)
type compactionPlanHandler struct {
mu sync.RWMutex
plans map[int64]*compactionTask // planID -> task
meta CompactionMeta
allocator allocator
chManager ChannelManager
scheduler Scheduler
sessions SessionManager
stopCh chan struct{}
stopOnce sync.Once
stopWg sync.WaitGroup
}
func newCompactionPlanHandler(sessions SessionManager, cm ChannelManager, meta CompactionMeta, allocator allocator,
) *compactionPlanHandler {
return &compactionPlanHandler{
plans: make(map[int64]*compactionTask),
chManager: cm,
meta: meta,
sessions: sessions,
allocator: allocator,
scheduler: NewCompactionScheduler(),
}
}
func (c *compactionPlanHandler) checkResult() {
// deal results
ts, err := c.GetCurrentTS()
if err != nil {
log.Warn("fail to check result", zap.Error(err))
return
}
_ = c.updateCompaction(ts)
}
func (c *compactionPlanHandler) GetCurrentTS() (Timestamp, error) {
interval := Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), interval)
defer cancel()
ts, err := c.allocator.allocTimestamp(ctx)
if err != nil {
log.Warn("unable to alloc timestamp", zap.Error(err))
return 0, err
}
return ts, nil
}
func (c *compactionPlanHandler) schedule() {
// schedule queuing tasks
tasks := c.scheduler.Schedule()
if len(tasks) > 0 {
c.notifyTasks(tasks)
c.scheduler.LogStatus()
}
}
func (c *compactionPlanHandler) start() {
interval := Params.DataCoordCfg.CompactionCheckIntervalInSeconds.GetAsDuration(time.Second)
c.stopCh = make(chan struct{})
c.stopWg.Add(3)
go func() {
defer c.stopWg.Done()
checkResultTicker := time.NewTicker(interval)
log.Info("Compaction handler check result loop start", zap.Any("check result interval", interval))
defer checkResultTicker.Stop()
for {
select {
case <-c.stopCh:
log.Info("compaction handler check result loop quit")
return
case <-checkResultTicker.C:
c.checkResult()
}
}
}()
// saperate check results and schedule goroutine so that check results doesn't
// influence the schedule
go func() {
defer c.stopWg.Done()
scheduleTicker := time.NewTicker(200 * time.Millisecond)
defer scheduleTicker.Stop()
log.Info("compaction handler start schedule")
for {
select {
case <-c.stopCh:
log.Info("Compaction handler quit schedule")
return
case <-scheduleTicker.C:
c.schedule()
}
}
}()
go func() {
defer c.stopWg.Done()
cleanTicker := time.NewTicker(30 * time.Minute)
defer cleanTicker.Stop()
for {
select {
case <-c.stopCh:
log.Info("Compaction handler quit clean")
return
case <-cleanTicker.C:
c.Clean()
}
}
}()
}
func (c *compactionPlanHandler) Clean() {
current := tsoutil.GetCurrentTime()
c.mu.Lock()
defer c.mu.Unlock()
for id, task := range c.plans {
if task.state == executing || task.state == pipelining {
continue
}
// after timeout + 1h, the plan will be cleaned
if c.isTimeout(current, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()+60*60) {
delete(c.plans, id)
}
}
}
func (c *compactionPlanHandler) stop() {
c.stopOnce.Do(func() {
close(c.stopCh)
})
c.stopWg.Wait()
}
func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
c.mu.Lock()
defer c.mu.Unlock()
for id, task := range c.plans {
if task.triggerInfo.channel == channel {
log.Info("Compaction handler removing tasks by channel",
zap.String("channel", channel),
zap.Int64("planID", task.plan.GetPlanID()),
zap.Int64("node", task.dataNodeID),
)
c.scheduler.Finish(task.dataNodeID, task.plan)
delete(c.plans, id)
}
}
}
func (c *compactionPlanHandler) updateTask(planID int64, opts ...compactionTaskOpt) {
c.mu.Lock()
defer c.mu.Unlock()
if plan, ok := c.plans[planID]; ok {
c.plans[planID] = plan.shadowClone(opts...)
}
}
func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
nodeID, err := c.chManager.FindWatcher(plan.GetChannel())
if err != nil {
log.Error("failed to find watcher", zap.Int64("planID", plan.GetPlanID()), zap.Error(err))
return err
}
log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", nodeID))
c.setSegmentsCompacting(plan, true)
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", plan.GetType()))
task := &compactionTask{
triggerInfo: signal,
plan: plan,
state: pipelining,
dataNodeID: nodeID,
span: span,
}
c.mu.Lock()
c.plans[plan.PlanID] = task
c.mu.Unlock()
c.scheduler.Submit(task)
log.Info("Compaction plan submited")
return nil
}
func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) {
plan := task.plan
log := log.With(zap.Int64("taskID", task.triggerInfo.id), zap.Int64("planID", plan.GetPlanID()))
if plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
sealedSegments := c.meta.SelectSegments(func(info *SegmentInfo) bool {
return info.GetCollectionID() == task.triggerInfo.collectionID &&
(task.triggerInfo.partitionID == -1 || info.GetPartitionID() == task.triggerInfo.partitionID) &&
info.GetInsertChannel() == plan.GetChannel() &&
isFlushState(info.GetState()) &&
!info.isCompacting &&
!info.GetIsImporting() &&
info.GetLevel() != datapb.SegmentLevel_L0 &&
info.GetDmlPosition().GetTimestamp() < task.triggerInfo.pos.GetTimestamp()
})
sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs {
return &datapb.CompactionSegmentBinlogs{
SegmentID: info.GetID(),
Level: datapb.SegmentLevel_L1,
CollectionID: info.GetCollectionID(),
PartitionID: info.GetPartitionID(),
}
})
plan.SegmentBinlogs = append(plan.SegmentBinlogs, sealedSegBinlogs...)
log.Info("Compaction handler refreshed level zero compaction plan", zap.Any("target segments count", len(sealedSegBinlogs)))
return
}
if plan.GetType() == datapb.CompactionType_MixCompaction {
for _, seg := range plan.GetSegmentBinlogs() {
if info := c.meta.GetHealthySegment(seg.GetSegmentID()); info != nil {
seg.Deltalogs = info.GetDeltalogs()
}
}
log.Info("Compaction handler refreshed mix compaction plan")
return
}
}
func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) {
for _, task := range tasks {
// avoid closure capture iteration variable
innerTask := task
c.RefreshPlan(innerTask)
getOrCreateIOPool().Submit(func() (any, error) {
ctx := tracer.SetupSpan(context.Background(), innerTask.span)
plan := innerTask.plan
log := log.Ctx(ctx).With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", innerTask.dataNodeID))
log.Info("Notify compaction task to DataNode")
ts, err := c.allocator.allocTimestamp(ctx)
if err != nil {
log.Warn("Alloc start time for CompactionPlan failed", zap.Error(err))
// update plan ts to TIMEOUT ts
c.updateTask(plan.PlanID, setState(executing), setStartTime(tsTimeout))
return nil, err
}
c.updateTask(plan.PlanID, setStartTime(ts))
err = c.sessions.Compaction(ctx, innerTask.dataNodeID, plan)
c.updateTask(plan.PlanID, setState(executing))
if err != nil {
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
return nil, err
}
log.Info("Compaction start")
return nil, nil
})
}
}
// execCompactionPlan start to execute plan and return immediately
func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
return c.enqueuePlan(signal, plan)
}
func (c *compactionPlanHandler) setSegmentsCompacting(plan *datapb.CompactionPlan, compacting bool) {
for _, segmentBinlogs := range plan.GetSegmentBinlogs() {
c.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), compacting)
}
}
// complete a compaction task
// not threadsafe, only can be used internally
func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionPlanResult) error {
planID := result.PlanID
if _, ok := c.plans[planID]; !ok {
return fmt.Errorf("plan %d is not found", planID)
}
if c.plans[planID].state != executing {
return fmt.Errorf("plan %d's state is %v", planID, c.plans[planID].state)
}
plan := c.plans[planID].plan
nodeID := c.plans[planID].dataNodeID
defer c.scheduler.Finish(nodeID, plan)
switch plan.GetType() {
case datapb.CompactionType_MergeCompaction, datapb.CompactionType_MixCompaction:
if err := c.handleMergeCompactionResult(plan, result); err != nil {
return err
}
case datapb.CompactionType_Level0DeleteCompaction:
if err := c.handleL0CompactionResult(plan, result); err != nil {
return err
}
default:
return errors.New("unknown compaction type")
}
UpdateCompactionSegmentSizeMetrics(result.GetSegments())
c.plans[planID] = c.plans[planID].shadowClone(setState(completed), setResult(result), cleanLogPath(), endSpan())
return nil
}
func (c *compactionPlanHandler) handleL0CompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) error {
var operators []UpdateOperator
for _, seg := range result.GetSegments() {
operators = append(operators, UpdateBinlogsOperator(seg.GetSegmentID(), nil, nil, seg.GetDeltalogs()))
}
levelZeroSegments := lo.Filter(plan.GetSegmentBinlogs(), func(b *datapb.CompactionSegmentBinlogs, _ int) bool {
return b.GetLevel() == datapb.SegmentLevel_L0
})
for _, seg := range levelZeroSegments {
operators = append(operators, UpdateStatusOperator(seg.GetSegmentID(), commonpb.SegmentState_Dropped), UpdateCompactedOperator(seg.GetSegmentID()))
}
log.Info("meta update: update segments info for level zero compaction",
zap.Int64("planID", plan.GetPlanID()),
)
return c.meta.UpdateSegmentsInfo(operators...)
}
func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) error {
log := log.With(zap.Int64("planID", plan.GetPlanID()))
if len(result.GetSegments()) == 0 || len(result.GetSegments()) > 1 {
// should never happen
log.Warn("illegal compaction results")
return fmt.Errorf("Illegal compaction results: %v", result)
}
// Merge compaction has one and only one segment
newSegmentInfo := c.meta.GetHealthySegment(result.GetSegments()[0].SegmentID)
if newSegmentInfo != nil {
log.Info("meta has already been changed, skip meta change and retry sync segments")
} else {
// Also prepare metric updates.
newSegment, metricMutation, err := c.meta.CompleteCompactionMutation(plan, result)
if err != nil {
return err
}
// Apply metrics after successful meta update.
metricMutation.commit()
newSegmentInfo = newSegment
}
nodeID := c.plans[plan.GetPlanID()].dataNodeID
req := &datapb.SyncSegmentsRequest{
PlanID: plan.PlanID,
CompactedTo: newSegmentInfo.GetID(),
CompactedFrom: newSegmentInfo.GetCompactionFrom(),
NumOfRows: newSegmentInfo.GetNumOfRows(),
StatsLogs: newSegmentInfo.GetStatslogs(),
ChannelName: plan.GetChannel(),
PartitionId: newSegmentInfo.GetPartitionID(),
CollectionId: newSegmentInfo.GetCollectionID(),
}
log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID))
if err := c.sessions.SyncSegments(nodeID, req); err != nil {
log.Warn("handleCompactionResult: fail to sync segments with node",
zap.Int64("nodeID", nodeID), zap.Error(err))
return err
}
log.Info("handleCompactionResult: success to handle merge compaction result")
return nil
}
// getCompaction return compaction task. If planId does not exist, return nil.
func (c *compactionPlanHandler) getCompaction(planID int64) *compactionTask {
c.mu.RLock()
defer c.mu.RUnlock()
return c.plans[planID]
}
// expireCompaction set the compaction state to expired
func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
// for DC might add new task while GetCompactionState.
executingTasks := c.getTasksByState(executing)
timeoutTasks := c.getTasksByState(timeout)
planStates := c.sessions.GetCompactionPlansResults()
c.mu.Lock()
defer c.mu.Unlock()
for _, task := range executingTasks {
planResult, ok := planStates[task.plan.PlanID]
state := planResult.GetState()
planID := task.plan.PlanID
// check whether the state of CompactionPlan is working
if ok {
if state == commonpb.CompactionState_Completed {
log.Info("complete compaction", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID))
err := c.completeCompaction(planResult)
if err != nil {
log.Warn("fail to complete compaction", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID), zap.Error(err))
}
continue
}
// check wether the CompactionPlan is timeout
if state == commonpb.CompactionState_Executing && !c.isTimeout(ts, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()) {
continue
}
log.Warn("compaction timeout",
zap.Int64("planID", task.plan.PlanID),
zap.Int64("nodeID", task.dataNodeID),
zap.Uint64("startTime", task.plan.GetStartTime()),
zap.Uint64("now", ts),
)
c.plans[planID] = c.plans[planID].shadowClone(setState(timeout), endSpan())
continue
}
log.Info("compaction failed", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan())
c.setSegmentsCompacting(task.plan, false)
c.scheduler.Finish(task.dataNodeID, task.plan)
}
// Timeout tasks will be timeout and failed in DataNode
// need to wait for DataNode reporting failure and
// clean the status.
for _, task := range timeoutTasks {
stateResult, ok := planStates[task.plan.PlanID]
planID := task.plan.PlanID
if !ok {
log.Info("compaction failed for timeout", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan())
c.setSegmentsCompacting(task.plan, false)
c.scheduler.Finish(task.dataNodeID, task.plan)
}
// DataNode will check if plan's are timeout but not as sensitive as DataCoord,
// just wait another round.
if ok && stateResult.GetState() == commonpb.CompactionState_Executing {
log.Info("compaction timeout in DataCoord yet DataNode is still running",
zap.Int64("planID", planID),
zap.Int64("nodeID", task.dataNodeID))
continue
}
}
return nil
}
func (c *compactionPlanHandler) isTimeout(now Timestamp, start Timestamp, timeout int32) bool {
startTime, _ := tsoutil.ParseTS(start)
ts, _ := tsoutil.ParseTS(now)
return int32(ts.Sub(startTime).Seconds()) >= timeout
}
// isFull return true if the task pool is full
func (c *compactionPlanHandler) isFull() bool {
return c.scheduler.GetTaskCount() >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt()
}
func (c *compactionPlanHandler) getTasksByState(state compactionTaskState) []*compactionTask {
c.mu.RLock()
defer c.mu.RUnlock()
tasks := make([]*compactionTask, 0, len(c.plans))
for _, plan := range c.plans {
if plan.state == state {
tasks = append(tasks, plan)
}
}
return tasks
}
// get compaction tasks by signal id; if signalID == 0 return all tasks
func (c *compactionPlanHandler) getCompactionTasksBySignalID(signalID int64) []*compactionTask {
c.mu.RLock()
defer c.mu.RUnlock()
var tasks []*compactionTask
for _, t := range c.plans {
if signalID == 0 {
tasks = append(tasks, t)
continue
}
if t.triggerInfo.id != signalID {
continue
}
tasks = append(tasks, t)
}
return tasks
}
type compactionTaskOpt func(task *compactionTask)
func setState(state compactionTaskState) compactionTaskOpt {
return func(task *compactionTask) {
task.state = state
}
}
func endSpan() compactionTaskOpt {
return func(task *compactionTask) {
if task.span != nil {
task.span.End()
}
}
}
func setStartTime(startTime uint64) compactionTaskOpt {
return func(task *compactionTask) {
task.plan.StartTime = startTime
}
}
func setResult(result *datapb.CompactionPlanResult) compactionTaskOpt {
return func(task *compactionTask) {
task.result = result
}
}
// cleanLogPath clean the log info in the compactionTask object for avoiding the memory leak
func cleanLogPath() compactionTaskOpt {
return func(task *compactionTask) {
if task.plan.GetSegmentBinlogs() != nil {
for _, binlogs := range task.plan.GetSegmentBinlogs() {
binlogs.FieldBinlogs = nil
binlogs.Field2StatslogPaths = nil
binlogs.Deltalogs = nil
}
}
if task.result.GetSegments() != nil {
for _, segment := range task.result.GetSegments() {
segment.InsertLogs = nil
segment.Deltalogs = nil
segment.Field2StatslogPaths = nil
}
}
}
}
// 0.5*min(8, NumCPU/2)
func calculateParallel() int {
// TODO after node memory management enabled, use this config as hard limit
return Params.DataCoordCfg.CompactionWorkerParalleTasks.GetAsInt()
//cores := hardware.GetCPUNum()
//if cores < 16 {
//return 4
//}
//return cores / 2
}
var (
ioPool *conc.Pool[any]
ioPoolInitOnce sync.Once
)
func initIOPool() {
capacity := Params.DataNodeCfg.IOConcurrency.GetAsInt()
if capacity > 32 {
capacity = 32
}
// error only happens with negative expiry duration or with negative pre-alloc size.
ioPool = conc.NewPool[any](capacity)
}
func getOrCreateIOPool() *conc.Pool[any] {
ioPoolInitOnce.Do(initIOPool)
return ioPool
}