mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
Make compaction rpc timeout and parallel maxium configurable (#25654)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
4140ec0916
commit
f939bd0d2c
@ -331,6 +331,8 @@ dataCoord:
|
||||
|
||||
compaction:
|
||||
enableAutoCompaction: true
|
||||
rpcTimeout: 10 # compaction rpc request timeout in seconds
|
||||
maxParallelTaskNum: 100 # max parallel compaction task number
|
||||
|
||||
gc:
|
||||
interval: 3600 # gc interval in seconds
|
||||
|
||||
@ -34,9 +34,7 @@ import (
|
||||
// TODO this num should be determined by resources of datanode, for now, we set to a fixed value for simple
|
||||
// TODO we should split compaction into different priorities, small compaction helps to merge segment, large compaction helps to handle delta and expiration of large segments
|
||||
const (
|
||||
maxParallelCompactionTaskNum = 100
|
||||
rpcCompactionTimeout = 10 * time.Second
|
||||
tsTimeout = uint64(1)
|
||||
tsTimeout = uint64(1)
|
||||
)
|
||||
|
||||
type compactionPlanContext interface {
|
||||
@ -374,7 +372,7 @@ func (c *compactionPlanHandler) isFull() bool {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
return c.executingTaskNum >= maxParallelCompactionTaskNum
|
||||
return c.executingTaskNum >= Params.DataCoordCfg.CompactionMaxParallelTasks
|
||||
}
|
||||
|
||||
func (c *compactionPlanHandler) getExecutingCompactions() []*compactionTask {
|
||||
|
||||
@ -144,7 +144,7 @@ func (c *SessionManager) execFlush(ctx context.Context, nodeID int64, req *datap
|
||||
|
||||
// Compaction is a grpc interface. It will send request to DataNode with provided `nodeID` synchronously.
|
||||
func (c *SessionManager) Compaction(nodeID int64, plan *datapb.CompactionPlan) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), rpcCompactionTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(Params.DataCoordCfg.CompactionRPCTimeout)*time.Second)
|
||||
defer cancel()
|
||||
cli, err := c.getClient(ctx, nodeID)
|
||||
if err != nil {
|
||||
@ -168,7 +168,7 @@ func (c *SessionManager) SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequ
|
||||
zap.Int64("nodeID", nodeID),
|
||||
zap.Int64("planID", req.GetPlanID()),
|
||||
)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), rpcCompactionTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(Params.DataCoordCfg.CompactionRPCTimeout)*time.Second)
|
||||
defer cancel()
|
||||
cli, err := c.getClient(ctx, nodeID)
|
||||
if err != nil {
|
||||
@ -178,7 +178,7 @@ func (c *SessionManager) SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequ
|
||||
|
||||
err = retry.Do(context.Background(), func() error {
|
||||
// reset timeout for each sync segments
|
||||
ctx, cancel := context.WithTimeout(context.Background(), rpcCompactionTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(Params.DataCoordCfg.CompactionRPCTimeout)*time.Second)
|
||||
defer cancel()
|
||||
|
||||
resp, err := cli.SyncSegments(ctx, req)
|
||||
@ -261,7 +261,7 @@ func (c *SessionManager) GetCompactionState() map[int64]*datapb.CompactionStateR
|
||||
log.Info("Cannot Create Client", zap.Int64("NodeID", nodeID))
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(ctx, rpcCompactionTimeout)
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Duration(Params.DataCoordCfg.CompactionRPCTimeout)*time.Second)
|
||||
defer cancel()
|
||||
resp, err := cli.GetCompactionState(ctx, &datapb.CompactionStateRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
|
||||
@ -1454,6 +1454,8 @@ type dataCoordConfig struct {
|
||||
EnableCompaction bool
|
||||
EnableAutoCompaction atomic.Value
|
||||
|
||||
CompactionRPCTimeout int64
|
||||
CompactionMaxParallelTasks int
|
||||
MinSegmentToMerge int
|
||||
MaxSegmentToMerge int
|
||||
SegmentSmallProportion float64
|
||||
@ -1496,6 +1498,8 @@ func (p *dataCoordConfig) init(base *BaseTable) {
|
||||
p.initEnableCompaction()
|
||||
p.initEnableAutoCompaction()
|
||||
|
||||
p.initCompactionRPCTimeout()
|
||||
p.initCompactionMaxParalellTask()
|
||||
p.initCompactionMinSegment()
|
||||
p.initCompactionMaxSegment()
|
||||
p.initSegmentProportion()
|
||||
@ -1581,6 +1585,14 @@ func (p *dataCoordConfig) initEnableCompaction() {
|
||||
p.EnableCompaction = p.Base.ParseBool("dataCoord.enableCompaction", false)
|
||||
}
|
||||
|
||||
func (p *dataCoordConfig) initCompactionRPCTimeout() {
|
||||
p.CompactionRPCTimeout = p.Base.ParseInt64WithDefault("dataCoord.compaction.rpcTimeout", 10)
|
||||
}
|
||||
|
||||
func (p *dataCoordConfig) initCompactionMaxParalellTask() {
|
||||
p.CompactionMaxParallelTasks = p.Base.ParseIntWithDefault("dataCoord.compaction.maxParallelTaskNum", 100)
|
||||
}
|
||||
|
||||
func (p *dataCoordConfig) initEnableAutoCompaction() {
|
||||
p.EnableAutoCompaction.Store(p.Base.ParseBool("dataCoord.compaction.enableAutoCompaction", true))
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user