From fa86de530d31840dd3dd154f4e9b3cd7c3a6662f Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 2 Mar 2023 11:49:47 +0800 Subject: [PATCH] Restrain gc files by collection (#22504) Signed-off-by: Congqi Xia --- internal/datacoord/garbage_collector.go | 92 ++++++++---- internal/datacoord/garbage_collector_test.go | 147 ++++++++++++++++++- internal/datacoord/server.go | 10 ++ 3 files changed, 222 insertions(+), 27 deletions(-) diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 1ac972fc77..a6f225789a 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -19,6 +19,7 @@ package datacoord import ( "context" "path" + "strconv" "strings" "sync" "time" @@ -43,6 +44,8 @@ const ( deltaLogPrefix = `delta_log` ) +type collectionValidator func(int64) bool + // GcOption garbage collection options type GcOption struct { cli storage.ChunkManager // client @@ -50,6 +53,7 @@ type GcOption struct { checkInterval time.Duration // each interval missingTolerance time.Duration // key missing in meta tolerance time dropTolerance time.Duration // dropped segment related key tolerance time + collValidator collectionValidator // validates collection id } // garbageCollector handles garbage files in object storage @@ -111,6 +115,24 @@ func (gc *garbageCollector) work() { } } +func (gc *garbageCollector) isCollectionPrefixValid(p string, prefix string) bool { + if gc.option.collValidator == nil { + return true + } + + if !strings.HasPrefix(p, prefix) { + return false + } + + p = strings.Trim(p[len(prefix):], "/") + collectionID, err := strconv.ParseInt(p, 10, 64) + if err != nil { + return false + } + + return gc.option.collValidator(collectionID) +} + func (gc *garbageCollector) close() { gc.stopOnce.Do(func() { close(gc.closeCh) @@ -148,46 +170,64 @@ func (gc *garbageCollector) scan() { var removedKeys []string for _, prefix := range prefixes { - infoKeys, modTimes, err := gc.option.cli.ListWithPrefix(ctx, prefix, true) + // list first level prefix, then perform collection id validation + collectionPrefixes, _, err := gc.option.cli.ListWithPrefix(ctx, prefix+"/", false) if err != nil { - log.Error("failed to list files with prefix", + log.Warn("failed to list collection prefix", zap.String("prefix", prefix), - zap.String("error", err.Error()), + zap.Error(err), ) } - for i, infoKey := range infoKeys { - total++ - _, has := filesMap[infoKey] - if has { - valid++ + for _, collPrefix := range collectionPrefixes { + if !gc.isCollectionPrefixValid(collPrefix, prefix) { + log.Warn("garbage collector meet invalid collection prefix, ignore it", + zap.String("collPrefix", collPrefix), + zap.String("prefix", prefix), + ) continue } - - segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.cli.RootPath(), infoKey) + infoKeys, modTimes, err := gc.option.cli.ListWithPrefix(ctx, collPrefix, true) if err != nil { - missing++ - log.Warn("parse segment id error", - zap.String("infoKey", infoKey), - zap.Error(err)) + log.Error("failed to list files with collPrefix", + zap.String("collPrefix", collPrefix), + zap.String("error", err.Error()), + ) continue } + for i, infoKey := range infoKeys { + total++ + _, has := filesMap[infoKey] + if has { + valid++ + continue + } - if strings.Contains(prefix, statsLogPrefix) && - segmentMap.Contain(segmentID) { - valid++ - continue - } - - // not found in meta, check last modified time exceeds tolerance duration - if time.Since(modTimes[i]) > gc.option.missingTolerance { - // ignore error since it could be cleaned up next time - removedKeys = append(removedKeys, infoKey) - err = gc.option.cli.Remove(ctx, infoKey) + segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.cli.RootPath(), infoKey) if err != nil { missing++ - log.Error("failed to remove object", + log.Warn("parse segment id error", zap.String("infoKey", infoKey), zap.Error(err)) + continue + } + + if strings.Contains(prefix, statsLogPrefix) && + segmentMap.Contain(segmentID) { + valid++ + continue + } + + // not found in meta, check last modified time exceeds tolerance duration + if time.Since(modTimes[i]) > gc.option.missingTolerance { + // ignore error since it could be cleaned up next time + removedKeys = append(removedKeys, infoKey) + err = gc.option.cli.Remove(ctx, infoKey) + if err != nil { + missing++ + log.Error("failed to remove object", + zap.String("infoKey", infoKey), + zap.Error(err)) + } } } } diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index f9b1dc397b..a48eb1aa1d 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -27,10 +27,12 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/samber/lo" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -51,6 +53,149 @@ import ( "github.com/milvus-io/milvus/internal/util/funcutil" ) +type GarbageCollectorSuite struct { + suite.Suite + + mockChunkManager *mocks.ChunkManager + gc *garbageCollector +} + +func (s *GarbageCollectorSuite) SetupTest() { + meta, err := newMemoryMeta() + s.Require().NoError(err) + s.mockChunkManager = &mocks.ChunkManager{} + s.gc = newGarbageCollector( + meta, newMockHandler(), GcOption{ + cli: s.mockChunkManager, + enabled: true, + checkInterval: time.Millisecond * 10, + missingTolerance: time.Hour * 24, + dropTolerance: time.Hour * 24, + }, + ) +} + +func (s *GarbageCollectorSuite) TearDownTest() { + s.mockChunkManager = nil + s.gc.close() + s.gc = nil +} + +func (s *GarbageCollectorSuite) TestBasicOperation() { + s.Run("normal_gc", func() { + gc := s.gc + s.mockChunkManager.EXPECT().RootPath().Return("files") + s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")). + Return([]string{}, []time.Time{}, nil) + gc.start() + // make ticker run at least once + time.Sleep(time.Millisecond * 20) + + s.NotPanics(func() { + gc.close() + }) + }) + + s.Run("nil_client", func() { + // initial a new garbageCollector here + gc := newGarbageCollector(nil, newMockHandler(), GcOption{ + cli: nil, + enabled: true, + }) + + s.NotPanics(func() { + gc.start() + }) + + s.NotPanics(func() { + gc.close() + }) + }) +} + +func (s *GarbageCollectorSuite) TestScan() { + s.Run("listCollectionPrefix_fails", func() { + s.mockChunkManager.ExpectedCalls = nil + s.mockChunkManager.EXPECT().RootPath().Return("files") + s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")). + Return(nil, nil, errors.New("mocked")) + + s.gc.scan() + s.mockChunkManager.AssertNotCalled(s.T(), "Remove", mock.Anything, mock.Anything) + }) + + s.Run("collectionPrefix_invalid", func() { + s.mockChunkManager.ExpectedCalls = nil + s.mockChunkManager.EXPECT().RootPath().Return("files") + /* + s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")). + Return([]string{"files/insert_log/1/", "files/bad_prefix", "files/insert_log/string/"}, lo.RepeatBy(3, func(_ int) time.Time { + return time.Now().Add(-time.Hour) + }), nil)*/ + + logTypes := []string{"files/insert_log/", "files/stats_log/", "files/delta_log/"} + for _, logType := range logTypes { + validSubPath := "1/2/3/100/2000" + if logType == "files/delta_log/" { + validSubPath = "1/2/3/2000" + } + s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, logType, false). + Return([]string{path.Join(logType, "1") + "/", path.Join(logType, "2") + "/", path.Join(logType, "string") + "/", "files/badprefix/"}, lo.RepeatBy(4, func(_ int) time.Time { return time.Now() }), nil) + s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, path.Join(logType, "1")+"/", true). + Return([]string{path.Join(logType, validSubPath)}, []time.Time{time.Now().Add(time.Hour * -48)}, nil) + s.mockChunkManager.EXPECT().Remove(mock.Anything, path.Join(logType, validSubPath)).Return(nil) + } + + s.gc.option.collValidator = func(collID int64) bool { + return collID == 1 + } + + s.gc.scan() + //s.mockChunkManager.AssertNotCalled(s.T(), "Remove", mock.Anything, mock.Anything) + s.mockChunkManager.AssertExpectations(s.T()) + }) + + s.Run("fileScan_fails", func() { + s.mockChunkManager.ExpectedCalls = nil + s.mockChunkManager.Calls = nil + s.mockChunkManager.EXPECT().RootPath().Return("files") + isCollPrefix := func(prefix string) bool { + return lo.Contains([]string{"files/insert_log/", "files/stats_log/", "files/delta_log/"}, prefix) + } + s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")).Call.Return( + func(_ context.Context, prefix string, recursive bool) []string { + if isCollPrefix(prefix) { + return []string{path.Join(prefix, "1")} + } + return nil + }, + func(_ context.Context, prefix string, recursive bool) []time.Time { + if isCollPrefix(prefix) { + return []time.Time{time.Now()} + } + return nil + }, + func(_ context.Context, prefix string, recursive bool) error { + if isCollPrefix(prefix) { + return nil + } + return errors.New("mocked") + }, + ) + s.gc.option.collValidator = func(collID int64) bool { + return true + } + + s.gc.scan() + s.mockChunkManager.AssertNotCalled(s.T(), "Remove", mock.Anything, mock.Anything) + }) +} + +func TestGarbageCollectorSuite(t *testing.T) { + suite.Run(t, new(GarbageCollectorSuite)) +} + +/* func Test_garbageCollector_basic(t *testing.T) { bucketName := `datacoord-ut` + strings.ToLower(funcutil.RandomString(8)) rootPath := `gc` + funcutil.RandomString(8) @@ -94,7 +239,7 @@ func Test_garbageCollector_basic(t *testing.T) { }) }) -} +}*/ func validateMinioPrefixElements(t *testing.T, cli *minio.Client, bucketName string, prefix string, elements []string) { var current []string diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index b9e3c35ecd..1b374cb41d 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -453,6 +453,16 @@ func (s *Server) initGarbageCollection(cli storage.ChunkManager) { checkInterval: Params.DataCoordCfg.GCInterval.GetAsDuration(time.Second), missingTolerance: Params.DataCoordCfg.GCMissingTolerance.GetAsDuration(time.Second), dropTolerance: Params.DataCoordCfg.GCDropTolerance.GetAsDuration(time.Second), + collValidator: func(collID int64) bool { + resp, err := s.rootCoordClient.DescribeCollectionInternal(context.Background(), &milvuspb.DescribeCollectionRequest{ + Base: commonpbutil.NewMsgBase(), + CollectionID: collID, + }) + if err != nil { + log.Warn("failed to check collection id", zap.Int64("collID", collID), zap.Error(err)) + } + return resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_Success + }, }) }