mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
This reverts commit fa86de530d31840dd3dd154f4e9b3cd7c3a6662f. Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
parent
2c038612ec
commit
dc5abe086c
@ -19,7 +19,6 @@ package datacoord
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -44,8 +43,6 @@ const (
|
|||||||
deltaLogPrefix = `delta_log`
|
deltaLogPrefix = `delta_log`
|
||||||
)
|
)
|
||||||
|
|
||||||
type collectionValidator func(int64) bool
|
|
||||||
|
|
||||||
// GcOption garbage collection options
|
// GcOption garbage collection options
|
||||||
type GcOption struct {
|
type GcOption struct {
|
||||||
cli storage.ChunkManager // client
|
cli storage.ChunkManager // client
|
||||||
@ -53,7 +50,6 @@ type GcOption struct {
|
|||||||
checkInterval time.Duration // each interval
|
checkInterval time.Duration // each interval
|
||||||
missingTolerance time.Duration // key missing in meta tolerance time
|
missingTolerance time.Duration // key missing in meta tolerance time
|
||||||
dropTolerance time.Duration // dropped segment related key tolerance time
|
dropTolerance time.Duration // dropped segment related key tolerance time
|
||||||
collValidator collectionValidator // validates collection id
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// garbageCollector handles garbage files in object storage
|
// garbageCollector handles garbage files in object storage
|
||||||
@ -115,24 +111,6 @@ func (gc *garbageCollector) work() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gc *garbageCollector) isCollectionPrefixValid(p string, prefix string) bool {
|
|
||||||
if gc.option.collValidator == nil {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
if !strings.HasPrefix(p, prefix) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
p = strings.Trim(p[len(prefix):], "/")
|
|
||||||
collectionID, err := strconv.ParseInt(p, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return gc.option.collValidator(collectionID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (gc *garbageCollector) close() {
|
func (gc *garbageCollector) close() {
|
||||||
gc.stopOnce.Do(func() {
|
gc.stopOnce.Do(func() {
|
||||||
close(gc.closeCh)
|
close(gc.closeCh)
|
||||||
@ -170,64 +148,46 @@ func (gc *garbageCollector) scan() {
|
|||||||
var removedKeys []string
|
var removedKeys []string
|
||||||
|
|
||||||
for _, prefix := range prefixes {
|
for _, prefix := range prefixes {
|
||||||
// list first level prefix, then perform collection id validation
|
infoKeys, modTimes, err := gc.option.cli.ListWithPrefix(ctx, prefix, true)
|
||||||
collectionPrefixes, _, err := gc.option.cli.ListWithPrefix(ctx, prefix+"/", false)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("failed to list collection prefix",
|
log.Error("failed to list files with prefix",
|
||||||
zap.String("prefix", prefix),
|
zap.String("prefix", prefix),
|
||||||
zap.Error(err),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
for _, collPrefix := range collectionPrefixes {
|
for i, infoKey := range infoKeys {
|
||||||
if !gc.isCollectionPrefixValid(collPrefix, prefix) {
|
total++
|
||||||
log.Warn("garbage collector meet invalid collection prefix, ignore it",
|
_, has := filesMap[infoKey]
|
||||||
zap.String("collPrefix", collPrefix),
|
if has {
|
||||||
zap.String("prefix", prefix),
|
valid++
|
||||||
)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
infoKeys, modTimes, err := gc.option.cli.ListWithPrefix(ctx, collPrefix, true)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("failed to list files with collPrefix",
|
|
||||||
zap.String("collPrefix", collPrefix),
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for i, infoKey := range infoKeys {
|
|
||||||
total++
|
|
||||||
_, has := filesMap[infoKey]
|
|
||||||
if has {
|
|
||||||
valid++
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.cli.RootPath(), infoKey)
|
segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.cli.RootPath(), infoKey)
|
||||||
|
if err != nil {
|
||||||
|
missing++
|
||||||
|
log.Warn("parse segment id error",
|
||||||
|
zap.String("infoKey", infoKey),
|
||||||
|
zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.Contains(prefix, statsLogPrefix) &&
|
||||||
|
segmentMap.Contain(segmentID) {
|
||||||
|
valid++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// not found in meta, check last modified time exceeds tolerance duration
|
||||||
|
if time.Since(modTimes[i]) > gc.option.missingTolerance {
|
||||||
|
// ignore error since it could be cleaned up next time
|
||||||
|
removedKeys = append(removedKeys, infoKey)
|
||||||
|
err = gc.option.cli.Remove(ctx, infoKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
missing++
|
missing++
|
||||||
log.Warn("parse segment id error",
|
log.Error("failed to remove object",
|
||||||
zap.String("infoKey", infoKey),
|
zap.String("infoKey", infoKey),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if strings.Contains(prefix, statsLogPrefix) &&
|
|
||||||
segmentMap.Contain(segmentID) {
|
|
||||||
valid++
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// not found in meta, check last modified time exceeds tolerance duration
|
|
||||||
if time.Since(modTimes[i]) > gc.option.missingTolerance {
|
|
||||||
// ignore error since it could be cleaned up next time
|
|
||||||
removedKeys = append(removedKeys, infoKey)
|
|
||||||
err = gc.option.cli.Remove(ctx, infoKey)
|
|
||||||
if err != nil {
|
|
||||||
missing++
|
|
||||||
log.Error("failed to remove object",
|
|
||||||
zap.String("infoKey", infoKey),
|
|
||||||
zap.Error(err))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -29,11 +29,9 @@ import (
|
|||||||
"github.com/cockroachdb/errors"
|
"github.com/cockroachdb/errors"
|
||||||
minio "github.com/minio/minio-go/v7"
|
minio "github.com/minio/minio-go/v7"
|
||||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||||
"github.com/samber/lo"
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/stretchr/testify/suite"
|
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||||
"github.com/milvus-io/milvus-proto/go-api/msgpb"
|
"github.com/milvus-io/milvus-proto/go-api/msgpb"
|
||||||
@ -50,149 +48,6 @@ import (
|
|||||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
type GarbageCollectorSuite struct {
|
|
||||||
suite.Suite
|
|
||||||
|
|
||||||
mockChunkManager *mocks.ChunkManager
|
|
||||||
gc *garbageCollector
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *GarbageCollectorSuite) SetupTest() {
|
|
||||||
meta, err := newMemoryMeta()
|
|
||||||
s.Require().NoError(err)
|
|
||||||
s.mockChunkManager = &mocks.ChunkManager{}
|
|
||||||
s.gc = newGarbageCollector(
|
|
||||||
meta, newMockHandler(), GcOption{
|
|
||||||
cli: s.mockChunkManager,
|
|
||||||
enabled: true,
|
|
||||||
checkInterval: time.Millisecond * 10,
|
|
||||||
missingTolerance: time.Hour * 24,
|
|
||||||
dropTolerance: time.Hour * 24,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *GarbageCollectorSuite) TearDownTest() {
|
|
||||||
s.mockChunkManager = nil
|
|
||||||
s.gc.close()
|
|
||||||
s.gc = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *GarbageCollectorSuite) TestBasicOperation() {
|
|
||||||
s.Run("normal_gc", func() {
|
|
||||||
gc := s.gc
|
|
||||||
s.mockChunkManager.EXPECT().RootPath().Return("files")
|
|
||||||
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")).
|
|
||||||
Return([]string{}, []time.Time{}, nil)
|
|
||||||
gc.start()
|
|
||||||
// make ticker run at least once
|
|
||||||
time.Sleep(time.Millisecond * 20)
|
|
||||||
|
|
||||||
s.NotPanics(func() {
|
|
||||||
gc.close()
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
s.Run("nil_client", func() {
|
|
||||||
// initial a new garbageCollector here
|
|
||||||
gc := newGarbageCollector(nil, newMockHandler(), GcOption{
|
|
||||||
cli: nil,
|
|
||||||
enabled: true,
|
|
||||||
})
|
|
||||||
|
|
||||||
s.NotPanics(func() {
|
|
||||||
gc.start()
|
|
||||||
})
|
|
||||||
|
|
||||||
s.NotPanics(func() {
|
|
||||||
gc.close()
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *GarbageCollectorSuite) TestScan() {
|
|
||||||
s.Run("listCollectionPrefix_fails", func() {
|
|
||||||
s.mockChunkManager.ExpectedCalls = nil
|
|
||||||
s.mockChunkManager.EXPECT().RootPath().Return("files")
|
|
||||||
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")).
|
|
||||||
Return(nil, nil, errors.New("mocked"))
|
|
||||||
|
|
||||||
s.gc.scan()
|
|
||||||
s.mockChunkManager.AssertNotCalled(s.T(), "Remove", mock.Anything, mock.Anything)
|
|
||||||
})
|
|
||||||
|
|
||||||
s.Run("collectionPrefix_invalid", func() {
|
|
||||||
s.mockChunkManager.ExpectedCalls = nil
|
|
||||||
s.mockChunkManager.EXPECT().RootPath().Return("files")
|
|
||||||
/*
|
|
||||||
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")).
|
|
||||||
Return([]string{"files/insert_log/1/", "files/bad_prefix", "files/insert_log/string/"}, lo.RepeatBy(3, func(_ int) time.Time {
|
|
||||||
return time.Now().Add(-time.Hour)
|
|
||||||
}), nil)*/
|
|
||||||
|
|
||||||
logTypes := []string{"files/insert_log/", "files/stats_log/", "files/delta_log/"}
|
|
||||||
for _, logType := range logTypes {
|
|
||||||
validSubPath := "1/2/3/100/2000"
|
|
||||||
if logType == "files/delta_log/" {
|
|
||||||
validSubPath = "1/2/3/2000"
|
|
||||||
}
|
|
||||||
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, logType, false).
|
|
||||||
Return([]string{path.Join(logType, "1") + "/", path.Join(logType, "2") + "/", path.Join(logType, "string") + "/", "files/badprefix/"}, lo.RepeatBy(4, func(_ int) time.Time { return time.Now() }), nil)
|
|
||||||
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, path.Join(logType, "1")+"/", true).
|
|
||||||
Return([]string{path.Join(logType, validSubPath)}, []time.Time{time.Now().Add(time.Hour * -48)}, nil)
|
|
||||||
s.mockChunkManager.EXPECT().Remove(mock.Anything, path.Join(logType, validSubPath)).Return(nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.gc.option.collValidator = func(collID int64) bool {
|
|
||||||
return collID == 1
|
|
||||||
}
|
|
||||||
|
|
||||||
s.gc.scan()
|
|
||||||
//s.mockChunkManager.AssertNotCalled(s.T(), "Remove", mock.Anything, mock.Anything)
|
|
||||||
s.mockChunkManager.AssertExpectations(s.T())
|
|
||||||
})
|
|
||||||
|
|
||||||
s.Run("fileScan_fails", func() {
|
|
||||||
s.mockChunkManager.ExpectedCalls = nil
|
|
||||||
s.mockChunkManager.Calls = nil
|
|
||||||
s.mockChunkManager.EXPECT().RootPath().Return("files")
|
|
||||||
isCollPrefix := func(prefix string) bool {
|
|
||||||
return lo.Contains([]string{"files/insert_log/", "files/stats_log/", "files/delta_log/"}, prefix)
|
|
||||||
}
|
|
||||||
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")).Call.Return(
|
|
||||||
func(_ context.Context, prefix string, recursive bool) []string {
|
|
||||||
if isCollPrefix(prefix) {
|
|
||||||
return []string{path.Join(prefix, "1")}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
func(_ context.Context, prefix string, recursive bool) []time.Time {
|
|
||||||
if isCollPrefix(prefix) {
|
|
||||||
return []time.Time{time.Now()}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
func(_ context.Context, prefix string, recursive bool) error {
|
|
||||||
if isCollPrefix(prefix) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return errors.New("mocked")
|
|
||||||
},
|
|
||||||
)
|
|
||||||
s.gc.option.collValidator = func(collID int64) bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
s.gc.scan()
|
|
||||||
s.mockChunkManager.AssertNotCalled(s.T(), "Remove", mock.Anything, mock.Anything)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGarbageCollectorSuite(t *testing.T) {
|
|
||||||
suite.Run(t, new(GarbageCollectorSuite))
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
func Test_garbageCollector_basic(t *testing.T) {
|
func Test_garbageCollector_basic(t *testing.T) {
|
||||||
bucketName := `datacoord-ut` + strings.ToLower(funcutil.RandomString(8))
|
bucketName := `datacoord-ut` + strings.ToLower(funcutil.RandomString(8))
|
||||||
rootPath := `gc` + funcutil.RandomString(8)
|
rootPath := `gc` + funcutil.RandomString(8)
|
||||||
@ -236,7 +91,7 @@ func Test_garbageCollector_basic(t *testing.T) {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
}*/
|
}
|
||||||
|
|
||||||
func validateMinioPrefixElements(t *testing.T, cli *minio.Client, bucketName string, prefix string, elements []string) {
|
func validateMinioPrefixElements(t *testing.T, cli *minio.Client, bucketName string, prefix string, elements []string) {
|
||||||
var current []string
|
var current []string
|
||||||
|
|||||||
@ -455,18 +455,6 @@ func (s *Server) initGarbageCollection(cli storage.ChunkManager) {
|
|||||||
checkInterval: Params.DataCoordCfg.GCInterval.GetAsDuration(time.Second),
|
checkInterval: Params.DataCoordCfg.GCInterval.GetAsDuration(time.Second),
|
||||||
missingTolerance: Params.DataCoordCfg.GCMissingTolerance.GetAsDuration(time.Second),
|
missingTolerance: Params.DataCoordCfg.GCMissingTolerance.GetAsDuration(time.Second),
|
||||||
dropTolerance: Params.DataCoordCfg.GCDropTolerance.GetAsDuration(time.Second),
|
dropTolerance: Params.DataCoordCfg.GCDropTolerance.GetAsDuration(time.Second),
|
||||||
collValidator: func(collID int64) bool {
|
|
||||||
resp, err := s.rootCoordClient.DescribeCollectionInternal(context.Background(), &milvuspb.DescribeCollectionRequest{
|
|
||||||
Base: commonpbutil.NewMsgBase(
|
|
||||||
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
|
|
||||||
),
|
|
||||||
CollectionID: collID,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("failed to check collection id", zap.Int64("collID", collID), zap.Error(err))
|
|
||||||
}
|
|
||||||
return resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_Success
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user