From a042a6e1e8ec8d22834b6c326df2bdb8074c041f Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 8 Dec 2025 19:33:15 +0800 Subject: [PATCH] enhance: support pause GC at collection level (#45943) Add collection-level granularity to the garbage collector pause/resume mechanism. Previously, GC pause affected all collections globally. Now operators can pause GC for specific collections while allowing other collections to continue normal GC operations. Changes: - Add `pausedCollection` concurrent map to track per-collection pause state - Extend `Pause()` and `Resume()` methods with `collectionID` parameter - Add `collectionGCPaused()` helper to check collection pause status - Skip dropped segment recycling when collection GC is paused - Update management API to accept optional `collection_id` query parameter - Add `GetInt64Value()` utility function for parsing int64 from KV pairs - Maintain backward compatibility: collectionID <= 0 triggers global pause This provides DevOps with finer control over Milvus data lifecycle. issue: #45941 --------- Signed-off-by: Congqi Xia --- internal/datacoord/garbage_collector.go | 342 ++++++++++++++++--- internal/datacoord/garbage_collector_test.go | 110 ++++-- internal/datacoord/services.go | 16 +- internal/proxy/management.go | 71 +++- internal/proxy/management_test.go | 6 +- pkg/common/common.go | 25 ++ 6 files changed, 477 insertions(+), 93 deletions(-) diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 0c80a90658..9264b685f3 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -69,19 +69,114 @@ type garbageCollector struct { meta *meta handler Handler - startOnce sync.Once - stopOnce sync.Once - wg sync.WaitGroup - cmdCh chan gcCmd - pauseUntil atomic.Time + startOnce sync.Once + stopOnce sync.Once + wg sync.WaitGroup + cmdCh chan gcCmd + pauseUntil *gcPauseRecords + pausedCollection *typeutil.ConcurrentMap[int64, *gcPauseRecords] + controlChannels map[string]chan gcCmd systemMetricsListener *hardware.SystemMetricsListener } type gcCmd struct { - cmdType datapb.GcCommand - duration time.Duration - done chan struct{} + cmdType datapb.GcCommand + duration time.Duration + collectionID int64 + ticket string + done chan error + timeout <-chan struct{} +} + +type gcPauseRecord struct { + ticket string + pauseUntil time.Time +} + +type gcPauseRecords struct { + mut sync.RWMutex + maxLen int + records typeutil.Heap[gcPauseRecord] +} + +func (gc *gcPauseRecords) PauseUntil() time.Time { + // nil protection + if gc == nil { + return time.Time{} + } + gc.mut.RLock() + defer gc.mut.RUnlock() + // no pause records, return zero value + if gc.records.Len() == 0 { + return time.Time{} + } + + return gc.records.Peek().pauseUntil +} + +func (gc *gcPauseRecords) Insert(ticket string, pauseUntil time.Time) error { + gc.mut.Lock() + defer gc.mut.Unlock() + + // heap small enough, short path + if gc.records.Len() < gc.maxLen { + gc.records.Push(gcPauseRecord{ + ticket: ticket, + pauseUntil: pauseUntil, + }) + return nil + } + + records := make([]gcPauseRecord, 0, gc.records.Len()) + now := time.Now() + for gc.records.Len() > 0 { + record := gc.records.Pop() + if record.pauseUntil.After(now) { + records = append(records, record) + } + } + + if gc.records.Len() < gc.maxLen { + gc.records.Push(gcPauseRecord{ + ticket: ticket, + pauseUntil: pauseUntil, + }) + } + + // too many pause records, refresh heap + return merr.WrapErrTooManyRequests(64, "too many pause records") +} + +func (gc *gcPauseRecords) Delete(ticket string) { + gc.mut.Lock() + defer gc.mut.Unlock() + now := time.Now() + records := make([]gcPauseRecord, 0, gc.records.Len()) + for gc.records.Len() > 0 { + record := gc.records.Pop() + if now.Before(record.pauseUntil) && record.ticket != ticket { + records = append(records, record) + } + } + gc.records = typeutil.NewObjectArrayBasedMaximumHeap(records, func(r gcPauseRecord) int64 { + return r.pauseUntil.UnixNano() + }) +} + +func (gc *gcPauseRecords) Len() int { + gc.mut.RLock() + defer gc.mut.RUnlock() + return gc.records.Len() +} + +func NewGCPauseRecords() *gcPauseRecords { + return &gcPauseRecords{ + records: typeutil.NewObjectArrayBasedMaximumHeap[gcPauseRecord, int64]([]gcPauseRecord{}, func(r gcPauseRecord) int64 { + return r.pauseUntil.UnixNano() + }), + maxLen: 64, + } } // newSystemMetricsListener creates a system metrics listener for garbage collector. @@ -120,6 +215,13 @@ func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageColl zap.Duration("dropTolerance", opt.dropTolerance)) opt.removeObjectPool = conc.NewPool[struct{}](Params.DataCoordCfg.GCRemoveConcurrent.GetAsInt(), conc.WithExpiryDuration(time.Minute)) ctx, cancel := context.WithCancel(context.Background()) + metaSignal := make(chan gcCmd) + orphanSignal := make(chan gcCmd) + // control signal channels + controlChannels := map[string]chan gcCmd{ + "meta": metaSignal, + "orphan": orphanSignal, + } return &garbageCollector{ ctx: ctx, cancel: cancel, @@ -128,6 +230,9 @@ func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageColl option: opt, cmdCh: make(chan gcCmd), systemMetricsListener: newSystemMetricsListener(&opt), + pauseUntil: NewGCPauseRecords(), + pausedCollection: typeutil.NewConcurrentMap[int64, *gcPauseRecords](), + controlChannels: controlChannels, } } @@ -152,7 +257,7 @@ type GcStatus struct { // GetStatus returns the current status of the garbage collector. func (gc *garbageCollector) GetStatus() GcStatus { - pauseUntil := gc.pauseUntil.Load() + pauseUntil := gc.pauseUntil.PauseUntil() now := time.Now() if now.Before(pauseUntil) { @@ -168,35 +273,40 @@ func (gc *garbageCollector) GetStatus() GcStatus { } } -func (gc *garbageCollector) Pause(ctx context.Context, pauseDuration time.Duration) error { +func (gc *garbageCollector) Pause(ctx context.Context, collectionID int64, ticket string, pauseDuration time.Duration) error { if !gc.option.enabled { log.Info("garbage collection not enabled") return nil } - done := make(chan struct{}) + done := make(chan error, 1) select { case gc.cmdCh <- gcCmd{ - cmdType: datapb.GcCommand_Pause, - duration: pauseDuration, - done: done, + cmdType: datapb.GcCommand_Pause, + duration: pauseDuration, + collectionID: collectionID, + ticket: ticket, + done: done, + timeout: ctx.Done(), }: - <-done - return nil + return <-done case <-ctx.Done(): return ctx.Err() } } -func (gc *garbageCollector) Resume(ctx context.Context) error { +func (gc *garbageCollector) Resume(ctx context.Context, collectionID int64, ticket string) error { if !gc.option.enabled { log.Warn("garbage collection not enabled, cannot resume") return merr.WrapErrServiceUnavailable("garbage collection not enabled") } - done := make(chan struct{}) + done := make(chan error) select { case gc.cmdCh <- gcCmd{ - cmdType: datapb.GcCommand_Resume, - done: done, + cmdType: datapb.GcCommand_Resume, + done: done, + collectionID: collectionID, + ticket: ticket, + timeout: ctx.Done(), }: <-done return nil @@ -212,20 +322,21 @@ func (gc *garbageCollector) work(ctx context.Context) { gc.wg.Add(3) go func() { defer gc.wg.Done() - gc.runRecycleTaskWithPauser(ctx, "meta", gc.option.checkInterval, func(ctx context.Context) { - gc.recycleDroppedSegments(ctx) - gc.recycleChannelCPMeta(ctx) - gc.recycleUnusedIndexes(ctx) - gc.recycleUnusedSegIndexes(ctx) - gc.recycleUnusedAnalyzeFiles(ctx) - gc.recycleUnusedTextIndexFiles(ctx) - gc.recycleUnusedJSONIndexFiles(ctx) - gc.recycleUnusedJSONStatsFiles(ctx) + gc.runRecycleTaskWithPauser(ctx, "meta", gc.option.checkInterval, func(ctx context.Context, signal <-chan gcCmd) { + gc.recycleDroppedSegments(ctx, signal) + gc.recycleChannelCPMeta(ctx, signal) + gc.recycleUnusedIndexes(ctx, signal) + gc.recycleUnusedSegIndexes(ctx, signal) + gc.recycleUnusedAnalyzeFiles(ctx, signal) + gc.recycleUnusedTextIndexFiles(ctx, signal) + gc.recycleUnusedJSONIndexFiles(ctx, signal) + gc.recycleUnusedJSONStatsFiles(ctx, signal) }) }() go func() { defer gc.wg.Done() - gc.runRecycleTaskWithPauser(ctx, "orphan", gc.option.scanInterval, func(ctx context.Context) { + gc.runRecycleTaskWithPauser(ctx, "orphan", gc.option.scanInterval, func(ctx context.Context, signal <-chan gcCmd) { + // orphan file not controlled by collection level pause for now gc.recycleUnusedBinlogFiles(ctx) gc.recycleUnusedIndexFiles(ctx) }) @@ -236,6 +347,16 @@ func (gc *garbageCollector) work(ctx context.Context) { }() } +func (gc *garbageCollector) ackSignal(signal <-chan gcCmd) { + select { + case cmd := <-signal: + if cmd.done != nil { + close(cmd.done) + } + default: + } +} + // startControlLoop start a control loop for garbageCollector. func (gc *garbageCollector) startControlLoop(_ context.Context) { hardware.RegisterSystemMetricsListener(gc.systemMetricsListener) @@ -246,17 +367,10 @@ func (gc *garbageCollector) startControlLoop(_ context.Context) { case cmd := <-gc.cmdCh: switch cmd.cmdType { case datapb.GcCommand_Pause: - pauseUntil := time.Now().Add(cmd.duration) - if pauseUntil.After(gc.pauseUntil.Load()) { - log.Info("garbage collection paused", zap.Duration("duration", cmd.duration), zap.Time("pauseUntil", pauseUntil)) - gc.pauseUntil.Store(pauseUntil) - } else { - log.Info("new pause until before current value", zap.Duration("duration", cmd.duration), zap.Time("pauseUntil", pauseUntil), zap.Time("oldPauseUntil", gc.pauseUntil.Load())) - } + err := gc.pause(cmd) + cmd.done <- err case datapb.GcCommand_Resume: - // reset to zero value - gc.pauseUntil.Store(time.Time{}) - log.Info("garbage collection resumed") + gc.resume(cmd) } close(cmd.done) case <-gc.ctx.Done(): @@ -266,29 +380,102 @@ func (gc *garbageCollector) startControlLoop(_ context.Context) { } } +func (gc *garbageCollector) pause(cmd gcCmd) error { + log := log.With( + zap.Int64("collectionID", cmd.collectionID), + zap.String("ticket", cmd.ticket), + ) + reqPauseUntil := time.Now().Add(cmd.duration) + log = log.With( + zap.Time("pauseUntil", reqPauseUntil), + zap.Duration("duration", cmd.duration), + ) + var err error + if cmd.collectionID <= 0 { // legacy pause all + err = gc.pauseUntil.Insert(cmd.ticket, reqPauseUntil) + log.Info("global pause ticket recorded") + } else { + curr, has := gc.pausedCollection.Get(cmd.collectionID) + if !has { + curr = NewGCPauseRecords() + gc.pausedCollection.Insert(cmd.collectionID, curr) + } + err = curr.Insert(cmd.ticket, reqPauseUntil) + log.Info("collection new pause ticket recorded") + } + if err != nil { + return err + } + signalCh := gc.controlChannels["meta"] + // send signal to worker + // make sure worker ack the pause command before returning + signal := gcCmd{ + done: make(chan error), + timeout: cmd.timeout, + } + select { + case signalCh <- signal: + <-signal.done + case <-cmd.timeout: + // timeout, resume the pause + gc.resume(cmd) + } + return nil +} + +func (gc *garbageCollector) resume(cmd gcCmd) { + // reset to zero value + var afterResume time.Time + if cmd.collectionID <= 0 { + gc.pauseUntil.Delete(cmd.ticket) + afterResume = gc.pauseUntil.PauseUntil() + } else { + curr, has := gc.pausedCollection.Get(cmd.collectionID) + if has { + curr.Delete(cmd.ticket) + afterResume = curr.PauseUntil() + if curr.Len() == 0 || time.Now().After(afterResume) { + gc.pausedCollection.Remove(cmd.collectionID) + } + } + } + stillPaused := time.Now().Before(afterResume) + log.Info("garbage collection resumed", zap.Bool("stillPaused", stillPaused)) +} + // runRecycleTaskWithPauser is a helper function to create a task with pauser -func (gc *garbageCollector) runRecycleTaskWithPauser(ctx context.Context, name string, interval time.Duration, task func(ctx context.Context)) { +func (gc *garbageCollector) runRecycleTaskWithPauser(ctx context.Context, name string, interval time.Duration, task func(ctx context.Context, signal <-chan gcCmd)) { logger := log.With(zap.String("gcType", name)).With(zap.Duration("interval", interval)) timer := time.NewTicker(interval) defer timer.Stop() - + // get signal channel, ok if nil, means no control + signal := gc.controlChannels[name] for { select { case <-ctx.Done(): return + case cmd := <-signal: + // notify signal received + close(cmd.done) case <-timer.C: - if time.Now().Before(gc.pauseUntil.Load()) { - logger.Info("garbage collector paused", zap.Time("until", gc.pauseUntil.Load())) + globalPauseUntil := gc.pauseUntil.PauseUntil() + if time.Now().Before(globalPauseUntil) { + logger.Info("garbage collector paused", zap.Time("until", globalPauseUntil)) continue } logger.Info("garbage collector recycle task start...") start := time.Now() - task(ctx) + task(ctx, signal) logger.Info("garbage collector recycle task done", zap.Duration("timeCost", time.Since(start))) } } } +func (gc *garbageCollector) collectionGCPaused(collectionID int64) bool { + collPauseUntil, has := gc.pausedCollection.Get(collectionID) + return has && time.Now().Before(collPauseUntil.PauseUntil()) +} + // close stop the garbage collector. func (gc *garbageCollector) close() { gc.stopOnce.Do(func() { @@ -474,7 +661,7 @@ func (gc *garbageCollector) checkDroppedSegmentGC(segment *SegmentInfo, } // recycleDroppedSegments scans all segments and remove those dropped segments from meta and oss. -func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) { +func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context, signal <-chan gcCmd) { start := time.Now() log := log.With(zap.String("gcName", "recycleDroppedSegments"), zap.Time("startAt", start)) log.Info("start clear dropped segments...") @@ -535,6 +722,13 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) { return } + gc.ackSignal(signal) + + if gc.collectionGCPaused(segment.GetCollectionID()) { + log.Info("skip GC segment since collection is paused", zap.Int64("segmentID", segmentID), zap.Int64("collectionID", segment.GetCollectionID())) + continue + } + log := log.With(zap.Int64("segmentID", segmentID)) segInsertChannel := segment.GetInsertChannel() if loadedSegments.Contain(segmentID) { @@ -579,7 +773,7 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) { } } -func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) { +func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context, signal <-chan gcCmd) { log := log.Ctx(ctx) channelCPs, err := gc.meta.catalog.ListChannelCheckpoint(ctx) if err != nil { @@ -593,6 +787,11 @@ func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) { log.Info("start to GC channel cp", zap.Int("vchannelCPCnt", len(channelCPs))) for vChannel := range channelCPs { collectionID := funcutil.GetCollectionIDFromVChannel(vChannel) + if gc.collectionGCPaused(collectionID) { + continue + } + + gc.ackSignal(signal) // !!! Skip to GC if vChannel format is illegal, it will lead meta leak in this case if collectionID == -1 { @@ -718,7 +917,7 @@ func (gc *garbageCollector) removeObjectFiles(ctx context.Context, filePaths map } // recycleUnusedIndexes is used to delete those indexes that is deleted by collection. -func (gc *garbageCollector) recycleUnusedIndexes(ctx context.Context) { +func (gc *garbageCollector) recycleUnusedIndexes(ctx context.Context, signal <-chan gcCmd) { start := time.Now() log := log.Ctx(ctx).With(zap.String("gcName", "recycleUnusedIndexes"), zap.Time("startAt", start)) log.Info("start recycleUnusedIndexes...") @@ -730,6 +929,10 @@ func (gc *garbageCollector) recycleUnusedIndexes(ctx context.Context) { // process canceled. return } + if gc.collectionGCPaused(index.CollectionID) { + continue + } + gc.ackSignal(signal) log := log.With(zap.Int64("collectionID", index.CollectionID), zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID)) if err := gc.meta.indexMeta.RemoveIndex(ctx, index.CollectionID, index.IndexID); err != nil { @@ -741,7 +944,7 @@ func (gc *garbageCollector) recycleUnusedIndexes(ctx context.Context) { } // recycleUnusedSegIndexes remove the index of segment if index is deleted or segment itself is deleted. -func (gc *garbageCollector) recycleUnusedSegIndexes(ctx context.Context) { +func (gc *garbageCollector) recycleUnusedSegIndexes(ctx context.Context, signal <-chan gcCmd) { start := time.Now() log := log.Ctx(ctx).With(zap.String("gcName", "recycleUnusedSegIndexes"), zap.Time("startAt", start)) log.Info("start recycleUnusedSegIndexes...") @@ -753,6 +956,10 @@ func (gc *garbageCollector) recycleUnusedSegIndexes(ctx context.Context) { // process canceled. return } + if gc.collectionGCPaused(segIdx.CollectionID) { + continue + } + gc.ackSignal(signal) // 1. segment belongs to is deleted. // 2. index is deleted. @@ -883,7 +1090,7 @@ func (gc *garbageCollector) getAllIndexFilesOfIndex(segmentIndex *model.SegmentI } // recycleUnusedAnalyzeFiles is used to delete those analyze stats files that no longer exist in the meta. -func (gc *garbageCollector) recycleUnusedAnalyzeFiles(ctx context.Context) { +func (gc *garbageCollector) recycleUnusedAnalyzeFiles(ctx context.Context, signal <-chan gcCmd) { log := log.Ctx(ctx) log.Info("start recycleUnusedAnalyzeFiles") startTs := time.Now() @@ -904,6 +1111,8 @@ func (gc *garbageCollector) recycleUnusedAnalyzeFiles(ctx context.Context) { // process canceled return } + // collection gc pause not affect analyze file for now + gc.ackSignal(signal) log.Debug("analyze keys", zap.String("key", key)) taskID, err := parseBuildIDFromFilePath(key) @@ -955,7 +1164,7 @@ func (gc *garbageCollector) recycleUnusedAnalyzeFiles(ctx context.Context) { // recycleUnusedTextIndexFiles load meta file info and compares OSS keys // if missing found, performs gc cleanup -func (gc *garbageCollector) recycleUnusedTextIndexFiles(ctx context.Context) { +func (gc *garbageCollector) recycleUnusedTextIndexFiles(ctx context.Context, signal <-chan gcCmd) { start := time.Now() log := log.Ctx(ctx).With(zap.String("gcName", "recycleUnusedTextIndexFiles"), zap.Time("startAt", start)) log.Info("start recycleUnusedTextIndexFiles...") @@ -968,6 +1177,15 @@ func (gc *garbageCollector) recycleUnusedTextIndexFiles(ctx context.Context) { deletedFilesNum := atomic.NewInt32(0) for _, seg := range hasTextIndexSegments { + if ctx.Err() != nil { + // process canceled, stop. + return + } + if gc.collectionGCPaused(seg.GetCollectionID()) { + log.Info("skip GC segment since collection is paused", zap.Int64("segmentID", seg.GetID()), zap.Int64("collectionID", seg.GetCollectionID())) + continue + } + gc.ackSignal(signal) for _, fieldStats := range seg.GetTextStatsLogs() { log := log.With(zap.Int64("segmentID", seg.GetID()), zap.Int64("fieldID", fieldStats.GetFieldID())) // clear low version task @@ -1016,7 +1234,7 @@ func (gc *garbageCollector) recycleUnusedTextIndexFiles(ctx context.Context) { // recycleUnusedJSONStatsFiles load meta file info and compares OSS keys // if missing found, performs gc cleanup -func (gc *garbageCollector) recycleUnusedJSONStatsFiles(ctx context.Context) { +func (gc *garbageCollector) recycleUnusedJSONStatsFiles(ctx context.Context, signal <-chan gcCmd) { start := time.Now() log := log.Ctx(ctx).With(zap.String("gcName", "recycleUnusedJSONStatsFiles"), zap.Time("startAt", start)) log.Info("start recycleUnusedJSONStatsFiles...") @@ -1029,6 +1247,15 @@ func (gc *garbageCollector) recycleUnusedJSONStatsFiles(ctx context.Context) { deletedFilesNum := atomic.NewInt32(0) for _, seg := range hasJSONStatsSegments { + if ctx.Err() != nil { + // process canceled, stop. + return + } + if gc.collectionGCPaused(seg.GetCollectionID()) { + log.Info("skip GC segment since collection is paused", zap.Int64("segmentID", seg.GetID()), zap.Int64("collectionID", seg.GetCollectionID())) + continue + } + gc.ackSignal(signal) for _, fieldStats := range seg.GetJsonKeyStats() { log := log.With(zap.Int64("segmentID", seg.GetID()), zap.Int64("fieldID", fieldStats.GetFieldID())) // clear low version task @@ -1114,7 +1341,7 @@ func (gc *garbageCollector) recycleUnusedJSONStatsFiles(ctx context.Context) { } // recycleUnusedJSONIndexFiles load meta file info and compares OSS keys -func (gc *garbageCollector) recycleUnusedJSONIndexFiles(ctx context.Context) { +func (gc *garbageCollector) recycleUnusedJSONIndexFiles(ctx context.Context, signal <-chan gcCmd) { start := time.Now() log := log.Ctx(ctx).With(zap.String("gcName", "recycleUnusedJSONIndexFiles"), zap.Time("startAt", start)) log.Info("start recycleUnusedJSONIndexFiles...") @@ -1127,6 +1354,15 @@ func (gc *garbageCollector) recycleUnusedJSONIndexFiles(ctx context.Context) { deletedFilesNum := atomic.NewInt32(0) for _, seg := range hasJSONIndexSegments { + if ctx.Err() != nil { + // process canceled, stop. + return + } + if gc.collectionGCPaused(seg.GetCollectionID()) { + log.Info("skip GC segment since collection is paused", zap.Int64("segmentID", seg.GetID()), zap.Int64("collectionID", seg.GetCollectionID())) + continue + } + gc.ackSignal(signal) for _, fieldStats := range seg.GetJsonKeyStats() { log := log.With(zap.Int64("segmentID", seg.GetID()), zap.Int64("fieldID", fieldStats.GetFieldID())) // clear low version task diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 33cd3a51c2..58e098d3a5 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -29,6 +29,7 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/google/uuid" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/stretchr/testify/assert" @@ -205,7 +206,8 @@ func Test_garbageCollector_scan(t *testing.T) { missingTolerance: time.Hour * 24, dropTolerance: 0, }) - gc.recycleDroppedSegments(context.TODO()) + signal := make(chan gcCmd) + gc.recycleDroppedSegments(context.TODO(), signal) validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentInsertLogPath), inserts[1:]) validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentStatslogPath), stats[1:]) validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentDeltaLogPath), delta[1:]) @@ -224,7 +226,8 @@ func Test_garbageCollector_scan(t *testing.T) { }) gc.start() gc.recycleUnusedBinlogFiles(context.TODO()) - gc.recycleDroppedSegments(context.TODO()) + signal := make(chan gcCmd) + gc.recycleDroppedSegments(context.TODO(), signal) // bad path shall remains since datacoord cannot determine file is garbage or not if path is not valid validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentInsertLogPath), inserts[1:2]) @@ -431,7 +434,7 @@ func TestGarbageCollector_recycleUnusedIndexes(t *testing.T) { mock.Anything, ).Return(nil) gc := newGarbageCollector(createMetaForRecycleUnusedIndexes(catalog), nil, GcOption{}) - gc.recycleUnusedIndexes(context.TODO()) + gc.recycleUnusedIndexes(context.TODO(), nil) }) t.Run("fail", func(t *testing.T) { @@ -442,7 +445,7 @@ func TestGarbageCollector_recycleUnusedIndexes(t *testing.T) { mock.Anything, ).Return(errors.New("fail")) gc := newGarbageCollector(createMetaForRecycleUnusedIndexes(catalog), nil, GcOption{}) - gc.recycleUnusedIndexes(context.TODO()) + gc.recycleUnusedIndexes(context.TODO(), nil) }) } @@ -588,7 +591,7 @@ func TestGarbageCollector_recycleUnusedSegIndexes(t *testing.T) { gc := newGarbageCollector(createMetaForRecycleUnusedSegIndexes(catalog), nil, GcOption{ cli: mockChunkManager, }) - gc.recycleUnusedSegIndexes(context.TODO()) + gc.recycleUnusedSegIndexes(context.TODO(), nil) }) t.Run("fail", func(t *testing.T) { @@ -606,7 +609,7 @@ func TestGarbageCollector_recycleUnusedSegIndexes(t *testing.T) { gc := newGarbageCollector(createMetaForRecycleUnusedSegIndexes(catalog), nil, GcOption{ cli: mockChunkManager, }) - gc.recycleUnusedSegIndexes(context.TODO()) + gc.recycleUnusedSegIndexes(context.TODO(), nil) }) } @@ -1386,6 +1389,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) { cm := &mocks.ChunkManager{} cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(nil) + signal := make(chan gcCmd) gc := newGarbageCollector( m, newMockHandlerWithMeta(m), @@ -1393,7 +1397,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) { cli: cm, dropTolerance: 1, }) - gc.recycleDroppedSegments(context.TODO()) + gc.recycleDroppedSegments(context.TODO(), signal) /* A B @@ -1451,7 +1455,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) { }) assert.NoError(t, err) - gc.recycleDroppedSegments(context.TODO()) + gc.recycleDroppedSegments(context.TODO(), signal) /* A: processed prior to C, C is not GCed yet and C is not indexed, A is not GCed in this turn @@ -1467,7 +1471,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) { segD = gc.meta.GetSegment(context.TODO(), segID+3) assert.Nil(t, segD) - gc.recycleDroppedSegments(context.TODO()) + gc.recycleDroppedSegments(context.TODO(), signal) /* A: compacted became false due to C is GCed already, A should be GCed since dropTolernace is meet B: compacted became false due to C is GCed already, B should be GCed since dropTolerance is meet @@ -1499,7 +1503,7 @@ func TestGarbageCollector_recycleChannelMeta(t *testing.T) { t.Run("list channel cp fail", func(t *testing.T) { catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, errors.New("mock error")).Once() - gc.recycleChannelCPMeta(context.TODO()) + gc.recycleChannelCPMeta(context.TODO(), nil) assert.Equal(t, 3, len(m.channelCPs.checkpoints)) }) @@ -1520,20 +1524,20 @@ func TestGarbageCollector_recycleChannelMeta(t *testing.T) { }).Maybe() t.Run("skip drop channel due to collection is available", func(t *testing.T) { - gc.recycleChannelCPMeta(context.TODO()) + gc.recycleChannelCPMeta(context.TODO(), nil) assert.Equal(t, 3, len(m.channelCPs.checkpoints)) }) broker.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(false, nil).Times(4) t.Run("drop channel cp fail", func(t *testing.T) { catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(errors.New("mock error")).Twice() - gc.recycleChannelCPMeta(context.TODO()) + gc.recycleChannelCPMeta(context.TODO(), nil) assert.Equal(t, 3, len(m.channelCPs.checkpoints)) }) t.Run("channel cp gc ok", func(t *testing.T) { catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Twice() - gc.recycleChannelCPMeta(context.TODO()) + gc.recycleChannelCPMeta(context.TODO(), nil) assert.Equal(t, 1, len(m.channelCPs.checkpoints)) }) } @@ -1629,10 +1633,10 @@ func (s *GarbageCollectorSuite) TestPauseResume() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := gc.Pause(ctx, time.Second) + err := gc.Pause(ctx, -1, "", time.Second) s.NoError(err) - err = gc.Resume(ctx) + err = gc.Resume(ctx, -1, "") s.Error(err) }) @@ -1650,15 +1654,15 @@ func (s *GarbageCollectorSuite) TestPauseResume() { defer gc.close() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := gc.Pause(ctx, time.Minute) + err := gc.Pause(ctx, -1, "", time.Minute) s.NoError(err) - s.NotZero(gc.pauseUntil.Load()) + s.NotZero(gc.pauseUntil.PauseUntil()) - err = gc.Resume(ctx) + err = gc.Resume(ctx, -1, "") s.NoError(err) - s.Zero(gc.pauseUntil.Load()) + s.Zero(gc.pauseUntil.PauseUntil()) }) s.Run("pause_before_until", func() { @@ -1675,16 +1679,16 @@ func (s *GarbageCollectorSuite) TestPauseResume() { defer gc.close() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := gc.Pause(ctx, time.Minute) + err := gc.Pause(ctx, -1, "", time.Minute) s.NoError(err) - until := gc.pauseUntil.Load() + until := gc.pauseUntil.PauseUntil() s.NotZero(until) - err = gc.Pause(ctx, time.Second) + err = gc.Pause(ctx, -1, "", time.Second) s.NoError(err) - second := gc.pauseUntil.Load() + second := gc.pauseUntil.PauseUntil() s.Equal(until, second) }) @@ -1701,15 +1705,62 @@ func (s *GarbageCollectorSuite) TestPauseResume() { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - err := gc.Pause(ctx, time.Minute) + err := gc.Pause(ctx, -1, "", time.Minute) s.Error(err) - s.Zero(gc.pauseUntil.Load()) + s.Zero(gc.pauseUntil.PauseUntil()) - err = gc.Resume(ctx) + err = gc.Resume(ctx, -1, "") s.Error(err) - s.Zero(gc.pauseUntil.Load()) + s.Zero(gc.pauseUntil.PauseUntil()) + }) + + s.Run("pause_collection", func() { + gc := newGarbageCollector(s.meta, newMockHandler(), GcOption{ + cli: s.cli, + enabled: true, + checkInterval: time.Millisecond * 10, + scanInterval: time.Hour * 7 * 24, + missingTolerance: time.Hour * 24, + dropTolerance: time.Hour * 24, + }) + + gc.start() + defer gc.close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ticket := uuid.NewString() + err := gc.Pause(ctx, 100, ticket, time.Minute) + s.NoError(err) + + until, has := gc.pausedCollection.Get(100) + firstPauseUntil := until.PauseUntil() + s.True(has) + s.NotZero(firstPauseUntil) + + ticket2 := uuid.NewString() + err = gc.Pause(ctx, 100, ticket2, time.Second*30) + s.NoError(err) + + second, has := gc.pausedCollection.Get(100) + secondPauseUntil := second.PauseUntil() + s.True(has) + + s.Equal(firstPauseUntil, secondPauseUntil) + + err = gc.Resume(ctx, 100, ticket2) + s.NoError(err) + + afterResume, has := gc.pausedCollection.Get(100) + s.True(has) + afterUntil := afterResume.PauseUntil() + s.Equal(firstPauseUntil, afterUntil) + + err = gc.Resume(ctx, 100, ticket) + + _, has = gc.pausedCollection.Get(100) + s.False(has) }) } @@ -1727,7 +1778,7 @@ func (s *GarbageCollectorSuite) TestRunRecycleTaskWithPauser() { defer cancel() cnt := 0 - gc.runRecycleTaskWithPauser(ctx, "test", time.Second, func(ctx context.Context) { + gc.runRecycleTaskWithPauser(ctx, "test", time.Second, func(ctx context.Context, signal <-chan gcCmd) { cnt++ }) s.Equal(cnt, 2) @@ -1753,7 +1804,8 @@ func (s *GarbageCollectorSuite) TestAvoidGCLoadedSegments() { }, }) - gc.recycleDroppedSegments(context.TODO()) + signal := make(chan gcCmd) + gc.recycleDroppedSegments(context.TODO(), signal) seg := s.meta.GetSegment(context.TODO(), 1) s.NotNil(seg) } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 6ba8b8681e..5fb2465fcf 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1804,13 +1804,25 @@ func (s *Server) GcControl(ctx context.Context, request *datapb.GcControlRequest status.Reason = fmt.Sprintf("pause duration not valid, %s", err.Error()) return status, nil } - if err := s.garbageCollector.Pause(ctx, time.Duration(pauseSeconds)*time.Second); err != nil { + + collectionID, err, _ := common.GetInt64Value(request.GetParams(), "collection_id") + if err != nil { + return merr.Status(err), nil + } + ticket, _ := common.GetStringValue(request.GetParams(), "ticket") + + if err := s.garbageCollector.Pause(ctx, collectionID, ticket, time.Duration(pauseSeconds)*time.Second); err != nil { status.ErrorCode = commonpb.ErrorCode_UnexpectedError status.Reason = fmt.Sprintf("failed to pause gc, %s", err.Error()) return status, nil } case datapb.GcCommand_Resume: - if err := s.garbageCollector.Resume(ctx); err != nil { + collectionID, err, _ := common.GetInt64Value(request.GetParams(), "collection_id") + if err != nil { + return merr.Status(err), nil + } + ticket, _ := common.GetStringValue(request.GetParams(), "ticket") + if err := s.garbageCollector.Resume(ctx, collectionID, ticket); err != nil { status.ErrorCode = commonpb.ErrorCode_UnexpectedError status.Reason = fmt.Sprintf("failed to pause gc, %s", err.Error()) return status, nil diff --git a/internal/proxy/management.go b/internal/proxy/management.go index 12c3e8a706..3d6bd46f4b 100644 --- a/internal/proxy/management.go +++ b/internal/proxy/management.go @@ -17,11 +17,13 @@ package proxy import ( + "encoding/base64" "fmt" "net/http" "strconv" "sync" + "github.com/google/uuid" "github.com/samber/lo" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -89,15 +91,54 @@ func RegisterMgrRoute(proxy *Proxy) { }) } +// EncodeTicket encodes the ticket with token and collectionID +func EncodeTicket(token string, collectionID string) string { + if collectionID == "" { + collectionID = "-1" + } + m := map[string]string{ + "token": token, + "collection_id": collectionID, + } + bytes, _ := json.Marshal(m) + ticket := base64.StdEncoding.EncodeToString(bytes) + return ticket +} + +// DecodeTicket decodes the ticket to get token and collectionID +func DecodeTicket(ticket string) (string, string, error) { + bytes, err := base64.StdEncoding.DecodeString(ticket) + if err != nil { + return "", "", err + } + m := make(map[string]string) + err = json.Unmarshal(bytes, &m) + if err != nil { + return "", "", err + } + return m["token"], m["collection_id"], nil +} + func (node *Proxy) PauseDatacoordGC(w http.ResponseWriter, req *http.Request) { pauseSeconds := req.URL.Query().Get("pause_seconds") + // generate ticket for request + token := uuid.New().String() + ticket := EncodeTicket(token, req.URL.Query().Get("collection_id")) + params := []*commonpb.KeyValuePair{ + {Key: "duration", Value: pauseSeconds}, + {Key: "ticket", Value: ticket}, + } + if req.URL.Query().Has("collection_id") { + params = append(params, &commonpb.KeyValuePair{ + Key: "collection_id", + Value: req.URL.Query().Get("collection_id"), + }) + } resp, err := node.mixCoord.GcControl(req.Context(), &datapb.GcControlRequest{ Base: commonpbutil.NewMsgBase(), Command: datapb.GcCommand_Pause, - Params: []*commonpb.KeyValuePair{ - {Key: "duration", Value: pauseSeconds}, - }, + Params: params, }) if err != nil { w.WriteHeader(http.StatusInternalServerError) @@ -110,22 +151,40 @@ func (node *Proxy) PauseDatacoordGC(w http.ResponseWriter, req *http.Request) { return } w.WriteHeader(http.StatusOK) - w.Write([]byte(`{"msg": "OK"}`)) + fmt.Fprintf(w, `{"msg": "OK", "ticket": "%s"}`, ticket) } func (node *Proxy) ResumeDatacoordGC(w http.ResponseWriter, req *http.Request) { + ticket := req.URL.Query().Get("ticket") + var collectionID string + var err error + // allow empty ticket for backward compatibility + if ticket != "" { + _, collectionID, err = DecodeTicket(ticket) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(fmt.Sprintf(`{"msg": "failed to decode ticket, %s"}`, err.Error()))) + return + } + } + params := []*commonpb.KeyValuePair{ + {Key: "ticket", Value: req.URL.Query().Get("ticket")}, + {Key: "collection_id", Value: collectionID}, + } + resp, err := node.mixCoord.GcControl(req.Context(), &datapb.GcControlRequest{ Base: commonpbutil.NewMsgBase(), Command: datapb.GcCommand_Resume, + Params: params, }) if err != nil { w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(fmt.Sprintf(`{"msg": "failed to pause garbage collection, %s"}`, err.Error()))) + w.Write([]byte(fmt.Sprintf(`{"msg": "failed to resume garbage collection, %s"}`, err.Error()))) return } if resp.GetErrorCode() != commonpb.ErrorCode_Success { w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(fmt.Sprintf(`{"msg": "failed to pause garbage collection, %s"}`, resp.GetReason()))) + w.Write([]byte(fmt.Sprintf(`{"msg": "failed to resume garbage collection, %s"}`, resp.GetReason()))) return } w.WriteHeader(http.StatusOK) diff --git a/internal/proxy/management_test.go b/internal/proxy/management_test.go index ddb7ac7d0a..f326595a02 100644 --- a/internal/proxy/management_test.go +++ b/internal/proxy/management_test.go @@ -64,7 +64,7 @@ func (s *ProxyManagementSuite) TestPauseDataCoordGC() { return &commonpb.Status{}, nil }) - req, err := http.NewRequest(http.MethodGet, management.RouteGcPause+"?pause_seconds=60", nil) + req, err := http.NewRequest(http.MethodGet, management.RouteGcPause+"?pause_seconds=60&collection_id=100", nil) s.Require().NoError(err) recorder := httptest.NewRecorder() @@ -80,7 +80,7 @@ func (s *ProxyManagementSuite) TestPauseDataCoordGC() { return &commonpb.Status{}, errors.New("mock") }) - req, err := http.NewRequest(http.MethodGet, management.RouteGcPause+"?pause_seconds=60", nil) + req, err := http.NewRequest(http.MethodGet, management.RouteGcPause+"?pause_seconds=60&collection_id=100", nil) s.Require().NoError(err) recorder := httptest.NewRecorder() @@ -118,7 +118,7 @@ func (s *ProxyManagementSuite) TestResumeDatacoordGC() { return &commonpb.Status{}, nil }) - req, err := http.NewRequest(http.MethodGet, management.RouteGcResume, nil) + req, err := http.NewRequest(http.MethodGet, management.RouteGcResume+"?collection_id=100", nil) s.Require().NoError(err) recorder := httptest.NewRecorder() diff --git a/pkg/common/common.go b/pkg/common/common.go index 21616dab8c..ca1d937809 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -606,6 +606,31 @@ func IsAllowInsertAutoID(kvs ...*commonpb.KeyValuePair) (bool, bool) { return false, false } +func GetInt64Value(kvs []*commonpb.KeyValuePair, key string) (result int64, parseErr error, exist bool) { + kv := lo.FindOrElse(kvs, nil, func(kv *commonpb.KeyValuePair) bool { + return kv.GetKey() == key + }) + if kv == nil { + return 0, nil, false + } + + result, err := strconv.ParseInt(kv.GetValue(), 10, 64) + if err != nil { + return 0, err, true + } + return result, nil, true +} + +func GetStringValue(kvs []*commonpb.KeyValuePair, key string) (result string, exist bool) { + kv := lo.FindOrElse(kvs, nil, func(kv *commonpb.KeyValuePair) bool { + return kv.GetKey() == key + }) + if kv == nil { + return "", false + } + return kv.GetValue(), true +} + func CheckNamespace(schema *schemapb.CollectionSchema, namespace *string) error { enabled, _, err := ParseNamespaceProp(schema.Properties...) if err != nil {