diff --git a/client/go.mod b/client/go.mod index a932d19f4a..195071b53a 100644 --- a/client/go.mod +++ b/client/go.mod @@ -6,7 +6,7 @@ require ( github.com/blang/semver/v4 v4.0.0 github.com/cockroachdb/errors v1.9.1 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 - github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c + github.com/milvus-io/milvus-proto/go-api/v2 v2.6.3-0.20250918113553-d15826602cc9 github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e github.com/quasilyte/go-ruleguard/dsl v0.3.22 github.com/samber/lo v1.27.0 diff --git a/client/go.sum b/client/go.sum index 38dfe33338..9618cbe0dd 100644 --- a/client/go.sum +++ b/client/go.sum @@ -320,6 +320,8 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c h1:B7zmZ30lWHE4wNjT/g2NPe3q0gcUtw7cA5shMtWAmDc= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.3-0.20250918113553-d15826602cc9 h1:7ojrhnBHitGaqebExGP00x0wDTioMgPniEBmNdFPiDI= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.3-0.20250918113553-d15826602cc9/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e h1:VCr43pG4efacDbM4au70fh8/5hNTftoWzm1iEumvDWM= github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e/go.mod h1:37AWzxVs2NS4QUJrkcbeLUwi+4Av0h5mEdjLI62EANU= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= diff --git a/go.mod b/go.mod index c0c887c1ef..ad2eab7c2a 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.9 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c + github.com/milvus-io/milvus-proto/go-api/v2 v2.6.3-0.20250918113553-d15826602cc9 github.com/minio/minio-go/v7 v7.0.73 github.com/panjf2000/ants/v2 v2.11.3 // indirect github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 diff --git a/go.sum b/go.sum index 5ddf646ca4..b0a7664372 100644 --- a/go.sum +++ b/go.sum @@ -793,6 +793,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c h1:B7zmZ30lWHE4wNjT/g2NPe3q0gcUtw7cA5shMtWAmDc= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.3-0.20250918113553-d15826602cc9 h1:7ojrhnBHitGaqebExGP00x0wDTioMgPniEBmNdFPiDI= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.3-0.20250918113553-d15826602cc9/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= diff --git a/internal/datacoord/compaction_l0_view.go b/internal/datacoord/compaction_l0_view.go index ea7fadd93a..72695870e5 100644 --- a/internal/datacoord/compaction_l0_view.go +++ b/internal/datacoord/compaction_l0_view.go @@ -14,6 +14,7 @@ type LevelZeroSegmentsView struct { label *CompactionGroupLabel segments []*SegmentView earliestGrowingSegmentPos *msgpb.MsgPosition + triggerID int64 } var _ CompactionView = (*LevelZeroSegmentsView)(nil) @@ -86,12 +87,61 @@ func (v *LevelZeroSegmentsView) ForceTrigger() (CompactionView, string) { label: v.label, segments: targetViews, earliestGrowingSegmentPos: v.earliestGrowingSegmentPos, + triggerID: v.triggerID, }, reason } return nil, "" } +func (v *LevelZeroSegmentsView) ForceTriggerAll() ([]CompactionView, string) { + // Only choose segments with position less than the earliest growing segment position + validSegments := lo.Filter(v.segments, func(view *SegmentView, _ int) bool { + return view.dmlPos.GetTimestamp() < v.earliestGrowingSegmentPos.GetTimestamp() + }) + + if len(validSegments) == 0 { + return nil, "" + } + + var resultViews []CompactionView + var lastReason string + remainingSegments := validSegments + + // Multi-round force trigger loop + for len(remainingSegments) > 0 { + targetViews, reason := v.forceTrigger(remainingSegments) + if len(targetViews) == 0 { + // No more segments can be force triggered, break the loop + break + } + + // Create a new LevelZeroSegmentsView for this round's target views + roundView := &LevelZeroSegmentsView{ + label: v.label, + segments: targetViews, + earliestGrowingSegmentPos: v.earliestGrowingSegmentPos, + triggerID: v.triggerID, + } + resultViews = append(resultViews, roundView) + lastReason = reason + + // Remove the target segments from remaining segments for next round + targetSegmentIDs := lo.Map(targetViews, func(view *SegmentView, _ int) int64 { + return view.ID + }) + remainingSegments = lo.Filter(remainingSegments, func(view *SegmentView, _ int) bool { + return !lo.Contains(targetSegmentIDs, view.ID) + }) + } + + return resultViews, lastReason +} + +func (v *LevelZeroSegmentsView) GetTriggerID() int64 { + return v.triggerID +} + // Trigger triggers all qualified LevelZeroSegments according to views func (v *LevelZeroSegmentsView) Trigger() (CompactionView, string) { // Only choose segments with position less than the earliest growing segment position @@ -105,6 +155,7 @@ func (v *LevelZeroSegmentsView) Trigger() (CompactionView, string) { label: v.label, segments: targetViews, earliestGrowingSegmentPos: v.earliestGrowingSegmentPos, + triggerID: v.triggerID, }, reason } diff --git a/internal/datacoord/compaction_l0_view_test.go b/internal/datacoord/compaction_l0_view_test.go index 13408def93..30e4e7549d 100644 --- a/internal/datacoord/compaction_l0_view_test.go +++ b/internal/datacoord/compaction_l0_view_test.go @@ -45,7 +45,10 @@ func (s *LevelZeroSegmentsViewSuite) SetupTest() { } targetView := &LevelZeroSegmentsView{ - label, segments, &msgpb.MsgPosition{Timestamp: 10000}, + label: label, + segments: segments, + earliestGrowingSegmentPos: &msgpb.MsgPosition{Timestamp: 10000}, + triggerID: 10000, } s.True(label.Equal(targetView.GetGroupLabel())) diff --git a/internal/datacoord/compaction_policy_clustering.go b/internal/datacoord/compaction_policy_clustering.go index a44c90678a..b75f9d0a76 100644 --- a/internal/datacoord/compaction_policy_clustering.go +++ b/internal/datacoord/compaction_policy_clustering.go @@ -306,6 +306,14 @@ func (v *ClusteringSegmentsView) Trigger() (CompactionView, string) { return v, "" } +func (v *ClusteringSegmentsView) GetTriggerID() int64 { + return v.triggerID +} + func (v *ClusteringSegmentsView) ForceTrigger() (CompactionView, string) { panic("implement me") } + +func (v *ClusteringSegmentsView) ForceTriggerAll() ([]CompactionView, string) { + panic("implement me") +} diff --git a/internal/datacoord/compaction_policy_l0.go b/internal/datacoord/compaction_policy_l0.go index 4c46ef3309..85fd471e79 100644 --- a/internal/datacoord/compaction_policy_l0.go +++ b/internal/datacoord/compaction_policy_l0.go @@ -1,6 +1,7 @@ package datacoord import ( + "context" "sync" "time" @@ -8,8 +9,10 @@ import ( "go.uber.org/atomic" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -18,16 +21,18 @@ type l0CompactionPolicy struct { meta *meta activeCollections *activeCollections + allocator allocator.Allocator // key: collectionID, value: reference count skipCompactionCollections map[int64]int skipLocker sync.RWMutex } -func newL0CompactionPolicy(meta *meta) *l0CompactionPolicy { +func newL0CompactionPolicy(meta *meta, allocator allocator.Allocator) *l0CompactionPolicy { return &l0CompactionPolicy{ meta: meta, activeCollections: newActiveCollections(), + allocator: allocator, skipCompactionCollections: make(map[int64]int), } } @@ -69,8 +74,7 @@ func (policy *l0CompactionPolicy) OnCollectionUpdate(collectionID int64) { policy.activeCollections.Record(collectionID) } -func (policy *l0CompactionPolicy) Trigger() (events map[CompactionTriggerType][]CompactionView, err error) { - events = make(map[CompactionTriggerType][]CompactionView) +func (policy *l0CompactionPolicy) Trigger(ctx context.Context) (events map[CompactionTriggerType][]CompactionView, err error) { latestCollSegs := policy.meta.GetCompactableSegmentGroupByCollection() // 1. Get active collections @@ -82,6 +86,12 @@ func (policy *l0CompactionPolicy) Trigger() (events map[CompactionTriggerType][] idleCollsSet := typeutil.NewUniqueSet(idleColls...) activeL0Views, idleL0Views := []CompactionView{}, []CompactionView{} + newTriggerID, err := policy.allocator.AllocID(ctx) + if err != nil { + log.Warn("fail to allocate triggerID to trigger l0 compaction", zap.Error(err)) + return nil, err + } + events = make(map[CompactionTriggerType][]CompactionView) for collID, segments := range latestCollSegs { if policy.isSkipCollection(collID) { continue @@ -94,8 +104,7 @@ func (policy *l0CompactionPolicy) Trigger() (events map[CompactionTriggerType][] if len(levelZeroSegments) == 0 { continue } - - labelViews := policy.groupL0ViewsByPartChan(collID, GetViewsByInfo(levelZeroSegments...)) + labelViews := policy.groupL0ViewsByPartChan(collID, GetViewsByInfo(levelZeroSegments...), newTriggerID) if idleCollsSet.Contain(collID) { idleL0Views = append(idleL0Views, labelViews...) } else { @@ -113,7 +122,7 @@ func (policy *l0CompactionPolicy) Trigger() (events map[CompactionTriggerType][] return } -func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) []CompactionView { +func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView, triggerID UniqueID) []CompactionView { partChanView := make(map[string]*LevelZeroSegmentsView) // "part-chan" as key for _, view := range levelZeroSegments { key := view.label.Key() @@ -122,6 +131,7 @@ func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, label: view.label, segments: []*SegmentView{view}, earliestGrowingSegmentPos: policy.meta.GetEarliestStartPositionOfGrowingSegments(view.label), + triggerID: triggerID, } } else { partChanView[key].Append(view) @@ -133,6 +143,33 @@ func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, }) } +func (policy *l0CompactionPolicy) triggerOneCollection(ctx context.Context, collectionID int64) ([]CompactionView, int64, error) { + log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID)) + log.Info("start trigger collection l0 compaction") + if policy.isSkipCollection(collectionID) { + return nil, 0, merr.WrapErrCollectionNotLoaded(collectionID, "the collection being paused by importing cannot do force l0 compaction") + } + allL0Segments := policy.meta.SelectSegments(ctx, WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool { + return isSegmentHealthy(segment) && + isFlushed(segment) && + !segment.isCompacting && // not compacting now + !segment.GetIsImporting() && // not importing now + segment.GetLevel() == datapb.SegmentLevel_L0 + })) + + if len(allL0Segments) == 0 { + return nil, 0, nil + } + + newTriggerID, err := policy.allocator.AllocID(ctx) + if err != nil { + log.Warn("fail to allocate triggerID for l0 compaction", zap.Error(err)) + return nil, 0, err + } + views := policy.groupL0ViewsByPartChan(collectionID, GetViewsByInfo(allL0Segments...), newTriggerID) + return views, newTriggerID, nil +} + type activeCollection struct { ID int64 lastRefresh time.Time diff --git a/internal/datacoord/compaction_policy_l0_test.go b/internal/datacoord/compaction_policy_l0_test.go index c99d464711..8031a00b0e 100644 --- a/internal/datacoord/compaction_policy_l0_test.go +++ b/internal/datacoord/compaction_policy_l0_test.go @@ -16,15 +16,17 @@ package datacoord import ( + "context" "testing" "time" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" @@ -58,8 +60,8 @@ func (s *L0CompactionPolicySuite) SetupTest() { for id, segment := range segments { meta.segments.SetSegment(id, segment) } - - s.l0_policy = newL0CompactionPolicy(meta) + s.mockAlloc = allocator.NewMockAllocator(s.T()) + s.l0_policy = newL0CompactionPolicy(meta, s.mockAlloc) } const MB = 1024 * 1024 @@ -68,13 +70,14 @@ func (s *L0CompactionPolicySuite) TestActiveToIdle() { paramtable.Get().Save(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key, "1") defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key) + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(1, nil) s.l0_policy.OnCollectionUpdate(1) s.Require().EqualValues(1, s.l0_policy.activeCollections.GetActiveCollections()[0]) <-time.After(3 * time.Second) for range 3 { - gotViews, err := s.l0_policy.Trigger() + gotViews, err := s.l0_policy.Trigger(context.Background()) s.NoError(err) s.NotNil(gotViews) s.NotEmpty(gotViews) @@ -83,7 +86,7 @@ func (s *L0CompactionPolicySuite) TestActiveToIdle() { } s.Empty(s.l0_policy.activeCollections.GetActiveCollections()) - gotViews, err := s.l0_policy.Trigger() + gotViews, err := s.l0_policy.Trigger(context.Background()) s.NoError(err) s.NotNil(gotViews) s.NotEmpty(gotViews) @@ -93,8 +96,8 @@ func (s *L0CompactionPolicySuite) TestActiveToIdle() { func (s *L0CompactionPolicySuite) TestTriggerIdle() { s.Require().Empty(s.l0_policy.activeCollections.GetActiveCollections()) - - events, err := s.l0_policy.Trigger() + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(1, nil) + events, err := s.l0_policy.Trigger(context.Background()) s.NoError(err) s.NotEmpty(events) @@ -118,18 +121,18 @@ func (s *L0CompactionPolicySuite) TestTriggerIdle() { s.l0_policy.AddSkipCollection(1) s.l0_policy.AddSkipCollection(1) // Test for skip collection - events, err = s.l0_policy.Trigger() + events, err = s.l0_policy.Trigger(context.Background()) s.NoError(err) s.Empty(events) // Test for skip collection with ref count s.l0_policy.RemoveSkipCollection(1) - events, err = s.l0_policy.Trigger() + events, err = s.l0_policy.Trigger(context.Background()) s.NoError(err) s.Empty(events) s.l0_policy.RemoveSkipCollection(1) - events, err = s.l0_policy.Trigger() + events, err = s.l0_policy.Trigger(context.Background()) s.NoError(err) s.Equal(1, len(events)) gotViews, ok = events[TriggerTypeLevelZeroViewIDLE] @@ -166,9 +169,10 @@ func (s *L0CompactionPolicySuite) TestTriggerViewChange() { meta.segments.SetSegment(id, segment) } s.l0_policy.meta = meta + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(1, nil) s.l0_policy.OnCollectionUpdate(s.testLabel.CollectionID) - events, err := s.l0_policy.Trigger() + events, err := s.l0_policy.Trigger(context.Background()) s.NoError(err) s.Equal(1, len(events)) gotViews, ok := events[TriggerTypeLevelZeroViewChange] @@ -180,6 +184,11 @@ func (s *L0CompactionPolicySuite) TestTriggerViewChange() { s.Empty(gotViews) } +func (s *L0CompactionPolicySuite) TestManualTrigger() { + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(1, nil) + s.l0_policy.triggerOneCollection(context.Background(), s.testLabel.CollectionID) +} + func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo { segArgs := []struct { ID UniqueID diff --git a/internal/datacoord/compaction_policy_single.go b/internal/datacoord/compaction_policy_single.go index 1408f79621..02b8406da0 100644 --- a/internal/datacoord/compaction_policy_single.go +++ b/internal/datacoord/compaction_policy_single.go @@ -307,3 +307,11 @@ func (v *MixSegmentView) Trigger() (CompactionView, string) { func (v *MixSegmentView) ForceTrigger() (CompactionView, string) { panic("implement me") } + +func (v *MixSegmentView) ForceTriggerAll() ([]CompactionView, string) { + panic("implement me") +} + +func (v *MixSegmentView) GetTriggerID() int64 { + return v.triggerID +} diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 7fa19144b8..a9806c040b 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -38,6 +38,7 @@ type CompactionTriggerType int8 const ( TriggerTypeLevelZeroViewChange CompactionTriggerType = iota + 1 TriggerTypeLevelZeroViewIDLE + TriggerTypeLevelZeroViewManual TriggerTypeSegmentSizeViewChange TriggerTypeClustering TriggerTypeSingle @@ -50,6 +51,8 @@ func (t CompactionTriggerType) String() string { return "LevelZeroViewChange" case TriggerTypeLevelZeroViewIDLE: return "LevelZeroViewIDLE" + case TriggerTypeLevelZeroViewManual: + return "LevelZeroViewManual" case TriggerTypeSegmentSizeViewChange: return "SegmentSizeViewChange" case TriggerTypeClustering: @@ -67,7 +70,7 @@ type TriggerManager interface { Start() Stop() OnCollectionUpdate(collectionID int64) - ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (UniqueID, error) + ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool) (UniqueID, error) GetPauseCompactionChan(jobID, collectionID int64) <-chan struct{} GetResumeCompactionChan(jobID, collectionID int64) <-chan struct{} } @@ -119,7 +122,7 @@ func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, ins } m.l0SigLock = &sync.Mutex{} m.l0TickSig = sync.NewCond(m.l0SigLock) - m.l0Policy = newL0CompactionPolicy(meta) + m.l0Policy = newL0CompactionPolicy(meta, alloc) m.clusteringPolicy = newClusteringCompactionPolicy(meta, m.allocator, m.handler) m.singlePolicy = newSingleCompactionPolicy(meta, m.allocator, m.handler) return m @@ -231,7 +234,7 @@ func (m *CompactionTriggerManager) loop(ctx context.Context) { continue } m.setL0Triggering(true) - events, err := m.l0Policy.Trigger() + events, err := m.l0Policy.Trigger(ctx) if err != nil { log.Warn("Fail to trigger L0 policy", zap.Error(err)) m.setL0Triggering(false) @@ -291,14 +294,27 @@ func (m *CompactionTriggerManager) loop(ctx context.Context) { } } -func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (UniqueID, error) { - log.Ctx(ctx).Info("receive manual trigger", zap.Int64("collectionID", collectionID)) - views, triggerID, err := m.clusteringPolicy.triggerOneCollection(ctx, collectionID, true) +func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool) (UniqueID, error) { + log.Ctx(ctx).Info("receive manual trigger", zap.Int64("collectionID", collectionID), + zap.Bool("clusteringCompaction", clusteringCompaction), zap.Bool("l0Compaction", l0Compaction)) + + var triggerID UniqueID + var err error + var views []CompactionView + + events := make(map[CompactionTriggerType][]CompactionView, 0) + if l0Compaction { + m.setL0Triggering(true) + defer m.setL0Triggering(false) + views, triggerID, err = m.l0Policy.triggerOneCollection(ctx, collectionID) + events[TriggerTypeLevelZeroViewManual] = views + } else if clusteringCompaction { + views, triggerID, err = m.clusteringPolicy.triggerOneCollection(ctx, collectionID, true) + events[TriggerTypeClustering] = views + } if err != nil { return 0, err } - events := make(map[CompactionTriggerType][]CompactionView, 0) - events[TriggerTypeClustering] = views if len(events) > 0 { for triggerType, views := range events { m.notify(ctx, triggerType, views) @@ -307,31 +323,41 @@ func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collection return triggerID, nil } +func (m *CompactionTriggerManager) triggerViewForCompaction(ctx context.Context, eventType CompactionTriggerType, + view CompactionView) ([]CompactionView, string) { + if eventType == TriggerTypeLevelZeroViewIDLE { + view, reason := view.ForceTrigger() + return []CompactionView{view}, reason + } else if eventType == TriggerTypeLevelZeroViewManual { + return view.ForceTriggerAll() + } + outView, reason := view.Trigger() + return []CompactionView{outView}, reason +} + func (m *CompactionTriggerManager) notify(ctx context.Context, eventType CompactionTriggerType, views []CompactionView) { log := log.Ctx(ctx) log.Debug("Start to trigger compactions", zap.String("eventType", eventType.String())) for _, view := range views { - outView, reason := view.Trigger() - if outView == nil && eventType == TriggerTypeLevelZeroViewIDLE { - log.Info("Start to force trigger a level zero compaction") - outView, reason = view.ForceTrigger() - } + outViews, reason := m.triggerViewForCompaction(ctx, eventType, view) + for _, outView := range outViews { + if outView != nil { + log.Info("Success to trigger a compaction, try to submit", + zap.String("eventType", eventType.String()), + zap.String("reason", reason), + zap.String("output view", outView.String()), + zap.Int64("triggerID", outView.GetTriggerID())) - if outView != nil { - log.Info("Success to trigger a compaction, try to submit", - zap.String("eventType", eventType.String()), - zap.String("reason", reason), - zap.String("output view", outView.String())) - - switch eventType { - case TriggerTypeLevelZeroViewChange, TriggerTypeLevelZeroViewIDLE: - m.SubmitL0ViewToScheduler(ctx, outView) - case TriggerTypeClustering: - m.SubmitClusteringViewToScheduler(ctx, outView) - case TriggerTypeSingle: - m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_MixCompaction) - case TriggerTypeSort: - m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_SortCompaction) + switch eventType { + case TriggerTypeLevelZeroViewChange, TriggerTypeLevelZeroViewIDLE, TriggerTypeLevelZeroViewManual: + m.SubmitL0ViewToScheduler(ctx, outView) + case TriggerTypeClustering: + m.SubmitClusteringViewToScheduler(ctx, outView) + case TriggerTypeSingle: + m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_MixCompaction) + case TriggerTypeSort: + m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_SortCompaction) + } } } } @@ -366,7 +392,7 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, }) task := &datapb.CompactionTask{ - TriggerID: taskID, // inner trigger, use task id as trigger id + TriggerID: view.GetTriggerID(), PlanID: taskID, Type: datapb.CompactionType_Level0DeleteCompaction, StartTime: time.Now().Unix(), diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index 2c81a93cdd..7089ac511b 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -87,7 +87,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { expectedSegID := seg1.ID s.Require().Equal(1, len(latestL0Segments)) - levelZeroViews := s.triggerManager.l0Policy.groupL0ViewsByPartChan(1, latestL0Segments) + levelZeroViews := s.triggerManager.l0Policy.groupL0ViewsByPartChan(1, latestL0Segments, 10000) s.Require().Equal(1, len(levelZeroViews)) cView, ok := levelZeroViews[0].(*LevelZeroSegmentsView) s.True(ok) @@ -130,7 +130,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { latestL0Segments := GetViewsByInfo(levelZeroSegments...) s.Require().NotEmpty(latestL0Segments) - levelZeroViews := s.triggerManager.l0Policy.groupL0ViewsByPartChan(1, latestL0Segments) + levelZeroViews := s.triggerManager.l0Policy.groupL0ViewsByPartChan(1, latestL0Segments, 10000) s.Require().Equal(1, len(levelZeroViews)) cView, ok := levelZeroViews[0].(*LevelZeroSegmentsView) s.True(ok) @@ -398,3 +398,48 @@ func TestCompactionAndImport(t *testing.T) { defer triggerManager.Stop() time.Sleep(3 * time.Second) } + +func (s *CompactionTriggerManagerSuite) TestManualTriggerL0Compaction() { + handler := NewNMockHandler(s.T()) + handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil) + s.triggerManager.handler = handler + + collSegs := s.meta.GetCompactableSegmentGroupByCollection() + segments, found := collSegs[1] + s.Require().True(found) + + levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool { + return info.GetLevel() == datapb.SegmentLevel_L0 + }) + s.Require().NotEmpty(levelZeroSegments) + + // Mock allocator for trigger ID + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(int64(12345), nil) + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(int64(19530), nil).Maybe() + + // Mock inspector to expect compaction enqueue + s.inspector.EXPECT().enqueueCompaction(mock.Anything). + RunAndReturn(func(task *datapb.CompactionTask) error { + s.EqualValues(19530, task.GetTriggerID()) + s.Equal(s.testLabel.CollectionID, task.GetCollectionID()) + s.Equal(s.testLabel.PartitionID, task.GetPartitionID()) + s.Equal(s.testLabel.Channel, task.GetChannel()) + s.Equal(datapb.CompactionType_Level0DeleteCompaction, task.GetType()) + + expectedSegs := []int64{100, 101, 102} + s.ElementsMatch(expectedSegs, task.GetInputSegments()) + return nil + }).Return(nil).Once() + + // Test L0 manual trigger + triggerID, err := s.triggerManager.ManualTrigger(context.Background(), s.testLabel.CollectionID, false, true) + s.NoError(err) + s.Equal(int64(12345), triggerID) +} + +func (s *CompactionTriggerManagerSuite) TestManualTriggerInvalidParams() { + // Test with both clustering and L0 compaction false + triggerID, err := s.triggerManager.ManualTrigger(context.Background(), s.testLabel.CollectionID, false, false) + s.NoError(err) + s.Equal(int64(0), triggerID) +} diff --git a/internal/datacoord/compaction_view.go b/internal/datacoord/compaction_view.go index 52a68f381b..3039281db5 100644 --- a/internal/datacoord/compaction_view.go +++ b/internal/datacoord/compaction_view.go @@ -34,6 +34,8 @@ type CompactionView interface { String() string Trigger() (CompactionView, string) ForceTrigger() (CompactionView, string) + ForceTriggerAll() ([]CompactionView, string) + GetTriggerID() int64 } type FullViews struct { diff --git a/internal/datacoord/mock_trigger_manager.go b/internal/datacoord/mock_trigger_manager.go index d319c16d54..9dc3c2ce13 100644 --- a/internal/datacoord/mock_trigger_manager.go +++ b/internal/datacoord/mock_trigger_manager.go @@ -119,9 +119,9 @@ func (_c *MockTriggerManager_GetResumeCompactionChan_Call) RunAndReturn(run func return _c } -// ManualTrigger provides a mock function with given fields: ctx, collectionID, clusteringCompaction -func (_m *MockTriggerManager) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (int64, error) { - ret := _m.Called(ctx, collectionID, clusteringCompaction) +// ManualTrigger provides a mock function with given fields: ctx, collectionID, clusteringCompaction, l0Compaction +func (_m *MockTriggerManager) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool) (int64, error) { + ret := _m.Called(ctx, collectionID, clusteringCompaction, l0Compaction) if len(ret) == 0 { panic("no return value specified for ManualTrigger") @@ -129,17 +129,17 @@ func (_m *MockTriggerManager) ManualTrigger(ctx context.Context, collectionID in var r0 int64 var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, bool) (int64, error)); ok { - return rf(ctx, collectionID, clusteringCompaction) + if rf, ok := ret.Get(0).(func(context.Context, int64, bool, bool) (int64, error)); ok { + return rf(ctx, collectionID, clusteringCompaction, l0Compaction) } - if rf, ok := ret.Get(0).(func(context.Context, int64, bool) int64); ok { - r0 = rf(ctx, collectionID, clusteringCompaction) + if rf, ok := ret.Get(0).(func(context.Context, int64, bool, bool) int64); ok { + r0 = rf(ctx, collectionID, clusteringCompaction, l0Compaction) } else { r0 = ret.Get(0).(int64) } - if rf, ok := ret.Get(1).(func(context.Context, int64, bool) error); ok { - r1 = rf(ctx, collectionID, clusteringCompaction) + if rf, ok := ret.Get(1).(func(context.Context, int64, bool, bool) error); ok { + r1 = rf(ctx, collectionID, clusteringCompaction, l0Compaction) } else { r1 = ret.Error(1) } @@ -156,13 +156,14 @@ type MockTriggerManager_ManualTrigger_Call struct { // - ctx context.Context // - collectionID int64 // - clusteringCompaction bool -func (_e *MockTriggerManager_Expecter) ManualTrigger(ctx interface{}, collectionID interface{}, clusteringCompaction interface{}) *MockTriggerManager_ManualTrigger_Call { - return &MockTriggerManager_ManualTrigger_Call{Call: _e.mock.On("ManualTrigger", ctx, collectionID, clusteringCompaction)} +// - l0Compaction bool +func (_e *MockTriggerManager_Expecter) ManualTrigger(ctx interface{}, collectionID interface{}, clusteringCompaction interface{}, l0Compaction interface{}) *MockTriggerManager_ManualTrigger_Call { + return &MockTriggerManager_ManualTrigger_Call{Call: _e.mock.On("ManualTrigger", ctx, collectionID, clusteringCompaction, l0Compaction)} } -func (_c *MockTriggerManager_ManualTrigger_Call) Run(run func(ctx context.Context, collectionID int64, clusteringCompaction bool)) *MockTriggerManager_ManualTrigger_Call { +func (_c *MockTriggerManager_ManualTrigger_Call) Run(run func(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool)) *MockTriggerManager_ManualTrigger_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(bool)) + run(args[0].(context.Context), args[1].(int64), args[2].(bool), args[3].(bool)) }) return _c } @@ -172,7 +173,7 @@ func (_c *MockTriggerManager_ManualTrigger_Call) Return(_a0 int64, _a1 error) *M return _c } -func (_c *MockTriggerManager_ManualTrigger_Call) RunAndReturn(run func(context.Context, int64, bool) (int64, error)) *MockTriggerManager_ManualTrigger_Call { +func (_c *MockTriggerManager_ManualTrigger_Call) RunAndReturn(run func(context.Context, int64, bool, bool) (int64, error)) *MockTriggerManager_ManualTrigger_Call { _c.Call.Return(run) return _c } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 338fb9f7cb..60a4019044 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -1706,6 +1706,25 @@ func TestManualCompaction(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) + t.Run("test manual l0 compaction successfully", func(t *testing.T) { + svr := &Server{allocator: allocator.NewMockAllocator(t)} + svr.stateCode.Store(commonpb.StateCode_Healthy) + mockTriggerManager := NewMockTriggerManager(t) + svr.compactionTriggerManager = mockTriggerManager + mockTriggerManager.EXPECT().ManualTrigger(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(1, nil) + + mockHandler := NewMockCompactionInspector(t) + mockHandler.EXPECT().getCompactionTasksNumBySignalID(mock.Anything).Return(1) + svr.compactionInspector = mockHandler + resp, err := svr.ManualCompaction(context.TODO(), &milvuspb.ManualCompactionRequest{ + CollectionID: 1, + Timetravel: 1, + L0Compaction: true, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + }) + t.Run("test manual compaction failure", func(t *testing.T) { svr := &Server{allocator: allocator.NewMockAllocator(t)} svr.stateCode.Store(commonpb.StateCode_Healthy) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index c2efba5039..85ae53a6f6 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1346,8 +1346,8 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa var id int64 var err error - if req.MajorCompaction { - id, err = s.compactionTriggerManager.ManualTrigger(ctx, req.CollectionID, req.GetMajorCompaction()) + if req.GetMajorCompaction() || req.GetL0Compaction() { + id, err = s.compactionTriggerManager.ManualTrigger(ctx, req.CollectionID, req.GetMajorCompaction(), req.GetL0Compaction()) } else { id, err = s.compactionTrigger.TriggerCompaction(ctx, NewCompactionSignal(). WithIsForce(true). @@ -1372,7 +1372,8 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa resp.CompactionPlanCount = int32(taskCnt) } - log.Info("success to trigger manual compaction", zap.Bool("isMajor", req.GetMajorCompaction()), zap.Int64("compactionID", id), zap.Int("taskNum", taskCnt)) + log.Info("success to trigger manual compaction", zap.Bool("isL0Compaction", req.GetL0Compaction()), + zap.Bool("isMajorCompaction", req.GetMajorCompaction()), zap.Int64("compactionID", id), zap.Int("taskNum", taskCnt)) return resp, nil } diff --git a/pkg/go.mod b/pkg/go.mod index 7af4f98ac8..fff7ee0235 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -21,7 +21,7 @@ require ( github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.17.9 - github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c + github.com/milvus-io/milvus-proto/go-api/v2 v2.6.3-0.20250918113553-d15826602cc9 github.com/minio/minio-go/v7 v7.0.73 github.com/panjf2000/ants/v2 v2.11.3 github.com/prometheus/client_golang v1.20.5 diff --git a/pkg/go.sum b/pkg/go.sum index 12b9b9871a..7c4ce17a45 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -599,6 +599,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c h1:B7zmZ30lWHE4wNjT/g2NPe3q0gcUtw7cA5shMtWAmDc= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.3-0.20250918113553-d15826602cc9 h1:7ojrhnBHitGaqebExGP00x0wDTioMgPniEBmNdFPiDI= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.3-0.20250918113553-d15826602cc9/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v7 v7.0.73 h1:qr2vi96Qm7kZ4v7LLebjte+MQh621fFWnv93p12htEo= diff --git a/tests/go_client/go.mod b/tests/go_client/go.mod index 72fb43c938..14be7fcdf9 100644 --- a/tests/go_client/go.mod +++ b/tests/go_client/go.mod @@ -51,7 +51,7 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect - github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c // indirect + github.com/milvus-io/milvus-proto/go-api/v2 v2.6.3-0.20250918113553-d15826602cc9 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect diff --git a/tests/go_client/go.sum b/tests/go_client/go.sum index 38dfe33338..9618cbe0dd 100644 --- a/tests/go_client/go.sum +++ b/tests/go_client/go.sum @@ -320,6 +320,8 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c h1:B7zmZ30lWHE4wNjT/g2NPe3q0gcUtw7cA5shMtWAmDc= github.com/milvus-io/milvus-proto/go-api/v2 v2.6.2-0.20250911093549-4cc2bace3f8c/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.3-0.20250918113553-d15826602cc9 h1:7ojrhnBHitGaqebExGP00x0wDTioMgPniEBmNdFPiDI= +github.com/milvus-io/milvus-proto/go-api/v2 v2.6.3-0.20250918113553-d15826602cc9/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e h1:VCr43pG4efacDbM4au70fh8/5hNTftoWzm1iEumvDWM= github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e/go.mod h1:37AWzxVs2NS4QUJrkcbeLUwi+4Av0h5mEdjLI62EANU= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=