mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: Clear compaction tasks when release channel (#31694)
See also: #31648 pr: #31666 Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
4a4a4b1061
commit
055dd7ea1d
@ -18,9 +18,11 @@ package datanode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
@ -36,6 +38,10 @@ type compactionExecutor struct {
|
||||
completed *typeutil.ConcurrentMap[int64, *datapb.CompactionResult] // planID to CompactionResult
|
||||
taskCh chan compactor
|
||||
dropped *typeutil.ConcurrentSet[string] // vchannel dropped
|
||||
|
||||
// To prevent concurrency of release channel and compaction get results
|
||||
// all released channel's compaction tasks will be discarded
|
||||
resultGuard sync.RWMutex
|
||||
}
|
||||
|
||||
func newCompactionExecutor() *compactionExecutor {
|
||||
@ -120,22 +126,71 @@ func (c *compactionExecutor) channelValidateForCompaction(vChannelName string) b
|
||||
return !c.dropped.Contain(vChannelName)
|
||||
}
|
||||
|
||||
func (c *compactionExecutor) stopExecutingtaskByVChannelName(vChannelName string) {
|
||||
c.dropped.Insert(vChannelName)
|
||||
func (c *compactionExecutor) discardByDroppedChannel(channel string) {
|
||||
c.dropped.Insert(channel)
|
||||
c.discardPlan(channel)
|
||||
}
|
||||
|
||||
func (c *compactionExecutor) discardPlan(channel string) {
|
||||
c.resultGuard.Lock()
|
||||
defer c.resultGuard.Unlock()
|
||||
|
||||
c.executing.Range(func(planID int64, task compactor) bool {
|
||||
if task.getChannelName() == vChannelName {
|
||||
if task.getChannelName() == channel {
|
||||
c.stopTask(planID)
|
||||
}
|
||||
return true
|
||||
})
|
||||
// remove all completed plans for vChannelName
|
||||
|
||||
// remove all completed plans for channel
|
||||
c.completed.Range(func(planID int64, result *datapb.CompactionResult) bool {
|
||||
if result.GetChannel() == vChannelName {
|
||||
if result.GetChannel() == channel {
|
||||
c.injectDone(planID, true)
|
||||
log.Info("remove compaction results for dropped channel",
|
||||
zap.String("channel", vChannelName),
|
||||
log.Info("remove compaction plan",
|
||||
zap.String("channel", channel),
|
||||
zap.Int64("planID", planID))
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (c *compactionExecutor) getAllCompactionResults() []*datapb.CompactionStateResult {
|
||||
c.resultGuard.RLock()
|
||||
defer c.resultGuard.RUnlock()
|
||||
|
||||
var (
|
||||
executing []int64
|
||||
completed []int64
|
||||
)
|
||||
results := make([]*datapb.CompactionStateResult, 0)
|
||||
// get executing results
|
||||
c.executing.Range(func(planID int64, task compactor) bool {
|
||||
executing = append(executing, planID)
|
||||
results = append(results, &datapb.CompactionStateResult{
|
||||
State: commonpb.CompactionState_Executing,
|
||||
PlanID: planID,
|
||||
})
|
||||
return true
|
||||
})
|
||||
|
||||
// get completed results
|
||||
c.completed.Range(func(planID int64, result *datapb.CompactionResult) bool {
|
||||
completed = append(completed, planID)
|
||||
results = append(results, &datapb.CompactionStateResult{
|
||||
State: commonpb.CompactionState_Completed,
|
||||
PlanID: planID,
|
||||
Result: result,
|
||||
})
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
if len(results) > 0 {
|
||||
log.Info("DataNode Compaction results",
|
||||
zap.Int64s("executing", executing),
|
||||
zap.Int64s("completed", completed),
|
||||
)
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
@ -83,7 +83,7 @@ func TestCompactionExecutor(t *testing.T) {
|
||||
{expected: false, channel: "ch2", desc: "in dropped"},
|
||||
}
|
||||
ex := newCompactionExecutor()
|
||||
ex.stopExecutingtaskByVChannelName("ch2")
|
||||
ex.discardByDroppedChannel("ch2")
|
||||
for _, test := range tests {
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
assert.Equal(t, test.expected, ex.channelValidateForCompaction(test.channel))
|
||||
@ -107,7 +107,7 @@ func TestCompactionExecutor(t *testing.T) {
|
||||
found = ex.executing.Contain(mc.getPlanID())
|
||||
}
|
||||
|
||||
ex.stopExecutingtaskByVChannelName("mock")
|
||||
ex.discardByDroppedChannel("mock")
|
||||
|
||||
select {
|
||||
case <-mc.ctx.Done():
|
||||
|
||||
@ -278,10 +278,11 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
|
||||
node.handleWatchInfo(e, string(evt.Kv.Key), evt.Kv.Value)
|
||||
}
|
||||
|
||||
// tryToReleaseFlowgraph tries to release a flowgraph
|
||||
func (node *DataNode) tryToReleaseFlowgraph(vChanName string) {
|
||||
log.Info("try to release flowgraph", zap.String("vChanName", vChanName))
|
||||
node.flowgraphManager.release(vChanName)
|
||||
// tryToReleaseFlowgraph tries to release a flowgraph and tidy channel related meta
|
||||
func (node *DataNode) tryToReleaseFlowgraph(channel string) {
|
||||
log.Info("try to release flowgraph", zap.String("channel", channel))
|
||||
node.compactionExecutor.discardPlan(channel)
|
||||
node.flowgraphManager.release(channel)
|
||||
}
|
||||
|
||||
// BackGroundGC runs in background to release datanode resources
|
||||
|
||||
@ -154,7 +154,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
|
||||
ddn.dropMode.Store(true)
|
||||
|
||||
log.Info("Stop compaction of vChannel", zap.String("vChannelName", ddn.vChannelName))
|
||||
ddn.compactionExecutor.stopExecutingtaskByVChannelName(ddn.vChannelName)
|
||||
ddn.compactionExecutor.discardByDroppedChannel(ddn.vChannelName)
|
||||
fgMsg.dropCollection = true
|
||||
|
||||
pChan := funcutil.ToPhysicalChannel(ddn.vChannelName)
|
||||
|
||||
@ -27,7 +27,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
@ -308,29 +307,7 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
results := make([]*datapb.CompactionStateResult, 0)
|
||||
node.compactionExecutor.executing.Range(func(planID int64, task compactor) bool {
|
||||
results = append(results, &datapb.CompactionStateResult{
|
||||
State: commonpb.CompactionState_Executing,
|
||||
PlanID: planID,
|
||||
})
|
||||
return true
|
||||
})
|
||||
node.compactionExecutor.completed.Range(func(planID int64, result *datapb.CompactionResult) bool {
|
||||
results = append(results, &datapb.CompactionStateResult{
|
||||
State: commonpb.CompactionState_Completed,
|
||||
PlanID: planID,
|
||||
Result: result,
|
||||
})
|
||||
return true
|
||||
})
|
||||
|
||||
if len(results) > 0 {
|
||||
planIDs := lo.Map(results, func(result *datapb.CompactionStateResult, i int) UniqueID {
|
||||
return result.GetPlanID()
|
||||
})
|
||||
log.Info("Compaction results", zap.Int64s("planIDs", planIDs))
|
||||
}
|
||||
results := node.compactionExecutor.getAllCompactionResults()
|
||||
return &datapb.CompactionStateResponse{
|
||||
Status: merr.Success(),
|
||||
Results: results,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user