From 0507db20156a3d91689ee8a3e99e0b40b5ab12be Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Fri, 19 Dec 2025 18:03:18 +0800 Subject: [PATCH] feat: Add force merge (#45556) See also: #46043 --------- Signed-off-by: yangxuan --- configs/milvus.yaml | 8 +- internal/datacoord/.mockery.yaml | 1 + .../datacoord/compaction_policy_forcemerge.go | 274 ++++++++ .../compaction_policy_forcemerge_test.go | 320 +++++++++ internal/datacoord/compaction_trigger.go | 7 +- internal/datacoord/compaction_trigger_test.go | 2 +- internal/datacoord/compaction_trigger_v2.go | 159 ++++- .../datacoord/compaction_trigger_v2_test.go | 4 +- .../datacoord/compaction_view_forcemerge.go | 309 +++++++++ .../compaction_view_forcemerge_test.go | 631 ++++++++++++++++++ internal/datacoord/import_util.go | 2 +- .../mock_collection_topology_querier.go | 95 +++ internal/datacoord/mock_trigger_manager.go | 69 +- internal/datacoord/server.go | 1 + internal/datacoord/server_test.go | 2 +- internal/datacoord/services.go | 8 +- internal/querynodev2/segments/segment_test.go | 7 +- pkg/util/paramtable/component_param.go | 84 ++- 18 files changed, 1898 insertions(+), 85 deletions(-) create mode 100644 internal/datacoord/compaction_policy_forcemerge.go create mode 100644 internal/datacoord/compaction_policy_forcemerge_test.go create mode 100644 internal/datacoord/compaction_view_forcemerge.go create mode 100644 internal/datacoord/compaction_view_forcemerge_test.go create mode 100644 internal/datacoord/mock_collection_topology_querier.go diff --git a/configs/milvus.yaml b/configs/milvus.yaml index aefe5815c1..08a845c11d 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -520,14 +520,14 @@ queryNode: readAheadPolicy: willneed # The read ahead policy of chunk cache, options: `normal, random, sequential, willneed, dontneed` mmap: vectorField: true # Enable mmap for loading vector data - vectorIndex: false # Enable mmap for loading vector index - scalarField: false # Enable mmap for loading scalar data - scalarIndex: false # Enable mmap for loading scalar index + vectorIndex: true # Enable mmap for loading vector index + scalarField: true # Enable mmap for loading scalar data + scalarIndex: true # Enable mmap for loading scalar index jsonShredding: true # Enable mmap for loading json stats # Enable memory mapping (mmap) to optimize the handling of growing raw data. # By activating this feature, the memory overhead associated with newly added or modified data will be significantly minimized. # However, this optimization may come at the cost of a slight decrease in query latency for the affected data segments. - growingMmapEnabled: false + growingMmapEnabled: true fixedFileSizeForMmapAlloc: 1 # tmp file size for mmap chunk manager maxDiskUsagePercentageForMmapAlloc: 50 # disk percentage used in mmap chunk manager lazyload: diff --git a/internal/datacoord/.mockery.yaml b/internal/datacoord/.mockery.yaml index 958024928d..1fcd08b22a 100644 --- a/internal/datacoord/.mockery.yaml +++ b/internal/datacoord/.mockery.yaml @@ -33,6 +33,7 @@ packages: SubCluster: StatsJobManager: ImportMeta: + CollectionTopologyQuerier: github.com/milvus-io/milvus/internal/datacoord/allocator: interfaces: Allocator: diff --git a/internal/datacoord/compaction_policy_forcemerge.go b/internal/datacoord/compaction_policy_forcemerge.go new file mode 100644 index 0000000000..2d7563f423 --- /dev/null +++ b/internal/datacoord/compaction_policy_forcemerge.go @@ -0,0 +1,274 @@ +package datacoord + +import ( + "context" + "fmt" + + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/datacoord/allocator" + "github.com/milvus-io/milvus/internal/datacoord/session" + "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" +) + +const ( + // Fallback memory for pooling DataNode (returns 0 from GetMetrics) + defaultPoolingDataNodeMemory = 32 * 1024 * 1024 * 1024 // 32GB +) + +// CollectionTopology captures memory constraints for a collection +type CollectionTopology struct { + CollectionID int64 + NumReplicas int + IsStandaloneMode bool + IsPooling bool + + QueryNodeMemory map[int64]uint64 + DataNodeMemory map[int64]uint64 +} + +// CollectionTopologyQuerier queries collection topology including replicas and memory info +type CollectionTopologyQuerier interface { + GetCollectionTopology(ctx context.Context, collectionID int64) (*CollectionTopology, error) +} + +type forceMergeCompactionPolicy struct { + meta *meta + allocator allocator.Allocator + handler Handler + topologyQuerier CollectionTopologyQuerier +} + +func newForceMergeCompactionPolicy(meta *meta, allocator allocator.Allocator, handler Handler) *forceMergeCompactionPolicy { + return &forceMergeCompactionPolicy{ + meta: meta, + allocator: allocator, + handler: handler, + topologyQuerier: nil, + } +} + +func (policy *forceMergeCompactionPolicy) SetTopologyQuerier(querier CollectionTopologyQuerier) { + policy.topologyQuerier = querier +} + +func (policy *forceMergeCompactionPolicy) triggerOneCollection( + ctx context.Context, + collectionID int64, + targetSize int64, +) ([]CompactionView, int64, error) { + log := log.Ctx(ctx).With( + zap.Int64("collectionID", collectionID), + zap.Int64("targetSize", targetSize)) + collection, err := policy.handler.GetCollection(ctx, collectionID) + if err != nil { + return nil, 0, err + } + triggerID, err := policy.allocator.AllocID(ctx) + if err != nil { + return nil, 0, err + } + + collectionTTL, err := getCollectionTTL(collection.Properties) + if err != nil { + log.Warn("failed to get collection ttl, use default", zap.Error(err)) + collectionTTL = 0 + } + + configMaxSize := getExpectedSegmentSize(policy.meta, collectionID, collection.Schema) + + segments := policy.meta.SelectSegments(ctx, WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool { + return isSegmentHealthy(segment) && + isFlushed(segment) && + !segment.isCompacting && + !segment.GetIsImporting() && + segment.GetLevel() != datapb.SegmentLevel_L0 + })) + + if len(segments) == 0 { + log.Info("no eligible segments for force merge") + return nil, 0, nil + } + + topology, err := policy.topologyQuerier.GetCollectionTopology(ctx, collectionID) + if err != nil { + return nil, 0, err + } + + views := []CompactionView{} + for label, groups := range groupByPartitionChannel(GetViewsByInfo(segments...)) { + view := &ForceMergeSegmentView{ + label: label, + segments: groups, + triggerID: triggerID, + collectionTTL: collectionTTL, + + configMaxSize: float64(configMaxSize), + topology: topology, + } + views = append(views, view) + } + return views, triggerID, nil + + log.Info("force merge triggered", zap.Int("viewCount", len(views))) + return views, triggerID, nil +} + +func groupByPartitionChannel(segments []*SegmentView) map[*CompactionGroupLabel][]*SegmentView { + result := make(map[*CompactionGroupLabel][]*SegmentView) + + for _, seg := range segments { + label := seg.label + key := label.Key() + + var foundLabel *CompactionGroupLabel + for l := range result { + if l.Key() == key { + foundLabel = l + break + } + } + if foundLabel == nil { + foundLabel = label + } + + result[foundLabel] = append(result[foundLabel], seg) + } + + return result +} + +type metricsNodeMemoryQuerier struct { + nodeManager session.NodeManager + mixCoord types.MixCoord + session sessionutil.SessionInterface +} + +func newMetricsNodeMemoryQuerier(nodeManager session.NodeManager, mixCoord types.MixCoord, session sessionutil.SessionInterface) *metricsNodeMemoryQuerier { + return &metricsNodeMemoryQuerier{ + nodeManager: nodeManager, + mixCoord: mixCoord, + session: session, + } +} + +var _ CollectionTopologyQuerier = (*metricsNodeMemoryQuerier)(nil) + +func (q *metricsNodeMemoryQuerier) GetCollectionTopology(ctx context.Context, collectionID int64) (*CollectionTopology, error) { + log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID)) + if q.mixCoord == nil { + return nil, fmt.Errorf("mixCoord not available for topology query") + } + + // 1. Get replica information + replicasResp, err := q.mixCoord.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + CollectionID: collectionID, + }) + if err != nil { + return nil, err + } + numReplicas := len(replicasResp.GetReplicas()) + + // 2. Get QueryNode metrics for memory info + req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) + if err != nil { + return nil, err + } + + // Get QueryNode sessions from etcd to filter out embedded nodes + sessions, _, err := q.session.GetSessions(ctx, typeutil.QueryNodeRole) + if err != nil { + log.Warn("failed to get QueryNode sessions", zap.Error(err)) + return nil, err + } + + // Build set of embedded QueryNode IDs to exclude + embeddedNodeIDs := make(map[int64]struct{}) + for _, sess := range sessions { + // Check if this is an embedded QueryNode in streaming node + if labels := sess.ServerLabels; labels != nil { + if labels[sessionutil.LabelStreamingNodeEmbeddedQueryNode] == "1" { + embeddedNodeIDs[sess.ServerID] = struct{}{} + } + } + } + + log.Info("excluding embedded QueryNode", zap.Int64s("nodeIDs", lo.Keys(embeddedNodeIDs))) + rsp, err := q.mixCoord.GetQcMetrics(ctx, req) + if err = merr.CheckRPCCall(rsp, err); err != nil { + return nil, err + } + topology := &metricsinfo.QueryCoordTopology{} + if err := metricsinfo.UnmarshalTopology(rsp.GetResponse(), topology); err != nil { + return nil, err + } + + // Build QueryNode memory map: nodeID → memory size (exclude embedded nodes) + queryNodeMemory := make(map[int64]uint64) + for _, node := range topology.Cluster.ConnectedNodes { + if _, ok := embeddedNodeIDs[node.ID]; ok { + continue + } + queryNodeMemory[node.ID] = node.HardwareInfos.Memory + } + + // 3. Get DataNode memory info + dataNodeMemory := make(map[int64]uint64) + isPooling := false + nodes := q.nodeManager.GetClientIDs() + for _, nodeID := range nodes { + cli, err := q.nodeManager.GetClient(nodeID) + if err != nil { + continue + } + + resp, err := cli.GetMetrics(ctx, req) + if err != nil { + continue + } + + var infos metricsinfo.DataNodeInfos + if err := metricsinfo.UnmarshalComponentInfos(resp.GetResponse(), &infos); err != nil { + continue + } + + if infos.HardwareInfos.Memory > 0 { + dataNodeMemory[nodeID] = infos.HardwareInfos.Memory + } else { + // Pooling DataNode returns 0 from GetMetrics + // Use default fallback: 32GB + isPooling = true + log.Warn("DataNode returned 0 memory (pooling mode?), using default", + zap.Int64("nodeID", nodeID), + zap.Uint64("defaultMemory", defaultPoolingDataNodeMemory)) + dataNodeMemory[nodeID] = defaultPoolingDataNodeMemory + } + } + + isStandaloneMode := paramtable.GetRole() == typeutil.StandaloneRole + log.Info("Collection topology", + zap.Int64("collectionID", collectionID), + zap.Int("numReplicas", numReplicas), + zap.Any("querynodes", queryNodeMemory), + zap.Any("datanodes", dataNodeMemory), + zap.Bool("isStandaloneMode", isStandaloneMode), + zap.Bool("isPooling", isPooling)) + + return &CollectionTopology{ + CollectionID: collectionID, + NumReplicas: numReplicas, + QueryNodeMemory: queryNodeMemory, + DataNodeMemory: dataNodeMemory, + IsStandaloneMode: isStandaloneMode, + IsPooling: isPooling, + }, nil +} diff --git a/internal/datacoord/compaction_policy_forcemerge_test.go b/internal/datacoord/compaction_policy_forcemerge_test.go new file mode 100644 index 0000000000..836c43ffd6 --- /dev/null +++ b/internal/datacoord/compaction_policy_forcemerge_test.go @@ -0,0 +1,320 @@ +package datacoord + +import ( + "context" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" + "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/util/merr" +) + +func TestForceMergeCompactionPolicySuite(t *testing.T) { + suite.Run(t, new(ForceMergeCompactionPolicySuite)) +} + +type ForceMergeCompactionPolicySuite struct { + suite.Suite + + mockAlloc *allocator.MockAllocator + mockHandler *NMockHandler + mockQuerier *MockCollectionTopologyQuerier + testLabel *CompactionGroupLabel + + policy *forceMergeCompactionPolicy +} + +func (s *ForceMergeCompactionPolicySuite) SetupTest() { + s.testLabel = &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch-1", + } + + segments := genSegmentsForMeta(s.testLabel) + meta, err := newMemoryMeta(s.T()) + s.Require().NoError(err) + for id, segment := range segments { + meta.segments.SetSegment(id, segment) + } + + s.mockAlloc = allocator.NewMockAllocator(s.T()) + s.mockHandler = NewNMockHandler(s.T()) + s.mockQuerier = NewMockCollectionTopologyQuerier(s.T()) + + s.policy = newForceMergeCompactionPolicy(meta, s.mockAlloc, s.mockHandler) + s.policy.SetTopologyQuerier(s.mockQuerier) +} + +func (s *ForceMergeCompactionPolicySuite) TestNewForceMergeCompactionPolicy() { + meta, err := newMemoryMeta(s.T()) + s.Require().NoError(err) + policy := newForceMergeCompactionPolicy(meta, s.mockAlloc, s.mockHandler) + + s.NotNil(policy) + s.Equal(meta, policy.meta) + s.Equal(s.mockAlloc, policy.allocator) + s.Equal(s.mockHandler, policy.handler) + s.Nil(policy.topologyQuerier) +} + +func (s *ForceMergeCompactionPolicySuite) TestSetTopologyQuerier() { + policy := newForceMergeCompactionPolicy(s.policy.meta, s.mockAlloc, s.mockHandler) + s.Nil(policy.topologyQuerier) + + policy.SetTopologyQuerier(s.mockQuerier) + s.Equal(s.mockQuerier, policy.topologyQuerier) +} + +func (s *ForceMergeCompactionPolicySuite) TestTriggerOneCollection_Success() { + ctx := context.Background() + collectionID := int64(1) + targetSize := int64(1024 * 1024 * 512) + triggerID := int64(100) + + coll := &collectionInfo{ + ID: collectionID, + Schema: newTestSchema(), + Properties: nil, + } + + topology := &CollectionTopology{ + CollectionID: collectionID, + NumReplicas: 1, + } + + s.mockHandler.EXPECT().GetCollection(mock.Anything, collectionID).Return(coll, nil) + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(triggerID, nil) + s.mockQuerier.EXPECT().GetCollectionTopology(mock.Anything, collectionID).Return(topology, nil) + + views, gotTriggerID, err := s.policy.triggerOneCollection(ctx, collectionID, targetSize) + + s.NoError(err) + s.Equal(triggerID, gotTriggerID) + s.NotNil(views) + s.Greater(len(views), 0) + + for _, view := range views { + s.NotNil(view) + s.NotNil(view.GetGroupLabel()) + } +} + +func (s *ForceMergeCompactionPolicySuite) TestTriggerOneCollection_GetCollectionError() { + ctx := context.Background() + collectionID := int64(999) + targetSize := int64(1024 * 1024 * 512) + + s.mockHandler.EXPECT().GetCollection(mock.Anything, collectionID).Return(nil, merr.ErrCollectionNotFound) + + views, triggerID, err := s.policy.triggerOneCollection(ctx, collectionID, targetSize) + + s.Error(err) + s.Nil(views) + s.Equal(int64(0), triggerID) +} + +func (s *ForceMergeCompactionPolicySuite) TestTriggerOneCollection_AllocIDError() { + ctx := context.Background() + collectionID := int64(1) + targetSize := int64(1024 * 1024 * 512) + + coll := &collectionInfo{ + ID: collectionID, + Schema: newTestSchema(), + Properties: nil, + } + + s.mockHandler.EXPECT().GetCollection(mock.Anything, collectionID).Return(coll, nil) + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(int64(0), merr.ErrServiceUnavailable) + + views, triggerID, err := s.policy.triggerOneCollection(ctx, collectionID, targetSize) + + s.Error(err) + s.Nil(views) + s.Equal(int64(0), triggerID) +} + +func (s *ForceMergeCompactionPolicySuite) TestTriggerOneCollection_NoEligibleSegments() { + ctx := context.Background() + collectionID := int64(999) + targetSize := int64(1024 * 1024 * 512) + triggerID := int64(100) + + coll := &collectionInfo{ + ID: collectionID, + Schema: newTestSchema(), + Properties: nil, + } + + s.mockHandler.EXPECT().GetCollection(mock.Anything, collectionID).Return(coll, nil) + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(triggerID, nil) + + views, gotTriggerID, err := s.policy.triggerOneCollection(ctx, collectionID, targetSize) + + s.NoError(err) + s.Equal(int64(0), gotTriggerID) + s.Nil(views) +} + +func (s *ForceMergeCompactionPolicySuite) TestTriggerOneCollection_TopologyError() { + ctx := context.Background() + collectionID := int64(1) + targetSize := int64(1024 * 1024 * 512) + triggerID := int64(100) + + coll := &collectionInfo{ + ID: collectionID, + Schema: newTestSchema(), + Properties: nil, + } + + s.mockHandler.EXPECT().GetCollection(mock.Anything, collectionID).Return(coll, nil) + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(triggerID, nil) + s.mockQuerier.EXPECT().GetCollectionTopology(mock.Anything, collectionID).Return(nil, merr.ErrServiceUnavailable) + + views, gotTriggerID, err := s.policy.triggerOneCollection(ctx, collectionID, targetSize) + + s.Error(err) + s.Nil(views) + s.Equal(int64(0), gotTriggerID) +} + +func (s *ForceMergeCompactionPolicySuite) TestTriggerOneCollection_WithCollectionProperties() { + ctx := context.Background() + collectionID := int64(1) + targetSize := int64(1024 * 1024 * 512) + triggerID := int64(100) + + coll := &collectionInfo{ + ID: collectionID, + Schema: newTestSchema(), + Properties: map[string]string{ + "collection.ttl.seconds": "86400", + }, + } + + topology := &CollectionTopology{ + CollectionID: collectionID, + NumReplicas: 2, + } + + s.mockHandler.EXPECT().GetCollection(mock.Anything, collectionID).Return(coll, nil) + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(triggerID, nil) + s.mockQuerier.EXPECT().GetCollectionTopology(mock.Anything, collectionID).Return(topology, nil) + + views, gotTriggerID, err := s.policy.triggerOneCollection(ctx, collectionID, targetSize) + + s.NoError(err) + s.Equal(triggerID, gotTriggerID) + s.NotNil(views) +} + +func (s *ForceMergeCompactionPolicySuite) TestGroupByPartitionChannel_EmptySegments() { + segments := []*SegmentView{} + result := groupByPartitionChannel(segments) + + s.NotNil(result) + s.Equal(0, len(result)) +} + +func (s *ForceMergeCompactionPolicySuite) TestGroupByPartitionChannel_SingleGroup() { + segmentInfo := genTestSegmentInfo(s.testLabel, 100, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed) + segments := GetViewsByInfo(segmentInfo) + + result := groupByPartitionChannel(segments) + + s.NotNil(result) + s.Equal(1, len(result)) + + for label, segs := range result { + s.Equal(s.testLabel.CollectionID, label.CollectionID) + s.Equal(s.testLabel.PartitionID, label.PartitionID) + s.Equal(s.testLabel.Channel, label.Channel) + s.Equal(1, len(segs)) + } +} + +func (s *ForceMergeCompactionPolicySuite) TestGroupByPartitionChannel_MultipleGroups() { + label1 := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch-1", + } + label2 := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 11, + Channel: "ch-1", + } + label3 := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch-2", + } + + seg1 := genTestSegmentInfo(label1, 100, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed) + seg2 := genTestSegmentInfo(label2, 101, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed) + seg3 := genTestSegmentInfo(label3, 102, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed) + + segments := GetViewsByInfo(seg1, seg2, seg3) + result := groupByPartitionChannel(segments) + + s.NotNil(result) + s.Equal(3, len(result)) +} + +func (s *ForceMergeCompactionPolicySuite) TestGroupByPartitionChannel_SameGroupMultipleSegments() { + seg1 := genTestSegmentInfo(s.testLabel, 100, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed) + seg2 := genTestSegmentInfo(s.testLabel, 101, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed) + seg3 := genTestSegmentInfo(s.testLabel, 102, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed) + + segments := GetViewsByInfo(seg1, seg2, seg3) + result := groupByPartitionChannel(segments) + + s.NotNil(result) + s.Equal(1, len(result)) + + for label, segs := range result { + s.Equal(s.testLabel.Key(), label.Key()) + s.Equal(3, len(segs)) + } +} + +func (s *ForceMergeCompactionPolicySuite) TestTriggerOneCollection_FilterSegments() { + ctx := context.Background() + collectionID := int64(1) + targetSize := int64(1024 * 1024 * 512) + triggerID := int64(100) + + coll := &collectionInfo{ + ID: collectionID, + Schema: newTestSchema(), + Properties: nil, + } + + topology := &CollectionTopology{ + CollectionID: collectionID, + NumReplicas: 1, + } + + s.mockHandler.EXPECT().GetCollection(mock.Anything, collectionID).Return(coll, nil) + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(triggerID, nil) + s.mockQuerier.EXPECT().GetCollectionTopology(mock.Anything, collectionID).Return(topology, nil) + + views, gotTriggerID, err := s.policy.triggerOneCollection(ctx, collectionID, targetSize) + + s.NoError(err) + s.Equal(triggerID, gotTriggerID) + s.NotNil(views) + + for _, view := range views { + for _, seg := range view.GetSegmentsView() { + s.NotEqual(datapb.SegmentLevel_L0, seg.Level) + s.Equal(commonpb.SegmentState_Flushed, seg.State) + } + } +} diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 888a44da77..ae783cfd63 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -407,7 +407,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error { ResultSegments: []int64{}, TotalRows: totalRows, Schema: coll.Schema, - MaxSize: getExpandedSize(expectedSize), + MaxSize: expectedSize, PreAllocatedSegmentIDs: &datapb.IDRange{ Begin: startID + 1, End: endID, @@ -425,6 +425,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error { log.Info("time cost of generating compaction", zap.Int64("planID", task.GetPlanID()), zap.Int64("time cost", time.Since(start).Milliseconds()), + zap.Int64("target size", task.GetMaxSize()), zap.Int64s("inputSegments", inputSegmentIDs)) } } @@ -809,10 +810,6 @@ func (t *compactionTrigger) squeezeSmallSegmentsToBuckets(small []*SegmentInfo, return small } -func getExpandedSize(size int64) int64 { - return int64(float64(size) * Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()) -} - func canTriggerSortCompaction(segment *SegmentInfo, isPartitionIsolationEnabled bool) bool { return segment.GetState() == commonpb.SegmentState_Flushed && segment.GetLevel() != datapb.SegmentLevel_L0 && diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 16879f9db2..91af1553e8 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -631,7 +631,7 @@ func Test_compactionTrigger_force(t *testing.T) { Schema: schema, PreAllocatedSegmentIDs: &datapb.IDRange{Begin: 101, End: 200}, PreAllocatedLogIDs: &datapb.IDRange{Begin: 100, End: 200}, - MaxSize: 1342177280, + MaxSize: 1073741824, SlotUsage: paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(), JsonParams: params, }, diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 677cbba0ce..d3c4b5feb6 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -26,6 +26,9 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datacoord/allocator" + "github.com/milvus-io/milvus/internal/datacoord/session" + "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" @@ -45,8 +48,28 @@ const ( TriggerTypeSort TriggerTypePartitionKeySort TriggerTypeClusteringPartitionKeySort + TriggerTypeForceMerge ) +func (t CompactionTriggerType) GetCompactionType() datapb.CompactionType { + switch t { + case TriggerTypeLevelZeroViewChange, TriggerTypeLevelZeroViewIDLE, TriggerTypeLevelZeroViewManual: + return datapb.CompactionType_Level0DeleteCompaction + case TriggerTypeSegmentSizeViewChange, TriggerTypeSingle, TriggerTypeForceMerge: + return datapb.CompactionType_MixCompaction + case TriggerTypeClustering: + return datapb.CompactionType_ClusteringCompaction + case TriggerTypeSort: + return datapb.CompactionType_SortCompaction + case TriggerTypePartitionKeySort: + return datapb.CompactionType_PartitionKeySortCompaction + case TriggerTypeClusteringPartitionKeySort: + return datapb.CompactionType_ClusteringPartitionKeySortCompaction + default: + return datapb.CompactionType_MixCompaction + } +} + func (t CompactionTriggerType) String() string { switch t { case TriggerTypeLevelZeroViewChange: @@ -67,6 +90,8 @@ func (t CompactionTriggerType) String() string { return "PartitionKeySort" case TriggerTypeClusteringPartitionKeySort: return "ClusteringPartitionKeySort" + case TriggerTypeForceMerge: + return "ForceMerge" default: return "" } @@ -76,9 +101,10 @@ type TriggerManager interface { Start() Stop() OnCollectionUpdate(collectionID int64) - ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool) (UniqueID, error) + ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool, targetSize int64) (UniqueID, error) GetPauseCompactionChan(jobID, collectionID int64) <-chan struct{} GetResumeCompactionChan(jobID, collectionID int64) <-chan struct{} + InitForceMergeMemoryQuerier(nodeManager session.NodeManager, mixCoord types.MixCoord, session sessionutil.SessionInterface) } var _ TriggerManager = (*CompactionTriggerManager)(nil) @@ -86,13 +112,6 @@ var _ TriggerManager = (*CompactionTriggerManager)(nil) // CompactionTriggerManager registers Triggers to TriggerType // so that when the certain TriggerType happens, the corresponding triggers can // trigger the correct compaction plans. -// Trigger types: -// 1. Change of Views -// - LevelZeroViewTrigger -// - SegmentSizeViewTrigger -// -// 2. SystemIDLE & schedulerIDLE -// 3. Manual Compaction type CompactionTriggerManager struct { inspector CompactionInspector handler Handler @@ -103,6 +122,7 @@ type CompactionTriggerManager struct { l0Policy *l0CompactionPolicy clusteringPolicy *clusteringCompactionPolicy singlePolicy *singleCompactionPolicy + forceMergePolicy *forceMergeCompactionPolicy cancel context.CancelFunc closeWg sync.WaitGroup @@ -131,9 +151,18 @@ func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, ins m.l0Policy = newL0CompactionPolicy(meta, alloc) m.clusteringPolicy = newClusteringCompactionPolicy(meta, m.allocator, m.handler) m.singlePolicy = newSingleCompactionPolicy(meta, m.allocator, m.handler) + m.forceMergePolicy = newForceMergeCompactionPolicy(meta, m.allocator, m.handler) return m } +// InitForceMergeMemoryQuerier initializes the topology querier for force merge auto calculation +func (m *CompactionTriggerManager) InitForceMergeMemoryQuerier(nodeManager session.NodeManager, mixCoord types.MixCoord, session sessionutil.SessionInterface) { + if m.forceMergePolicy != nil { + querier := newMetricsNodeMemoryQuerier(nodeManager, mixCoord, session) + m.forceMergePolicy.SetTopologyQuerier(querier) + } +} + // OnCollectionUpdate notifies L0Policy about latest collection's L0 segment changes // This tells the l0 triggers about which collections are active func (m *CompactionTriggerManager) OnCollectionUpdate(collectionID int64) { @@ -310,21 +339,27 @@ func (m *CompactionTriggerManager) loop(ctx context.Context) { } } -func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool) (UniqueID, error) { - log.Ctx(ctx).Info("receive manual trigger", zap.Int64("collectionID", collectionID), - zap.Bool("clusteringCompaction", clusteringCompaction), zap.Bool("l0Compaction", l0Compaction)) +func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collectionID int64, isClustering bool, isL0 bool, targetSize int64) (UniqueID, error) { + log.Ctx(ctx).Info("receive manual trigger", + zap.Int64("collectionID", collectionID), + zap.Bool("is clustering", isClustering), + zap.Bool("is l0", isL0), + zap.Int64("targetSize", targetSize)) var triggerID UniqueID var err error var views []CompactionView events := make(map[CompactionTriggerType][]CompactionView, 0) - if l0Compaction { + if targetSize != 0 { + views, triggerID, err = m.forceMergePolicy.triggerOneCollection(ctx, collectionID, targetSize) + events[TriggerTypeForceMerge] = views + } else if isL0 { m.setL0Triggering(true) defer m.setL0Triggering(false) views, triggerID, err = m.l0Policy.triggerOneCollection(ctx, collectionID) events[TriggerTypeLevelZeroViewManual] = views - } else if clusteringCompaction { + } else if isClustering { views, triggerID, err = m.clusteringPolicy.triggerOneCollection(ctx, collectionID, true) events[TriggerTypeClustering] = views } @@ -342,14 +377,18 @@ func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collection func (m *CompactionTriggerManager) triggerViewForCompaction(ctx context.Context, eventType CompactionTriggerType, view CompactionView, ) ([]CompactionView, string) { - if eventType == TriggerTypeLevelZeroViewIDLE { + switch eventType { + case TriggerTypeLevelZeroViewIDLE: view, reason := view.ForceTrigger() return []CompactionView{view}, reason - } else if eventType == TriggerTypeLevelZeroViewManual { + + case TriggerTypeLevelZeroViewManual, TriggerTypeForceMerge: return view.ForceTriggerAll() + + default: + outView, reason := view.Trigger() + return []CompactionView{outView}, reason } - outView, reason := view.Trigger() - return []CompactionView{outView}, reason } func (m *CompactionTriggerManager) notify(ctx context.Context, eventType CompactionTriggerType, views []CompactionView) { @@ -370,14 +409,10 @@ func (m *CompactionTriggerManager) notify(ctx context.Context, eventType Compact m.SubmitL0ViewToScheduler(ctx, outView) case TriggerTypeClustering: m.SubmitClusteringViewToScheduler(ctx, outView) - case TriggerTypeSingle: - m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_MixCompaction) - case TriggerTypeSort: - m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_SortCompaction) - case TriggerTypePartitionKeySort: - m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_PartitionKeySortCompaction) - case TriggerTypeClusteringPartitionKeySort: - m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_ClusteringPartitionKeySortCompaction) + case TriggerTypeSingle, TriggerTypeSort, TriggerTypePartitionKeySort, TriggerTypeClusteringPartitionKeySort: + m.SubmitSingleViewToScheduler(ctx, outView, eventType) + case TriggerTypeForceMerge: + m.SubmitForceMergeViewToScheduler(ctx, outView) } } } @@ -591,14 +626,15 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C ) } -func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Context, view CompactionView, compactionType datapb.CompactionType) { - log := log.Ctx(ctx).With(zap.String("view", view.String())) +func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Context, view CompactionView, triggerType CompactionTriggerType) { + // single view is definitely one-one mapping + log := log.Ctx(ctx).With(zap.String("trigger type", triggerType.String()), zap.String("view", view.String())) // TODO[GOOSE], 11 = 1 planID + 10 segmentID, this is a hack need to be removed. // Any plan that output segment number greater than 10 will be marked as invalid plan for now. n := 11 * paramtable.Get().DataCoordCfg.CompactionPreAllocateIDExpansionFactor.GetAsInt64() startID, endID, err := m.allocator.AllocN(n) if err != nil { - log.Warn("fFailed to submit compaction view to scheduler because allocate id fail", zap.Error(err)) + log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.Error(err)) return } @@ -619,7 +655,7 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte State: datapb.CompactionTaskState_pipelining, StartTime: time.Now().Unix(), CollectionTtl: view.(*MixSegmentView).collectionTTL.Nanoseconds(), - Type: compactionType, // todo: use SingleCompaction + Type: triggerType.GetCompactionType(), CollectionID: view.GetGroupLabel().CollectionID, PartitionID: view.GetGroupLabel().PartitionID, Channel: view.GetGroupLabel().Channel, @@ -628,7 +664,7 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte ResultSegments: []int64{}, TotalRows: totalRows, LastStateStartTime: time.Now().Unix(), - MaxSize: getExpandedSize(expectedSize), + MaxSize: expectedSize, PreAllocatedSegmentIDs: &datapb.IDRange{ Begin: startID + 1, End: endID, @@ -641,11 +677,74 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte zap.Int64("planID", task.GetPlanID()), zap.Int64s("segmentIDs", task.GetInputSegments()), zap.Error(err)) + return } log.Info("Finish to submit a single compaction task", zap.Int64("triggerID", task.GetTriggerID()), zap.Int64("planID", task.GetPlanID()), zap.String("type", task.GetType().String()), + zap.Int64("targetSize", task.GetMaxSize()), + ) +} + +func (m *CompactionTriggerManager) SubmitForceMergeViewToScheduler(ctx context.Context, view CompactionView) { + log := log.Ctx(ctx).With(zap.String("view", view.String())) + + taskID, err := m.allocator.AllocID(ctx) + if err != nil { + log.Warn("Failed to allocate task ID", zap.Error(err)) + return + } + + collection, err := m.handler.GetCollection(ctx, view.GetGroupLabel().CollectionID) + if err != nil { + log.Warn("Failed to get collection", zap.Error(err)) + return + } + + totalRows := lo.SumBy(view.GetSegmentsView(), func(v *SegmentView) int64 { return v.NumOfRows }) + + targetCount := view.(*ForceMergeSegmentView).targetCount + n := targetCount * paramtable.Get().DataCoordCfg.CompactionPreAllocateIDExpansionFactor.GetAsInt64() + startID, endID, err := m.allocator.AllocN(n) + if err != nil { + log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.Error(err)) + return + } + + task := &datapb.CompactionTask{ + PlanID: taskID, + TriggerID: view.GetTriggerID(), + State: datapb.CompactionTaskState_pipelining, + StartTime: time.Now().Unix(), + CollectionTtl: view.(*ForceMergeSegmentView).collectionTTL.Nanoseconds(), + Type: datapb.CompactionType_MixCompaction, + CollectionID: view.GetGroupLabel().CollectionID, + PartitionID: view.GetGroupLabel().PartitionID, + Channel: view.GetGroupLabel().Channel, + Schema: collection.Schema, + InputSegments: lo.Map(view.GetSegmentsView(), func(segmentView *SegmentView, _ int) int64 { return segmentView.ID }), + ResultSegments: []int64{}, + TotalRows: totalRows, + LastStateStartTime: time.Now().Unix(), + MaxSize: int64(view.(*ForceMergeSegmentView).targetSize), + PreAllocatedSegmentIDs: &datapb.IDRange{ + Begin: startID + 1, + End: endID, + }, + } + + err = m.inspector.enqueueCompaction(task) + if err != nil { + log.Warn("Failed to enqueue task", zap.Error(err)) + return + } + + log.Info("Finish to submit force merge task", + zap.Int64("planID", taskID), + zap.Int64("triggerID", task.GetTriggerID()), + zap.Int64("collectionID", task.GetCollectionID()), + zap.Int64("targetSize", task.GetMaxSize()), ) } diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index 7089ac511b..3bd16ac175 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -432,14 +432,14 @@ func (s *CompactionTriggerManagerSuite) TestManualTriggerL0Compaction() { }).Return(nil).Once() // Test L0 manual trigger - triggerID, err := s.triggerManager.ManualTrigger(context.Background(), s.testLabel.CollectionID, false, true) + triggerID, err := s.triggerManager.ManualTrigger(context.Background(), s.testLabel.CollectionID, false, true, 0) s.NoError(err) s.Equal(int64(12345), triggerID) } func (s *CompactionTriggerManagerSuite) TestManualTriggerInvalidParams() { // Test with both clustering and L0 compaction false - triggerID, err := s.triggerManager.ManualTrigger(context.Background(), s.testLabel.CollectionID, false, false) + triggerID, err := s.triggerManager.ManualTrigger(context.Background(), s.testLabel.CollectionID, false, false, 0) s.NoError(err) s.Equal(int64(0), triggerID) } diff --git a/internal/datacoord/compaction_view_forcemerge.go b/internal/datacoord/compaction_view_forcemerge.go new file mode 100644 index 0000000000..01c76068b6 --- /dev/null +++ b/internal/datacoord/compaction_view_forcemerge.go @@ -0,0 +1,309 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datacoord + +import ( + "fmt" + "math" + "time" + + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" + "github.com/samber/lo" + "go.uber.org/zap" +) + +const ( + defaultToleranceMB = 0.05 +) + +// static segment view, only algothrims here, no IO +type ForceMergeSegmentView struct { + label *CompactionGroupLabel + segments []*SegmentView + triggerID int64 + collectionTTL time.Duration + + configMaxSize float64 + topology *CollectionTopology + + targetSize float64 + targetCount int64 +} + +func (v *ForceMergeSegmentView) GetGroupLabel() *CompactionGroupLabel { + return v.label +} + +func (v *ForceMergeSegmentView) GetSegmentsView() []*SegmentView { + return v.segments +} + +func (v *ForceMergeSegmentView) Append(segments ...*SegmentView) { + v.segments = append(v.segments, segments...) +} + +func (v *ForceMergeSegmentView) String() string { + return fmt.Sprintf("ForceMerge: %s, segments=%d, triggerID=%d", + v.label, len(v.segments), v.triggerID) +} + +func (v *ForceMergeSegmentView) Trigger() (CompactionView, string) { + panic("implement me") +} + +func (v *ForceMergeSegmentView) ForceTrigger() (CompactionView, string) { + panic("implement me") +} + +func (v *ForceMergeSegmentView) GetTriggerID() int64 { + return v.triggerID +} + +func (v *ForceMergeSegmentView) calculateTargetSizeCount() (maxSafeSize float64, targetCount int64) { + log := log.With(zap.Int64("triggerID", v.triggerID), zap.String("label", v.label.String())) + maxSafeSize = v.calculateMaxSafeSize() + if maxSafeSize < v.configMaxSize { + log.Info("maxSafeSize is less than configMaxSize, set to configMaxSize", + zap.Float64("targetSize", maxSafeSize), + zap.Float64("configMaxSize", v.configMaxSize)) + maxSafeSize = v.configMaxSize + } + + targetCount = max(1, int64(sumSegmentSize(v.segments)/maxSafeSize)) + log.Info("topology-aware force merge calculation", + zap.Int64("targetSegmentCount", targetCount), + zap.Float64("maxSafeSize", maxSafeSize)) + return maxSafeSize, targetCount +} + +func (v *ForceMergeSegmentView) ForceTriggerAll() ([]CompactionView, string) { + targetSizePerSegment, targetCount := v.calculateTargetSizeCount() + groups := adaptiveGroupSegments(v.segments, float64(targetSizePerSegment)) + + results := make([]CompactionView, 0, len(groups)) + for _, group := range groups { + results = append(results, &ForceMergeSegmentView{ + label: v.label, + segments: group, + triggerID: v.triggerID, + collectionTTL: v.collectionTTL, + configMaxSize: v.configMaxSize, + targetSize: targetSizePerSegment, + targetCount: targetCount, + topology: v.topology, + }) + } + return results, "force merge trigger" +} + +// adaptiveGroupSegments automatically selects the best grouping algorithm based on segment count +// For small segment counts (≤ threshold), use maxFull for optimal full segment count +// For large segment counts, use larger for better performance +func adaptiveGroupSegments(segments []*SegmentView, targetSize float64) [][]*SegmentView { + if len(segments) == 0 { + return nil + } + + n := len(segments) + + // Get threshold from config, fallback to default if not available + threshold := paramtable.Get().DataCoordCfg.CompactionMaxFullSegmentThreshold.GetAsInt() + + // Use maxFull for small segment counts to maximize full segments + // Use larger for large segment counts for O(n) performance + if n <= threshold { + return maxFullSegmentsGrouping(segments, targetSize) + } + + return largerGroupingSegments(segments, targetSize) +} + +// largerGroupingSegments groups segments to minimize number of tasks +// Strategy: Create larger groups that produce multiple full target-sized segments +// This approach favors fewer compaction tasks with larger batches +func largerGroupingSegments(segments []*SegmentView, targetSize float64) [][]*SegmentView { + if len(segments) == 0 { + return nil + } + + n := len(segments) + // Pre-allocate with estimated capacity to reduce allocations + estimatedGroups := max(1, n/10) + groups := make([][]*SegmentView, 0, estimatedGroups) + + i := 0 + for i < n { + groupStart := i + groupSize := 0.0 + + // Accumulate segments to form multiple target-sized outputs + for i < n { + groupSize += segments[i].Size + i++ + + // Check if we should stop + if i < n { + nextSize := groupSize + segments[i].Size + currentFull := int(groupSize / targetSize) + nextFull := int(nextSize / targetSize) + + // Stop if we have full segments and next addition won't give another full segment + if currentFull > 0 && nextFull == currentFull { + currentRemainder := math.Mod(groupSize, targetSize) + if currentRemainder < targetSize*defaultToleranceMB { + break + } + } + } + } + + groups = append(groups, segments[groupStart:i]) + } + + return groups +} + +// maxFullSegmentsGrouping groups segments to maximize number of full target-sized outputs +// Strategy: Use dynamic programming to find partitioning that produces most full segments +// This approach minimizes tail segments and achieves best space utilization +func maxFullSegmentsGrouping(segments []*SegmentView, targetSize float64) [][]*SegmentView { + if len(segments) == 0 { + return nil + } + + n := len(segments) + + // Pre-compute prefix sums to avoid repeated summation + prefixSum := make([]float64, n+1) + for i := 0; i < n; i++ { + prefixSum[i+1] = prefixSum[i] + segments[i].Size + } + + // dp[i] = best result for segments[0:i] + type dpState struct { + fullSegments int + tailSegments int + numGroups int + groupIndices []int + } + + dp := make([]dpState, n+1) + dp[0] = dpState{fullSegments: 0, tailSegments: 0, numGroups: 0, groupIndices: make([]int, 0, n/10)} + + for i := 1; i <= n; i++ { + dp[i] = dpState{fullSegments: -1, tailSegments: math.MaxInt32, numGroups: 0, groupIndices: nil} + + // Try different starting positions for the last group + for j := 0; j < i; j++ { + // Calculate group size from j to i-1 using prefix sums (O(1) instead of O(n)) + groupSize := prefixSum[i] - prefixSum[j] + + numFull := int(groupSize / targetSize) + remainder := math.Mod(groupSize, targetSize) + hasTail := 0 + if remainder > 0.01 { + hasTail = 1 + } + + newFull := dp[j].fullSegments + numFull + newTails := dp[j].tailSegments + hasTail + newGroups := dp[j].numGroups + 1 + + // Prioritize: more full segments, then fewer tails, then more groups (for parallelism) + isBetter := false + if newFull > dp[i].fullSegments { + isBetter = true + } else if newFull == dp[i].fullSegments && newTails < dp[i].tailSegments { + isBetter = true + } else if newFull == dp[i].fullSegments && newTails == dp[i].tailSegments && newGroups > dp[i].numGroups { + isBetter = true + } + + if isBetter { + // Pre-allocate with exact capacity to avoid reallocation + newIndices := make([]int, 0, len(dp[j].groupIndices)+1) + newIndices = append(newIndices, dp[j].groupIndices...) + newIndices = append(newIndices, j) + dp[i] = dpState{ + fullSegments: newFull, + tailSegments: newTails, + numGroups: newGroups, + groupIndices: newIndices, + } + } + } + } + + // Reconstruct groups from indices + if dp[n].groupIndices == nil { + return [][]*SegmentView{segments} + } + + groupStarts := append(dp[n].groupIndices, n) + groups := make([][]*SegmentView, 0, len(groupStarts)) + for i := 0; i < len(groupStarts); i++ { + start := groupStarts[i] + end := n + if i+1 < len(groupStarts) { + end = groupStarts[i+1] + } + if start < end { + groups = append(groups, segments[start:end]) + } + } + + return groups +} + +func (v *ForceMergeSegmentView) calculateMaxSafeSize() float64 { + log := log.With(zap.Int64("triggerID", v.triggerID), zap.String("label", v.label.String())) + if len(v.topology.QueryNodeMemory) == 0 || len(v.topology.DataNodeMemory) == 0 { + log.Warn("No querynodes or datanodes in topology, using config size") + return v.configMaxSize + } + + // QueryNode constraint: use global minimum memory + querynodeMemoryFactor := paramtable.Get().DataCoordCfg.CompactionForceMergeQueryNodeMemoryFactor.GetAsFloat() + qnMaxSafeSize := float64(lo.Min(lo.Values(v.topology.QueryNodeMemory))) / querynodeMemoryFactor + + // DataNode constraint: segments must fit in smallest DataNode + datanodeMemoryFactor := paramtable.Get().DataCoordCfg.CompactionForceMergeDataNodeMemoryFactor.GetAsFloat() + dnMaxSafeSize := float64(lo.Min(lo.Values(v.topology.DataNodeMemory))) / datanodeMemoryFactor + + maxSafeSize := min(qnMaxSafeSize, dnMaxSafeSize) + if v.topology.IsStandaloneMode && !v.topology.IsPooling { + log.Info("force merge on standalone not pooling mode, half the max size", + zap.Float64("qnMaxSafeSize", qnMaxSafeSize), + zap.Float64("dnMaxSafeSize", dnMaxSafeSize), + zap.Float64("maxSafeSize/2", maxSafeSize/2), + zap.Float64("configMaxSize", v.configMaxSize)) + // dn and qn are co-located, half the min + return maxSafeSize * 0.5 + } + + log.Info("force merge on cluster/pooling mode", + zap.Float64("qnMaxSafeSize", qnMaxSafeSize), + zap.Float64("dnMaxSafeSize", dnMaxSafeSize), + zap.Float64("maxSafeSize", maxSafeSize), + zap.Float64("configMaxSize", v.configMaxSize)) + return maxSafeSize +} + +func sumSegmentSize(views []*SegmentView) float64 { + return lo.SumBy(views, func(v *SegmentView) float64 { return v.Size }) +} diff --git a/internal/datacoord/compaction_view_forcemerge_test.go b/internal/datacoord/compaction_view_forcemerge_test.go new file mode 100644 index 0000000000..be6523ae5b --- /dev/null +++ b/internal/datacoord/compaction_view_forcemerge_test.go @@ -0,0 +1,631 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datacoord + +import ( + "fmt" + "testing" + "time" + + "github.com/samber/lo" + + "github.com/stretchr/testify/assert" +) + +func TestForceMergeSegmentView_GetGroupLabel(t *testing.T) { + label := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch1", + } + + view := &ForceMergeSegmentView{ + label: label, + } + + assert.Equal(t, label, view.GetGroupLabel()) +} + +func TestForceMergeSegmentView_GetSegmentsView(t *testing.T) { + segments := []*SegmentView{ + {ID: 1, Size: 1024}, + {ID: 2, Size: 2048}, + } + + view := &ForceMergeSegmentView{ + segments: segments, + } + + assert.Equal(t, segments, view.GetSegmentsView()) + assert.Len(t, view.GetSegmentsView(), 2) +} + +func TestForceMergeSegmentView_Append(t *testing.T) { + view := &ForceMergeSegmentView{ + segments: []*SegmentView{ + {ID: 1, Size: 1024}, + }, + } + + newSegments := []*SegmentView{ + {ID: 2, Size: 2048}, + {ID: 3, Size: 3072}, + } + + view.Append(newSegments...) + + assert.Len(t, view.segments, 3) + assert.Equal(t, int64(1), view.segments[0].ID) + assert.Equal(t, int64(2), view.segments[1].ID) + assert.Equal(t, int64(3), view.segments[2].ID) +} + +func TestForceMergeSegmentView_String(t *testing.T) { + label := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch1", + } + + view := &ForceMergeSegmentView{ + label: label, + segments: []*SegmentView{ + {ID: 1}, + {ID: 2}, + }, + triggerID: 12345, + } + + str := view.String() + assert.Contains(t, str, "ForceMerge") + assert.Contains(t, str, "segments=2") + assert.Contains(t, str, "triggerID=12345") +} + +func TestForceMergeSegmentView_Trigger(t *testing.T) { + view := &ForceMergeSegmentView{ + triggerID: 100, + } + + assert.Panics(t, func() { + view.Trigger() + }) +} + +func TestForceMergeSegmentView_ForceTrigger(t *testing.T) { + view := &ForceMergeSegmentView{ + triggerID: 100, + } + + assert.Panics(t, func() { + view.ForceTrigger() + }) +} + +func TestForceMergeSegmentView_GetTriggerID(t *testing.T) { + view := &ForceMergeSegmentView{ + triggerID: 12345, + } + + assert.Equal(t, int64(12345), view.GetTriggerID()) +} + +func TestForceMergeSegmentView_Complete(t *testing.T) { + label := &CompactionGroupLabel{ + CollectionID: 100, + PartitionID: 200, + Channel: "test-channel", + } + + segments := []*SegmentView{ + {ID: 1, Size: 1024 * 1024 * 1024}, + {ID: 2, Size: 512 * 1024 * 1024}, + } + + topology := &CollectionTopology{ + CollectionID: 100, + NumReplicas: 1, + IsStandaloneMode: false, + QueryNodeMemory: map[int64]uint64{1: 8 * 1024 * 1024 * 1024}, + DataNodeMemory: map[int64]uint64{1: 8 * 1024 * 1024 * 1024}, + } + + view := &ForceMergeSegmentView{ + label: label, + segments: segments, + triggerID: 99999, + collectionTTL: 24 * time.Hour, + targetSize: 2048 * 1024 * 1024, + topology: topology, + } + + // Test String output + str := view.String() + assert.Contains(t, str, "ForceMerge") + + views, r3 := view.ForceTriggerAll() + assert.Len(t, views, 1) + assert.NotEmpty(t, r3) +} + +func TestGroupingAlgorithmsComparison(t *testing.T) { + type testCase struct { + name string + segments []float64 + targetSize float64 + } + + testCases := []testCase{ + { + name: "perfect fit - 5x2GB to 5GB", + segments: []float64{2, 2, 2, 2, 2}, + targetSize: 5, + }, + { + name: "varying sizes - example from discussion", + segments: []float64{1.2, 1.3, 1.4, 1.8, 1.8, 1.8, 1.8, 1.8}, + targetSize: 3, + }, + { + name: "small segments", + segments: []float64{0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5}, + targetSize: 2, + }, + { + name: "large segments", + segments: []float64{3, 3, 3, 3}, + targetSize: 5, + }, + { + name: "mixed sizes", + segments: []float64{0.5, 1, 1.5, 2, 2.5, 3}, + targetSize: 4, + }, + { + name: "many small segments", + segments: []float64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, + targetSize: 3, + }, + { + name: "uneven distribution", + segments: []float64{0.3, 0.4, 2.5, 0.3, 2.8, 0.5, 2.2}, + targetSize: 3, + }, + { + name: "single segment", + segments: []float64{5}, + targetSize: 3, + }, + { + name: "two segments perfect", + segments: []float64{2.5, 2.5}, + targetSize: 5, + }, + { + name: "fibonacci-like sizes", + segments: []float64{1, 1, 2, 3, 5, 8}, + targetSize: 10, + }, + { + name: "near-perfect split - tests greedy vs optimal", + segments: []float64{1.5, 1.5, 1.5, 1.5, 1.5, 1.5}, + targetSize: 3, + }, + { + name: "strategic grouping - [2.8,0.3] vs [2.8,0.2,0.1]", + segments: []float64{2.8, 0.2, 0.1, 2.8, 0.3}, + targetSize: 3, + }, + { + name: "tail optimization - many small + one large", + segments: []float64{0.5, 0.5, 0.5, 0.5, 0.5, 2.5}, + targetSize: 3, + }, + { + name: "alternating sizes for different strategies", + segments: []float64{1.0, 2.5, 1.0, 2.5, 1.0, 2.5}, + targetSize: 4, + }, + { + name: "edge case - slightly over target creates decision point", + segments: []float64{2.1, 2.1, 2.1, 2.1, 2.1}, + targetSize: 4, + }, + { + name: "optimal vs greedy - can fit 3 full or 2 full + small tail", + segments: []float64{1.8, 1.8, 1.8, 1.8, 1.8, 1.5}, + targetSize: 3, + }, + { + name: "many segments with complex optimal solution", + segments: []float64{0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8}, + targetSize: 2, + }, + { + name: "greedy stops early, optimal continues", + segments: []float64{2.8, 0.2, 0.1, 2.8, 0.3}, + targetSize: 3.0, + }, + { + name: "Balanced vs Larger - distribution vs grouping", + segments: []float64{1.0, 1.0, 1.0, 1.0, 1.0, 1.0}, + targetSize: 2.5, + }, + { + name: "MaxFull achieves theoretical maximum when possible", + // Perfect case: 6x1.5GB segments, target=3GB + // Total=9GB, theoretical max = 3 full segments + segments: []float64{1.5, 1.5, 1.5, 1.5, 1.5, 1.5}, + targetSize: 3.0, + }, + { + name: "Larger creates fewer compaction tasks", + segments: lo.Times(20, func(i int) float64 { return 0.5 }), + targetSize: 2.0, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Convert to SegmentView + segments := make([]*SegmentView, len(tc.segments)) + for i, size := range tc.segments { + segments[i] = &SegmentView{ + ID: int64(i + 1), + Size: size * 1024 * 1024 * 1024, + } + } + targetSize := tc.targetSize * 1024 * 1024 * 1024 + totalSize := sumSegmentSize(segments) + theoreticalMaxFull := int(totalSize / targetSize) + + // Test all three algorithms + groupsBalanced := adaptiveGroupSegments(segments, targetSize) + groupsLarger := largerGroupingSegments(segments, targetSize) + groupsMax := maxFullSegmentsGrouping(segments, targetSize) + + // Helper to count full segments and tails + countMetrics := func(groups [][]*SegmentView) (numGroups, numFull, numTails int) { + numGroups = len(groups) + for _, group := range groups { + groupSize := sumSegmentSize(group) + full := int(groupSize / targetSize) + remainder := groupSize - float64(full)*targetSize + numFull += full + if remainder > 0.01 { + numTails++ + } + } + return + } + + // Helper to verify all segments used exactly once + verifyAllSegmentsUsed := func(groups [][]*SegmentView) bool { + seen := make(map[int64]int) + for _, group := range groups { + for _, seg := range group { + seen[seg.ID]++ + } + } + if len(seen) != len(segments) { + return false + } + for _, count := range seen { + if count != 1 { + return false + } + } + return true + } + + // Verify all algorithms use each segment exactly once + assert.True(t, verifyAllSegmentsUsed(groupsBalanced), "adaptiveGroupSegments: all segments must be used exactly once") + assert.True(t, verifyAllSegmentsUsed(groupsLarger), "largerGroupingSegments: all segments must be used exactly once") + assert.True(t, verifyAllSegmentsUsed(groupsMax), "maxFullSegmentsGrouping: all segments must be used exactly once") + + // Get metrics + adaptiveGroups, adaptiveFull, adaptiveTails := countMetrics(groupsBalanced) + largerGroups, largerFull, largerTails := countMetrics(groupsLarger) + maxGroups, maxFull, maxTails := countMetrics(groupsMax) + + t.Logf("Total size: %.1f GB, Target: %.1f GB, Theoretical max full: %d", + totalSize/(1024*1024*1024), targetSize/(1024*1024*1024), theoreticalMaxFull) + t.Logf("Adaptive: %d groups, %d full, %d tails", adaptiveGroups, adaptiveFull, adaptiveTails) + t.Logf("Larger: %d groups, %d full, %d tails", largerGroups, largerFull, largerTails) + t.Logf("MaxFull: %d groups, %d full, %d tails", maxGroups, maxFull, maxTails) + + // Assertions + // 1. maxFullSegmentsGrouping should produce most full segments + assert.GreaterOrEqual(t, maxFull, largerFull, "maxFullSegmentsGrouping should produce >= full segments than largerGroupingSegments") + + // 2. maxFullSegmentsGrouping should not exceed theoretical maximum + assert.LessOrEqual(t, maxFull, theoreticalMaxFull, "cannot exceed theoretical maximum") + + // 3. All algorithms should process all segments + for _, groups := range [][][]*SegmentView{groupsBalanced, groupsLarger, groupsMax} { + totalProcessed := 0 + for _, group := range groups { + totalProcessed += len(group) + } + assert.Equal(t, len(segments), totalProcessed) + } + }) + } +} + +func TestAdaptiveGroupSegments(t *testing.T) { + t.Run("empty segments", func(t *testing.T) { + groups := adaptiveGroupSegments(nil, 5*1024*1024*1024) + assert.Nil(t, groups) + }) + + t.Run("uses maxFull for small segment count", func(t *testing.T) { + segments := []*SegmentView{ + {ID: 1, Size: 1.5 * 1024 * 1024 * 1024}, + {ID: 2, Size: 1.5 * 1024 * 1024 * 1024}, + {ID: 3, Size: 1.5 * 1024 * 1024 * 1024}, + {ID: 4, Size: 1.5 * 1024 * 1024 * 1024}, + } + groups := adaptiveGroupSegments(segments, 3*1024*1024*1024) + // Should produce 2 groups with 2 full segments + assert.Equal(t, 2, len(groups)) + }) + + t.Run("uses larger for large segment count", func(t *testing.T) { + // Create 200 segments (> defaultMaxFullSegmentThreshold) + segments := make([]*SegmentView, 200) + for i := 0; i < 200; i++ { + segments[i] = &SegmentView{ + ID: int64(i), + Size: 1 * 1024 * 1024 * 1024, + } + } + groups := adaptiveGroupSegments(segments, 3*1024*1024*1024) + // Should use larger algorithm + assert.NotNil(t, groups) + assert.Greater(t, len(groups), 0) + }) +} + +func TestLargerGroupingSegments(t *testing.T) { + t.Run("empty segments", func(t *testing.T) { + groups := largerGroupingSegments(nil, 5*1024*1024*1024) + assert.Nil(t, groups) + }) + + t.Run("single segment", func(t *testing.T) { + segments := []*SegmentView{ + {ID: 1, Size: 3 * 1024 * 1024 * 1024}, + } + groups := largerGroupingSegments(segments, 5*1024*1024*1024) + assert.Equal(t, 1, len(groups)) + assert.Equal(t, 1, len(groups[0])) + }) +} + +func TestMaxFullSegmentsGrouping(t *testing.T) { + t.Run("empty segments", func(t *testing.T) { + groups := maxFullSegmentsGrouping(nil, 5*1024*1024*1024) + assert.Nil(t, groups) + }) + + t.Run("single segment", func(t *testing.T) { + segments := []*SegmentView{ + {ID: 1, Size: 3 * 1024 * 1024 * 1024}, + } + groups := maxFullSegmentsGrouping(segments, 5*1024*1024*1024) + assert.Equal(t, 1, len(groups)) + assert.Equal(t, 1, len(groups[0])) + }) + + t.Run("perfect fit achieves theoretical maximum", func(t *testing.T) { + segments := []*SegmentView{ + {ID: 1, Size: 2.5 * 1024 * 1024 * 1024}, + {ID: 2, Size: 2.5 * 1024 * 1024 * 1024}, + {ID: 3, Size: 2.5 * 1024 * 1024 * 1024}, + {ID: 4, Size: 2.5 * 1024 * 1024 * 1024}, + } + targetSize := 5.0 * 1024 * 1024 * 1024 + + groups := maxFullSegmentsGrouping(segments, targetSize) + + totalFull := 0 + for _, group := range groups { + groupSize := sumSegmentSize(group) + totalFull += int(groupSize / targetSize) + } + + // Total is 10GB, should produce exactly 2 full 5GB segments + assert.Equal(t, 2, totalFull) + }) +} + +func TestSumSegmentSize(t *testing.T) { + segments := []*SegmentView{ + {ID: 1, Size: 1024 * 1024 * 1024}, + {ID: 2, Size: 512 * 1024 * 1024}, + } + + totalSize := sumSegmentSize(segments) + expected := 1.5 * 1024 * 1024 * 1024 + assert.InDelta(t, expected, totalSize, 1) +} + +func TestGroupByPartitionChannel(t *testing.T) { + label1 := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch1", + } + label2 := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 20, + Channel: "ch1", + } + + segments := []*SegmentView{ + {ID: 1, label: label1}, + {ID: 2, label: label1}, + {ID: 3, label: label2}, + } + + groups := groupByPartitionChannel(segments) + assert.Equal(t, 2, len(groups)) + + var count1, count2 int + for _, segs := range groups { + if len(segs) == 2 { + count1++ + } else if len(segs) == 1 { + count2++ + } + } + assert.Equal(t, 1, count1) + assert.Equal(t, 1, count2) +} + +func TestGroupByPartitionChannel_EmptySegments(t *testing.T) { + groups := groupByPartitionChannel([]*SegmentView{}) + assert.Empty(t, groups) +} + +func TestGroupByPartitionChannel_SameLabel(t *testing.T) { + label := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch1", + } + + segments := []*SegmentView{ + {ID: 1, label: label}, + {ID: 2, label: label}, + {ID: 3, label: label}, + } + + groups := groupByPartitionChannel(segments) + assert.Equal(t, 1, len(groups)) + for _, segs := range groups { + assert.Equal(t, 3, len(segs)) + } +} + +// Benchmark tests +func BenchmarkLargerGroupingSegments(b *testing.B) { + sizes := []int{10, 50, 100, 500} + + for _, n := range sizes { + b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) { + segments := make([]*SegmentView, n) + for i := 0; i < n; i++ { + segments[i] = &SegmentView{ + ID: int64(i), + Size: float64((i%10+1)*100*1024*1024 + i*1024*1024), + } + } + targetSize := float64(3 * 1024 * 1024 * 1024) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + largerGroupingSegments(segments, targetSize) + } + }) + } +} + +func BenchmarkMaxFullSegmentsGrouping(b *testing.B) { + sizes := []int{10, 50, 100} + + for _, n := range sizes { + b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) { + segments := make([]*SegmentView, n) + for i := 0; i < n; i++ { + segments[i] = &SegmentView{ + ID: int64(i), + Size: float64((i%10+1)*100*1024*1024 + i*1024*1024), + } + } + targetSize := float64(3 * 1024 * 1024 * 1024) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + maxFullSegmentsGrouping(segments, targetSize) + } + }) + } +} + +func BenchmarkGroupingAlgorithmsComparison(b *testing.B) { + sizes := []int{10, 50, 100, 200, 500} + targetSize := float64(3 * 1024 * 1024 * 1024) + + for _, n := range sizes { + segments := make([]*SegmentView, n) + for i := 0; i < n; i++ { + segments[i] = &SegmentView{ + ID: int64(i), + Size: float64((i%10+1)*100*1024*1024 + i*1024*1024), + } + } + + b.Run(fmt.Sprintf("adaptive/n=%d", n), func(b *testing.B) { + for i := 0; i < b.N; i++ { + adaptiveGroupSegments(segments, targetSize) + } + }) + + b.Run(fmt.Sprintf("larger/n=%d", n), func(b *testing.B) { + for i := 0; i < b.N; i++ { + largerGroupingSegments(segments, targetSize) + } + }) + + // Only test maxFull with smaller sizes due to O(n³) complexity + if n <= 200 { + b.Run(fmt.Sprintf("maxFull/n=%d", n), func(b *testing.B) { + for i := 0; i < b.N; i++ { + maxFullSegmentsGrouping(segments, targetSize) + } + }) + } + } +} + +func BenchmarkGroupByPartitionChannel(b *testing.B) { + sizes := []int{10, 100, 1000} + + for _, n := range sizes { + b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) { + segments := make([]*SegmentView, n) + for i := 0; i < n; i++ { + label := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: int64(i % 5), + Channel: fmt.Sprintf("ch%d", i%3), + } + segments[i] = &SegmentView{ + ID: int64(i), + label: label, + } + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = groupByPartitionChannel(segments) + } + }) + } +} diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index dccc5b9300..bd8e38a8f9 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -903,7 +903,7 @@ func createSortCompactionTask(ctx context.Context, ResultSegments: []int64{}, TotalRows: originSegment.GetNumOfRows(), LastStateStartTime: time.Now().Unix(), - MaxSize: getExpandedSize(expectedSize), + MaxSize: expectedSize, PreAllocatedSegmentIDs: &datapb.IDRange{ Begin: targetSegmentID, End: targetSegmentID + 1, diff --git a/internal/datacoord/mock_collection_topology_querier.go b/internal/datacoord/mock_collection_topology_querier.go new file mode 100644 index 0000000000..fce617bbe6 --- /dev/null +++ b/internal/datacoord/mock_collection_topology_querier.go @@ -0,0 +1,95 @@ +// Code generated by mockery v2.53.3. DO NOT EDIT. + +package datacoord + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// MockCollectionTopologyQuerier is an autogenerated mock type for the CollectionTopologyQuerier type +type MockCollectionTopologyQuerier struct { + mock.Mock +} + +type MockCollectionTopologyQuerier_Expecter struct { + mock *mock.Mock +} + +func (_m *MockCollectionTopologyQuerier) EXPECT() *MockCollectionTopologyQuerier_Expecter { + return &MockCollectionTopologyQuerier_Expecter{mock: &_m.Mock} +} + +// GetCollectionTopology provides a mock function with given fields: ctx, collectionID +func (_m *MockCollectionTopologyQuerier) GetCollectionTopology(ctx context.Context, collectionID int64) (*CollectionTopology, error) { + ret := _m.Called(ctx, collectionID) + + if len(ret) == 0 { + panic("no return value specified for GetCollectionTopology") + } + + var r0 *CollectionTopology + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64) (*CollectionTopology, error)); ok { + return rf(ctx, collectionID) + } + if rf, ok := ret.Get(0).(func(context.Context, int64) *CollectionTopology); ok { + r0 = rf(ctx, collectionID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*CollectionTopology) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, collectionID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockCollectionTopologyQuerier_GetCollectionTopology_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCollectionTopology' +type MockCollectionTopologyQuerier_GetCollectionTopology_Call struct { + *mock.Call +} + +// GetCollectionTopology is a helper method to define mock.On call +// - ctx context.Context +// - collectionID int64 +func (_e *MockCollectionTopologyQuerier_Expecter) GetCollectionTopology(ctx interface{}, collectionID interface{}) *MockCollectionTopologyQuerier_GetCollectionTopology_Call { + return &MockCollectionTopologyQuerier_GetCollectionTopology_Call{Call: _e.mock.On("GetCollectionTopology", ctx, collectionID)} +} + +func (_c *MockCollectionTopologyQuerier_GetCollectionTopology_Call) Run(run func(ctx context.Context, collectionID int64)) *MockCollectionTopologyQuerier_GetCollectionTopology_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *MockCollectionTopologyQuerier_GetCollectionTopology_Call) Return(_a0 *CollectionTopology, _a1 error) *MockCollectionTopologyQuerier_GetCollectionTopology_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockCollectionTopologyQuerier_GetCollectionTopology_Call) RunAndReturn(run func(context.Context, int64) (*CollectionTopology, error)) *MockCollectionTopologyQuerier_GetCollectionTopology_Call { + _c.Call.Return(run) + return _c +} + +// NewMockCollectionTopologyQuerier creates a new instance of MockCollectionTopologyQuerier. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockCollectionTopologyQuerier(t interface { + mock.TestingT + Cleanup(func()) +}) *MockCollectionTopologyQuerier { + mock := &MockCollectionTopologyQuerier{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datacoord/mock_trigger_manager.go b/internal/datacoord/mock_trigger_manager.go index 9dc3c2ce13..6999416b10 100644 --- a/internal/datacoord/mock_trigger_manager.go +++ b/internal/datacoord/mock_trigger_manager.go @@ -5,7 +5,12 @@ package datacoord import ( context "context" + session "github.com/milvus-io/milvus/internal/datacoord/session" mock "github.com/stretchr/testify/mock" + + sessionutil "github.com/milvus-io/milvus/internal/util/sessionutil" + + types "github.com/milvus-io/milvus/internal/types" ) // MockTriggerManager is an autogenerated mock type for the TriggerManager type @@ -119,9 +124,44 @@ func (_c *MockTriggerManager_GetResumeCompactionChan_Call) RunAndReturn(run func return _c } -// ManualTrigger provides a mock function with given fields: ctx, collectionID, clusteringCompaction, l0Compaction -func (_m *MockTriggerManager) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool) (int64, error) { - ret := _m.Called(ctx, collectionID, clusteringCompaction, l0Compaction) +// InitForceMergeMemoryQuerier provides a mock function with given fields: nodeManager, mixCoord, _a2 +func (_m *MockTriggerManager) InitForceMergeMemoryQuerier(nodeManager session.NodeManager, mixCoord types.MixCoord, _a2 sessionutil.SessionInterface) { + _m.Called(nodeManager, mixCoord, _a2) +} + +// MockTriggerManager_InitForceMergeMemoryQuerier_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InitForceMergeMemoryQuerier' +type MockTriggerManager_InitForceMergeMemoryQuerier_Call struct { + *mock.Call +} + +// InitForceMergeMemoryQuerier is a helper method to define mock.On call +// - nodeManager session.NodeManager +// - mixCoord types.MixCoord +// - _a2 sessionutil.SessionInterface +func (_e *MockTriggerManager_Expecter) InitForceMergeMemoryQuerier(nodeManager interface{}, mixCoord interface{}, _a2 interface{}) *MockTriggerManager_InitForceMergeMemoryQuerier_Call { + return &MockTriggerManager_InitForceMergeMemoryQuerier_Call{Call: _e.mock.On("InitForceMergeMemoryQuerier", nodeManager, mixCoord, _a2)} +} + +func (_c *MockTriggerManager_InitForceMergeMemoryQuerier_Call) Run(run func(nodeManager session.NodeManager, mixCoord types.MixCoord, _a2 sessionutil.SessionInterface)) *MockTriggerManager_InitForceMergeMemoryQuerier_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(session.NodeManager), args[1].(types.MixCoord), args[2].(sessionutil.SessionInterface)) + }) + return _c +} + +func (_c *MockTriggerManager_InitForceMergeMemoryQuerier_Call) Return() *MockTriggerManager_InitForceMergeMemoryQuerier_Call { + _c.Call.Return() + return _c +} + +func (_c *MockTriggerManager_InitForceMergeMemoryQuerier_Call) RunAndReturn(run func(session.NodeManager, types.MixCoord, sessionutil.SessionInterface)) *MockTriggerManager_InitForceMergeMemoryQuerier_Call { + _c.Run(run) + return _c +} + +// ManualTrigger provides a mock function with given fields: ctx, collectionID, clusteringCompaction, l0Compaction, targetSize +func (_m *MockTriggerManager) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool, targetSize int64) (int64, error) { + ret := _m.Called(ctx, collectionID, clusteringCompaction, l0Compaction, targetSize) if len(ret) == 0 { panic("no return value specified for ManualTrigger") @@ -129,17 +169,17 @@ func (_m *MockTriggerManager) ManualTrigger(ctx context.Context, collectionID in var r0 int64 var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, bool, bool) (int64, error)); ok { - return rf(ctx, collectionID, clusteringCompaction, l0Compaction) + if rf, ok := ret.Get(0).(func(context.Context, int64, bool, bool, int64) (int64, error)); ok { + return rf(ctx, collectionID, clusteringCompaction, l0Compaction, targetSize) } - if rf, ok := ret.Get(0).(func(context.Context, int64, bool, bool) int64); ok { - r0 = rf(ctx, collectionID, clusteringCompaction, l0Compaction) + if rf, ok := ret.Get(0).(func(context.Context, int64, bool, bool, int64) int64); ok { + r0 = rf(ctx, collectionID, clusteringCompaction, l0Compaction, targetSize) } else { r0 = ret.Get(0).(int64) } - if rf, ok := ret.Get(1).(func(context.Context, int64, bool, bool) error); ok { - r1 = rf(ctx, collectionID, clusteringCompaction, l0Compaction) + if rf, ok := ret.Get(1).(func(context.Context, int64, bool, bool, int64) error); ok { + r1 = rf(ctx, collectionID, clusteringCompaction, l0Compaction, targetSize) } else { r1 = ret.Error(1) } @@ -157,13 +197,14 @@ type MockTriggerManager_ManualTrigger_Call struct { // - collectionID int64 // - clusteringCompaction bool // - l0Compaction bool -func (_e *MockTriggerManager_Expecter) ManualTrigger(ctx interface{}, collectionID interface{}, clusteringCompaction interface{}, l0Compaction interface{}) *MockTriggerManager_ManualTrigger_Call { - return &MockTriggerManager_ManualTrigger_Call{Call: _e.mock.On("ManualTrigger", ctx, collectionID, clusteringCompaction, l0Compaction)} +// - targetSize int64 +func (_e *MockTriggerManager_Expecter) ManualTrigger(ctx interface{}, collectionID interface{}, clusteringCompaction interface{}, l0Compaction interface{}, targetSize interface{}) *MockTriggerManager_ManualTrigger_Call { + return &MockTriggerManager_ManualTrigger_Call{Call: _e.mock.On("ManualTrigger", ctx, collectionID, clusteringCompaction, l0Compaction, targetSize)} } -func (_c *MockTriggerManager_ManualTrigger_Call) Run(run func(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool)) *MockTriggerManager_ManualTrigger_Call { +func (_c *MockTriggerManager_ManualTrigger_Call) Run(run func(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool, targetSize int64)) *MockTriggerManager_ManualTrigger_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(bool), args[3].(bool)) + run(args[0].(context.Context), args[1].(int64), args[2].(bool), args[3].(bool), args[4].(int64)) }) return _c } @@ -173,7 +214,7 @@ func (_c *MockTriggerManager_ManualTrigger_Call) Return(_a0 int64, _a1 error) *M return _c } -func (_c *MockTriggerManager_ManualTrigger_Call) RunAndReturn(run func(context.Context, int64, bool, bool) (int64, error)) *MockTriggerManager_ManualTrigger_Call { +func (_c *MockTriggerManager_ManualTrigger_Call) RunAndReturn(run func(context.Context, int64, bool, bool, int64) (int64, error)) *MockTriggerManager_ManualTrigger_Call { _c.Call.Return(run) return _c } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index ce60ba4411..d87903d27b 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -697,6 +697,7 @@ func (s *Server) initCompaction() { cph.loadMeta() s.compactionInspector = cph s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionInspector, s.meta, s.importMeta) + s.compactionTriggerManager.InitForceMergeMemoryQuerier(s.nodeManager, s.mixCoord, s.session) s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionInspector, s.allocator, s.handler, s.indexEngineVersionManager) } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 9015cbd4c0..49ad9f1a1e 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -1700,7 +1700,7 @@ func TestManualCompaction(t *testing.T) { svr.stateCode.Store(commonpb.StateCode_Healthy) mockTriggerManager := NewMockTriggerManager(t) svr.compactionTriggerManager = mockTriggerManager - mockTriggerManager.EXPECT().ManualTrigger(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(1, nil) + mockTriggerManager.EXPECT().ManualTrigger(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(1, nil) mockHandler := NewMockCompactionInspector(t) mockHandler.EXPECT().getCompactionTasksNumBySignalID(mock.Anything).Return(1) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index d960144380..98233a9cab 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1260,7 +1260,7 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa log := log.Ctx(ctx).With( zap.Int64("collectionID", req.GetCollectionID()), ) - log.Info("received manual compaction") + log.Info("received manual compaction", zap.Any("request", req)) resp := &milvuspb.ManualCompactionResponse{ Status: merr.Success(), @@ -1279,8 +1279,8 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa var id int64 var err error - if req.GetMajorCompaction() || req.GetL0Compaction() { - id, err = s.compactionTriggerManager.ManualTrigger(ctx, req.CollectionID, req.GetMajorCompaction(), req.GetL0Compaction()) + if req.GetMajorCompaction() || req.GetL0Compaction() || req.GetTargetSize() != 0 { + id, err = s.compactionTriggerManager.ManualTrigger(ctx, req.CollectionID, req.GetMajorCompaction(), req.GetL0Compaction(), req.GetTargetSize()) } else { id, err = s.compactionTrigger.TriggerCompaction(ctx, NewCompactionSignal(). WithIsForce(true). @@ -1306,7 +1306,7 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa } log.Info("success to trigger manual compaction", zap.Bool("isL0Compaction", req.GetL0Compaction()), - zap.Bool("isMajorCompaction", req.GetMajorCompaction()), zap.Int64("compactionID", id), zap.Int("taskNum", taskCnt)) + zap.Bool("isMajorCompaction", req.GetMajorCompaction()), zap.Int64("targetSize", req.GetTargetSize()), zap.Int64("compactionID", id), zap.Int("taskNum", taskCnt)) return resp, nil } diff --git a/internal/querynodev2/segments/segment_test.go b/internal/querynodev2/segments/segment_test.go index f3a2745b54..4676e38dc2 100644 --- a/internal/querynodev2/segments/segment_test.go +++ b/internal/querynodev2/segments/segment_test.go @@ -160,10 +160,11 @@ func (suite *SegmentSuite) TestResourceUsageEstimate() { usage := suite.growing.ResourceUsageEstimate() suite.Zero(usage.MemorySize) suite.Zero(usage.DiskSize) - // growing segment has no resource usage + // sealed segment has resource usage usage = suite.sealed.ResourceUsageEstimate() - suite.NotZero(usage.MemorySize) - suite.Zero(usage.DiskSize) + // mmap is on + suite.Zero(usage.MemorySize) + suite.NotZero(usage.DiskSize) suite.Zero(usage.MmapFieldCount) } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 1217082fc8..2942ad9e1f 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3990,7 +3990,7 @@ This defaults to true, indicating that Milvus creates temporary index for growin p.MmapVectorIndex = ParamItem{ Key: "queryNode.mmap.vectorIndex", Version: "2.4.7", - DefaultValue: "false", + DefaultValue: "true", Formatter: func(originValue string) string { if p.MmapEnabled.GetAsBool() { return "true" @@ -4005,7 +4005,7 @@ This defaults to true, indicating that Milvus creates temporary index for growin p.MmapScalarField = ParamItem{ Key: "queryNode.mmap.scalarField", Version: "2.4.7", - DefaultValue: "false", + DefaultValue: "true", Formatter: func(originValue string) string { if p.MmapEnabled.GetAsBool() { return "true" @@ -4020,7 +4020,7 @@ This defaults to true, indicating that Milvus creates temporary index for growin p.MmapScalarIndex = ParamItem{ Key: "queryNode.mmap.scalarIndex", Version: "2.4.7", - DefaultValue: "false", + DefaultValue: "true", Formatter: func(originValue string) string { if p.MmapEnabled.GetAsBool() { return "true" @@ -4045,7 +4045,7 @@ This defaults to true, indicating that Milvus creates temporary index for growin p.GrowingMmapEnabled = ParamItem{ Key: "queryNode.mmap.growingMmapEnabled", Version: "2.4.6", - DefaultValue: "false", + DefaultValue: "true", FallbackKeys: []string{"queryNode.growingMmapEnabled"}, Doc: `Enable memory mapping (mmap) to optimize the handling of growing raw data. By activating this feature, the memory overhead associated with newly added or modified data will be significantly minimized. @@ -4605,22 +4605,25 @@ type dataCoordConfig struct { CompactionTaskQueueCapacity ParamItem `refreshable:"false"` CompactionPreAllocateIDExpansionFactor ParamItem `refreshable:"false"` - CompactionRPCTimeout ParamItem `refreshable:"true"` - CompactionMaxParallelTasks ParamItem `refreshable:"true"` - CompactionWorkerParallelTasks ParamItem `refreshable:"true"` - MinSegmentToMerge ParamItem `refreshable:"true"` - SegmentSmallProportion ParamItem `refreshable:"true"` - SegmentCompactableProportion ParamItem `refreshable:"true"` - SegmentExpansionRate ParamItem `refreshable:"true"` - CompactionTimeoutInSeconds ParamItem `refreshable:"true"` // deprecated - CompactionDropToleranceInSeconds ParamItem `refreshable:"true"` - CompactionGCIntervalInSeconds ParamItem `refreshable:"true"` - CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` // deprecated - CompactionScheduleInterval ParamItem `refreshable:"false"` - MixCompactionTriggerInterval ParamItem `refreshable:"false"` - L0CompactionTriggerInterval ParamItem `refreshable:"false"` - GlobalCompactionInterval ParamItem `refreshable:"false"` - CompactionExpiryTolerance ParamItem `refreshable:"true"` + CompactionRPCTimeout ParamItem `refreshable:"true"` + CompactionMaxParallelTasks ParamItem `refreshable:"true"` + CompactionWorkerParallelTasks ParamItem `refreshable:"true"` + CompactionMaxFullSegmentThreshold ParamItem `refreshable:"true"` + CompactionForceMergeDataNodeMemoryFactor ParamItem `refreshable:"true"` + CompactionForceMergeQueryNodeMemoryFactor ParamItem `refreshable:"true"` + MinSegmentToMerge ParamItem `refreshable:"true"` + SegmentSmallProportion ParamItem `refreshable:"true"` + SegmentCompactableProportion ParamItem `refreshable:"true"` + SegmentExpansionRate ParamItem `refreshable:"true"` + CompactionTimeoutInSeconds ParamItem `refreshable:"true"` // deprecated + CompactionDropToleranceInSeconds ParamItem `refreshable:"true"` + CompactionGCIntervalInSeconds ParamItem `refreshable:"true"` + CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` // deprecated + CompactionScheduleInterval ParamItem `refreshable:"false"` + MixCompactionTriggerInterval ParamItem `refreshable:"false"` + L0CompactionTriggerInterval ParamItem `refreshable:"false"` + GlobalCompactionInterval ParamItem `refreshable:"false"` + CompactionExpiryTolerance ParamItem `refreshable:"true"` SingleCompactionRatioThreshold ParamItem `refreshable:"true"` SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"` @@ -5070,6 +5073,47 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.CompactionScheduleInterval.Init(base.mgr) + p.CompactionMaxFullSegmentThreshold = ParamItem{ + Key: "dataCoord.compaction.maxFullSegmentThreshold", + Version: "2.6.8", + DefaultValue: "100", + Doc: "Maximum number of segments to use maxFull algorithm (O(n³) complexity) for optimal full segment count. For larger counts, uses faster larger algorithm (O(n)).", + Export: false, + } + p.CompactionMaxFullSegmentThreshold.Init(base.mgr) + + p.CompactionForceMergeDataNodeMemoryFactor = ParamItem{ + Key: "dataCoord.compaction.forceMerge.dataNodeMemoryFactor", + Version: "2.6.8", + DefaultValue: "3.0", + Doc: "Memory safety factor for DataNode during force merge compaction. Max segment size = DataNode memory / factor. Default 3.0 means segments can use up to 1/3 of DataNode memory. Must be >= 1.0.", + Export: false, + Formatter: func(value string) string { + factor, err := strconv.ParseFloat(value, 64) + if err != nil || factor < 1.0 { + factor = 1.0 + } + return strconv.FormatFloat(factor, 'f', -1, 64) + }, + } + p.CompactionForceMergeDataNodeMemoryFactor.Init(base.mgr) + + p.CompactionForceMergeQueryNodeMemoryFactor = ParamItem{ + Key: "dataCoord.compaction.forceMerge.queryNodeMemoryFactor", + Version: "2.6.8", + DefaultValue: "3.0", + Doc: "Memory safety factor for QueryNode when loading segments after force merge. Max segment size = QueryNode memory / factor. Default 3.0 means segments can use up to 40% of QueryNode memory. Must be >= 1.0.", + Export: false, + Formatter: func(value string) string { + factor, err := strconv.ParseFloat(value, 64) + if err != nil || factor < 1.0 { + factor = 1.0 + } + return strconv.FormatFloat(factor, 'f', -1, 64) + }, + } + p.CompactionForceMergeQueryNodeMemoryFactor.Init(base.mgr) + p.SingleCompactionRatioThreshold = ParamItem{ Key: "dataCoord.compaction.single.ratio.threshold", Version: "2.0.0",