feat: Add force merge (#45556)

See also: #46043

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2025-12-19 18:03:18 +08:00 committed by GitHub
parent ab9bec0a6d
commit 0507db2015
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 1898 additions and 85 deletions

View File

@ -520,14 +520,14 @@ queryNode:
readAheadPolicy: willneed # The read ahead policy of chunk cache, options: `normal, random, sequential, willneed, dontneed`
mmap:
vectorField: true # Enable mmap for loading vector data
vectorIndex: false # Enable mmap for loading vector index
scalarField: false # Enable mmap for loading scalar data
scalarIndex: false # Enable mmap for loading scalar index
vectorIndex: true # Enable mmap for loading vector index
scalarField: true # Enable mmap for loading scalar data
scalarIndex: true # Enable mmap for loading scalar index
jsonShredding: true # Enable mmap for loading json stats
# Enable memory mapping (mmap) to optimize the handling of growing raw data.
# By activating this feature, the memory overhead associated with newly added or modified data will be significantly minimized.
# However, this optimization may come at the cost of a slight decrease in query latency for the affected data segments.
growingMmapEnabled: false
growingMmapEnabled: true
fixedFileSizeForMmapAlloc: 1 # tmp file size for mmap chunk manager
maxDiskUsagePercentageForMmapAlloc: 50 # disk percentage used in mmap chunk manager
lazyload:

View File

@ -33,6 +33,7 @@ packages:
SubCluster:
StatsJobManager:
ImportMeta:
CollectionTopologyQuerier:
github.com/milvus-io/milvus/internal/datacoord/allocator:
interfaces:
Allocator:

View File

@ -0,0 +1,274 @@
package datacoord
import (
"context"
"fmt"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
const (
// Fallback memory for pooling DataNode (returns 0 from GetMetrics)
defaultPoolingDataNodeMemory = 32 * 1024 * 1024 * 1024 // 32GB
)
// CollectionTopology captures memory constraints for a collection
type CollectionTopology struct {
CollectionID int64
NumReplicas int
IsStandaloneMode bool
IsPooling bool
QueryNodeMemory map[int64]uint64
DataNodeMemory map[int64]uint64
}
// CollectionTopologyQuerier queries collection topology including replicas and memory info
type CollectionTopologyQuerier interface {
GetCollectionTopology(ctx context.Context, collectionID int64) (*CollectionTopology, error)
}
type forceMergeCompactionPolicy struct {
meta *meta
allocator allocator.Allocator
handler Handler
topologyQuerier CollectionTopologyQuerier
}
func newForceMergeCompactionPolicy(meta *meta, allocator allocator.Allocator, handler Handler) *forceMergeCompactionPolicy {
return &forceMergeCompactionPolicy{
meta: meta,
allocator: allocator,
handler: handler,
topologyQuerier: nil,
}
}
func (policy *forceMergeCompactionPolicy) SetTopologyQuerier(querier CollectionTopologyQuerier) {
policy.topologyQuerier = querier
}
func (policy *forceMergeCompactionPolicy) triggerOneCollection(
ctx context.Context,
collectionID int64,
targetSize int64,
) ([]CompactionView, int64, error) {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", collectionID),
zap.Int64("targetSize", targetSize))
collection, err := policy.handler.GetCollection(ctx, collectionID)
if err != nil {
return nil, 0, err
}
triggerID, err := policy.allocator.AllocID(ctx)
if err != nil {
return nil, 0, err
}
collectionTTL, err := getCollectionTTL(collection.Properties)
if err != nil {
log.Warn("failed to get collection ttl, use default", zap.Error(err))
collectionTTL = 0
}
configMaxSize := getExpectedSegmentSize(policy.meta, collectionID, collection.Schema)
segments := policy.meta.SelectSegments(ctx, WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlushed(segment) &&
!segment.isCompacting &&
!segment.GetIsImporting() &&
segment.GetLevel() != datapb.SegmentLevel_L0
}))
if len(segments) == 0 {
log.Info("no eligible segments for force merge")
return nil, 0, nil
}
topology, err := policy.topologyQuerier.GetCollectionTopology(ctx, collectionID)
if err != nil {
return nil, 0, err
}
views := []CompactionView{}
for label, groups := range groupByPartitionChannel(GetViewsByInfo(segments...)) {
view := &ForceMergeSegmentView{
label: label,
segments: groups,
triggerID: triggerID,
collectionTTL: collectionTTL,
configMaxSize: float64(configMaxSize),
topology: topology,
}
views = append(views, view)
}
return views, triggerID, nil
log.Info("force merge triggered", zap.Int("viewCount", len(views)))
return views, triggerID, nil
}
func groupByPartitionChannel(segments []*SegmentView) map[*CompactionGroupLabel][]*SegmentView {
result := make(map[*CompactionGroupLabel][]*SegmentView)
for _, seg := range segments {
label := seg.label
key := label.Key()
var foundLabel *CompactionGroupLabel
for l := range result {
if l.Key() == key {
foundLabel = l
break
}
}
if foundLabel == nil {
foundLabel = label
}
result[foundLabel] = append(result[foundLabel], seg)
}
return result
}
type metricsNodeMemoryQuerier struct {
nodeManager session.NodeManager
mixCoord types.MixCoord
session sessionutil.SessionInterface
}
func newMetricsNodeMemoryQuerier(nodeManager session.NodeManager, mixCoord types.MixCoord, session sessionutil.SessionInterface) *metricsNodeMemoryQuerier {
return &metricsNodeMemoryQuerier{
nodeManager: nodeManager,
mixCoord: mixCoord,
session: session,
}
}
var _ CollectionTopologyQuerier = (*metricsNodeMemoryQuerier)(nil)
func (q *metricsNodeMemoryQuerier) GetCollectionTopology(ctx context.Context, collectionID int64) (*CollectionTopology, error) {
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID))
if q.mixCoord == nil {
return nil, fmt.Errorf("mixCoord not available for topology query")
}
// 1. Get replica information
replicasResp, err := q.mixCoord.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
CollectionID: collectionID,
})
if err != nil {
return nil, err
}
numReplicas := len(replicasResp.GetReplicas())
// 2. Get QueryNode metrics for memory info
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
if err != nil {
return nil, err
}
// Get QueryNode sessions from etcd to filter out embedded nodes
sessions, _, err := q.session.GetSessions(ctx, typeutil.QueryNodeRole)
if err != nil {
log.Warn("failed to get QueryNode sessions", zap.Error(err))
return nil, err
}
// Build set of embedded QueryNode IDs to exclude
embeddedNodeIDs := make(map[int64]struct{})
for _, sess := range sessions {
// Check if this is an embedded QueryNode in streaming node
if labels := sess.ServerLabels; labels != nil {
if labels[sessionutil.LabelStreamingNodeEmbeddedQueryNode] == "1" {
embeddedNodeIDs[sess.ServerID] = struct{}{}
}
}
}
log.Info("excluding embedded QueryNode", zap.Int64s("nodeIDs", lo.Keys(embeddedNodeIDs)))
rsp, err := q.mixCoord.GetQcMetrics(ctx, req)
if err = merr.CheckRPCCall(rsp, err); err != nil {
return nil, err
}
topology := &metricsinfo.QueryCoordTopology{}
if err := metricsinfo.UnmarshalTopology(rsp.GetResponse(), topology); err != nil {
return nil, err
}
// Build QueryNode memory map: nodeID → memory size (exclude embedded nodes)
queryNodeMemory := make(map[int64]uint64)
for _, node := range topology.Cluster.ConnectedNodes {
if _, ok := embeddedNodeIDs[node.ID]; ok {
continue
}
queryNodeMemory[node.ID] = node.HardwareInfos.Memory
}
// 3. Get DataNode memory info
dataNodeMemory := make(map[int64]uint64)
isPooling := false
nodes := q.nodeManager.GetClientIDs()
for _, nodeID := range nodes {
cli, err := q.nodeManager.GetClient(nodeID)
if err != nil {
continue
}
resp, err := cli.GetMetrics(ctx, req)
if err != nil {
continue
}
var infos metricsinfo.DataNodeInfos
if err := metricsinfo.UnmarshalComponentInfos(resp.GetResponse(), &infos); err != nil {
continue
}
if infos.HardwareInfos.Memory > 0 {
dataNodeMemory[nodeID] = infos.HardwareInfos.Memory
} else {
// Pooling DataNode returns 0 from GetMetrics
// Use default fallback: 32GB
isPooling = true
log.Warn("DataNode returned 0 memory (pooling mode?), using default",
zap.Int64("nodeID", nodeID),
zap.Uint64("defaultMemory", defaultPoolingDataNodeMemory))
dataNodeMemory[nodeID] = defaultPoolingDataNodeMemory
}
}
isStandaloneMode := paramtable.GetRole() == typeutil.StandaloneRole
log.Info("Collection topology",
zap.Int64("collectionID", collectionID),
zap.Int("numReplicas", numReplicas),
zap.Any("querynodes", queryNodeMemory),
zap.Any("datanodes", dataNodeMemory),
zap.Bool("isStandaloneMode", isStandaloneMode),
zap.Bool("isPooling", isPooling))
return &CollectionTopology{
CollectionID: collectionID,
NumReplicas: numReplicas,
QueryNodeMemory: queryNodeMemory,
DataNodeMemory: dataNodeMemory,
IsStandaloneMode: isStandaloneMode,
IsPooling: isPooling,
}, nil
}

View File

@ -0,0 +1,320 @@
package datacoord
import (
"context"
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)
func TestForceMergeCompactionPolicySuite(t *testing.T) {
suite.Run(t, new(ForceMergeCompactionPolicySuite))
}
type ForceMergeCompactionPolicySuite struct {
suite.Suite
mockAlloc *allocator.MockAllocator
mockHandler *NMockHandler
mockQuerier *MockCollectionTopologyQuerier
testLabel *CompactionGroupLabel
policy *forceMergeCompactionPolicy
}
func (s *ForceMergeCompactionPolicySuite) SetupTest() {
s.testLabel = &CompactionGroupLabel{
CollectionID: 1,
PartitionID: 10,
Channel: "ch-1",
}
segments := genSegmentsForMeta(s.testLabel)
meta, err := newMemoryMeta(s.T())
s.Require().NoError(err)
for id, segment := range segments {
meta.segments.SetSegment(id, segment)
}
s.mockAlloc = allocator.NewMockAllocator(s.T())
s.mockHandler = NewNMockHandler(s.T())
s.mockQuerier = NewMockCollectionTopologyQuerier(s.T())
s.policy = newForceMergeCompactionPolicy(meta, s.mockAlloc, s.mockHandler)
s.policy.SetTopologyQuerier(s.mockQuerier)
}
func (s *ForceMergeCompactionPolicySuite) TestNewForceMergeCompactionPolicy() {
meta, err := newMemoryMeta(s.T())
s.Require().NoError(err)
policy := newForceMergeCompactionPolicy(meta, s.mockAlloc, s.mockHandler)
s.NotNil(policy)
s.Equal(meta, policy.meta)
s.Equal(s.mockAlloc, policy.allocator)
s.Equal(s.mockHandler, policy.handler)
s.Nil(policy.topologyQuerier)
}
func (s *ForceMergeCompactionPolicySuite) TestSetTopologyQuerier() {
policy := newForceMergeCompactionPolicy(s.policy.meta, s.mockAlloc, s.mockHandler)
s.Nil(policy.topologyQuerier)
policy.SetTopologyQuerier(s.mockQuerier)
s.Equal(s.mockQuerier, policy.topologyQuerier)
}
func (s *ForceMergeCompactionPolicySuite) TestTriggerOneCollection_Success() {
ctx := context.Background()
collectionID := int64(1)
targetSize := int64(1024 * 1024 * 512)
triggerID := int64(100)
coll := &collectionInfo{
ID: collectionID,
Schema: newTestSchema(),
Properties: nil,
}
topology := &CollectionTopology{
CollectionID: collectionID,
NumReplicas: 1,
}
s.mockHandler.EXPECT().GetCollection(mock.Anything, collectionID).Return(coll, nil)
s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(triggerID, nil)
s.mockQuerier.EXPECT().GetCollectionTopology(mock.Anything, collectionID).Return(topology, nil)
views, gotTriggerID, err := s.policy.triggerOneCollection(ctx, collectionID, targetSize)
s.NoError(err)
s.Equal(triggerID, gotTriggerID)
s.NotNil(views)
s.Greater(len(views), 0)
for _, view := range views {
s.NotNil(view)
s.NotNil(view.GetGroupLabel())
}
}
func (s *ForceMergeCompactionPolicySuite) TestTriggerOneCollection_GetCollectionError() {
ctx := context.Background()
collectionID := int64(999)
targetSize := int64(1024 * 1024 * 512)
s.mockHandler.EXPECT().GetCollection(mock.Anything, collectionID).Return(nil, merr.ErrCollectionNotFound)
views, triggerID, err := s.policy.triggerOneCollection(ctx, collectionID, targetSize)
s.Error(err)
s.Nil(views)
s.Equal(int64(0), triggerID)
}
func (s *ForceMergeCompactionPolicySuite) TestTriggerOneCollection_AllocIDError() {
ctx := context.Background()
collectionID := int64(1)
targetSize := int64(1024 * 1024 * 512)
coll := &collectionInfo{
ID: collectionID,
Schema: newTestSchema(),
Properties: nil,
}
s.mockHandler.EXPECT().GetCollection(mock.Anything, collectionID).Return(coll, nil)
s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(int64(0), merr.ErrServiceUnavailable)
views, triggerID, err := s.policy.triggerOneCollection(ctx, collectionID, targetSize)
s.Error(err)
s.Nil(views)
s.Equal(int64(0), triggerID)
}
func (s *ForceMergeCompactionPolicySuite) TestTriggerOneCollection_NoEligibleSegments() {
ctx := context.Background()
collectionID := int64(999)
targetSize := int64(1024 * 1024 * 512)
triggerID := int64(100)
coll := &collectionInfo{
ID: collectionID,
Schema: newTestSchema(),
Properties: nil,
}
s.mockHandler.EXPECT().GetCollection(mock.Anything, collectionID).Return(coll, nil)
s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(triggerID, nil)
views, gotTriggerID, err := s.policy.triggerOneCollection(ctx, collectionID, targetSize)
s.NoError(err)
s.Equal(int64(0), gotTriggerID)
s.Nil(views)
}
func (s *ForceMergeCompactionPolicySuite) TestTriggerOneCollection_TopologyError() {
ctx := context.Background()
collectionID := int64(1)
targetSize := int64(1024 * 1024 * 512)
triggerID := int64(100)
coll := &collectionInfo{
ID: collectionID,
Schema: newTestSchema(),
Properties: nil,
}
s.mockHandler.EXPECT().GetCollection(mock.Anything, collectionID).Return(coll, nil)
s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(triggerID, nil)
s.mockQuerier.EXPECT().GetCollectionTopology(mock.Anything, collectionID).Return(nil, merr.ErrServiceUnavailable)
views, gotTriggerID, err := s.policy.triggerOneCollection(ctx, collectionID, targetSize)
s.Error(err)
s.Nil(views)
s.Equal(int64(0), gotTriggerID)
}
func (s *ForceMergeCompactionPolicySuite) TestTriggerOneCollection_WithCollectionProperties() {
ctx := context.Background()
collectionID := int64(1)
targetSize := int64(1024 * 1024 * 512)
triggerID := int64(100)
coll := &collectionInfo{
ID: collectionID,
Schema: newTestSchema(),
Properties: map[string]string{
"collection.ttl.seconds": "86400",
},
}
topology := &CollectionTopology{
CollectionID: collectionID,
NumReplicas: 2,
}
s.mockHandler.EXPECT().GetCollection(mock.Anything, collectionID).Return(coll, nil)
s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(triggerID, nil)
s.mockQuerier.EXPECT().GetCollectionTopology(mock.Anything, collectionID).Return(topology, nil)
views, gotTriggerID, err := s.policy.triggerOneCollection(ctx, collectionID, targetSize)
s.NoError(err)
s.Equal(triggerID, gotTriggerID)
s.NotNil(views)
}
func (s *ForceMergeCompactionPolicySuite) TestGroupByPartitionChannel_EmptySegments() {
segments := []*SegmentView{}
result := groupByPartitionChannel(segments)
s.NotNil(result)
s.Equal(0, len(result))
}
func (s *ForceMergeCompactionPolicySuite) TestGroupByPartitionChannel_SingleGroup() {
segmentInfo := genTestSegmentInfo(s.testLabel, 100, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed)
segments := GetViewsByInfo(segmentInfo)
result := groupByPartitionChannel(segments)
s.NotNil(result)
s.Equal(1, len(result))
for label, segs := range result {
s.Equal(s.testLabel.CollectionID, label.CollectionID)
s.Equal(s.testLabel.PartitionID, label.PartitionID)
s.Equal(s.testLabel.Channel, label.Channel)
s.Equal(1, len(segs))
}
}
func (s *ForceMergeCompactionPolicySuite) TestGroupByPartitionChannel_MultipleGroups() {
label1 := &CompactionGroupLabel{
CollectionID: 1,
PartitionID: 10,
Channel: "ch-1",
}
label2 := &CompactionGroupLabel{
CollectionID: 1,
PartitionID: 11,
Channel: "ch-1",
}
label3 := &CompactionGroupLabel{
CollectionID: 1,
PartitionID: 10,
Channel: "ch-2",
}
seg1 := genTestSegmentInfo(label1, 100, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed)
seg2 := genTestSegmentInfo(label2, 101, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed)
seg3 := genTestSegmentInfo(label3, 102, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed)
segments := GetViewsByInfo(seg1, seg2, seg3)
result := groupByPartitionChannel(segments)
s.NotNil(result)
s.Equal(3, len(result))
}
func (s *ForceMergeCompactionPolicySuite) TestGroupByPartitionChannel_SameGroupMultipleSegments() {
seg1 := genTestSegmentInfo(s.testLabel, 100, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed)
seg2 := genTestSegmentInfo(s.testLabel, 101, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed)
seg3 := genTestSegmentInfo(s.testLabel, 102, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed)
segments := GetViewsByInfo(seg1, seg2, seg3)
result := groupByPartitionChannel(segments)
s.NotNil(result)
s.Equal(1, len(result))
for label, segs := range result {
s.Equal(s.testLabel.Key(), label.Key())
s.Equal(3, len(segs))
}
}
func (s *ForceMergeCompactionPolicySuite) TestTriggerOneCollection_FilterSegments() {
ctx := context.Background()
collectionID := int64(1)
targetSize := int64(1024 * 1024 * 512)
triggerID := int64(100)
coll := &collectionInfo{
ID: collectionID,
Schema: newTestSchema(),
Properties: nil,
}
topology := &CollectionTopology{
CollectionID: collectionID,
NumReplicas: 1,
}
s.mockHandler.EXPECT().GetCollection(mock.Anything, collectionID).Return(coll, nil)
s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(triggerID, nil)
s.mockQuerier.EXPECT().GetCollectionTopology(mock.Anything, collectionID).Return(topology, nil)
views, gotTriggerID, err := s.policy.triggerOneCollection(ctx, collectionID, targetSize)
s.NoError(err)
s.Equal(triggerID, gotTriggerID)
s.NotNil(views)
for _, view := range views {
for _, seg := range view.GetSegmentsView() {
s.NotEqual(datapb.SegmentLevel_L0, seg.Level)
s.Equal(commonpb.SegmentState_Flushed, seg.State)
}
}
}

View File

@ -407,7 +407,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error {
ResultSegments: []int64{},
TotalRows: totalRows,
Schema: coll.Schema,
MaxSize: getExpandedSize(expectedSize),
MaxSize: expectedSize,
PreAllocatedSegmentIDs: &datapb.IDRange{
Begin: startID + 1,
End: endID,
@ -425,6 +425,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error {
log.Info("time cost of generating compaction",
zap.Int64("planID", task.GetPlanID()),
zap.Int64("time cost", time.Since(start).Milliseconds()),
zap.Int64("target size", task.GetMaxSize()),
zap.Int64s("inputSegments", inputSegmentIDs))
}
}
@ -809,10 +810,6 @@ func (t *compactionTrigger) squeezeSmallSegmentsToBuckets(small []*SegmentInfo,
return small
}
func getExpandedSize(size int64) int64 {
return int64(float64(size) * Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat())
}
func canTriggerSortCompaction(segment *SegmentInfo, isPartitionIsolationEnabled bool) bool {
return segment.GetState() == commonpb.SegmentState_Flushed &&
segment.GetLevel() != datapb.SegmentLevel_L0 &&

View File

@ -631,7 +631,7 @@ func Test_compactionTrigger_force(t *testing.T) {
Schema: schema,
PreAllocatedSegmentIDs: &datapb.IDRange{Begin: 101, End: 200},
PreAllocatedLogIDs: &datapb.IDRange{Begin: 100, End: 200},
MaxSize: 1342177280,
MaxSize: 1073741824,
SlotUsage: paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(),
JsonParams: params,
},

View File

@ -26,6 +26,9 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
@ -45,8 +48,28 @@ const (
TriggerTypeSort
TriggerTypePartitionKeySort
TriggerTypeClusteringPartitionKeySort
TriggerTypeForceMerge
)
func (t CompactionTriggerType) GetCompactionType() datapb.CompactionType {
switch t {
case TriggerTypeLevelZeroViewChange, TriggerTypeLevelZeroViewIDLE, TriggerTypeLevelZeroViewManual:
return datapb.CompactionType_Level0DeleteCompaction
case TriggerTypeSegmentSizeViewChange, TriggerTypeSingle, TriggerTypeForceMerge:
return datapb.CompactionType_MixCompaction
case TriggerTypeClustering:
return datapb.CompactionType_ClusteringCompaction
case TriggerTypeSort:
return datapb.CompactionType_SortCompaction
case TriggerTypePartitionKeySort:
return datapb.CompactionType_PartitionKeySortCompaction
case TriggerTypeClusteringPartitionKeySort:
return datapb.CompactionType_ClusteringPartitionKeySortCompaction
default:
return datapb.CompactionType_MixCompaction
}
}
func (t CompactionTriggerType) String() string {
switch t {
case TriggerTypeLevelZeroViewChange:
@ -67,6 +90,8 @@ func (t CompactionTriggerType) String() string {
return "PartitionKeySort"
case TriggerTypeClusteringPartitionKeySort:
return "ClusteringPartitionKeySort"
case TriggerTypeForceMerge:
return "ForceMerge"
default:
return ""
}
@ -76,9 +101,10 @@ type TriggerManager interface {
Start()
Stop()
OnCollectionUpdate(collectionID int64)
ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool) (UniqueID, error)
ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool, targetSize int64) (UniqueID, error)
GetPauseCompactionChan(jobID, collectionID int64) <-chan struct{}
GetResumeCompactionChan(jobID, collectionID int64) <-chan struct{}
InitForceMergeMemoryQuerier(nodeManager session.NodeManager, mixCoord types.MixCoord, session sessionutil.SessionInterface)
}
var _ TriggerManager = (*CompactionTriggerManager)(nil)
@ -86,13 +112,6 @@ var _ TriggerManager = (*CompactionTriggerManager)(nil)
// CompactionTriggerManager registers Triggers to TriggerType
// so that when the certain TriggerType happens, the corresponding triggers can
// trigger the correct compaction plans.
// Trigger types:
// 1. Change of Views
// - LevelZeroViewTrigger
// - SegmentSizeViewTrigger
//
// 2. SystemIDLE & schedulerIDLE
// 3. Manual Compaction
type CompactionTriggerManager struct {
inspector CompactionInspector
handler Handler
@ -103,6 +122,7 @@ type CompactionTriggerManager struct {
l0Policy *l0CompactionPolicy
clusteringPolicy *clusteringCompactionPolicy
singlePolicy *singleCompactionPolicy
forceMergePolicy *forceMergeCompactionPolicy
cancel context.CancelFunc
closeWg sync.WaitGroup
@ -131,9 +151,18 @@ func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, ins
m.l0Policy = newL0CompactionPolicy(meta, alloc)
m.clusteringPolicy = newClusteringCompactionPolicy(meta, m.allocator, m.handler)
m.singlePolicy = newSingleCompactionPolicy(meta, m.allocator, m.handler)
m.forceMergePolicy = newForceMergeCompactionPolicy(meta, m.allocator, m.handler)
return m
}
// InitForceMergeMemoryQuerier initializes the topology querier for force merge auto calculation
func (m *CompactionTriggerManager) InitForceMergeMemoryQuerier(nodeManager session.NodeManager, mixCoord types.MixCoord, session sessionutil.SessionInterface) {
if m.forceMergePolicy != nil {
querier := newMetricsNodeMemoryQuerier(nodeManager, mixCoord, session)
m.forceMergePolicy.SetTopologyQuerier(querier)
}
}
// OnCollectionUpdate notifies L0Policy about latest collection's L0 segment changes
// This tells the l0 triggers about which collections are active
func (m *CompactionTriggerManager) OnCollectionUpdate(collectionID int64) {
@ -310,21 +339,27 @@ func (m *CompactionTriggerManager) loop(ctx context.Context) {
}
}
func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool) (UniqueID, error) {
log.Ctx(ctx).Info("receive manual trigger", zap.Int64("collectionID", collectionID),
zap.Bool("clusteringCompaction", clusteringCompaction), zap.Bool("l0Compaction", l0Compaction))
func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collectionID int64, isClustering bool, isL0 bool, targetSize int64) (UniqueID, error) {
log.Ctx(ctx).Info("receive manual trigger",
zap.Int64("collectionID", collectionID),
zap.Bool("is clustering", isClustering),
zap.Bool("is l0", isL0),
zap.Int64("targetSize", targetSize))
var triggerID UniqueID
var err error
var views []CompactionView
events := make(map[CompactionTriggerType][]CompactionView, 0)
if l0Compaction {
if targetSize != 0 {
views, triggerID, err = m.forceMergePolicy.triggerOneCollection(ctx, collectionID, targetSize)
events[TriggerTypeForceMerge] = views
} else if isL0 {
m.setL0Triggering(true)
defer m.setL0Triggering(false)
views, triggerID, err = m.l0Policy.triggerOneCollection(ctx, collectionID)
events[TriggerTypeLevelZeroViewManual] = views
} else if clusteringCompaction {
} else if isClustering {
views, triggerID, err = m.clusteringPolicy.triggerOneCollection(ctx, collectionID, true)
events[TriggerTypeClustering] = views
}
@ -342,14 +377,18 @@ func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collection
func (m *CompactionTriggerManager) triggerViewForCompaction(ctx context.Context, eventType CompactionTriggerType,
view CompactionView,
) ([]CompactionView, string) {
if eventType == TriggerTypeLevelZeroViewIDLE {
switch eventType {
case TriggerTypeLevelZeroViewIDLE:
view, reason := view.ForceTrigger()
return []CompactionView{view}, reason
} else if eventType == TriggerTypeLevelZeroViewManual {
case TriggerTypeLevelZeroViewManual, TriggerTypeForceMerge:
return view.ForceTriggerAll()
default:
outView, reason := view.Trigger()
return []CompactionView{outView}, reason
}
outView, reason := view.Trigger()
return []CompactionView{outView}, reason
}
func (m *CompactionTriggerManager) notify(ctx context.Context, eventType CompactionTriggerType, views []CompactionView) {
@ -370,14 +409,10 @@ func (m *CompactionTriggerManager) notify(ctx context.Context, eventType Compact
m.SubmitL0ViewToScheduler(ctx, outView)
case TriggerTypeClustering:
m.SubmitClusteringViewToScheduler(ctx, outView)
case TriggerTypeSingle:
m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_MixCompaction)
case TriggerTypeSort:
m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_SortCompaction)
case TriggerTypePartitionKeySort:
m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_PartitionKeySortCompaction)
case TriggerTypeClusteringPartitionKeySort:
m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_ClusteringPartitionKeySortCompaction)
case TriggerTypeSingle, TriggerTypeSort, TriggerTypePartitionKeySort, TriggerTypeClusteringPartitionKeySort:
m.SubmitSingleViewToScheduler(ctx, outView, eventType)
case TriggerTypeForceMerge:
m.SubmitForceMergeViewToScheduler(ctx, outView)
}
}
}
@ -591,14 +626,15 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C
)
}
func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Context, view CompactionView, compactionType datapb.CompactionType) {
log := log.Ctx(ctx).With(zap.String("view", view.String()))
func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Context, view CompactionView, triggerType CompactionTriggerType) {
// single view is definitely one-one mapping
log := log.Ctx(ctx).With(zap.String("trigger type", triggerType.String()), zap.String("view", view.String()))
// TODO[GOOSE], 11 = 1 planID + 10 segmentID, this is a hack need to be removed.
// Any plan that output segment number greater than 10 will be marked as invalid plan for now.
n := 11 * paramtable.Get().DataCoordCfg.CompactionPreAllocateIDExpansionFactor.GetAsInt64()
startID, endID, err := m.allocator.AllocN(n)
if err != nil {
log.Warn("fFailed to submit compaction view to scheduler because allocate id fail", zap.Error(err))
log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.Error(err))
return
}
@ -619,7 +655,7 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte
State: datapb.CompactionTaskState_pipelining,
StartTime: time.Now().Unix(),
CollectionTtl: view.(*MixSegmentView).collectionTTL.Nanoseconds(),
Type: compactionType, // todo: use SingleCompaction
Type: triggerType.GetCompactionType(),
CollectionID: view.GetGroupLabel().CollectionID,
PartitionID: view.GetGroupLabel().PartitionID,
Channel: view.GetGroupLabel().Channel,
@ -628,7 +664,7 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte
ResultSegments: []int64{},
TotalRows: totalRows,
LastStateStartTime: time.Now().Unix(),
MaxSize: getExpandedSize(expectedSize),
MaxSize: expectedSize,
PreAllocatedSegmentIDs: &datapb.IDRange{
Begin: startID + 1,
End: endID,
@ -641,11 +677,74 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte
zap.Int64("planID", task.GetPlanID()),
zap.Int64s("segmentIDs", task.GetInputSegments()),
zap.Error(err))
return
}
log.Info("Finish to submit a single compaction task",
zap.Int64("triggerID", task.GetTriggerID()),
zap.Int64("planID", task.GetPlanID()),
zap.String("type", task.GetType().String()),
zap.Int64("targetSize", task.GetMaxSize()),
)
}
func (m *CompactionTriggerManager) SubmitForceMergeViewToScheduler(ctx context.Context, view CompactionView) {
log := log.Ctx(ctx).With(zap.String("view", view.String()))
taskID, err := m.allocator.AllocID(ctx)
if err != nil {
log.Warn("Failed to allocate task ID", zap.Error(err))
return
}
collection, err := m.handler.GetCollection(ctx, view.GetGroupLabel().CollectionID)
if err != nil {
log.Warn("Failed to get collection", zap.Error(err))
return
}
totalRows := lo.SumBy(view.GetSegmentsView(), func(v *SegmentView) int64 { return v.NumOfRows })
targetCount := view.(*ForceMergeSegmentView).targetCount
n := targetCount * paramtable.Get().DataCoordCfg.CompactionPreAllocateIDExpansionFactor.GetAsInt64()
startID, endID, err := m.allocator.AllocN(n)
if err != nil {
log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.Error(err))
return
}
task := &datapb.CompactionTask{
PlanID: taskID,
TriggerID: view.GetTriggerID(),
State: datapb.CompactionTaskState_pipelining,
StartTime: time.Now().Unix(),
CollectionTtl: view.(*ForceMergeSegmentView).collectionTTL.Nanoseconds(),
Type: datapb.CompactionType_MixCompaction,
CollectionID: view.GetGroupLabel().CollectionID,
PartitionID: view.GetGroupLabel().PartitionID,
Channel: view.GetGroupLabel().Channel,
Schema: collection.Schema,
InputSegments: lo.Map(view.GetSegmentsView(), func(segmentView *SegmentView, _ int) int64 { return segmentView.ID }),
ResultSegments: []int64{},
TotalRows: totalRows,
LastStateStartTime: time.Now().Unix(),
MaxSize: int64(view.(*ForceMergeSegmentView).targetSize),
PreAllocatedSegmentIDs: &datapb.IDRange{
Begin: startID + 1,
End: endID,
},
}
err = m.inspector.enqueueCompaction(task)
if err != nil {
log.Warn("Failed to enqueue task", zap.Error(err))
return
}
log.Info("Finish to submit force merge task",
zap.Int64("planID", taskID),
zap.Int64("triggerID", task.GetTriggerID()),
zap.Int64("collectionID", task.GetCollectionID()),
zap.Int64("targetSize", task.GetMaxSize()),
)
}

View File

@ -432,14 +432,14 @@ func (s *CompactionTriggerManagerSuite) TestManualTriggerL0Compaction() {
}).Return(nil).Once()
// Test L0 manual trigger
triggerID, err := s.triggerManager.ManualTrigger(context.Background(), s.testLabel.CollectionID, false, true)
triggerID, err := s.triggerManager.ManualTrigger(context.Background(), s.testLabel.CollectionID, false, true, 0)
s.NoError(err)
s.Equal(int64(12345), triggerID)
}
func (s *CompactionTriggerManagerSuite) TestManualTriggerInvalidParams() {
// Test with both clustering and L0 compaction false
triggerID, err := s.triggerManager.ManualTrigger(context.Background(), s.testLabel.CollectionID, false, false)
triggerID, err := s.triggerManager.ManualTrigger(context.Background(), s.testLabel.CollectionID, false, false, 0)
s.NoError(err)
s.Equal(int64(0), triggerID)
}

View File

@ -0,0 +1,309 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"fmt"
"math"
"time"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/samber/lo"
"go.uber.org/zap"
)
const (
defaultToleranceMB = 0.05
)
// static segment view, only algothrims here, no IO
type ForceMergeSegmentView struct {
label *CompactionGroupLabel
segments []*SegmentView
triggerID int64
collectionTTL time.Duration
configMaxSize float64
topology *CollectionTopology
targetSize float64
targetCount int64
}
func (v *ForceMergeSegmentView) GetGroupLabel() *CompactionGroupLabel {
return v.label
}
func (v *ForceMergeSegmentView) GetSegmentsView() []*SegmentView {
return v.segments
}
func (v *ForceMergeSegmentView) Append(segments ...*SegmentView) {
v.segments = append(v.segments, segments...)
}
func (v *ForceMergeSegmentView) String() string {
return fmt.Sprintf("ForceMerge: %s, segments=%d, triggerID=%d",
v.label, len(v.segments), v.triggerID)
}
func (v *ForceMergeSegmentView) Trigger() (CompactionView, string) {
panic("implement me")
}
func (v *ForceMergeSegmentView) ForceTrigger() (CompactionView, string) {
panic("implement me")
}
func (v *ForceMergeSegmentView) GetTriggerID() int64 {
return v.triggerID
}
func (v *ForceMergeSegmentView) calculateTargetSizeCount() (maxSafeSize float64, targetCount int64) {
log := log.With(zap.Int64("triggerID", v.triggerID), zap.String("label", v.label.String()))
maxSafeSize = v.calculateMaxSafeSize()
if maxSafeSize < v.configMaxSize {
log.Info("maxSafeSize is less than configMaxSize, set to configMaxSize",
zap.Float64("targetSize", maxSafeSize),
zap.Float64("configMaxSize", v.configMaxSize))
maxSafeSize = v.configMaxSize
}
targetCount = max(1, int64(sumSegmentSize(v.segments)/maxSafeSize))
log.Info("topology-aware force merge calculation",
zap.Int64("targetSegmentCount", targetCount),
zap.Float64("maxSafeSize", maxSafeSize))
return maxSafeSize, targetCount
}
func (v *ForceMergeSegmentView) ForceTriggerAll() ([]CompactionView, string) {
targetSizePerSegment, targetCount := v.calculateTargetSizeCount()
groups := adaptiveGroupSegments(v.segments, float64(targetSizePerSegment))
results := make([]CompactionView, 0, len(groups))
for _, group := range groups {
results = append(results, &ForceMergeSegmentView{
label: v.label,
segments: group,
triggerID: v.triggerID,
collectionTTL: v.collectionTTL,
configMaxSize: v.configMaxSize,
targetSize: targetSizePerSegment,
targetCount: targetCount,
topology: v.topology,
})
}
return results, "force merge trigger"
}
// adaptiveGroupSegments automatically selects the best grouping algorithm based on segment count
// For small segment counts (≤ threshold), use maxFull for optimal full segment count
// For large segment counts, use larger for better performance
func adaptiveGroupSegments(segments []*SegmentView, targetSize float64) [][]*SegmentView {
if len(segments) == 0 {
return nil
}
n := len(segments)
// Get threshold from config, fallback to default if not available
threshold := paramtable.Get().DataCoordCfg.CompactionMaxFullSegmentThreshold.GetAsInt()
// Use maxFull for small segment counts to maximize full segments
// Use larger for large segment counts for O(n) performance
if n <= threshold {
return maxFullSegmentsGrouping(segments, targetSize)
}
return largerGroupingSegments(segments, targetSize)
}
// largerGroupingSegments groups segments to minimize number of tasks
// Strategy: Create larger groups that produce multiple full target-sized segments
// This approach favors fewer compaction tasks with larger batches
func largerGroupingSegments(segments []*SegmentView, targetSize float64) [][]*SegmentView {
if len(segments) == 0 {
return nil
}
n := len(segments)
// Pre-allocate with estimated capacity to reduce allocations
estimatedGroups := max(1, n/10)
groups := make([][]*SegmentView, 0, estimatedGroups)
i := 0
for i < n {
groupStart := i
groupSize := 0.0
// Accumulate segments to form multiple target-sized outputs
for i < n {
groupSize += segments[i].Size
i++
// Check if we should stop
if i < n {
nextSize := groupSize + segments[i].Size
currentFull := int(groupSize / targetSize)
nextFull := int(nextSize / targetSize)
// Stop if we have full segments and next addition won't give another full segment
if currentFull > 0 && nextFull == currentFull {
currentRemainder := math.Mod(groupSize, targetSize)
if currentRemainder < targetSize*defaultToleranceMB {
break
}
}
}
}
groups = append(groups, segments[groupStart:i])
}
return groups
}
// maxFullSegmentsGrouping groups segments to maximize number of full target-sized outputs
// Strategy: Use dynamic programming to find partitioning that produces most full segments
// This approach minimizes tail segments and achieves best space utilization
func maxFullSegmentsGrouping(segments []*SegmentView, targetSize float64) [][]*SegmentView {
if len(segments) == 0 {
return nil
}
n := len(segments)
// Pre-compute prefix sums to avoid repeated summation
prefixSum := make([]float64, n+1)
for i := 0; i < n; i++ {
prefixSum[i+1] = prefixSum[i] + segments[i].Size
}
// dp[i] = best result for segments[0:i]
type dpState struct {
fullSegments int
tailSegments int
numGroups int
groupIndices []int
}
dp := make([]dpState, n+1)
dp[0] = dpState{fullSegments: 0, tailSegments: 0, numGroups: 0, groupIndices: make([]int, 0, n/10)}
for i := 1; i <= n; i++ {
dp[i] = dpState{fullSegments: -1, tailSegments: math.MaxInt32, numGroups: 0, groupIndices: nil}
// Try different starting positions for the last group
for j := 0; j < i; j++ {
// Calculate group size from j to i-1 using prefix sums (O(1) instead of O(n))
groupSize := prefixSum[i] - prefixSum[j]
numFull := int(groupSize / targetSize)
remainder := math.Mod(groupSize, targetSize)
hasTail := 0
if remainder > 0.01 {
hasTail = 1
}
newFull := dp[j].fullSegments + numFull
newTails := dp[j].tailSegments + hasTail
newGroups := dp[j].numGroups + 1
// Prioritize: more full segments, then fewer tails, then more groups (for parallelism)
isBetter := false
if newFull > dp[i].fullSegments {
isBetter = true
} else if newFull == dp[i].fullSegments && newTails < dp[i].tailSegments {
isBetter = true
} else if newFull == dp[i].fullSegments && newTails == dp[i].tailSegments && newGroups > dp[i].numGroups {
isBetter = true
}
if isBetter {
// Pre-allocate with exact capacity to avoid reallocation
newIndices := make([]int, 0, len(dp[j].groupIndices)+1)
newIndices = append(newIndices, dp[j].groupIndices...)
newIndices = append(newIndices, j)
dp[i] = dpState{
fullSegments: newFull,
tailSegments: newTails,
numGroups: newGroups,
groupIndices: newIndices,
}
}
}
}
// Reconstruct groups from indices
if dp[n].groupIndices == nil {
return [][]*SegmentView{segments}
}
groupStarts := append(dp[n].groupIndices, n)
groups := make([][]*SegmentView, 0, len(groupStarts))
for i := 0; i < len(groupStarts); i++ {
start := groupStarts[i]
end := n
if i+1 < len(groupStarts) {
end = groupStarts[i+1]
}
if start < end {
groups = append(groups, segments[start:end])
}
}
return groups
}
func (v *ForceMergeSegmentView) calculateMaxSafeSize() float64 {
log := log.With(zap.Int64("triggerID", v.triggerID), zap.String("label", v.label.String()))
if len(v.topology.QueryNodeMemory) == 0 || len(v.topology.DataNodeMemory) == 0 {
log.Warn("No querynodes or datanodes in topology, using config size")
return v.configMaxSize
}
// QueryNode constraint: use global minimum memory
querynodeMemoryFactor := paramtable.Get().DataCoordCfg.CompactionForceMergeQueryNodeMemoryFactor.GetAsFloat()
qnMaxSafeSize := float64(lo.Min(lo.Values(v.topology.QueryNodeMemory))) / querynodeMemoryFactor
// DataNode constraint: segments must fit in smallest DataNode
datanodeMemoryFactor := paramtable.Get().DataCoordCfg.CompactionForceMergeDataNodeMemoryFactor.GetAsFloat()
dnMaxSafeSize := float64(lo.Min(lo.Values(v.topology.DataNodeMemory))) / datanodeMemoryFactor
maxSafeSize := min(qnMaxSafeSize, dnMaxSafeSize)
if v.topology.IsStandaloneMode && !v.topology.IsPooling {
log.Info("force merge on standalone not pooling mode, half the max size",
zap.Float64("qnMaxSafeSize", qnMaxSafeSize),
zap.Float64("dnMaxSafeSize", dnMaxSafeSize),
zap.Float64("maxSafeSize/2", maxSafeSize/2),
zap.Float64("configMaxSize", v.configMaxSize))
// dn and qn are co-located, half the min
return maxSafeSize * 0.5
}
log.Info("force merge on cluster/pooling mode",
zap.Float64("qnMaxSafeSize", qnMaxSafeSize),
zap.Float64("dnMaxSafeSize", dnMaxSafeSize),
zap.Float64("maxSafeSize", maxSafeSize),
zap.Float64("configMaxSize", v.configMaxSize))
return maxSafeSize
}
func sumSegmentSize(views []*SegmentView) float64 {
return lo.SumBy(views, func(v *SegmentView) float64 { return v.Size })
}

View File

@ -0,0 +1,631 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"fmt"
"testing"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
)
func TestForceMergeSegmentView_GetGroupLabel(t *testing.T) {
label := &CompactionGroupLabel{
CollectionID: 1,
PartitionID: 10,
Channel: "ch1",
}
view := &ForceMergeSegmentView{
label: label,
}
assert.Equal(t, label, view.GetGroupLabel())
}
func TestForceMergeSegmentView_GetSegmentsView(t *testing.T) {
segments := []*SegmentView{
{ID: 1, Size: 1024},
{ID: 2, Size: 2048},
}
view := &ForceMergeSegmentView{
segments: segments,
}
assert.Equal(t, segments, view.GetSegmentsView())
assert.Len(t, view.GetSegmentsView(), 2)
}
func TestForceMergeSegmentView_Append(t *testing.T) {
view := &ForceMergeSegmentView{
segments: []*SegmentView{
{ID: 1, Size: 1024},
},
}
newSegments := []*SegmentView{
{ID: 2, Size: 2048},
{ID: 3, Size: 3072},
}
view.Append(newSegments...)
assert.Len(t, view.segments, 3)
assert.Equal(t, int64(1), view.segments[0].ID)
assert.Equal(t, int64(2), view.segments[1].ID)
assert.Equal(t, int64(3), view.segments[2].ID)
}
func TestForceMergeSegmentView_String(t *testing.T) {
label := &CompactionGroupLabel{
CollectionID: 1,
PartitionID: 10,
Channel: "ch1",
}
view := &ForceMergeSegmentView{
label: label,
segments: []*SegmentView{
{ID: 1},
{ID: 2},
},
triggerID: 12345,
}
str := view.String()
assert.Contains(t, str, "ForceMerge")
assert.Contains(t, str, "segments=2")
assert.Contains(t, str, "triggerID=12345")
}
func TestForceMergeSegmentView_Trigger(t *testing.T) {
view := &ForceMergeSegmentView{
triggerID: 100,
}
assert.Panics(t, func() {
view.Trigger()
})
}
func TestForceMergeSegmentView_ForceTrigger(t *testing.T) {
view := &ForceMergeSegmentView{
triggerID: 100,
}
assert.Panics(t, func() {
view.ForceTrigger()
})
}
func TestForceMergeSegmentView_GetTriggerID(t *testing.T) {
view := &ForceMergeSegmentView{
triggerID: 12345,
}
assert.Equal(t, int64(12345), view.GetTriggerID())
}
func TestForceMergeSegmentView_Complete(t *testing.T) {
label := &CompactionGroupLabel{
CollectionID: 100,
PartitionID: 200,
Channel: "test-channel",
}
segments := []*SegmentView{
{ID: 1, Size: 1024 * 1024 * 1024},
{ID: 2, Size: 512 * 1024 * 1024},
}
topology := &CollectionTopology{
CollectionID: 100,
NumReplicas: 1,
IsStandaloneMode: false,
QueryNodeMemory: map[int64]uint64{1: 8 * 1024 * 1024 * 1024},
DataNodeMemory: map[int64]uint64{1: 8 * 1024 * 1024 * 1024},
}
view := &ForceMergeSegmentView{
label: label,
segments: segments,
triggerID: 99999,
collectionTTL: 24 * time.Hour,
targetSize: 2048 * 1024 * 1024,
topology: topology,
}
// Test String output
str := view.String()
assert.Contains(t, str, "ForceMerge")
views, r3 := view.ForceTriggerAll()
assert.Len(t, views, 1)
assert.NotEmpty(t, r3)
}
func TestGroupingAlgorithmsComparison(t *testing.T) {
type testCase struct {
name string
segments []float64
targetSize float64
}
testCases := []testCase{
{
name: "perfect fit - 5x2GB to 5GB",
segments: []float64{2, 2, 2, 2, 2},
targetSize: 5,
},
{
name: "varying sizes - example from discussion",
segments: []float64{1.2, 1.3, 1.4, 1.8, 1.8, 1.8, 1.8, 1.8},
targetSize: 3,
},
{
name: "small segments",
segments: []float64{0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5},
targetSize: 2,
},
{
name: "large segments",
segments: []float64{3, 3, 3, 3},
targetSize: 5,
},
{
name: "mixed sizes",
segments: []float64{0.5, 1, 1.5, 2, 2.5, 3},
targetSize: 4,
},
{
name: "many small segments",
segments: []float64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
targetSize: 3,
},
{
name: "uneven distribution",
segments: []float64{0.3, 0.4, 2.5, 0.3, 2.8, 0.5, 2.2},
targetSize: 3,
},
{
name: "single segment",
segments: []float64{5},
targetSize: 3,
},
{
name: "two segments perfect",
segments: []float64{2.5, 2.5},
targetSize: 5,
},
{
name: "fibonacci-like sizes",
segments: []float64{1, 1, 2, 3, 5, 8},
targetSize: 10,
},
{
name: "near-perfect split - tests greedy vs optimal",
segments: []float64{1.5, 1.5, 1.5, 1.5, 1.5, 1.5},
targetSize: 3,
},
{
name: "strategic grouping - [2.8,0.3] vs [2.8,0.2,0.1]",
segments: []float64{2.8, 0.2, 0.1, 2.8, 0.3},
targetSize: 3,
},
{
name: "tail optimization - many small + one large",
segments: []float64{0.5, 0.5, 0.5, 0.5, 0.5, 2.5},
targetSize: 3,
},
{
name: "alternating sizes for different strategies",
segments: []float64{1.0, 2.5, 1.0, 2.5, 1.0, 2.5},
targetSize: 4,
},
{
name: "edge case - slightly over target creates decision point",
segments: []float64{2.1, 2.1, 2.1, 2.1, 2.1},
targetSize: 4,
},
{
name: "optimal vs greedy - can fit 3 full or 2 full + small tail",
segments: []float64{1.8, 1.8, 1.8, 1.8, 1.8, 1.5},
targetSize: 3,
},
{
name: "many segments with complex optimal solution",
segments: []float64{0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8},
targetSize: 2,
},
{
name: "greedy stops early, optimal continues",
segments: []float64{2.8, 0.2, 0.1, 2.8, 0.3},
targetSize: 3.0,
},
{
name: "Balanced vs Larger - distribution vs grouping",
segments: []float64{1.0, 1.0, 1.0, 1.0, 1.0, 1.0},
targetSize: 2.5,
},
{
name: "MaxFull achieves theoretical maximum when possible",
// Perfect case: 6x1.5GB segments, target=3GB
// Total=9GB, theoretical max = 3 full segments
segments: []float64{1.5, 1.5, 1.5, 1.5, 1.5, 1.5},
targetSize: 3.0,
},
{
name: "Larger creates fewer compaction tasks",
segments: lo.Times(20, func(i int) float64 { return 0.5 }),
targetSize: 2.0,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Convert to SegmentView
segments := make([]*SegmentView, len(tc.segments))
for i, size := range tc.segments {
segments[i] = &SegmentView{
ID: int64(i + 1),
Size: size * 1024 * 1024 * 1024,
}
}
targetSize := tc.targetSize * 1024 * 1024 * 1024
totalSize := sumSegmentSize(segments)
theoreticalMaxFull := int(totalSize / targetSize)
// Test all three algorithms
groupsBalanced := adaptiveGroupSegments(segments, targetSize)
groupsLarger := largerGroupingSegments(segments, targetSize)
groupsMax := maxFullSegmentsGrouping(segments, targetSize)
// Helper to count full segments and tails
countMetrics := func(groups [][]*SegmentView) (numGroups, numFull, numTails int) {
numGroups = len(groups)
for _, group := range groups {
groupSize := sumSegmentSize(group)
full := int(groupSize / targetSize)
remainder := groupSize - float64(full)*targetSize
numFull += full
if remainder > 0.01 {
numTails++
}
}
return
}
// Helper to verify all segments used exactly once
verifyAllSegmentsUsed := func(groups [][]*SegmentView) bool {
seen := make(map[int64]int)
for _, group := range groups {
for _, seg := range group {
seen[seg.ID]++
}
}
if len(seen) != len(segments) {
return false
}
for _, count := range seen {
if count != 1 {
return false
}
}
return true
}
// Verify all algorithms use each segment exactly once
assert.True(t, verifyAllSegmentsUsed(groupsBalanced), "adaptiveGroupSegments: all segments must be used exactly once")
assert.True(t, verifyAllSegmentsUsed(groupsLarger), "largerGroupingSegments: all segments must be used exactly once")
assert.True(t, verifyAllSegmentsUsed(groupsMax), "maxFullSegmentsGrouping: all segments must be used exactly once")
// Get metrics
adaptiveGroups, adaptiveFull, adaptiveTails := countMetrics(groupsBalanced)
largerGroups, largerFull, largerTails := countMetrics(groupsLarger)
maxGroups, maxFull, maxTails := countMetrics(groupsMax)
t.Logf("Total size: %.1f GB, Target: %.1f GB, Theoretical max full: %d",
totalSize/(1024*1024*1024), targetSize/(1024*1024*1024), theoreticalMaxFull)
t.Logf("Adaptive: %d groups, %d full, %d tails", adaptiveGroups, adaptiveFull, adaptiveTails)
t.Logf("Larger: %d groups, %d full, %d tails", largerGroups, largerFull, largerTails)
t.Logf("MaxFull: %d groups, %d full, %d tails", maxGroups, maxFull, maxTails)
// Assertions
// 1. maxFullSegmentsGrouping should produce most full segments
assert.GreaterOrEqual(t, maxFull, largerFull, "maxFullSegmentsGrouping should produce >= full segments than largerGroupingSegments")
// 2. maxFullSegmentsGrouping should not exceed theoretical maximum
assert.LessOrEqual(t, maxFull, theoreticalMaxFull, "cannot exceed theoretical maximum")
// 3. All algorithms should process all segments
for _, groups := range [][][]*SegmentView{groupsBalanced, groupsLarger, groupsMax} {
totalProcessed := 0
for _, group := range groups {
totalProcessed += len(group)
}
assert.Equal(t, len(segments), totalProcessed)
}
})
}
}
func TestAdaptiveGroupSegments(t *testing.T) {
t.Run("empty segments", func(t *testing.T) {
groups := adaptiveGroupSegments(nil, 5*1024*1024*1024)
assert.Nil(t, groups)
})
t.Run("uses maxFull for small segment count", func(t *testing.T) {
segments := []*SegmentView{
{ID: 1, Size: 1.5 * 1024 * 1024 * 1024},
{ID: 2, Size: 1.5 * 1024 * 1024 * 1024},
{ID: 3, Size: 1.5 * 1024 * 1024 * 1024},
{ID: 4, Size: 1.5 * 1024 * 1024 * 1024},
}
groups := adaptiveGroupSegments(segments, 3*1024*1024*1024)
// Should produce 2 groups with 2 full segments
assert.Equal(t, 2, len(groups))
})
t.Run("uses larger for large segment count", func(t *testing.T) {
// Create 200 segments (> defaultMaxFullSegmentThreshold)
segments := make([]*SegmentView, 200)
for i := 0; i < 200; i++ {
segments[i] = &SegmentView{
ID: int64(i),
Size: 1 * 1024 * 1024 * 1024,
}
}
groups := adaptiveGroupSegments(segments, 3*1024*1024*1024)
// Should use larger algorithm
assert.NotNil(t, groups)
assert.Greater(t, len(groups), 0)
})
}
func TestLargerGroupingSegments(t *testing.T) {
t.Run("empty segments", func(t *testing.T) {
groups := largerGroupingSegments(nil, 5*1024*1024*1024)
assert.Nil(t, groups)
})
t.Run("single segment", func(t *testing.T) {
segments := []*SegmentView{
{ID: 1, Size: 3 * 1024 * 1024 * 1024},
}
groups := largerGroupingSegments(segments, 5*1024*1024*1024)
assert.Equal(t, 1, len(groups))
assert.Equal(t, 1, len(groups[0]))
})
}
func TestMaxFullSegmentsGrouping(t *testing.T) {
t.Run("empty segments", func(t *testing.T) {
groups := maxFullSegmentsGrouping(nil, 5*1024*1024*1024)
assert.Nil(t, groups)
})
t.Run("single segment", func(t *testing.T) {
segments := []*SegmentView{
{ID: 1, Size: 3 * 1024 * 1024 * 1024},
}
groups := maxFullSegmentsGrouping(segments, 5*1024*1024*1024)
assert.Equal(t, 1, len(groups))
assert.Equal(t, 1, len(groups[0]))
})
t.Run("perfect fit achieves theoretical maximum", func(t *testing.T) {
segments := []*SegmentView{
{ID: 1, Size: 2.5 * 1024 * 1024 * 1024},
{ID: 2, Size: 2.5 * 1024 * 1024 * 1024},
{ID: 3, Size: 2.5 * 1024 * 1024 * 1024},
{ID: 4, Size: 2.5 * 1024 * 1024 * 1024},
}
targetSize := 5.0 * 1024 * 1024 * 1024
groups := maxFullSegmentsGrouping(segments, targetSize)
totalFull := 0
for _, group := range groups {
groupSize := sumSegmentSize(group)
totalFull += int(groupSize / targetSize)
}
// Total is 10GB, should produce exactly 2 full 5GB segments
assert.Equal(t, 2, totalFull)
})
}
func TestSumSegmentSize(t *testing.T) {
segments := []*SegmentView{
{ID: 1, Size: 1024 * 1024 * 1024},
{ID: 2, Size: 512 * 1024 * 1024},
}
totalSize := sumSegmentSize(segments)
expected := 1.5 * 1024 * 1024 * 1024
assert.InDelta(t, expected, totalSize, 1)
}
func TestGroupByPartitionChannel(t *testing.T) {
label1 := &CompactionGroupLabel{
CollectionID: 1,
PartitionID: 10,
Channel: "ch1",
}
label2 := &CompactionGroupLabel{
CollectionID: 1,
PartitionID: 20,
Channel: "ch1",
}
segments := []*SegmentView{
{ID: 1, label: label1},
{ID: 2, label: label1},
{ID: 3, label: label2},
}
groups := groupByPartitionChannel(segments)
assert.Equal(t, 2, len(groups))
var count1, count2 int
for _, segs := range groups {
if len(segs) == 2 {
count1++
} else if len(segs) == 1 {
count2++
}
}
assert.Equal(t, 1, count1)
assert.Equal(t, 1, count2)
}
func TestGroupByPartitionChannel_EmptySegments(t *testing.T) {
groups := groupByPartitionChannel([]*SegmentView{})
assert.Empty(t, groups)
}
func TestGroupByPartitionChannel_SameLabel(t *testing.T) {
label := &CompactionGroupLabel{
CollectionID: 1,
PartitionID: 10,
Channel: "ch1",
}
segments := []*SegmentView{
{ID: 1, label: label},
{ID: 2, label: label},
{ID: 3, label: label},
}
groups := groupByPartitionChannel(segments)
assert.Equal(t, 1, len(groups))
for _, segs := range groups {
assert.Equal(t, 3, len(segs))
}
}
// Benchmark tests
func BenchmarkLargerGroupingSegments(b *testing.B) {
sizes := []int{10, 50, 100, 500}
for _, n := range sizes {
b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) {
segments := make([]*SegmentView, n)
for i := 0; i < n; i++ {
segments[i] = &SegmentView{
ID: int64(i),
Size: float64((i%10+1)*100*1024*1024 + i*1024*1024),
}
}
targetSize := float64(3 * 1024 * 1024 * 1024)
b.ResetTimer()
for i := 0; i < b.N; i++ {
largerGroupingSegments(segments, targetSize)
}
})
}
}
func BenchmarkMaxFullSegmentsGrouping(b *testing.B) {
sizes := []int{10, 50, 100}
for _, n := range sizes {
b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) {
segments := make([]*SegmentView, n)
for i := 0; i < n; i++ {
segments[i] = &SegmentView{
ID: int64(i),
Size: float64((i%10+1)*100*1024*1024 + i*1024*1024),
}
}
targetSize := float64(3 * 1024 * 1024 * 1024)
b.ResetTimer()
for i := 0; i < b.N; i++ {
maxFullSegmentsGrouping(segments, targetSize)
}
})
}
}
func BenchmarkGroupingAlgorithmsComparison(b *testing.B) {
sizes := []int{10, 50, 100, 200, 500}
targetSize := float64(3 * 1024 * 1024 * 1024)
for _, n := range sizes {
segments := make([]*SegmentView, n)
for i := 0; i < n; i++ {
segments[i] = &SegmentView{
ID: int64(i),
Size: float64((i%10+1)*100*1024*1024 + i*1024*1024),
}
}
b.Run(fmt.Sprintf("adaptive/n=%d", n), func(b *testing.B) {
for i := 0; i < b.N; i++ {
adaptiveGroupSegments(segments, targetSize)
}
})
b.Run(fmt.Sprintf("larger/n=%d", n), func(b *testing.B) {
for i := 0; i < b.N; i++ {
largerGroupingSegments(segments, targetSize)
}
})
// Only test maxFull with smaller sizes due to O(n³) complexity
if n <= 200 {
b.Run(fmt.Sprintf("maxFull/n=%d", n), func(b *testing.B) {
for i := 0; i < b.N; i++ {
maxFullSegmentsGrouping(segments, targetSize)
}
})
}
}
}
func BenchmarkGroupByPartitionChannel(b *testing.B) {
sizes := []int{10, 100, 1000}
for _, n := range sizes {
b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) {
segments := make([]*SegmentView, n)
for i := 0; i < n; i++ {
label := &CompactionGroupLabel{
CollectionID: 1,
PartitionID: int64(i % 5),
Channel: fmt.Sprintf("ch%d", i%3),
}
segments[i] = &SegmentView{
ID: int64(i),
label: label,
}
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = groupByPartitionChannel(segments)
}
})
}
}

View File

@ -903,7 +903,7 @@ func createSortCompactionTask(ctx context.Context,
ResultSegments: []int64{},
TotalRows: originSegment.GetNumOfRows(),
LastStateStartTime: time.Now().Unix(),
MaxSize: getExpandedSize(expectedSize),
MaxSize: expectedSize,
PreAllocatedSegmentIDs: &datapb.IDRange{
Begin: targetSegmentID,
End: targetSegmentID + 1,

View File

@ -0,0 +1,95 @@
// Code generated by mockery v2.53.3. DO NOT EDIT.
package datacoord
import (
context "context"
mock "github.com/stretchr/testify/mock"
)
// MockCollectionTopologyQuerier is an autogenerated mock type for the CollectionTopologyQuerier type
type MockCollectionTopologyQuerier struct {
mock.Mock
}
type MockCollectionTopologyQuerier_Expecter struct {
mock *mock.Mock
}
func (_m *MockCollectionTopologyQuerier) EXPECT() *MockCollectionTopologyQuerier_Expecter {
return &MockCollectionTopologyQuerier_Expecter{mock: &_m.Mock}
}
// GetCollectionTopology provides a mock function with given fields: ctx, collectionID
func (_m *MockCollectionTopologyQuerier) GetCollectionTopology(ctx context.Context, collectionID int64) (*CollectionTopology, error) {
ret := _m.Called(ctx, collectionID)
if len(ret) == 0 {
panic("no return value specified for GetCollectionTopology")
}
var r0 *CollectionTopology
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int64) (*CollectionTopology, error)); ok {
return rf(ctx, collectionID)
}
if rf, ok := ret.Get(0).(func(context.Context, int64) *CollectionTopology); ok {
r0 = rf(ctx, collectionID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*CollectionTopology)
}
}
if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok {
r1 = rf(ctx, collectionID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockCollectionTopologyQuerier_GetCollectionTopology_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCollectionTopology'
type MockCollectionTopologyQuerier_GetCollectionTopology_Call struct {
*mock.Call
}
// GetCollectionTopology is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
func (_e *MockCollectionTopologyQuerier_Expecter) GetCollectionTopology(ctx interface{}, collectionID interface{}) *MockCollectionTopologyQuerier_GetCollectionTopology_Call {
return &MockCollectionTopologyQuerier_GetCollectionTopology_Call{Call: _e.mock.On("GetCollectionTopology", ctx, collectionID)}
}
func (_c *MockCollectionTopologyQuerier_GetCollectionTopology_Call) Run(run func(ctx context.Context, collectionID int64)) *MockCollectionTopologyQuerier_GetCollectionTopology_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64))
})
return _c
}
func (_c *MockCollectionTopologyQuerier_GetCollectionTopology_Call) Return(_a0 *CollectionTopology, _a1 error) *MockCollectionTopologyQuerier_GetCollectionTopology_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockCollectionTopologyQuerier_GetCollectionTopology_Call) RunAndReturn(run func(context.Context, int64) (*CollectionTopology, error)) *MockCollectionTopologyQuerier_GetCollectionTopology_Call {
_c.Call.Return(run)
return _c
}
// NewMockCollectionTopologyQuerier creates a new instance of MockCollectionTopologyQuerier. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockCollectionTopologyQuerier(t interface {
mock.TestingT
Cleanup(func())
}) *MockCollectionTopologyQuerier {
mock := &MockCollectionTopologyQuerier{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -5,7 +5,12 @@ package datacoord
import (
context "context"
session "github.com/milvus-io/milvus/internal/datacoord/session"
mock "github.com/stretchr/testify/mock"
sessionutil "github.com/milvus-io/milvus/internal/util/sessionutil"
types "github.com/milvus-io/milvus/internal/types"
)
// MockTriggerManager is an autogenerated mock type for the TriggerManager type
@ -119,9 +124,44 @@ func (_c *MockTriggerManager_GetResumeCompactionChan_Call) RunAndReturn(run func
return _c
}
// ManualTrigger provides a mock function with given fields: ctx, collectionID, clusteringCompaction, l0Compaction
func (_m *MockTriggerManager) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool) (int64, error) {
ret := _m.Called(ctx, collectionID, clusteringCompaction, l0Compaction)
// InitForceMergeMemoryQuerier provides a mock function with given fields: nodeManager, mixCoord, _a2
func (_m *MockTriggerManager) InitForceMergeMemoryQuerier(nodeManager session.NodeManager, mixCoord types.MixCoord, _a2 sessionutil.SessionInterface) {
_m.Called(nodeManager, mixCoord, _a2)
}
// MockTriggerManager_InitForceMergeMemoryQuerier_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InitForceMergeMemoryQuerier'
type MockTriggerManager_InitForceMergeMemoryQuerier_Call struct {
*mock.Call
}
// InitForceMergeMemoryQuerier is a helper method to define mock.On call
// - nodeManager session.NodeManager
// - mixCoord types.MixCoord
// - _a2 sessionutil.SessionInterface
func (_e *MockTriggerManager_Expecter) InitForceMergeMemoryQuerier(nodeManager interface{}, mixCoord interface{}, _a2 interface{}) *MockTriggerManager_InitForceMergeMemoryQuerier_Call {
return &MockTriggerManager_InitForceMergeMemoryQuerier_Call{Call: _e.mock.On("InitForceMergeMemoryQuerier", nodeManager, mixCoord, _a2)}
}
func (_c *MockTriggerManager_InitForceMergeMemoryQuerier_Call) Run(run func(nodeManager session.NodeManager, mixCoord types.MixCoord, _a2 sessionutil.SessionInterface)) *MockTriggerManager_InitForceMergeMemoryQuerier_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(session.NodeManager), args[1].(types.MixCoord), args[2].(sessionutil.SessionInterface))
})
return _c
}
func (_c *MockTriggerManager_InitForceMergeMemoryQuerier_Call) Return() *MockTriggerManager_InitForceMergeMemoryQuerier_Call {
_c.Call.Return()
return _c
}
func (_c *MockTriggerManager_InitForceMergeMemoryQuerier_Call) RunAndReturn(run func(session.NodeManager, types.MixCoord, sessionutil.SessionInterface)) *MockTriggerManager_InitForceMergeMemoryQuerier_Call {
_c.Run(run)
return _c
}
// ManualTrigger provides a mock function with given fields: ctx, collectionID, clusteringCompaction, l0Compaction, targetSize
func (_m *MockTriggerManager) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool, targetSize int64) (int64, error) {
ret := _m.Called(ctx, collectionID, clusteringCompaction, l0Compaction, targetSize)
if len(ret) == 0 {
panic("no return value specified for ManualTrigger")
@ -129,17 +169,17 @@ func (_m *MockTriggerManager) ManualTrigger(ctx context.Context, collectionID in
var r0 int64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int64, bool, bool) (int64, error)); ok {
return rf(ctx, collectionID, clusteringCompaction, l0Compaction)
if rf, ok := ret.Get(0).(func(context.Context, int64, bool, bool, int64) (int64, error)); ok {
return rf(ctx, collectionID, clusteringCompaction, l0Compaction, targetSize)
}
if rf, ok := ret.Get(0).(func(context.Context, int64, bool, bool) int64); ok {
r0 = rf(ctx, collectionID, clusteringCompaction, l0Compaction)
if rf, ok := ret.Get(0).(func(context.Context, int64, bool, bool, int64) int64); ok {
r0 = rf(ctx, collectionID, clusteringCompaction, l0Compaction, targetSize)
} else {
r0 = ret.Get(0).(int64)
}
if rf, ok := ret.Get(1).(func(context.Context, int64, bool, bool) error); ok {
r1 = rf(ctx, collectionID, clusteringCompaction, l0Compaction)
if rf, ok := ret.Get(1).(func(context.Context, int64, bool, bool, int64) error); ok {
r1 = rf(ctx, collectionID, clusteringCompaction, l0Compaction, targetSize)
} else {
r1 = ret.Error(1)
}
@ -157,13 +197,14 @@ type MockTriggerManager_ManualTrigger_Call struct {
// - collectionID int64
// - clusteringCompaction bool
// - l0Compaction bool
func (_e *MockTriggerManager_Expecter) ManualTrigger(ctx interface{}, collectionID interface{}, clusteringCompaction interface{}, l0Compaction interface{}) *MockTriggerManager_ManualTrigger_Call {
return &MockTriggerManager_ManualTrigger_Call{Call: _e.mock.On("ManualTrigger", ctx, collectionID, clusteringCompaction, l0Compaction)}
// - targetSize int64
func (_e *MockTriggerManager_Expecter) ManualTrigger(ctx interface{}, collectionID interface{}, clusteringCompaction interface{}, l0Compaction interface{}, targetSize interface{}) *MockTriggerManager_ManualTrigger_Call {
return &MockTriggerManager_ManualTrigger_Call{Call: _e.mock.On("ManualTrigger", ctx, collectionID, clusteringCompaction, l0Compaction, targetSize)}
}
func (_c *MockTriggerManager_ManualTrigger_Call) Run(run func(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool)) *MockTriggerManager_ManualTrigger_Call {
func (_c *MockTriggerManager_ManualTrigger_Call) Run(run func(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool, targetSize int64)) *MockTriggerManager_ManualTrigger_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64), args[2].(bool), args[3].(bool))
run(args[0].(context.Context), args[1].(int64), args[2].(bool), args[3].(bool), args[4].(int64))
})
return _c
}
@ -173,7 +214,7 @@ func (_c *MockTriggerManager_ManualTrigger_Call) Return(_a0 int64, _a1 error) *M
return _c
}
func (_c *MockTriggerManager_ManualTrigger_Call) RunAndReturn(run func(context.Context, int64, bool, bool) (int64, error)) *MockTriggerManager_ManualTrigger_Call {
func (_c *MockTriggerManager_ManualTrigger_Call) RunAndReturn(run func(context.Context, int64, bool, bool, int64) (int64, error)) *MockTriggerManager_ManualTrigger_Call {
_c.Call.Return(run)
return _c
}

View File

@ -697,6 +697,7 @@ func (s *Server) initCompaction() {
cph.loadMeta()
s.compactionInspector = cph
s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionInspector, s.meta, s.importMeta)
s.compactionTriggerManager.InitForceMergeMemoryQuerier(s.nodeManager, s.mixCoord, s.session)
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionInspector, s.allocator, s.handler, s.indexEngineVersionManager)
}

View File

@ -1700,7 +1700,7 @@ func TestManualCompaction(t *testing.T) {
svr.stateCode.Store(commonpb.StateCode_Healthy)
mockTriggerManager := NewMockTriggerManager(t)
svr.compactionTriggerManager = mockTriggerManager
mockTriggerManager.EXPECT().ManualTrigger(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(1, nil)
mockTriggerManager.EXPECT().ManualTrigger(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(1, nil)
mockHandler := NewMockCompactionInspector(t)
mockHandler.EXPECT().getCompactionTasksNumBySignalID(mock.Anything).Return(1)

View File

@ -1260,7 +1260,7 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
log.Info("received manual compaction")
log.Info("received manual compaction", zap.Any("request", req))
resp := &milvuspb.ManualCompactionResponse{
Status: merr.Success(),
@ -1279,8 +1279,8 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa
var id int64
var err error
if req.GetMajorCompaction() || req.GetL0Compaction() {
id, err = s.compactionTriggerManager.ManualTrigger(ctx, req.CollectionID, req.GetMajorCompaction(), req.GetL0Compaction())
if req.GetMajorCompaction() || req.GetL0Compaction() || req.GetTargetSize() != 0 {
id, err = s.compactionTriggerManager.ManualTrigger(ctx, req.CollectionID, req.GetMajorCompaction(), req.GetL0Compaction(), req.GetTargetSize())
} else {
id, err = s.compactionTrigger.TriggerCompaction(ctx, NewCompactionSignal().
WithIsForce(true).
@ -1306,7 +1306,7 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa
}
log.Info("success to trigger manual compaction", zap.Bool("isL0Compaction", req.GetL0Compaction()),
zap.Bool("isMajorCompaction", req.GetMajorCompaction()), zap.Int64("compactionID", id), zap.Int("taskNum", taskCnt))
zap.Bool("isMajorCompaction", req.GetMajorCompaction()), zap.Int64("targetSize", req.GetTargetSize()), zap.Int64("compactionID", id), zap.Int("taskNum", taskCnt))
return resp, nil
}

View File

@ -160,10 +160,11 @@ func (suite *SegmentSuite) TestResourceUsageEstimate() {
usage := suite.growing.ResourceUsageEstimate()
suite.Zero(usage.MemorySize)
suite.Zero(usage.DiskSize)
// growing segment has no resource usage
// sealed segment has resource usage
usage = suite.sealed.ResourceUsageEstimate()
suite.NotZero(usage.MemorySize)
suite.Zero(usage.DiskSize)
// mmap is on
suite.Zero(usage.MemorySize)
suite.NotZero(usage.DiskSize)
suite.Zero(usage.MmapFieldCount)
}

View File

@ -3990,7 +3990,7 @@ This defaults to true, indicating that Milvus creates temporary index for growin
p.MmapVectorIndex = ParamItem{
Key: "queryNode.mmap.vectorIndex",
Version: "2.4.7",
DefaultValue: "false",
DefaultValue: "true",
Formatter: func(originValue string) string {
if p.MmapEnabled.GetAsBool() {
return "true"
@ -4005,7 +4005,7 @@ This defaults to true, indicating that Milvus creates temporary index for growin
p.MmapScalarField = ParamItem{
Key: "queryNode.mmap.scalarField",
Version: "2.4.7",
DefaultValue: "false",
DefaultValue: "true",
Formatter: func(originValue string) string {
if p.MmapEnabled.GetAsBool() {
return "true"
@ -4020,7 +4020,7 @@ This defaults to true, indicating that Milvus creates temporary index for growin
p.MmapScalarIndex = ParamItem{
Key: "queryNode.mmap.scalarIndex",
Version: "2.4.7",
DefaultValue: "false",
DefaultValue: "true",
Formatter: func(originValue string) string {
if p.MmapEnabled.GetAsBool() {
return "true"
@ -4045,7 +4045,7 @@ This defaults to true, indicating that Milvus creates temporary index for growin
p.GrowingMmapEnabled = ParamItem{
Key: "queryNode.mmap.growingMmapEnabled",
Version: "2.4.6",
DefaultValue: "false",
DefaultValue: "true",
FallbackKeys: []string{"queryNode.growingMmapEnabled"},
Doc: `Enable memory mapping (mmap) to optimize the handling of growing raw data.
By activating this feature, the memory overhead associated with newly added or modified data will be significantly minimized.
@ -4605,22 +4605,25 @@ type dataCoordConfig struct {
CompactionTaskQueueCapacity ParamItem `refreshable:"false"`
CompactionPreAllocateIDExpansionFactor ParamItem `refreshable:"false"`
CompactionRPCTimeout ParamItem `refreshable:"true"`
CompactionMaxParallelTasks ParamItem `refreshable:"true"`
CompactionWorkerParallelTasks ParamItem `refreshable:"true"`
MinSegmentToMerge ParamItem `refreshable:"true"`
SegmentSmallProportion ParamItem `refreshable:"true"`
SegmentCompactableProportion ParamItem `refreshable:"true"`
SegmentExpansionRate ParamItem `refreshable:"true"`
CompactionTimeoutInSeconds ParamItem `refreshable:"true"` // deprecated
CompactionDropToleranceInSeconds ParamItem `refreshable:"true"`
CompactionGCIntervalInSeconds ParamItem `refreshable:"true"`
CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` // deprecated
CompactionScheduleInterval ParamItem `refreshable:"false"`
MixCompactionTriggerInterval ParamItem `refreshable:"false"`
L0CompactionTriggerInterval ParamItem `refreshable:"false"`
GlobalCompactionInterval ParamItem `refreshable:"false"`
CompactionExpiryTolerance ParamItem `refreshable:"true"`
CompactionRPCTimeout ParamItem `refreshable:"true"`
CompactionMaxParallelTasks ParamItem `refreshable:"true"`
CompactionWorkerParallelTasks ParamItem `refreshable:"true"`
CompactionMaxFullSegmentThreshold ParamItem `refreshable:"true"`
CompactionForceMergeDataNodeMemoryFactor ParamItem `refreshable:"true"`
CompactionForceMergeQueryNodeMemoryFactor ParamItem `refreshable:"true"`
MinSegmentToMerge ParamItem `refreshable:"true"`
SegmentSmallProportion ParamItem `refreshable:"true"`
SegmentCompactableProportion ParamItem `refreshable:"true"`
SegmentExpansionRate ParamItem `refreshable:"true"`
CompactionTimeoutInSeconds ParamItem `refreshable:"true"` // deprecated
CompactionDropToleranceInSeconds ParamItem `refreshable:"true"`
CompactionGCIntervalInSeconds ParamItem `refreshable:"true"`
CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` // deprecated
CompactionScheduleInterval ParamItem `refreshable:"false"`
MixCompactionTriggerInterval ParamItem `refreshable:"false"`
L0CompactionTriggerInterval ParamItem `refreshable:"false"`
GlobalCompactionInterval ParamItem `refreshable:"false"`
CompactionExpiryTolerance ParamItem `refreshable:"true"`
SingleCompactionRatioThreshold ParamItem `refreshable:"true"`
SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"`
@ -5070,6 +5073,47 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.CompactionScheduleInterval.Init(base.mgr)
p.CompactionMaxFullSegmentThreshold = ParamItem{
Key: "dataCoord.compaction.maxFullSegmentThreshold",
Version: "2.6.8",
DefaultValue: "100",
Doc: "Maximum number of segments to use maxFull algorithm (O(n³) complexity) for optimal full segment count. For larger counts, uses faster larger algorithm (O(n)).",
Export: false,
}
p.CompactionMaxFullSegmentThreshold.Init(base.mgr)
p.CompactionForceMergeDataNodeMemoryFactor = ParamItem{
Key: "dataCoord.compaction.forceMerge.dataNodeMemoryFactor",
Version: "2.6.8",
DefaultValue: "3.0",
Doc: "Memory safety factor for DataNode during force merge compaction. Max segment size = DataNode memory / factor. Default 3.0 means segments can use up to 1/3 of DataNode memory. Must be >= 1.0.",
Export: false,
Formatter: func(value string) string {
factor, err := strconv.ParseFloat(value, 64)
if err != nil || factor < 1.0 {
factor = 1.0
}
return strconv.FormatFloat(factor, 'f', -1, 64)
},
}
p.CompactionForceMergeDataNodeMemoryFactor.Init(base.mgr)
p.CompactionForceMergeQueryNodeMemoryFactor = ParamItem{
Key: "dataCoord.compaction.forceMerge.queryNodeMemoryFactor",
Version: "2.6.8",
DefaultValue: "3.0",
Doc: "Memory safety factor for QueryNode when loading segments after force merge. Max segment size = QueryNode memory / factor. Default 3.0 means segments can use up to 40% of QueryNode memory. Must be >= 1.0.",
Export: false,
Formatter: func(value string) string {
factor, err := strconv.ParseFloat(value, 64)
if err != nil || factor < 1.0 {
factor = 1.0
}
return strconv.FormatFloat(factor, 'f', -1, 64)
},
}
p.CompactionForceMergeQueryNodeMemoryFactor.Init(base.mgr)
p.SingleCompactionRatioThreshold = ParamItem{
Key: "dataCoord.compaction.single.ratio.threshold",
Version: "2.0.0",