diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index f3665beb4a..6109ddaa84 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" "github.com/minio/minio-go/v7" "go.uber.org/zap" ) @@ -112,11 +113,11 @@ func (gc *garbageCollector) close() { // scan load meta file info and compares OSS keys // if missing found, performs gc cleanup func (gc *garbageCollector) scan() { - var v, m int - valid := gc.meta.ListSegmentFiles() - vm := make(map[string]struct{}) - for _, k := range valid { - vm[k.GetLogPath()] = struct{}{} + var total, valid, missing int + segmentFiles := gc.meta.ListSegmentFiles() + filesMap := make(map[string]struct{}) + for _, k := range segmentFiles { + filesMap[k.GetLogPath()] = struct{}{} } // walk only data cluster related prefixes @@ -131,30 +132,36 @@ func (gc *garbageCollector) scan() { Prefix: prefix, Recursive: true, }) { - _, has := vm[info.Key] + total++ + _, has := filesMap[info.Key] if has { - v++ + valid++ continue } - // binlog path should consist of "/files/insertLog/collID/partID/segID/fieldID/fileName" - segmentID, err := parseSegmentIDByBinlog(info.Key) - if err == nil { - if gc.segRefer.HasSegmentLock(segmentID) { - v++ - continue - } + segmentID, err := storage.ParseSegmentIDByBinlog(info.Key) + if err != nil { + log.Error("parse segment id error", zap.String("infoKey", info.Key), zap.Error(err)) + continue } - m++ + if gc.segRefer.HasSegmentLock(segmentID) { + valid++ + continue + } + missing++ // not found in meta, check last modified time exceeds tolerance duration if time.Since(info.LastModified) > gc.option.missingTolerance { // ignore error since it could be cleaned up next time removedKeys = append(removedKeys, info.Key) - _ = gc.option.cli.RemoveObject(context.TODO(), gc.option.bucketName, info.Key, minio.RemoveObjectOptions{}) + err = gc.option.cli.RemoveObject(context.TODO(), gc.option.bucketName, info.Key, minio.RemoveObjectOptions{}) + if err != nil { + log.Error("failed to remove object", zap.String("infoKey", info.Key), zap.Error(err)) + } } } } - log.Info("scan result", zap.Int("valid", v), zap.Int("missing", m), zap.Strings("removed keys", removedKeys)) + log.Info("scan file to do garbage collection", zap.Int("total", total), + zap.Int("valid", valid), zap.Int("missing", missing), zap.Strings("removed keys", removedKeys)) } func (gc *garbageCollector) clearEtcd() { diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 2ff0cb0ce9..1ebf05b0e0 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -284,9 +284,9 @@ func initUtOSSEnv(bucket, root string, n int) (cli *minio.Client, inserts []stri content := []byte("test") for i := 0; i < n; i++ { reader := bytes.NewReader(content) - token := path.Join(funcutil.RandomString(8), funcutil.RandomString(8), strconv.Itoa(i), funcutil.RandomString(8), funcutil.RandomString(8)) + token := path.Join(funcutil.RandomString(8), strconv.Itoa(i), strconv.Itoa(i), funcutil.RandomString(8), funcutil.RandomString(8)) if i == 1 { - token = path.Join(funcutil.RandomString(8), funcutil.RandomString(8), strconv.Itoa(i), funcutil.RandomString(8)) + token = path.Join(funcutil.RandomString(8), strconv.Itoa(i), strconv.Itoa(i), funcutil.RandomString(8)) } // insert filePath := path.Join(root, insertLogPrefix, token) diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index da28caf9f2..feacec62e4 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -19,8 +19,6 @@ package datacoord import ( "context" "errors" - "strconv" - "strings" "time" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -87,9 +85,3 @@ func getCompactTime(ctx context.Context, allocator allocator) (*compactTime, err // no expiration time return &compactTime{ttRetentionLogic, 0}, nil } - -func parseSegmentIDByBinlog(path string) (UniqueID, error) { - // binlog path should consist of "files/insertLog/collID/partID/segID/fieldID/fileName" - keyStr := strings.Split(path, "/") - return strconv.ParseInt(keyStr[len(keyStr)-3], 10, 64) -} diff --git a/internal/storage/binlog_util.go b/internal/storage/binlog_util.go new file mode 100644 index 0000000000..04a24ed063 --- /dev/null +++ b/internal/storage/binlog_util.go @@ -0,0 +1,12 @@ +package storage + +import ( + "strconv" + "strings" +) + +func ParseSegmentIDByBinlog(path string) (UniqueID, error) { + // binlog path should consist of "files/insertLog/collID/partID/segID/fieldID/fileName" + keyStr := strings.Split(path, "/") + return strconv.ParseInt(keyStr[len(keyStr)-3], 10, 64) +}