mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 10:08:42 +08:00
Make compaction rpc timeout and parallel maxium configurable (#25672)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
73512c72fd
commit
8d343bf75a
@ -332,6 +332,8 @@ dataCoord:
|
|||||||
enableCompaction: true # Enable data segment compaction
|
enableCompaction: true # Enable data segment compaction
|
||||||
compaction:
|
compaction:
|
||||||
enableAutoCompaction: true
|
enableAutoCompaction: true
|
||||||
|
rpcTimeout: 10 # compaction rpc request timeout in seconds
|
||||||
|
maxParallelTaskNum: 100 # max parallel compaction task number
|
||||||
enableGarbageCollection: true
|
enableGarbageCollection: true
|
||||||
gc:
|
gc:
|
||||||
interval: 3600 # gc interval in seconds
|
interval: 3600 # gc interval in seconds
|
||||||
|
|||||||
@ -35,9 +35,7 @@ import (
|
|||||||
// TODO this num should be determined by resources of datanode, for now, we set to a fixed value for simple
|
// TODO this num should be determined by resources of datanode, for now, we set to a fixed value for simple
|
||||||
// TODO we should split compaction into different priorities, small compaction helps to merge segment, large compaction helps to handle delta and expiration of large segments
|
// TODO we should split compaction into different priorities, small compaction helps to merge segment, large compaction helps to handle delta and expiration of large segments
|
||||||
const (
|
const (
|
||||||
maxParallelCompactionTaskNum = 100
|
tsTimeout = uint64(1)
|
||||||
rpcCompactionTimeout = 10 * time.Second
|
|
||||||
tsTimeout = uint64(1)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type compactionPlanContext interface {
|
type compactionPlanContext interface {
|
||||||
@ -379,7 +377,7 @@ func (c *compactionPlanHandler) isFull() bool {
|
|||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
defer c.mu.RUnlock()
|
defer c.mu.RUnlock()
|
||||||
|
|
||||||
return c.executingTaskNum >= maxParallelCompactionTaskNum
|
return c.executingTaskNum >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactionPlanHandler) getExecutingCompactions() []*compactionTask {
|
func (c *compactionPlanHandler) getExecutingCompactions() []*compactionTask {
|
||||||
|
|||||||
@ -145,7 +145,7 @@ func (c *SessionManager) execFlush(ctx context.Context, nodeID int64, req *datap
|
|||||||
|
|
||||||
// Compaction is a grpc interface. It will send request to DataNode with provided `nodeID` synchronously.
|
// Compaction is a grpc interface. It will send request to DataNode with provided `nodeID` synchronously.
|
||||||
func (c *SessionManager) Compaction(nodeID int64, plan *datapb.CompactionPlan) error {
|
func (c *SessionManager) Compaction(nodeID int64, plan *datapb.CompactionPlan) error {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), rpcCompactionTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second))
|
||||||
defer cancel()
|
defer cancel()
|
||||||
cli, err := c.getClient(ctx, nodeID)
|
cli, err := c.getClient(ctx, nodeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -169,7 +169,7 @@ func (c *SessionManager) SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequ
|
|||||||
zap.Int64("nodeID", nodeID),
|
zap.Int64("nodeID", nodeID),
|
||||||
zap.Int64("planID", req.GetPlanID()),
|
zap.Int64("planID", req.GetPlanID()),
|
||||||
)
|
)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), rpcCompactionTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second))
|
||||||
defer cancel()
|
defer cancel()
|
||||||
cli, err := c.getClient(ctx, nodeID)
|
cli, err := c.getClient(ctx, nodeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -178,6 +178,9 @@ func (c *SessionManager) SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequ
|
|||||||
}
|
}
|
||||||
|
|
||||||
err = retry.Do(context.Background(), func() error {
|
err = retry.Do(context.Background(), func() error {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second))
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
resp, err := cli.SyncSegments(ctx, req)
|
resp, err := cli.SyncSegments(ctx, req)
|
||||||
if err := VerifyResponse(resp, err); err != nil {
|
if err := VerifyResponse(resp, err); err != nil {
|
||||||
log.Warn("failed to sync segments", zap.Error(err))
|
log.Warn("failed to sync segments", zap.Error(err))
|
||||||
@ -259,7 +262,7 @@ func (c *SessionManager) GetCompactionState() map[int64]*datapb.CompactionStateR
|
|||||||
log.Info("Cannot Create Client", zap.Int64("NodeID", nodeID))
|
log.Info("Cannot Create Client", zap.Int64("NodeID", nodeID))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ctx, cancel := context.WithTimeout(ctx, rpcCompactionTimeout)
|
ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second))
|
||||||
defer cancel()
|
defer cancel()
|
||||||
resp, err := cli.GetCompactionState(ctx, &datapb.CompactionStateRequest{
|
resp, err := cli.GetCompactionState(ctx, &datapb.CompactionStateRequest{
|
||||||
Base: commonpbutil.NewMsgBase(
|
Base: commonpbutil.NewMsgBase(
|
||||||
|
|||||||
@ -1874,6 +1874,8 @@ type dataCoordConfig struct {
|
|||||||
EnableCompaction ParamItem `refreshable:"false"`
|
EnableCompaction ParamItem `refreshable:"false"`
|
||||||
EnableAutoCompaction ParamItem `refreshable:"true"`
|
EnableAutoCompaction ParamItem `refreshable:"true"`
|
||||||
|
|
||||||
|
CompactionRPCTimeout ParamItem `refreshable:"true"`
|
||||||
|
CompactionMaxParallelTasks ParamItem `refreshable:"true"`
|
||||||
MinSegmentToMerge ParamItem `refreshable:"true"`
|
MinSegmentToMerge ParamItem `refreshable:"true"`
|
||||||
MaxSegmentToMerge ParamItem `refreshable:"true"`
|
MaxSegmentToMerge ParamItem `refreshable:"true"`
|
||||||
SegmentSmallProportion ParamItem `refreshable:"true"`
|
SegmentSmallProportion ParamItem `refreshable:"true"`
|
||||||
@ -2022,6 +2024,22 @@ the number of binlog file reaches to max value.`,
|
|||||||
}
|
}
|
||||||
p.EnableAutoCompaction.Init(base.mgr)
|
p.EnableAutoCompaction.Init(base.mgr)
|
||||||
|
|
||||||
|
p.CompactionRPCTimeout = ParamItem{
|
||||||
|
Key: "dataCoord.compaction.rpcTimeout",
|
||||||
|
Version: "2.2.12",
|
||||||
|
DefaultValue: "10",
|
||||||
|
Export: true,
|
||||||
|
}
|
||||||
|
p.CompactionRPCTimeout.Init(base.mgr)
|
||||||
|
|
||||||
|
p.CompactionMaxParallelTasks = ParamItem{
|
||||||
|
Key: "dataCoord.compaction.maxParallelTaskNum",
|
||||||
|
Version: "2.2.12",
|
||||||
|
DefaultValue: "100",
|
||||||
|
Export: true,
|
||||||
|
}
|
||||||
|
p.CompactionMaxParallelTasks.Init(base.mgr)
|
||||||
|
|
||||||
p.MinSegmentToMerge = ParamItem{
|
p.MinSegmentToMerge = ParamItem{
|
||||||
Key: "dataCoord.compaction.min.segment",
|
Key: "dataCoord.compaction.min.segment",
|
||||||
Version: "2.0.0",
|
Version: "2.0.0",
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user