mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 09:08:43 +08:00
fix: Fix compaction task blocking due to executor loop exit (#44543)
1. Use goroutine pool instead of sem. 2. Remove compaction executor from pipeline, since in streaming mode pipeline should be decoupled from compaction. issue: https://github.com/milvus-io/milvus/issues/44541 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
1b5191974c
commit
f61952adfc
@ -23,7 +23,6 @@ import (
|
||||
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/semaphore"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||
"github.com/milvus-io/milvus/pkg/v2/metrics"
|
||||
@ -34,18 +33,15 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
maxTaskQueueNum = 1024
|
||||
maxParallelTaskNum = 10
|
||||
maxTaskQueueNum = 1024
|
||||
)
|
||||
|
||||
type Executor interface {
|
||||
Start(ctx context.Context)
|
||||
Execute(task Compactor) (bool, error)
|
||||
Enqueue(task Compactor) (bool, error)
|
||||
Slots() int64
|
||||
RemoveTask(planID int64)
|
||||
GetResults(planID int64) []*datapb.CompactionPlanResult
|
||||
DiscardByDroppedChannel(channel string)
|
||||
DiscardPlan(channel string)
|
||||
}
|
||||
|
||||
type executor struct {
|
||||
@ -53,7 +49,6 @@ type executor struct {
|
||||
completedCompactor *typeutil.ConcurrentMap[int64, Compactor] // planID to compactor
|
||||
completed *typeutil.ConcurrentMap[int64, *datapb.CompactionPlanResult] // planID to CompactionPlanResult
|
||||
taskCh chan Compactor
|
||||
taskSem *semaphore.Weighted // todo remove this, unify with slot logic
|
||||
dropped *typeutil.ConcurrentSet[string] // vchannel dropped
|
||||
usingSlots int64
|
||||
slotMu sync.RWMutex
|
||||
@ -69,13 +64,12 @@ func NewExecutor() *executor {
|
||||
completedCompactor: typeutil.NewConcurrentMap[int64, Compactor](),
|
||||
completed: typeutil.NewConcurrentMap[int64, *datapb.CompactionPlanResult](),
|
||||
taskCh: make(chan Compactor, maxTaskQueueNum),
|
||||
taskSem: semaphore.NewWeighted(maxParallelTaskNum),
|
||||
dropped: typeutil.NewConcurrentSet[string](),
|
||||
usingSlots: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (e *executor) Execute(task Compactor) (bool, error) {
|
||||
func (e *executor) Enqueue(task Compactor) (bool, error) {
|
||||
e.slotMu.Lock()
|
||||
defer e.slotMu.Unlock()
|
||||
newSlotUsage := task.GetSlotUsage()
|
||||
@ -143,14 +137,10 @@ func (e *executor) Start(ctx context.Context) {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case task := <-e.taskCh:
|
||||
err := e.taskSem.Acquire(ctx, 1)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
defer e.taskSem.Release(1)
|
||||
GetExecPool().Submit(func() (any, error) {
|
||||
e.executeTask(task)
|
||||
}()
|
||||
return nil, nil
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -213,39 +203,6 @@ func (e *executor) stopTask(planID int64) {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *executor) isValidChannel(channel string) bool {
|
||||
// if vchannel marked dropped, compaction should not proceed
|
||||
return !e.dropped.Contain(channel)
|
||||
}
|
||||
|
||||
func (e *executor) DiscardByDroppedChannel(channel string) {
|
||||
e.dropped.Insert(channel)
|
||||
e.DiscardPlan(channel)
|
||||
}
|
||||
|
||||
func (e *executor) DiscardPlan(channel string) {
|
||||
e.resultGuard.Lock()
|
||||
defer e.resultGuard.Unlock()
|
||||
|
||||
e.executing.Range(func(planID int64, task Compactor) bool {
|
||||
if task.GetChannelName() == channel {
|
||||
e.stopTask(planID)
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
// remove all completed plans of channel
|
||||
e.completed.Range(func(planID int64, result *datapb.CompactionPlanResult) bool {
|
||||
if result.GetChannel() == channel {
|
||||
e.RemoveTask(planID)
|
||||
log.Info("remove compaction plan and results",
|
||||
zap.String("channel", channel),
|
||||
zap.Int64("planID", planID))
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (e *executor) GetResults(planID int64) []*datapb.CompactionPlanResult {
|
||||
if planID != 0 {
|
||||
result := e.getCompactionResult(planID)
|
||||
|
||||
@ -38,7 +38,7 @@ func TestCompactionExecutor(t *testing.T) {
|
||||
mockC.EXPECT().GetChannelName().Return("ch1")
|
||||
mockC.EXPECT().GetSlotUsage().Return(8)
|
||||
executor := NewExecutor()
|
||||
succeed, err := executor.Execute(mockC)
|
||||
succeed, err := executor.Enqueue(mockC)
|
||||
assert.Equal(t, true, succeed)
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 1, len(executor.taskCh))
|
||||
@ -56,11 +56,11 @@ func TestCompactionExecutor(t *testing.T) {
|
||||
mockC.EXPECT().GetChannelName().Return("ch1")
|
||||
mockC.EXPECT().GetSlotUsage().Return(8)
|
||||
executor := NewExecutor()
|
||||
succeed, err := executor.Execute(mockC)
|
||||
succeed, err := executor.Enqueue(mockC)
|
||||
assert.Equal(t, true, succeed)
|
||||
assert.NoError(t, err)
|
||||
|
||||
succeed2, err2 := executor.Execute(mockC)
|
||||
succeed2, err2 := executor.Enqueue(mockC)
|
||||
assert.Equal(t, false, succeed2)
|
||||
assert.Error(t, err2)
|
||||
assert.True(t, errors.Is(err2, merr.ErrDuplicatedCompactionTask))
|
||||
@ -82,7 +82,7 @@ func TestCompactionExecutor(t *testing.T) {
|
||||
mockC.EXPECT().GetSlotUsage().Return(0)
|
||||
executor := NewExecutor()
|
||||
|
||||
succeed, err := executor.Execute(mockC)
|
||||
succeed, err := executor.Enqueue(mockC)
|
||||
assert.Equal(t, true, succeed)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(4), executor.Slots())
|
||||
@ -142,42 +142,6 @@ func TestCompactionExecutor(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Test channel valid check", func(t *testing.T) {
|
||||
tests := []struct {
|
||||
expected bool
|
||||
channel string
|
||||
desc string
|
||||
}{
|
||||
{expected: true, channel: "ch1", desc: "no in dropped"},
|
||||
{expected: false, channel: "ch2", desc: "in dropped"},
|
||||
}
|
||||
ex := NewExecutor()
|
||||
ex.DiscardByDroppedChannel("ch2")
|
||||
for _, test := range tests {
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
assert.Equal(t, test.expected, ex.isValidChannel(test.channel))
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("test stop vchannel tasks", func(t *testing.T) {
|
||||
ex := NewExecutor()
|
||||
mc := NewMockCompactor(t)
|
||||
mc.EXPECT().GetPlanID().Return(int64(1))
|
||||
mc.EXPECT().GetChannelName().Return("mock")
|
||||
mc.EXPECT().Compact().Return(&datapb.CompactionPlanResult{PlanID: 1}, nil).Maybe()
|
||||
mc.EXPECT().GetSlotUsage().Return(8)
|
||||
mc.EXPECT().Stop().Return().Once()
|
||||
|
||||
ex.Execute(mc)
|
||||
|
||||
require.True(t, ex.executing.Contain(int64(1)))
|
||||
|
||||
ex.DiscardByDroppedChannel("mock")
|
||||
assert.True(t, ex.dropped.Contain("mock"))
|
||||
assert.False(t, ex.executing.Contain(int64(1)))
|
||||
})
|
||||
|
||||
t.Run("test GetAllCompactionResults", func(t *testing.T) {
|
||||
ex := NewExecutor()
|
||||
|
||||
|
||||
65
internal/datanode/compactor/pool.go
Normal file
65
internal/datanode/compactor/pool.go
Normal file
@ -0,0 +1,65 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package compactor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/v2/config"
|
||||
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
||||
)
|
||||
|
||||
var (
|
||||
execPool *conc.Pool[any]
|
||||
execPoolInitOnce sync.Once
|
||||
)
|
||||
|
||||
func initExecPool() {
|
||||
pt := paramtable.Get()
|
||||
initPoolSize := pt.DataNodeCfg.MaxCompactionConcurrency.GetAsInt()
|
||||
execPool = conc.NewPool[any](
|
||||
initPoolSize,
|
||||
)
|
||||
|
||||
watchKey := pt.DataNodeCfg.MaxCompactionConcurrency.Key
|
||||
pt.Watch(watchKey, config.NewHandler(watchKey, resizeExecPool))
|
||||
log.Info("init compaction execution pool done", zap.Int("size", initPoolSize))
|
||||
}
|
||||
|
||||
func resizeExecPool(evt *config.Event) {
|
||||
if evt.HasUpdated {
|
||||
newSize := paramtable.Get().DataNodeCfg.MaxCompactionConcurrency.GetAsInt()
|
||||
log := log.Ctx(context.Background()).With(zap.Int("newSize", newSize))
|
||||
|
||||
err := GetExecPool().Resize(newSize)
|
||||
if err != nil {
|
||||
log.Warn("failed to resize pool", zap.Error(err))
|
||||
return
|
||||
}
|
||||
log.Info("pool resize successfully")
|
||||
}
|
||||
}
|
||||
|
||||
func GetExecPool() *conc.Pool[any] {
|
||||
execPoolInitOnce.Do(initExecPool)
|
||||
return execPool
|
||||
}
|
||||
64
internal/datanode/compactor/pool_test.go
Normal file
64
internal/datanode/compactor/pool_test.go
Normal file
@ -0,0 +1,64 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package compactor
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/v2/config"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
||||
)
|
||||
|
||||
func TestResizePools(t *testing.T) {
|
||||
paramtable.Get().Init(paramtable.NewBaseTable(paramtable.SkipRemote(true)))
|
||||
pt := paramtable.Get()
|
||||
|
||||
defer func() {
|
||||
_ = pt.Reset(pt.DataNodeCfg.MaxCompactionConcurrency.Key)
|
||||
}()
|
||||
|
||||
t.Run("ExecPool", func(t *testing.T) {
|
||||
expectedCap := pt.DataNodeCfg.MaxCompactionConcurrency.GetAsInt()
|
||||
assert.Equal(t, expectedCap, GetExecPool().Cap())
|
||||
resizeExecPool(&config.Event{
|
||||
HasUpdated: true,
|
||||
})
|
||||
assert.Equal(t, expectedCap, GetExecPool().Cap())
|
||||
|
||||
_ = pt.Save(pt.DataNodeCfg.MaxCompactionConcurrency.Key, fmt.Sprintf("%d", expectedCap*2))
|
||||
expectedCap = pt.DataNodeCfg.MaxCompactionConcurrency.GetAsInt()
|
||||
resizeExecPool(&config.Event{
|
||||
HasUpdated: true,
|
||||
})
|
||||
assert.Equal(t, expectedCap, GetExecPool().Cap())
|
||||
|
||||
_ = pt.Save(pt.DataNodeCfg.MaxCompactionConcurrency.Key, "0")
|
||||
resizeExecPool(&config.Event{
|
||||
HasUpdated: true,
|
||||
})
|
||||
assert.Equal(t, expectedCap, GetExecPool().Cap(), "pool shall not be resized when newSize is 0")
|
||||
|
||||
_ = pt.Save(pt.DataNodeCfg.MaxCompactionConcurrency.Key, "invalid")
|
||||
resizeExecPool(&config.Event{
|
||||
HasUpdated: true,
|
||||
})
|
||||
assert.Equal(t, expectedCap, GetExecPool().Cap())
|
||||
})
|
||||
}
|
||||
@ -1,40 +0,0 @@
|
||||
package compactor
|
||||
|
||||
import "github.com/milvus-io/milvus/internal/storage"
|
||||
|
||||
type PQItem struct {
|
||||
Value *storage.Value
|
||||
Index int
|
||||
Pos int
|
||||
}
|
||||
|
||||
type PriorityQueue []*PQItem
|
||||
|
||||
func (pq PriorityQueue) Len() int { return len(pq) }
|
||||
|
||||
func (pq PriorityQueue) Less(i, j int) bool {
|
||||
return pq[i].Value.PK.LT(pq[j].Value.PK)
|
||||
}
|
||||
|
||||
func (pq PriorityQueue) Swap(i, j int) {
|
||||
pq[i], pq[j] = pq[j], pq[i]
|
||||
pq[i].Pos = i
|
||||
pq[j].Pos = j
|
||||
}
|
||||
|
||||
func (pq *PriorityQueue) Push(x interface{}) {
|
||||
n := len(*pq)
|
||||
item := x.(*PQItem)
|
||||
item.Pos = n
|
||||
*pq = append(*pq, item)
|
||||
}
|
||||
|
||||
func (pq *PriorityQueue) Pop() interface{} {
|
||||
old := *pq
|
||||
n := len(old)
|
||||
item := old[n-1]
|
||||
old[n-1] = nil
|
||||
item.Pos = -1
|
||||
*pq = old[0 : n-1]
|
||||
return item
|
||||
}
|
||||
@ -1,126 +0,0 @@
|
||||
package compactor
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
)
|
||||
|
||||
type PriorityQueueSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func (s *PriorityQueueSuite) PriorityQueueMergeSort() {
|
||||
slices := [][]*storage.Value{
|
||||
{
|
||||
{
|
||||
ID: 1,
|
||||
PK: &storage.Int64PrimaryKey{
|
||||
Value: 1,
|
||||
},
|
||||
Timestamp: 0,
|
||||
IsDeleted: false,
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
ID: 4,
|
||||
PK: &storage.Int64PrimaryKey{
|
||||
Value: 4,
|
||||
},
|
||||
Timestamp: 0,
|
||||
IsDeleted: false,
|
||||
Value: 4,
|
||||
},
|
||||
{
|
||||
ID: 7,
|
||||
PK: &storage.Int64PrimaryKey{
|
||||
Value: 7,
|
||||
},
|
||||
Timestamp: 0,
|
||||
IsDeleted: false,
|
||||
Value: 7,
|
||||
},
|
||||
{
|
||||
ID: 10,
|
||||
PK: &storage.Int64PrimaryKey{
|
||||
Value: 10,
|
||||
},
|
||||
Timestamp: 0,
|
||||
IsDeleted: false,
|
||||
Value: 10,
|
||||
},
|
||||
},
|
||||
{
|
||||
{
|
||||
ID: 2,
|
||||
PK: &storage.Int64PrimaryKey{
|
||||
Value: 2,
|
||||
},
|
||||
Timestamp: 0,
|
||||
IsDeleted: false,
|
||||
Value: 2,
|
||||
},
|
||||
{
|
||||
ID: 3,
|
||||
PK: &storage.Int64PrimaryKey{
|
||||
Value: 3,
|
||||
},
|
||||
Timestamp: 0,
|
||||
IsDeleted: false,
|
||||
Value: 3,
|
||||
},
|
||||
{
|
||||
ID: 5,
|
||||
PK: &storage.Int64PrimaryKey{
|
||||
Value: 5,
|
||||
},
|
||||
Timestamp: 0,
|
||||
IsDeleted: false,
|
||||
Value: 5,
|
||||
},
|
||||
{
|
||||
ID: 6,
|
||||
PK: &storage.Int64PrimaryKey{
|
||||
Value: 6,
|
||||
},
|
||||
Timestamp: 0,
|
||||
IsDeleted: false,
|
||||
Value: 6,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var result []*storage.Value
|
||||
pq := make(PriorityQueue, 0)
|
||||
heap.Init(&pq)
|
||||
|
||||
for i, s := range slices {
|
||||
if len(s) > 0 {
|
||||
heap.Push(&pq, &PQItem{
|
||||
Value: s[0],
|
||||
Index: i,
|
||||
Pos: 1,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
for pq.Len() > 0 {
|
||||
smallest := heap.Pop(&pq).(*PQItem)
|
||||
result = append(result, smallest.Value)
|
||||
if smallest.Pos+1 < len(slices[smallest.Index]) {
|
||||
next := &PQItem{
|
||||
Value: slices[smallest.Index][smallest.Pos+1],
|
||||
Index: smallest.Index,
|
||||
Pos: smallest.Pos + 1,
|
||||
}
|
||||
heap.Push(&pq, next)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewPriorityQueueSuite(t *testing.T) {
|
||||
suite.Run(t, new(PriorityQueueSuite))
|
||||
}
|
||||
@ -255,7 +255,7 @@ func (node *DataNode) CompactionV2(ctx context.Context, req *datapb.CompactionPl
|
||||
return merr.Status(merr.WrapErrParameterInvalidMsg("Unknown compaction type: %v", req.GetType().String())), nil
|
||||
}
|
||||
|
||||
succeed, err := node.compactionExecutor.Execute(task)
|
||||
succeed, err := node.compactionExecutor.Enqueue(task)
|
||||
if succeed {
|
||||
return merr.Success(), nil
|
||||
} else {
|
||||
|
||||
@ -167,7 +167,7 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() {
|
||||
PlanID: 1,
|
||||
State: datapb.CompactionTaskState_completed,
|
||||
}, nil)
|
||||
s.node.compactionExecutor.Execute(mockC)
|
||||
s.node.compactionExecutor.Enqueue(mockC)
|
||||
|
||||
mockC2 := compactor.NewMockCompactor(s.T())
|
||||
mockC2.EXPECT().GetPlanID().Return(int64(2))
|
||||
@ -179,7 +179,7 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() {
|
||||
PlanID: 2,
|
||||
State: datapb.CompactionTaskState_failed,
|
||||
}, nil)
|
||||
s.node.compactionExecutor.Execute(mockC2)
|
||||
s.node.compactionExecutor.Enqueue(mockC2)
|
||||
|
||||
s.Eventually(func() bool {
|
||||
stat, err := s.node.GetCompactionState(s.ctx, nil)
|
||||
|
||||
@ -24,7 +24,6 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/compaction"
|
||||
"github.com/milvus-io/milvus/internal/datanode/compactor"
|
||||
"github.com/milvus-io/milvus/internal/flushcommon/broker"
|
||||
"github.com/milvus-io/milvus/internal/flushcommon/io"
|
||||
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
|
||||
@ -64,8 +63,7 @@ type DataSyncService struct {
|
||||
broker broker.Broker
|
||||
syncMgr syncmgr.SyncManager
|
||||
|
||||
timetickSender util.StatsUpdater // reference to TimeTickSender
|
||||
compactor compactor.Executor // reference to compaction executor
|
||||
timetickSender util.StatsUpdater // reference to TimeTickSender
|
||||
|
||||
dispClient msgdispatcher.Client
|
||||
chunkManager storage.ChunkManager
|
||||
@ -260,7 +258,6 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams,
|
||||
serverID: config.serverID,
|
||||
|
||||
chunkManager: params.ChunkManager,
|
||||
compactor: params.CompactionExecutor,
|
||||
timetickSender: params.TimeTickSender,
|
||||
syncMgr: params.SyncMgr,
|
||||
|
||||
@ -281,7 +278,6 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams,
|
||||
info.GetVchan().GetDroppedSegmentIds(),
|
||||
flushed,
|
||||
unflushed,
|
||||
params.CompactionExecutor,
|
||||
params.MsgHandler,
|
||||
)
|
||||
nodeList = append(nodeList, ddNode)
|
||||
|
||||
@ -28,10 +28,8 @@ import (
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus/internal/datanode/compactor"
|
||||
"github.com/milvus-io/milvus/internal/flushcommon/util"
|
||||
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
||||
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||
"github.com/milvus-io/milvus/pkg/v2/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
|
||||
@ -68,9 +66,8 @@ type ddNode struct {
|
||||
collectionID typeutil.UniqueID
|
||||
vChannelName string
|
||||
|
||||
dropMode atomic.Value
|
||||
compactionExecutor compactor.Executor
|
||||
msgHandler util.MsgHandler
|
||||
dropMode atomic.Value
|
||||
msgHandler util.MsgHandler
|
||||
|
||||
// for recovery
|
||||
growingSegInfo map[typeutil.UniqueID]*datapb.SegmentInfo // segmentID
|
||||
@ -154,11 +151,6 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
|
||||
if msg.(*msgstream.DropCollectionMsg).GetCollectionID() == ddn.collectionID {
|
||||
log.Info("Receiving DropCollection msg", zap.String("channel", ddn.vChannelName))
|
||||
ddn.dropMode.Store(true)
|
||||
|
||||
log.Info("Stop compaction for dropped channel", zap.String("channel", ddn.vChannelName))
|
||||
if !streamingutil.IsStreamingServiceEnabled() {
|
||||
ddn.compactionExecutor.DiscardByDroppedChannel(ddn.vChannelName)
|
||||
}
|
||||
fgMsg.dropCollection = true
|
||||
}
|
||||
|
||||
@ -347,22 +339,21 @@ func (ddn *ddNode) Close() {
|
||||
}
|
||||
|
||||
func newDDNode(ctx context.Context, collID typeutil.UniqueID, vChannelName string, droppedSegmentIDs []typeutil.UniqueID,
|
||||
sealedSegments []*datapb.SegmentInfo, growingSegments []*datapb.SegmentInfo, executor compactor.Executor, handler util.MsgHandler,
|
||||
sealedSegments []*datapb.SegmentInfo, growingSegments []*datapb.SegmentInfo, handler util.MsgHandler,
|
||||
) *ddNode {
|
||||
baseNode := BaseNode{}
|
||||
baseNode.SetMaxQueueLength(paramtable.Get().DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32())
|
||||
baseNode.SetMaxParallelism(paramtable.Get().DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32())
|
||||
|
||||
dd := &ddNode{
|
||||
ctx: ctx,
|
||||
BaseNode: baseNode,
|
||||
collectionID: collID,
|
||||
sealedSegInfo: make(map[typeutil.UniqueID]*datapb.SegmentInfo, len(sealedSegments)),
|
||||
growingSegInfo: make(map[typeutil.UniqueID]*datapb.SegmentInfo, len(growingSegments)),
|
||||
droppedSegmentIDs: droppedSegmentIDs,
|
||||
vChannelName: vChannelName,
|
||||
compactionExecutor: executor,
|
||||
msgHandler: handler,
|
||||
ctx: ctx,
|
||||
BaseNode: baseNode,
|
||||
collectionID: collID,
|
||||
sealedSegInfo: make(map[typeutil.UniqueID]*datapb.SegmentInfo, len(sealedSegments)),
|
||||
growingSegInfo: make(map[typeutil.UniqueID]*datapb.SegmentInfo, len(growingSegments)),
|
||||
droppedSegmentIDs: droppedSegmentIDs,
|
||||
vChannelName: vChannelName,
|
||||
msgHandler: handler,
|
||||
}
|
||||
|
||||
dd.dropMode.Store(false)
|
||||
|
||||
@ -27,7 +27,6 @@ import (
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus/internal/datanode/compactor"
|
||||
"github.com/milvus-io/milvus/internal/mocks/flushcommon/mock_util"
|
||||
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
||||
"github.com/milvus-io/milvus/pkg/v2/mocks/streaming/util/mock_message"
|
||||
@ -83,7 +82,6 @@ func TestFlowGraph_DDNode_newDDNode(t *testing.T) {
|
||||
droppedSegIDs,
|
||||
test.inSealedSegs,
|
||||
test.inGrowingSegs,
|
||||
compactor.NewExecutor(),
|
||||
nil,
|
||||
)
|
||||
require.NotNil(t, ddNode)
|
||||
@ -195,10 +193,9 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.description, func(t *testing.T) {
|
||||
ddn := ddNode{
|
||||
ctx: context.Background(),
|
||||
collectionID: test.ddnCollID,
|
||||
vChannelName: "ddn_drop_msg",
|
||||
compactionExecutor: compactor.NewExecutor(),
|
||||
ctx: context.Background(),
|
||||
collectionID: test.ddnCollID,
|
||||
vChannelName: "ddn_drop_msg",
|
||||
}
|
||||
|
||||
var dropCollMsg msgstream.TsMsg = &msgstream.DropCollectionMsg{
|
||||
@ -248,10 +245,9 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.description, func(t *testing.T) {
|
||||
ddn := ddNode{
|
||||
ctx: context.Background(),
|
||||
collectionID: test.ddnCollID,
|
||||
vChannelName: "ddn_drop_msg",
|
||||
compactionExecutor: compactor.NewExecutor(),
|
||||
ctx: context.Background(),
|
||||
collectionID: test.ddnCollID,
|
||||
vChannelName: "ddn_drop_msg",
|
||||
}
|
||||
|
||||
var dropPartMsg msgstream.TsMsg = &msgstream.DropPartitionMsg{
|
||||
|
||||
@ -23,7 +23,6 @@ import (
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/allocator"
|
||||
compactor "github.com/milvus-io/milvus/internal/datanode/compactor"
|
||||
"github.com/milvus-io/milvus/internal/flushcommon/broker"
|
||||
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
|
||||
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
|
||||
@ -40,8 +39,7 @@ type PipelineParams struct {
|
||||
Ctx context.Context
|
||||
Broker broker.Broker
|
||||
SyncMgr syncmgr.SyncManager
|
||||
TimeTickSender StatsUpdater // reference to TimeTickSender
|
||||
CompactionExecutor compactor.Executor // reference to compaction executor
|
||||
TimeTickSender StatsUpdater // reference to TimeTickSender
|
||||
MsgStreamFactory dependency.Factory
|
||||
DispClient msgdispatcher.Client
|
||||
ChunkManager storage.ChunkManager
|
||||
|
||||
@ -5664,6 +5664,7 @@ type dataNodeConfig struct {
|
||||
L0CompactionMaxBatchSize ParamItem `refreshable:"true"`
|
||||
UseMergeSort ParamItem `refreshable:"true"`
|
||||
MaxSegmentMergeSort ParamItem `refreshable:"true"`
|
||||
MaxCompactionConcurrency ParamItem `refreshable:"true"`
|
||||
|
||||
GracefulStopTimeout ParamItem `refreshable:"true"`
|
||||
|
||||
@ -6046,6 +6047,15 @@ if this parameter <= 0, will set it as 10`,
|
||||
}
|
||||
p.MaxSegmentMergeSort.Init(base.mgr)
|
||||
|
||||
p.MaxCompactionConcurrency = ParamItem{
|
||||
Key: "dataNode.compaction.maxConcurrency",
|
||||
Version: "2.6.0",
|
||||
Doc: "The maximum number of compaction tasks that can run concurrently on a datanode",
|
||||
DefaultValue: "10",
|
||||
Export: false,
|
||||
}
|
||||
p.MaxCompactionConcurrency.Init(base.mgr)
|
||||
|
||||
p.GracefulStopTimeout = ParamItem{
|
||||
Key: "dataNode.gracefulStopTimeout",
|
||||
Version: "2.3.7",
|
||||
|
||||
@ -638,6 +638,9 @@ func TestComponentParam(t *testing.T) {
|
||||
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
|
||||
assert.Equal(t, 16, Params.SlotCap.GetAsInt())
|
||||
|
||||
// compaction
|
||||
assert.Equal(t, 10, Params.MaxCompactionConcurrency.GetAsInt())
|
||||
|
||||
// clustering compaction
|
||||
params.Save("datanode.clusteringCompaction.memoryBufferRatio", "0.1")
|
||||
assert.Equal(t, 0.1, Params.ClusteringCompactionMemoryBufferRatio.GetAsFloat())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user