enhance: support pause GC at collection level (#45943)

Add collection-level granularity to the garbage collector pause/resume
mechanism. Previously, GC pause affected all collections globally. Now
operators can pause GC for specific collections while allowing other
collections to continue normal GC operations.

Changes:
- Add `pausedCollection` concurrent map to track per-collection pause
state
- Extend `Pause()` and `Resume()` methods with `collectionID` parameter
- Add `collectionGCPaused()` helper to check collection pause status
- Skip dropped segment recycling when collection GC is paused
- Update management API to accept optional `collection_id` query
parameter
- Add `GetInt64Value()` utility function for parsing int64 from KV pairs
- Maintain backward compatibility: collectionID <= 0 triggers global
pause

This provides DevOps with finer control over Milvus data lifecycle.

issue: #45941

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-12-08 19:33:15 +08:00 committed by GitHub
parent 4fe41ff14d
commit a042a6e1e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 477 additions and 93 deletions

View File

@ -69,19 +69,114 @@ type garbageCollector struct {
meta *meta
handler Handler
startOnce sync.Once
stopOnce sync.Once
wg sync.WaitGroup
cmdCh chan gcCmd
pauseUntil atomic.Time
startOnce sync.Once
stopOnce sync.Once
wg sync.WaitGroup
cmdCh chan gcCmd
pauseUntil *gcPauseRecords
pausedCollection *typeutil.ConcurrentMap[int64, *gcPauseRecords]
controlChannels map[string]chan gcCmd
systemMetricsListener *hardware.SystemMetricsListener
}
type gcCmd struct {
cmdType datapb.GcCommand
duration time.Duration
done chan struct{}
cmdType datapb.GcCommand
duration time.Duration
collectionID int64
ticket string
done chan error
timeout <-chan struct{}
}
type gcPauseRecord struct {
ticket string
pauseUntil time.Time
}
type gcPauseRecords struct {
mut sync.RWMutex
maxLen int
records typeutil.Heap[gcPauseRecord]
}
func (gc *gcPauseRecords) PauseUntil() time.Time {
// nil protection
if gc == nil {
return time.Time{}
}
gc.mut.RLock()
defer gc.mut.RUnlock()
// no pause records, return zero value
if gc.records.Len() == 0 {
return time.Time{}
}
return gc.records.Peek().pauseUntil
}
func (gc *gcPauseRecords) Insert(ticket string, pauseUntil time.Time) error {
gc.mut.Lock()
defer gc.mut.Unlock()
// heap small enough, short path
if gc.records.Len() < gc.maxLen {
gc.records.Push(gcPauseRecord{
ticket: ticket,
pauseUntil: pauseUntil,
})
return nil
}
records := make([]gcPauseRecord, 0, gc.records.Len())
now := time.Now()
for gc.records.Len() > 0 {
record := gc.records.Pop()
if record.pauseUntil.After(now) {
records = append(records, record)
}
}
if gc.records.Len() < gc.maxLen {
gc.records.Push(gcPauseRecord{
ticket: ticket,
pauseUntil: pauseUntil,
})
}
// too many pause records, refresh heap
return merr.WrapErrTooManyRequests(64, "too many pause records")
}
func (gc *gcPauseRecords) Delete(ticket string) {
gc.mut.Lock()
defer gc.mut.Unlock()
now := time.Now()
records := make([]gcPauseRecord, 0, gc.records.Len())
for gc.records.Len() > 0 {
record := gc.records.Pop()
if now.Before(record.pauseUntil) && record.ticket != ticket {
records = append(records, record)
}
}
gc.records = typeutil.NewObjectArrayBasedMaximumHeap(records, func(r gcPauseRecord) int64 {
return r.pauseUntil.UnixNano()
})
}
func (gc *gcPauseRecords) Len() int {
gc.mut.RLock()
defer gc.mut.RUnlock()
return gc.records.Len()
}
func NewGCPauseRecords() *gcPauseRecords {
return &gcPauseRecords{
records: typeutil.NewObjectArrayBasedMaximumHeap[gcPauseRecord, int64]([]gcPauseRecord{}, func(r gcPauseRecord) int64 {
return r.pauseUntil.UnixNano()
}),
maxLen: 64,
}
}
// newSystemMetricsListener creates a system metrics listener for garbage collector.
@ -120,6 +215,13 @@ func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageColl
zap.Duration("dropTolerance", opt.dropTolerance))
opt.removeObjectPool = conc.NewPool[struct{}](Params.DataCoordCfg.GCRemoveConcurrent.GetAsInt(), conc.WithExpiryDuration(time.Minute))
ctx, cancel := context.WithCancel(context.Background())
metaSignal := make(chan gcCmd)
orphanSignal := make(chan gcCmd)
// control signal channels
controlChannels := map[string]chan gcCmd{
"meta": metaSignal,
"orphan": orphanSignal,
}
return &garbageCollector{
ctx: ctx,
cancel: cancel,
@ -128,6 +230,9 @@ func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageColl
option: opt,
cmdCh: make(chan gcCmd),
systemMetricsListener: newSystemMetricsListener(&opt),
pauseUntil: NewGCPauseRecords(),
pausedCollection: typeutil.NewConcurrentMap[int64, *gcPauseRecords](),
controlChannels: controlChannels,
}
}
@ -152,7 +257,7 @@ type GcStatus struct {
// GetStatus returns the current status of the garbage collector.
func (gc *garbageCollector) GetStatus() GcStatus {
pauseUntil := gc.pauseUntil.Load()
pauseUntil := gc.pauseUntil.PauseUntil()
now := time.Now()
if now.Before(pauseUntil) {
@ -168,35 +273,40 @@ func (gc *garbageCollector) GetStatus() GcStatus {
}
}
func (gc *garbageCollector) Pause(ctx context.Context, pauseDuration time.Duration) error {
func (gc *garbageCollector) Pause(ctx context.Context, collectionID int64, ticket string, pauseDuration time.Duration) error {
if !gc.option.enabled {
log.Info("garbage collection not enabled")
return nil
}
done := make(chan struct{})
done := make(chan error, 1)
select {
case gc.cmdCh <- gcCmd{
cmdType: datapb.GcCommand_Pause,
duration: pauseDuration,
done: done,
cmdType: datapb.GcCommand_Pause,
duration: pauseDuration,
collectionID: collectionID,
ticket: ticket,
done: done,
timeout: ctx.Done(),
}:
<-done
return nil
return <-done
case <-ctx.Done():
return ctx.Err()
}
}
func (gc *garbageCollector) Resume(ctx context.Context) error {
func (gc *garbageCollector) Resume(ctx context.Context, collectionID int64, ticket string) error {
if !gc.option.enabled {
log.Warn("garbage collection not enabled, cannot resume")
return merr.WrapErrServiceUnavailable("garbage collection not enabled")
}
done := make(chan struct{})
done := make(chan error)
select {
case gc.cmdCh <- gcCmd{
cmdType: datapb.GcCommand_Resume,
done: done,
cmdType: datapb.GcCommand_Resume,
done: done,
collectionID: collectionID,
ticket: ticket,
timeout: ctx.Done(),
}:
<-done
return nil
@ -212,20 +322,21 @@ func (gc *garbageCollector) work(ctx context.Context) {
gc.wg.Add(3)
go func() {
defer gc.wg.Done()
gc.runRecycleTaskWithPauser(ctx, "meta", gc.option.checkInterval, func(ctx context.Context) {
gc.recycleDroppedSegments(ctx)
gc.recycleChannelCPMeta(ctx)
gc.recycleUnusedIndexes(ctx)
gc.recycleUnusedSegIndexes(ctx)
gc.recycleUnusedAnalyzeFiles(ctx)
gc.recycleUnusedTextIndexFiles(ctx)
gc.recycleUnusedJSONIndexFiles(ctx)
gc.recycleUnusedJSONStatsFiles(ctx)
gc.runRecycleTaskWithPauser(ctx, "meta", gc.option.checkInterval, func(ctx context.Context, signal <-chan gcCmd) {
gc.recycleDroppedSegments(ctx, signal)
gc.recycleChannelCPMeta(ctx, signal)
gc.recycleUnusedIndexes(ctx, signal)
gc.recycleUnusedSegIndexes(ctx, signal)
gc.recycleUnusedAnalyzeFiles(ctx, signal)
gc.recycleUnusedTextIndexFiles(ctx, signal)
gc.recycleUnusedJSONIndexFiles(ctx, signal)
gc.recycleUnusedJSONStatsFiles(ctx, signal)
})
}()
go func() {
defer gc.wg.Done()
gc.runRecycleTaskWithPauser(ctx, "orphan", gc.option.scanInterval, func(ctx context.Context) {
gc.runRecycleTaskWithPauser(ctx, "orphan", gc.option.scanInterval, func(ctx context.Context, signal <-chan gcCmd) {
// orphan file not controlled by collection level pause for now
gc.recycleUnusedBinlogFiles(ctx)
gc.recycleUnusedIndexFiles(ctx)
})
@ -236,6 +347,16 @@ func (gc *garbageCollector) work(ctx context.Context) {
}()
}
func (gc *garbageCollector) ackSignal(signal <-chan gcCmd) {
select {
case cmd := <-signal:
if cmd.done != nil {
close(cmd.done)
}
default:
}
}
// startControlLoop start a control loop for garbageCollector.
func (gc *garbageCollector) startControlLoop(_ context.Context) {
hardware.RegisterSystemMetricsListener(gc.systemMetricsListener)
@ -246,17 +367,10 @@ func (gc *garbageCollector) startControlLoop(_ context.Context) {
case cmd := <-gc.cmdCh:
switch cmd.cmdType {
case datapb.GcCommand_Pause:
pauseUntil := time.Now().Add(cmd.duration)
if pauseUntil.After(gc.pauseUntil.Load()) {
log.Info("garbage collection paused", zap.Duration("duration", cmd.duration), zap.Time("pauseUntil", pauseUntil))
gc.pauseUntil.Store(pauseUntil)
} else {
log.Info("new pause until before current value", zap.Duration("duration", cmd.duration), zap.Time("pauseUntil", pauseUntil), zap.Time("oldPauseUntil", gc.pauseUntil.Load()))
}
err := gc.pause(cmd)
cmd.done <- err
case datapb.GcCommand_Resume:
// reset to zero value
gc.pauseUntil.Store(time.Time{})
log.Info("garbage collection resumed")
gc.resume(cmd)
}
close(cmd.done)
case <-gc.ctx.Done():
@ -266,29 +380,102 @@ func (gc *garbageCollector) startControlLoop(_ context.Context) {
}
}
func (gc *garbageCollector) pause(cmd gcCmd) error {
log := log.With(
zap.Int64("collectionID", cmd.collectionID),
zap.String("ticket", cmd.ticket),
)
reqPauseUntil := time.Now().Add(cmd.duration)
log = log.With(
zap.Time("pauseUntil", reqPauseUntil),
zap.Duration("duration", cmd.duration),
)
var err error
if cmd.collectionID <= 0 { // legacy pause all
err = gc.pauseUntil.Insert(cmd.ticket, reqPauseUntil)
log.Info("global pause ticket recorded")
} else {
curr, has := gc.pausedCollection.Get(cmd.collectionID)
if !has {
curr = NewGCPauseRecords()
gc.pausedCollection.Insert(cmd.collectionID, curr)
}
err = curr.Insert(cmd.ticket, reqPauseUntil)
log.Info("collection new pause ticket recorded")
}
if err != nil {
return err
}
signalCh := gc.controlChannels["meta"]
// send signal to worker
// make sure worker ack the pause command before returning
signal := gcCmd{
done: make(chan error),
timeout: cmd.timeout,
}
select {
case signalCh <- signal:
<-signal.done
case <-cmd.timeout:
// timeout, resume the pause
gc.resume(cmd)
}
return nil
}
func (gc *garbageCollector) resume(cmd gcCmd) {
// reset to zero value
var afterResume time.Time
if cmd.collectionID <= 0 {
gc.pauseUntil.Delete(cmd.ticket)
afterResume = gc.pauseUntil.PauseUntil()
} else {
curr, has := gc.pausedCollection.Get(cmd.collectionID)
if has {
curr.Delete(cmd.ticket)
afterResume = curr.PauseUntil()
if curr.Len() == 0 || time.Now().After(afterResume) {
gc.pausedCollection.Remove(cmd.collectionID)
}
}
}
stillPaused := time.Now().Before(afterResume)
log.Info("garbage collection resumed", zap.Bool("stillPaused", stillPaused))
}
// runRecycleTaskWithPauser is a helper function to create a task with pauser
func (gc *garbageCollector) runRecycleTaskWithPauser(ctx context.Context, name string, interval time.Duration, task func(ctx context.Context)) {
func (gc *garbageCollector) runRecycleTaskWithPauser(ctx context.Context, name string, interval time.Duration, task func(ctx context.Context, signal <-chan gcCmd)) {
logger := log.With(zap.String("gcType", name)).With(zap.Duration("interval", interval))
timer := time.NewTicker(interval)
defer timer.Stop()
// get signal channel, ok if nil, means no control
signal := gc.controlChannels[name]
for {
select {
case <-ctx.Done():
return
case cmd := <-signal:
// notify signal received
close(cmd.done)
case <-timer.C:
if time.Now().Before(gc.pauseUntil.Load()) {
logger.Info("garbage collector paused", zap.Time("until", gc.pauseUntil.Load()))
globalPauseUntil := gc.pauseUntil.PauseUntil()
if time.Now().Before(globalPauseUntil) {
logger.Info("garbage collector paused", zap.Time("until", globalPauseUntil))
continue
}
logger.Info("garbage collector recycle task start...")
start := time.Now()
task(ctx)
task(ctx, signal)
logger.Info("garbage collector recycle task done", zap.Duration("timeCost", time.Since(start)))
}
}
}
func (gc *garbageCollector) collectionGCPaused(collectionID int64) bool {
collPauseUntil, has := gc.pausedCollection.Get(collectionID)
return has && time.Now().Before(collPauseUntil.PauseUntil())
}
// close stop the garbage collector.
func (gc *garbageCollector) close() {
gc.stopOnce.Do(func() {
@ -474,7 +661,7 @@ func (gc *garbageCollector) checkDroppedSegmentGC(segment *SegmentInfo,
}
// recycleDroppedSegments scans all segments and remove those dropped segments from meta and oss.
func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) {
func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context, signal <-chan gcCmd) {
start := time.Now()
log := log.With(zap.String("gcName", "recycleDroppedSegments"), zap.Time("startAt", start))
log.Info("start clear dropped segments...")
@ -535,6 +722,13 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) {
return
}
gc.ackSignal(signal)
if gc.collectionGCPaused(segment.GetCollectionID()) {
log.Info("skip GC segment since collection is paused", zap.Int64("segmentID", segmentID), zap.Int64("collectionID", segment.GetCollectionID()))
continue
}
log := log.With(zap.Int64("segmentID", segmentID))
segInsertChannel := segment.GetInsertChannel()
if loadedSegments.Contain(segmentID) {
@ -579,7 +773,7 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) {
}
}
func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) {
func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context, signal <-chan gcCmd) {
log := log.Ctx(ctx)
channelCPs, err := gc.meta.catalog.ListChannelCheckpoint(ctx)
if err != nil {
@ -593,6 +787,11 @@ func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) {
log.Info("start to GC channel cp", zap.Int("vchannelCPCnt", len(channelCPs)))
for vChannel := range channelCPs {
collectionID := funcutil.GetCollectionIDFromVChannel(vChannel)
if gc.collectionGCPaused(collectionID) {
continue
}
gc.ackSignal(signal)
// !!! Skip to GC if vChannel format is illegal, it will lead meta leak in this case
if collectionID == -1 {
@ -718,7 +917,7 @@ func (gc *garbageCollector) removeObjectFiles(ctx context.Context, filePaths map
}
// recycleUnusedIndexes is used to delete those indexes that is deleted by collection.
func (gc *garbageCollector) recycleUnusedIndexes(ctx context.Context) {
func (gc *garbageCollector) recycleUnusedIndexes(ctx context.Context, signal <-chan gcCmd) {
start := time.Now()
log := log.Ctx(ctx).With(zap.String("gcName", "recycleUnusedIndexes"), zap.Time("startAt", start))
log.Info("start recycleUnusedIndexes...")
@ -730,6 +929,10 @@ func (gc *garbageCollector) recycleUnusedIndexes(ctx context.Context) {
// process canceled.
return
}
if gc.collectionGCPaused(index.CollectionID) {
continue
}
gc.ackSignal(signal)
log := log.With(zap.Int64("collectionID", index.CollectionID), zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID))
if err := gc.meta.indexMeta.RemoveIndex(ctx, index.CollectionID, index.IndexID); err != nil {
@ -741,7 +944,7 @@ func (gc *garbageCollector) recycleUnusedIndexes(ctx context.Context) {
}
// recycleUnusedSegIndexes remove the index of segment if index is deleted or segment itself is deleted.
func (gc *garbageCollector) recycleUnusedSegIndexes(ctx context.Context) {
func (gc *garbageCollector) recycleUnusedSegIndexes(ctx context.Context, signal <-chan gcCmd) {
start := time.Now()
log := log.Ctx(ctx).With(zap.String("gcName", "recycleUnusedSegIndexes"), zap.Time("startAt", start))
log.Info("start recycleUnusedSegIndexes...")
@ -753,6 +956,10 @@ func (gc *garbageCollector) recycleUnusedSegIndexes(ctx context.Context) {
// process canceled.
return
}
if gc.collectionGCPaused(segIdx.CollectionID) {
continue
}
gc.ackSignal(signal)
// 1. segment belongs to is deleted.
// 2. index is deleted.
@ -883,7 +1090,7 @@ func (gc *garbageCollector) getAllIndexFilesOfIndex(segmentIndex *model.SegmentI
}
// recycleUnusedAnalyzeFiles is used to delete those analyze stats files that no longer exist in the meta.
func (gc *garbageCollector) recycleUnusedAnalyzeFiles(ctx context.Context) {
func (gc *garbageCollector) recycleUnusedAnalyzeFiles(ctx context.Context, signal <-chan gcCmd) {
log := log.Ctx(ctx)
log.Info("start recycleUnusedAnalyzeFiles")
startTs := time.Now()
@ -904,6 +1111,8 @@ func (gc *garbageCollector) recycleUnusedAnalyzeFiles(ctx context.Context) {
// process canceled
return
}
// collection gc pause not affect analyze file for now
gc.ackSignal(signal)
log.Debug("analyze keys", zap.String("key", key))
taskID, err := parseBuildIDFromFilePath(key)
@ -955,7 +1164,7 @@ func (gc *garbageCollector) recycleUnusedAnalyzeFiles(ctx context.Context) {
// recycleUnusedTextIndexFiles load meta file info and compares OSS keys
// if missing found, performs gc cleanup
func (gc *garbageCollector) recycleUnusedTextIndexFiles(ctx context.Context) {
func (gc *garbageCollector) recycleUnusedTextIndexFiles(ctx context.Context, signal <-chan gcCmd) {
start := time.Now()
log := log.Ctx(ctx).With(zap.String("gcName", "recycleUnusedTextIndexFiles"), zap.Time("startAt", start))
log.Info("start recycleUnusedTextIndexFiles...")
@ -968,6 +1177,15 @@ func (gc *garbageCollector) recycleUnusedTextIndexFiles(ctx context.Context) {
deletedFilesNum := atomic.NewInt32(0)
for _, seg := range hasTextIndexSegments {
if ctx.Err() != nil {
// process canceled, stop.
return
}
if gc.collectionGCPaused(seg.GetCollectionID()) {
log.Info("skip GC segment since collection is paused", zap.Int64("segmentID", seg.GetID()), zap.Int64("collectionID", seg.GetCollectionID()))
continue
}
gc.ackSignal(signal)
for _, fieldStats := range seg.GetTextStatsLogs() {
log := log.With(zap.Int64("segmentID", seg.GetID()), zap.Int64("fieldID", fieldStats.GetFieldID()))
// clear low version task
@ -1016,7 +1234,7 @@ func (gc *garbageCollector) recycleUnusedTextIndexFiles(ctx context.Context) {
// recycleUnusedJSONStatsFiles load meta file info and compares OSS keys
// if missing found, performs gc cleanup
func (gc *garbageCollector) recycleUnusedJSONStatsFiles(ctx context.Context) {
func (gc *garbageCollector) recycleUnusedJSONStatsFiles(ctx context.Context, signal <-chan gcCmd) {
start := time.Now()
log := log.Ctx(ctx).With(zap.String("gcName", "recycleUnusedJSONStatsFiles"), zap.Time("startAt", start))
log.Info("start recycleUnusedJSONStatsFiles...")
@ -1029,6 +1247,15 @@ func (gc *garbageCollector) recycleUnusedJSONStatsFiles(ctx context.Context) {
deletedFilesNum := atomic.NewInt32(0)
for _, seg := range hasJSONStatsSegments {
if ctx.Err() != nil {
// process canceled, stop.
return
}
if gc.collectionGCPaused(seg.GetCollectionID()) {
log.Info("skip GC segment since collection is paused", zap.Int64("segmentID", seg.GetID()), zap.Int64("collectionID", seg.GetCollectionID()))
continue
}
gc.ackSignal(signal)
for _, fieldStats := range seg.GetJsonKeyStats() {
log := log.With(zap.Int64("segmentID", seg.GetID()), zap.Int64("fieldID", fieldStats.GetFieldID()))
// clear low version task
@ -1114,7 +1341,7 @@ func (gc *garbageCollector) recycleUnusedJSONStatsFiles(ctx context.Context) {
}
// recycleUnusedJSONIndexFiles load meta file info and compares OSS keys
func (gc *garbageCollector) recycleUnusedJSONIndexFiles(ctx context.Context) {
func (gc *garbageCollector) recycleUnusedJSONIndexFiles(ctx context.Context, signal <-chan gcCmd) {
start := time.Now()
log := log.Ctx(ctx).With(zap.String("gcName", "recycleUnusedJSONIndexFiles"), zap.Time("startAt", start))
log.Info("start recycleUnusedJSONIndexFiles...")
@ -1127,6 +1354,15 @@ func (gc *garbageCollector) recycleUnusedJSONIndexFiles(ctx context.Context) {
deletedFilesNum := atomic.NewInt32(0)
for _, seg := range hasJSONIndexSegments {
if ctx.Err() != nil {
// process canceled, stop.
return
}
if gc.collectionGCPaused(seg.GetCollectionID()) {
log.Info("skip GC segment since collection is paused", zap.Int64("segmentID", seg.GetID()), zap.Int64("collectionID", seg.GetCollectionID()))
continue
}
gc.ackSignal(signal)
for _, fieldStats := range seg.GetJsonKeyStats() {
log := log.With(zap.Int64("segmentID", seg.GetID()), zap.Int64("fieldID", fieldStats.GetFieldID()))
// clear low version task

View File

@ -29,6 +29,7 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/google/uuid"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/stretchr/testify/assert"
@ -205,7 +206,8 @@ func Test_garbageCollector_scan(t *testing.T) {
missingTolerance: time.Hour * 24,
dropTolerance: 0,
})
gc.recycleDroppedSegments(context.TODO())
signal := make(chan gcCmd)
gc.recycleDroppedSegments(context.TODO(), signal)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentInsertLogPath), inserts[1:])
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentStatslogPath), stats[1:])
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentDeltaLogPath), delta[1:])
@ -224,7 +226,8 @@ func Test_garbageCollector_scan(t *testing.T) {
})
gc.start()
gc.recycleUnusedBinlogFiles(context.TODO())
gc.recycleDroppedSegments(context.TODO())
signal := make(chan gcCmd)
gc.recycleDroppedSegments(context.TODO(), signal)
// bad path shall remains since datacoord cannot determine file is garbage or not if path is not valid
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentInsertLogPath), inserts[1:2])
@ -431,7 +434,7 @@ func TestGarbageCollector_recycleUnusedIndexes(t *testing.T) {
mock.Anything,
).Return(nil)
gc := newGarbageCollector(createMetaForRecycleUnusedIndexes(catalog), nil, GcOption{})
gc.recycleUnusedIndexes(context.TODO())
gc.recycleUnusedIndexes(context.TODO(), nil)
})
t.Run("fail", func(t *testing.T) {
@ -442,7 +445,7 @@ func TestGarbageCollector_recycleUnusedIndexes(t *testing.T) {
mock.Anything,
).Return(errors.New("fail"))
gc := newGarbageCollector(createMetaForRecycleUnusedIndexes(catalog), nil, GcOption{})
gc.recycleUnusedIndexes(context.TODO())
gc.recycleUnusedIndexes(context.TODO(), nil)
})
}
@ -588,7 +591,7 @@ func TestGarbageCollector_recycleUnusedSegIndexes(t *testing.T) {
gc := newGarbageCollector(createMetaForRecycleUnusedSegIndexes(catalog), nil, GcOption{
cli: mockChunkManager,
})
gc.recycleUnusedSegIndexes(context.TODO())
gc.recycleUnusedSegIndexes(context.TODO(), nil)
})
t.Run("fail", func(t *testing.T) {
@ -606,7 +609,7 @@ func TestGarbageCollector_recycleUnusedSegIndexes(t *testing.T) {
gc := newGarbageCollector(createMetaForRecycleUnusedSegIndexes(catalog), nil, GcOption{
cli: mockChunkManager,
})
gc.recycleUnusedSegIndexes(context.TODO())
gc.recycleUnusedSegIndexes(context.TODO(), nil)
})
}
@ -1386,6 +1389,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
cm := &mocks.ChunkManager{}
cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(nil)
signal := make(chan gcCmd)
gc := newGarbageCollector(
m,
newMockHandlerWithMeta(m),
@ -1393,7 +1397,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
cli: cm,
dropTolerance: 1,
})
gc.recycleDroppedSegments(context.TODO())
gc.recycleDroppedSegments(context.TODO(), signal)
/*
A B
@ -1451,7 +1455,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
})
assert.NoError(t, err)
gc.recycleDroppedSegments(context.TODO())
gc.recycleDroppedSegments(context.TODO(), signal)
/*
A: processed prior to C, C is not GCed yet and C is not indexed, A is not GCed in this turn
@ -1467,7 +1471,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
segD = gc.meta.GetSegment(context.TODO(), segID+3)
assert.Nil(t, segD)
gc.recycleDroppedSegments(context.TODO())
gc.recycleDroppedSegments(context.TODO(), signal)
/*
A: compacted became false due to C is GCed already, A should be GCed since dropTolernace is meet
B: compacted became false due to C is GCed already, B should be GCed since dropTolerance is meet
@ -1499,7 +1503,7 @@ func TestGarbageCollector_recycleChannelMeta(t *testing.T) {
t.Run("list channel cp fail", func(t *testing.T) {
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, errors.New("mock error")).Once()
gc.recycleChannelCPMeta(context.TODO())
gc.recycleChannelCPMeta(context.TODO(), nil)
assert.Equal(t, 3, len(m.channelCPs.checkpoints))
})
@ -1520,20 +1524,20 @@ func TestGarbageCollector_recycleChannelMeta(t *testing.T) {
}).Maybe()
t.Run("skip drop channel due to collection is available", func(t *testing.T) {
gc.recycleChannelCPMeta(context.TODO())
gc.recycleChannelCPMeta(context.TODO(), nil)
assert.Equal(t, 3, len(m.channelCPs.checkpoints))
})
broker.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(false, nil).Times(4)
t.Run("drop channel cp fail", func(t *testing.T) {
catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(errors.New("mock error")).Twice()
gc.recycleChannelCPMeta(context.TODO())
gc.recycleChannelCPMeta(context.TODO(), nil)
assert.Equal(t, 3, len(m.channelCPs.checkpoints))
})
t.Run("channel cp gc ok", func(t *testing.T) {
catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Twice()
gc.recycleChannelCPMeta(context.TODO())
gc.recycleChannelCPMeta(context.TODO(), nil)
assert.Equal(t, 1, len(m.channelCPs.checkpoints))
})
}
@ -1629,10 +1633,10 @@ func (s *GarbageCollectorSuite) TestPauseResume() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := gc.Pause(ctx, time.Second)
err := gc.Pause(ctx, -1, "", time.Second)
s.NoError(err)
err = gc.Resume(ctx)
err = gc.Resume(ctx, -1, "")
s.Error(err)
})
@ -1650,15 +1654,15 @@ func (s *GarbageCollectorSuite) TestPauseResume() {
defer gc.close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := gc.Pause(ctx, time.Minute)
err := gc.Pause(ctx, -1, "", time.Minute)
s.NoError(err)
s.NotZero(gc.pauseUntil.Load())
s.NotZero(gc.pauseUntil.PauseUntil())
err = gc.Resume(ctx)
err = gc.Resume(ctx, -1, "")
s.NoError(err)
s.Zero(gc.pauseUntil.Load())
s.Zero(gc.pauseUntil.PauseUntil())
})
s.Run("pause_before_until", func() {
@ -1675,16 +1679,16 @@ func (s *GarbageCollectorSuite) TestPauseResume() {
defer gc.close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := gc.Pause(ctx, time.Minute)
err := gc.Pause(ctx, -1, "", time.Minute)
s.NoError(err)
until := gc.pauseUntil.Load()
until := gc.pauseUntil.PauseUntil()
s.NotZero(until)
err = gc.Pause(ctx, time.Second)
err = gc.Pause(ctx, -1, "", time.Second)
s.NoError(err)
second := gc.pauseUntil.Load()
second := gc.pauseUntil.PauseUntil()
s.Equal(until, second)
})
@ -1701,15 +1705,62 @@ func (s *GarbageCollectorSuite) TestPauseResume() {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
err := gc.Pause(ctx, time.Minute)
err := gc.Pause(ctx, -1, "", time.Minute)
s.Error(err)
s.Zero(gc.pauseUntil.Load())
s.Zero(gc.pauseUntil.PauseUntil())
err = gc.Resume(ctx)
err = gc.Resume(ctx, -1, "")
s.Error(err)
s.Zero(gc.pauseUntil.Load())
s.Zero(gc.pauseUntil.PauseUntil())
})
s.Run("pause_collection", func() {
gc := newGarbageCollector(s.meta, newMockHandler(), GcOption{
cli: s.cli,
enabled: true,
checkInterval: time.Millisecond * 10,
scanInterval: time.Hour * 7 * 24,
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
})
gc.start()
defer gc.close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ticket := uuid.NewString()
err := gc.Pause(ctx, 100, ticket, time.Minute)
s.NoError(err)
until, has := gc.pausedCollection.Get(100)
firstPauseUntil := until.PauseUntil()
s.True(has)
s.NotZero(firstPauseUntil)
ticket2 := uuid.NewString()
err = gc.Pause(ctx, 100, ticket2, time.Second*30)
s.NoError(err)
second, has := gc.pausedCollection.Get(100)
secondPauseUntil := second.PauseUntil()
s.True(has)
s.Equal(firstPauseUntil, secondPauseUntil)
err = gc.Resume(ctx, 100, ticket2)
s.NoError(err)
afterResume, has := gc.pausedCollection.Get(100)
s.True(has)
afterUntil := afterResume.PauseUntil()
s.Equal(firstPauseUntil, afterUntil)
err = gc.Resume(ctx, 100, ticket)
_, has = gc.pausedCollection.Get(100)
s.False(has)
})
}
@ -1727,7 +1778,7 @@ func (s *GarbageCollectorSuite) TestRunRecycleTaskWithPauser() {
defer cancel()
cnt := 0
gc.runRecycleTaskWithPauser(ctx, "test", time.Second, func(ctx context.Context) {
gc.runRecycleTaskWithPauser(ctx, "test", time.Second, func(ctx context.Context, signal <-chan gcCmd) {
cnt++
})
s.Equal(cnt, 2)
@ -1753,7 +1804,8 @@ func (s *GarbageCollectorSuite) TestAvoidGCLoadedSegments() {
},
})
gc.recycleDroppedSegments(context.TODO())
signal := make(chan gcCmd)
gc.recycleDroppedSegments(context.TODO(), signal)
seg := s.meta.GetSegment(context.TODO(), 1)
s.NotNil(seg)
}

View File

@ -1804,13 +1804,25 @@ func (s *Server) GcControl(ctx context.Context, request *datapb.GcControlRequest
status.Reason = fmt.Sprintf("pause duration not valid, %s", err.Error())
return status, nil
}
if err := s.garbageCollector.Pause(ctx, time.Duration(pauseSeconds)*time.Second); err != nil {
collectionID, err, _ := common.GetInt64Value(request.GetParams(), "collection_id")
if err != nil {
return merr.Status(err), nil
}
ticket, _ := common.GetStringValue(request.GetParams(), "ticket")
if err := s.garbageCollector.Pause(ctx, collectionID, ticket, time.Duration(pauseSeconds)*time.Second); err != nil {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = fmt.Sprintf("failed to pause gc, %s", err.Error())
return status, nil
}
case datapb.GcCommand_Resume:
if err := s.garbageCollector.Resume(ctx); err != nil {
collectionID, err, _ := common.GetInt64Value(request.GetParams(), "collection_id")
if err != nil {
return merr.Status(err), nil
}
ticket, _ := common.GetStringValue(request.GetParams(), "ticket")
if err := s.garbageCollector.Resume(ctx, collectionID, ticket); err != nil {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = fmt.Sprintf("failed to pause gc, %s", err.Error())
return status, nil

View File

@ -17,11 +17,13 @@
package proxy
import (
"encoding/base64"
"fmt"
"net/http"
"strconv"
"sync"
"github.com/google/uuid"
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -89,15 +91,54 @@ func RegisterMgrRoute(proxy *Proxy) {
})
}
// EncodeTicket encodes the ticket with token and collectionID
func EncodeTicket(token string, collectionID string) string {
if collectionID == "" {
collectionID = "-1"
}
m := map[string]string{
"token": token,
"collection_id": collectionID,
}
bytes, _ := json.Marshal(m)
ticket := base64.StdEncoding.EncodeToString(bytes)
return ticket
}
// DecodeTicket decodes the ticket to get token and collectionID
func DecodeTicket(ticket string) (string, string, error) {
bytes, err := base64.StdEncoding.DecodeString(ticket)
if err != nil {
return "", "", err
}
m := make(map[string]string)
err = json.Unmarshal(bytes, &m)
if err != nil {
return "", "", err
}
return m["token"], m["collection_id"], nil
}
func (node *Proxy) PauseDatacoordGC(w http.ResponseWriter, req *http.Request) {
pauseSeconds := req.URL.Query().Get("pause_seconds")
// generate ticket for request
token := uuid.New().String()
ticket := EncodeTicket(token, req.URL.Query().Get("collection_id"))
params := []*commonpb.KeyValuePair{
{Key: "duration", Value: pauseSeconds},
{Key: "ticket", Value: ticket},
}
if req.URL.Query().Has("collection_id") {
params = append(params, &commonpb.KeyValuePair{
Key: "collection_id",
Value: req.URL.Query().Get("collection_id"),
})
}
resp, err := node.mixCoord.GcControl(req.Context(), &datapb.GcControlRequest{
Base: commonpbutil.NewMsgBase(),
Command: datapb.GcCommand_Pause,
Params: []*commonpb.KeyValuePair{
{Key: "duration", Value: pauseSeconds},
},
Params: params,
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
@ -110,22 +151,40 @@ func (node *Proxy) PauseDatacoordGC(w http.ResponseWriter, req *http.Request) {
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"msg": "OK"}`))
fmt.Fprintf(w, `{"msg": "OK", "ticket": "%s"}`, ticket)
}
func (node *Proxy) ResumeDatacoordGC(w http.ResponseWriter, req *http.Request) {
ticket := req.URL.Query().Get("ticket")
var collectionID string
var err error
// allow empty ticket for backward compatibility
if ticket != "" {
_, collectionID, err = DecodeTicket(ticket)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to decode ticket, %s"}`, err.Error())))
return
}
}
params := []*commonpb.KeyValuePair{
{Key: "ticket", Value: req.URL.Query().Get("ticket")},
{Key: "collection_id", Value: collectionID},
}
resp, err := node.mixCoord.GcControl(req.Context(), &datapb.GcControlRequest{
Base: commonpbutil.NewMsgBase(),
Command: datapb.GcCommand_Resume,
Params: params,
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to pause garbage collection, %s"}`, err.Error())))
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to resume garbage collection, %s"}`, err.Error())))
return
}
if resp.GetErrorCode() != commonpb.ErrorCode_Success {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to pause garbage collection, %s"}`, resp.GetReason())))
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to resume garbage collection, %s"}`, resp.GetReason())))
return
}
w.WriteHeader(http.StatusOK)

View File

@ -64,7 +64,7 @@ func (s *ProxyManagementSuite) TestPauseDataCoordGC() {
return &commonpb.Status{}, nil
})
req, err := http.NewRequest(http.MethodGet, management.RouteGcPause+"?pause_seconds=60", nil)
req, err := http.NewRequest(http.MethodGet, management.RouteGcPause+"?pause_seconds=60&collection_id=100", nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
@ -80,7 +80,7 @@ func (s *ProxyManagementSuite) TestPauseDataCoordGC() {
return &commonpb.Status{}, errors.New("mock")
})
req, err := http.NewRequest(http.MethodGet, management.RouteGcPause+"?pause_seconds=60", nil)
req, err := http.NewRequest(http.MethodGet, management.RouteGcPause+"?pause_seconds=60&collection_id=100", nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
@ -118,7 +118,7 @@ func (s *ProxyManagementSuite) TestResumeDatacoordGC() {
return &commonpb.Status{}, nil
})
req, err := http.NewRequest(http.MethodGet, management.RouteGcResume, nil)
req, err := http.NewRequest(http.MethodGet, management.RouteGcResume+"?collection_id=100", nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()

View File

@ -606,6 +606,31 @@ func IsAllowInsertAutoID(kvs ...*commonpb.KeyValuePair) (bool, bool) {
return false, false
}
func GetInt64Value(kvs []*commonpb.KeyValuePair, key string) (result int64, parseErr error, exist bool) {
kv := lo.FindOrElse(kvs, nil, func(kv *commonpb.KeyValuePair) bool {
return kv.GetKey() == key
})
if kv == nil {
return 0, nil, false
}
result, err := strconv.ParseInt(kv.GetValue(), 10, 64)
if err != nil {
return 0, err, true
}
return result, nil, true
}
func GetStringValue(kvs []*commonpb.KeyValuePair, key string) (result string, exist bool) {
kv := lo.FindOrElse(kvs, nil, func(kv *commonpb.KeyValuePair) bool {
return kv.GetKey() == key
})
if kv == nil {
return "", false
}
return kv.GetValue(), true
}
func CheckNamespace(schema *schemapb.CollectionSchema, namespace *string) error {
enabled, _, err := ParseNamespaceProp(schema.Properties...)
if err != nil {