diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index e293e483af..1b28d2cddb 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -19,7 +19,6 @@ package datacoord import ( "context" "path" - "strconv" "strings" "sync" "time" @@ -42,8 +41,6 @@ const ( deltaLogPrefix = `delta_log` ) -type collectionValidator func(int64) bool - // GcOption garbage collection options type GcOption struct { cli storage.ChunkManager // client @@ -51,7 +48,6 @@ type GcOption struct { checkInterval time.Duration // each interval missingTolerance time.Duration // key missing in meta tolerance time dropTolerance time.Duration // dropped segment related key tolerance time - collValidator collectionValidator // validates collection id } // garbageCollector handles garbage files in object storage @@ -113,24 +109,6 @@ func (gc *garbageCollector) work() { } } -func (gc *garbageCollector) isCollectionPrefixValid(p string, prefix string) bool { - if gc.option.collValidator == nil { - return true - } - - if !strings.HasPrefix(p, prefix) { - return false - } - - p = strings.Trim(p[len(prefix):], "/") - collectionID, err := strconv.ParseInt(p, 10, 64) - if err != nil { - return false - } - - return gc.option.collValidator(collectionID) -} - func (gc *garbageCollector) close() { gc.stopOnce.Do(func() { close(gc.closeCh) @@ -168,69 +146,51 @@ func (gc *garbageCollector) scan() { var removedKeys []string for _, prefix := range prefixes { - // list first level prefix, then perform collection id validation - collectionPrefixes, _, err := gc.option.cli.ListWithPrefix(ctx, prefix+"/", false) + infoKeys, modTimes, err := gc.option.cli.ListWithPrefix(ctx, prefix, true) if err != nil { - log.Warn("failed to list collection prefix", + log.Error("failed to list files with prefix", zap.String("prefix", prefix), - zap.Error(err), + zap.String("error", err.Error()), ) } - for _, collPrefix := range collectionPrefixes { - if !gc.isCollectionPrefixValid(collPrefix, prefix) { - log.Warn("garbage collector meet invalid collection prefix, ignore it", - zap.String("collPrefix", collPrefix), - zap.String("prefix", prefix), - ) + for i, infoKey := range infoKeys { + total++ + _, has := filesMap[infoKey] + if has { + valid++ continue } - infoKeys, modTimes, err := gc.option.cli.ListWithPrefix(ctx, collPrefix, true) + + segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.cli.RootPath(), infoKey) if err != nil { - log.Error("failed to list files with collPrefix", - zap.String("collPrefix", collPrefix), - zap.String("error", err.Error()), - ) + missing++ + log.Warn("parse segment id error", + zap.String("infoKey", infoKey), + zap.Error(err)) continue } - for i, infoKey := range infoKeys { - total++ - _, has := filesMap[infoKey] - if has { - valid++ - continue - } + if gc.segRefer.HasSegmentLock(segmentID) { + valid++ + continue + } - segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.cli.RootPath(), infoKey) + if strings.Contains(prefix, statsLogPrefix) && + segmentMap.Contain(segmentID) { + valid++ + continue + } + + // not found in meta, check last modified time exceeds tolerance duration + if time.Since(modTimes[i]) > gc.option.missingTolerance { + // ignore error since it could be cleaned up next time + removedKeys = append(removedKeys, infoKey) + err = gc.option.cli.Remove(ctx, infoKey) if err != nil { missing++ - log.Warn("parse segment id error", + log.Error("failed to remove object", zap.String("infoKey", infoKey), zap.Error(err)) - continue - } - if gc.segRefer.HasSegmentLock(segmentID) { - valid++ - continue - } - - if strings.Contains(prefix, statsLogPrefix) && - segmentMap.Contain(segmentID) { - valid++ - continue - } - - // not found in meta, check last modified time exceeds tolerance duration - if time.Since(modTimes[i]) > gc.option.missingTolerance { - // ignore error since it could be cleaned up next time - removedKeys = append(removedKeys, infoKey) - err = gc.option.cli.Remove(ctx, infoKey) - if err != nil { - missing++ - log.Error("failed to remove object", - zap.String("infoKey", infoKey), - zap.Error(err)) - } } } } diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index e831aeea7d..d0d93faaa6 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -19,7 +19,6 @@ package datacoord import ( "bytes" "context" - "errors" "path" "strconv" "strings" @@ -28,7 +27,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" - kvmocks "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" @@ -36,170 +34,10 @@ import ( "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" - "github.com/samber/lo" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" ) -type GarbageCollectorSuite struct { - suite.Suite - - mockChunkManager *mocks.ChunkManager - gc *garbageCollector -} - -func (s *GarbageCollectorSuite) SetupTest() { - meta, err := newMemoryMeta() - s.Require().NoError(err) - s.mockChunkManager = &mocks.ChunkManager{} - - mockKV := &kvmocks.TxnKV{} - mockKV.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{}, []string{}, nil) - segRefer, err := NewSegmentReferenceManager(mockKV, nil) - - s.Require().NoError(err) - s.Require().NotNil(segRefer) - - s.gc = newGarbageCollector( - meta, newMockHandler(), segRefer, &mocks.MockIndexCoord{}, GcOption{ - cli: s.mockChunkManager, - enabled: true, - checkInterval: time.Millisecond * 10, - missingTolerance: time.Hour * 24, - dropTolerance: time.Hour * 24, - }, - ) - -} - -func (s *GarbageCollectorSuite) TearDownTest() { - s.mockChunkManager = nil - s.gc.close() - s.gc = nil -} - -func (s *GarbageCollectorSuite) TestBasicOperation() { - s.Run("normal_gc", func() { - gc := s.gc - s.mockChunkManager.EXPECT().RootPath().Return("files") - s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")). - Return([]string{}, []time.Time{}, nil) - gc.start() - // make ticker run at least once - time.Sleep(time.Millisecond * 20) - - s.NotPanics(func() { - gc.close() - }) - }) - - s.Run("nil_client", func() { - // initial a new garbageCollector here - mockKV := &kvmocks.TxnKV{} - mockKV.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{}, []string{}, nil) - segRefer, err := NewSegmentReferenceManager(mockKV, nil) - s.Require().NoError(err) - - gc := newGarbageCollector(nil, newMockHandler(), segRefer, &mocks.MockIndexCoord{}, GcOption{ - cli: nil, - enabled: true, - }) - - s.NotPanics(func() { - gc.start() - }) - - s.NotPanics(func() { - gc.close() - }) - }) -} - -func (s *GarbageCollectorSuite) TestScan() { - s.Run("listCollectionPrefix_fails", func() { - s.mockChunkManager.ExpectedCalls = nil - s.mockChunkManager.EXPECT().RootPath().Return("files") - s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")). - Return(nil, nil, errors.New("mocked")) - - s.gc.scan() - s.mockChunkManager.AssertNotCalled(s.T(), "Remove", mock.Anything, mock.Anything) - }) - - s.Run("collectionPrefix_invalid", func() { - s.mockChunkManager.ExpectedCalls = nil - s.mockChunkManager.EXPECT().RootPath().Return("files") - /* - s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")). - Return([]string{"files/insert_log/1/", "files/bad_prefix", "files/insert_log/string/"}, lo.RepeatBy(3, func(_ int) time.Time { - return time.Now().Add(-time.Hour) - }), nil)*/ - - logTypes := []string{"files/insert_log/", "files/stats_log/", "files/delta_log/"} - for _, logType := range logTypes { - validSubPath := "1/2/3/100/2000" - if logType == "files/delta_log/" { - validSubPath = "1/2/3/2000" - } - s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, logType, false). - Return([]string{path.Join(logType, "1") + "/", path.Join(logType, "2") + "/", path.Join(logType, "string") + "/", "files/badprefix/"}, lo.RepeatBy(4, func(_ int) time.Time { return time.Now() }), nil) - s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, path.Join(logType, "1")+"/", true). - Return([]string{path.Join(logType, validSubPath)}, []time.Time{time.Now().Add(time.Hour * -48)}, nil) - s.mockChunkManager.EXPECT().Remove(mock.Anything, path.Join(logType, validSubPath)).Return(nil) - } - - s.gc.option.collValidator = func(collID int64) bool { - return collID == 1 - } - - s.gc.scan() - //s.mockChunkManager.AssertNotCalled(s.T(), "Remove", mock.Anything, mock.Anything) - s.mockChunkManager.AssertExpectations(s.T()) - }) - - s.Run("fileScan_fails", func() { - s.mockChunkManager.ExpectedCalls = nil - s.mockChunkManager.Calls = nil - s.mockChunkManager.EXPECT().RootPath().Return("files") - isCollPrefix := func(prefix string) bool { - return lo.Contains([]string{"files/insert_log/", "files/stats_log/", "files/delta_log/"}, prefix) - } - s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")).Call.Return( - func(_ context.Context, prefix string, recursive bool) []string { - if isCollPrefix(prefix) { - return []string{path.Join(prefix, "1")} - } - return nil - }, - func(_ context.Context, prefix string, recursive bool) []time.Time { - if isCollPrefix(prefix) { - return []time.Time{time.Now()} - } - return nil - }, - func(_ context.Context, prefix string, recursive bool) error { - if isCollPrefix(prefix) { - return nil - } - return errors.New("mocked") - }, - ) - s.gc.option.collValidator = func(collID int64) bool { - return true - } - - s.gc.scan() - s.mockChunkManager.AssertNotCalled(s.T(), "Remove", mock.Anything, mock.Anything) - }) -} - -func TestGarbageCollectorSuite(t *testing.T) { - suite.Run(t, new(GarbageCollectorSuite)) -} - -/* func Test_garbageCollector_basic(t *testing.T) { bucketName := `datacoord-ut` + strings.ToLower(funcutil.RandomString(8)) rootPath := `gc` + funcutil.RandomString(8) @@ -259,7 +97,7 @@ func Test_garbageCollector_basic(t *testing.T) { }) }) -}*/ +} func validateMinioPrefixElements(t *testing.T, cli *minio.Client, bucketName string, prefix string, elements []string) { var current []string diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 16fde525b5..ad10a9bb76 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -416,18 +416,6 @@ func (s *Server) initGarbageCollection(cli storage.ChunkManager) { checkInterval: Params.DataCoordCfg.GCInterval, missingTolerance: Params.DataCoordCfg.GCMissingTolerance, dropTolerance: Params.DataCoordCfg.GCDropTolerance, - collValidator: func(collID int64) bool { - resp, err := s.rootCoordClient.DescribeCollectionInternal(context.Background(), &milvuspb.DescribeCollectionRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection), - ), - CollectionID: collID, - }) - if err != nil { - log.Warn("failed to check collection id", zap.Int64("collID", collID), zap.Error(err)) - } - return resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_Success - }, }) } diff --git a/internal/datanode/flow_graph_manager_test.go b/internal/datanode/flow_graph_manager_test.go index 07b54fac09..04ca48894a 100644 --- a/internal/datanode/flow_graph_manager_test.go +++ b/internal/datanode/flow_graph_manager_test.go @@ -207,10 +207,10 @@ func TestFlowGraphManager(t *testing.T) { } fm.dropAll() + Params.DataNodeCfg.MemoryForceSyncEnable = true const channelPrefix = "by-dev-rootcoord-dml-test-fg-mgr-execute-" for _, test := range tests { Params.DataNodeCfg.MemoryWatermark = test.watermark - Params.DataNodeCfg.MemoryForceSyncEnable = true for i, memorySize := range test.memorySizes { vchannel := fmt.Sprintf("%s%d", channelPrefix, i) vchan := &datapb.VchannelInfo{