mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 10:08:42 +08:00
Add garbage collector for DataCoord (#11554)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
c5f566357a
commit
3c9610cb1b
150
internal/datacoord/garbage_collector.go
Normal file
150
internal/datacoord/garbage_collector.go
Normal file
@ -0,0 +1,150 @@
|
|||||||
|
// Licensed to the LF AI & Data foundation under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package datacoord
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
|
"github.com/minio/minio-go/v7"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// temp solution use const
|
||||||
|
// maybe put garbage config into config files in the future
|
||||||
|
defaultGcInterval = 1 * time.Hour
|
||||||
|
defaultMissingTolerance = 24 * time.Hour // 1day
|
||||||
|
defaultDropTolerance = 24 * time.Hour // 1day
|
||||||
|
)
|
||||||
|
|
||||||
|
// GcOption garbage collection options
|
||||||
|
type GcOption struct {
|
||||||
|
cli *minio.Client // OSS client
|
||||||
|
enabled bool // enable switch
|
||||||
|
checkInterval time.Duration // each interval
|
||||||
|
missingTolerance time.Duration // key missing in meta tolerace time
|
||||||
|
dropTolerance time.Duration // dropped segment related key tolerance time
|
||||||
|
bucketName string
|
||||||
|
rootPath string
|
||||||
|
}
|
||||||
|
|
||||||
|
// garbageCollector handles garbage files in object storage
|
||||||
|
// which could be dropped collection remnant or data node failure traces
|
||||||
|
type garbageCollector struct {
|
||||||
|
option GcOption
|
||||||
|
meta *meta
|
||||||
|
|
||||||
|
startOnce sync.Once
|
||||||
|
stopOnce sync.Once
|
||||||
|
wg sync.WaitGroup
|
||||||
|
closeCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// newGarbageCollector create garbage collector with meta and option
|
||||||
|
func newGarbageCollector(meta *meta, opt GcOption) *garbageCollector {
|
||||||
|
return &garbageCollector{
|
||||||
|
meta: meta,
|
||||||
|
option: opt,
|
||||||
|
closeCh: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// start a goroutine and perform gc check every `checkInterval`
|
||||||
|
func (gc *garbageCollector) start() {
|
||||||
|
if gc.option.enabled {
|
||||||
|
if gc.option.cli == nil {
|
||||||
|
log.Warn("datacoord gc enabled, but SSO client is not provided")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
gc.startOnce.Do(func() {
|
||||||
|
gc.wg.Add(1)
|
||||||
|
go gc.work()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// work contains actual looping check logic
|
||||||
|
func (gc *garbageCollector) work() {
|
||||||
|
defer gc.wg.Done()
|
||||||
|
ticker := time.Tick(gc.option.checkInterval)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker:
|
||||||
|
gc.scan()
|
||||||
|
case <-gc.closeCh:
|
||||||
|
log.Warn("garbage collector quit")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gc *garbageCollector) close() {
|
||||||
|
gc.stopOnce.Do(func() {
|
||||||
|
close(gc.closeCh)
|
||||||
|
gc.wg.Wait()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// scan load meta file info and compares OSS keys
|
||||||
|
// if drop found or missing found, performs gc cleanup
|
||||||
|
func (gc *garbageCollector) scan() {
|
||||||
|
var v, d, m, e int
|
||||||
|
valid, dropped := gc.meta.ListSegmentFiles()
|
||||||
|
vm := make(map[string]struct{})
|
||||||
|
dm := make(map[string]struct{})
|
||||||
|
for _, k := range valid {
|
||||||
|
vm[k] = struct{}{}
|
||||||
|
}
|
||||||
|
for _, k := range dropped {
|
||||||
|
dm[k] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
for info := range gc.option.cli.ListObjects(context.TODO(), gc.option.bucketName, minio.ListObjectsOptions{
|
||||||
|
Prefix: gc.option.rootPath,
|
||||||
|
Recursive: true,
|
||||||
|
}) {
|
||||||
|
log.Warn(info.Key)
|
||||||
|
_, has := vm[info.Key]
|
||||||
|
if has {
|
||||||
|
v++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// dropped
|
||||||
|
_, has = dm[info.Key]
|
||||||
|
if has {
|
||||||
|
d++
|
||||||
|
// check file last modified time exceeds tolerance duration
|
||||||
|
if time.Since(info.LastModified) > gc.option.dropTolerance {
|
||||||
|
e++
|
||||||
|
// ignore error since it could be cleaned up next time
|
||||||
|
_ = gc.option.cli.RemoveObject(context.TODO(), gc.option.bucketName, info.Key, minio.RemoveObjectOptions{})
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
m++
|
||||||
|
// not found in meta, check last modified time exceeds tolerance duration
|
||||||
|
if time.Since(info.LastModified) > gc.option.missingTolerance {
|
||||||
|
e++
|
||||||
|
// ignore error since it could be cleaned up next time
|
||||||
|
_ = gc.option.cli.RemoveObject(context.TODO(), gc.option.bucketName, info.Key, minio.RemoveObjectOptions{})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Warn("scan result", zap.Int("valid", v), zap.Int("dropped", d), zap.Int("missing", m), zap.Int("removed", e))
|
||||||
|
}
|
||||||
233
internal/datacoord/garbage_collector_test.go
Normal file
233
internal/datacoord/garbage_collector_test.go
Normal file
@ -0,0 +1,233 @@
|
|||||||
|
// Licensed to the LF AI & Data foundation under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package datacoord
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"path"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||||
|
"github.com/minio/minio-go/v7"
|
||||||
|
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_garbageCollector_basic(t *testing.T) {
|
||||||
|
bucketName := `datacoord-ut` + strings.ToLower(funcutil.RandomString(8))
|
||||||
|
rootPath := `gc` + funcutil.RandomString(8)
|
||||||
|
//TODO change to Params
|
||||||
|
cli, _, err := initUtOSSEnv(bucketName, rootPath, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
mockAllocator := newMockAllocator()
|
||||||
|
meta, err := newMemoryMeta(mockAllocator)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
t.Run("normal gc", func(t *testing.T) {
|
||||||
|
gc := newGarbageCollector(meta, GcOption{
|
||||||
|
cli: cli,
|
||||||
|
enabled: true,
|
||||||
|
checkInterval: time.Millisecond * 10,
|
||||||
|
missingTolerance: time.Hour * 24,
|
||||||
|
dropTolerance: time.Hour * 24,
|
||||||
|
bucketName: bucketName,
|
||||||
|
rootPath: rootPath,
|
||||||
|
})
|
||||||
|
gc.start()
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond * 20)
|
||||||
|
assert.NotPanics(t, func() {
|
||||||
|
gc.close()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("with nil cli", func(t *testing.T) {
|
||||||
|
gc := newGarbageCollector(meta, GcOption{
|
||||||
|
cli: nil,
|
||||||
|
enabled: true,
|
||||||
|
checkInterval: time.Millisecond * 10,
|
||||||
|
missingTolerance: time.Hour * 24,
|
||||||
|
dropTolerance: time.Hour * 24,
|
||||||
|
bucketName: bucketName,
|
||||||
|
rootPath: rootPath,
|
||||||
|
})
|
||||||
|
assert.NotPanics(t, func() {
|
||||||
|
gc.start()
|
||||||
|
})
|
||||||
|
|
||||||
|
assert.NotPanics(t, func() {
|
||||||
|
gc.close()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_garbageCollector_scan(t *testing.T) {
|
||||||
|
bucketName := `datacoord-ut` + strings.ToLower(funcutil.RandomString(8))
|
||||||
|
rootPath := `gc` + funcutil.RandomString(8)
|
||||||
|
//TODO change to Params
|
||||||
|
cli, files, err := initUtOSSEnv(bucketName, rootPath, 3)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
mockAllocator := newMockAllocator()
|
||||||
|
meta, err := newMemoryMeta(mockAllocator)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
t.Run("missing all but save tolerance", func(t *testing.T) {
|
||||||
|
gc := newGarbageCollector(meta, GcOption{
|
||||||
|
cli: cli,
|
||||||
|
enabled: true,
|
||||||
|
checkInterval: time.Minute * 30,
|
||||||
|
missingTolerance: time.Hour * 24,
|
||||||
|
dropTolerance: time.Hour * 24,
|
||||||
|
bucketName: bucketName,
|
||||||
|
rootPath: rootPath,
|
||||||
|
})
|
||||||
|
gc.scan()
|
||||||
|
|
||||||
|
current := make([]string, 0, 3)
|
||||||
|
for info := range cli.ListObjects(context.TODO(), bucketName, minio.ListObjectsOptions{Prefix: rootPath, Recursive: true}) {
|
||||||
|
current = append(current, info.Key)
|
||||||
|
}
|
||||||
|
assert.ElementsMatch(t, files, current)
|
||||||
|
})
|
||||||
|
t.Run("all hit, no gc", func(t *testing.T) {
|
||||||
|
segment := buildSegment(1, 10, 100, "ch")
|
||||||
|
segment.State = commonpb.SegmentState_Flushed
|
||||||
|
segment.Binlogs = []*datapb.FieldBinlog{{FieldID: 0, Binlogs: []string{files[0]}}}
|
||||||
|
segment.Statslogs = []*datapb.FieldBinlog{{FieldID: 0, Binlogs: []string{files[1]}}}
|
||||||
|
segment.Deltalogs = []*datapb.DeltaLogInfo{{DeltaLogPath: files[2]}}
|
||||||
|
err = meta.AddSegment(segment)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
gc := newGarbageCollector(meta, GcOption{
|
||||||
|
cli: cli,
|
||||||
|
enabled: true,
|
||||||
|
checkInterval: time.Minute * 30,
|
||||||
|
missingTolerance: time.Hour * 24,
|
||||||
|
dropTolerance: time.Hour * 24,
|
||||||
|
bucketName: bucketName,
|
||||||
|
rootPath: rootPath,
|
||||||
|
})
|
||||||
|
gc.start()
|
||||||
|
gc.scan()
|
||||||
|
|
||||||
|
current := make([]string, 0, 3)
|
||||||
|
for info := range cli.ListObjects(context.TODO(), bucketName, minio.ListObjectsOptions{Prefix: rootPath, Recursive: true}) {
|
||||||
|
current = append(current, info.Key)
|
||||||
|
}
|
||||||
|
assert.ElementsMatch(t, files, current)
|
||||||
|
gc.close()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("dropped gc one", func(t *testing.T) {
|
||||||
|
segment := buildSegment(1, 10, 100, "ch")
|
||||||
|
segment.State = commonpb.SegmentState_Dropped
|
||||||
|
segment.Deltalogs = []*datapb.DeltaLogInfo{{DeltaLogPath: files[2]}}
|
||||||
|
err = meta.AddSegment(segment)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
gc := newGarbageCollector(meta, GcOption{
|
||||||
|
cli: cli,
|
||||||
|
enabled: true,
|
||||||
|
checkInterval: time.Minute * 30,
|
||||||
|
missingTolerance: time.Hour * 24,
|
||||||
|
dropTolerance: 0,
|
||||||
|
bucketName: bucketName,
|
||||||
|
rootPath: rootPath,
|
||||||
|
})
|
||||||
|
gc.start()
|
||||||
|
gc.scan()
|
||||||
|
|
||||||
|
current := make([]string, 0, 3)
|
||||||
|
for info := range cli.ListObjects(context.TODO(), bucketName, minio.ListObjectsOptions{Prefix: rootPath, Recursive: true}) {
|
||||||
|
current = append(current, info.Key)
|
||||||
|
}
|
||||||
|
assert.ElementsMatch(t, files[:2], current)
|
||||||
|
gc.close()
|
||||||
|
})
|
||||||
|
t.Run("missing gc all", func(t *testing.T) {
|
||||||
|
gc := newGarbageCollector(meta, GcOption{
|
||||||
|
cli: cli,
|
||||||
|
enabled: true,
|
||||||
|
checkInterval: time.Minute * 30,
|
||||||
|
missingTolerance: 0,
|
||||||
|
dropTolerance: 0,
|
||||||
|
bucketName: bucketName,
|
||||||
|
rootPath: rootPath,
|
||||||
|
})
|
||||||
|
gc.start()
|
||||||
|
gc.scan()
|
||||||
|
|
||||||
|
current := make([]string, 0, 3)
|
||||||
|
for info := range cli.ListObjects(context.TODO(), bucketName, minio.ListObjectsOptions{Prefix: rootPath, Recursive: true}) {
|
||||||
|
current = append(current, info.Key)
|
||||||
|
}
|
||||||
|
assert.Equal(t, 0, len(current))
|
||||||
|
gc.close()
|
||||||
|
})
|
||||||
|
|
||||||
|
cleanupOSS(cli, bucketName, rootPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
// initialize unit test sso env
|
||||||
|
func initUtOSSEnv(bucket, root string, n int) (*minio.Client, []string, error) {
|
||||||
|
Params.Init()
|
||||||
|
cli, err := minio.New(Params.MinioAddress, &minio.Options{
|
||||||
|
Creds: credentials.NewStaticV4(Params.MinioAccessKeyID, Params.MinioSecretAccessKey, ""),
|
||||||
|
Secure: Params.MinioUseSSL,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
has, err := cli.BucketExists(context.TODO(), bucket)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
if !has {
|
||||||
|
err = cli.MakeBucket(context.TODO(), bucket, minio.MakeBucketOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
keys := make([]string, 0, n)
|
||||||
|
content := []byte("test")
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
reader := bytes.NewReader(content)
|
||||||
|
token := funcutil.RandomString(8)
|
||||||
|
token = path.Join(root, token)
|
||||||
|
info, err := cli.PutObject(context.TODO(), bucket, token, reader, int64(len(content)), minio.PutObjectOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
keys = append(keys, info.Key)
|
||||||
|
}
|
||||||
|
return cli, keys, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func cleanupOSS(cli *minio.Client, bucket, root string) {
|
||||||
|
ch := cli.ListObjects(context.TODO(), bucket, minio.ListObjectsOptions{Prefix: root, Recursive: true})
|
||||||
|
cli.RemoveObjects(context.TODO(), bucket, ch, minio.RemoveObjectsOptions{})
|
||||||
|
cli.RemoveBucket(context.TODO(), bucket)
|
||||||
|
}
|
||||||
@ -334,6 +334,43 @@ func (m *meta) ListSegmentIDs() []UniqueID {
|
|||||||
return infos
|
return infos
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListSegmentFiles lists all segment related file paths in valid & dropped list
|
||||||
|
func (m *meta) ListSegmentFiles() ([]string, []string) {
|
||||||
|
m.RLock()
|
||||||
|
defer m.RUnlock()
|
||||||
|
|
||||||
|
var valid []string
|
||||||
|
var dropped []string
|
||||||
|
|
||||||
|
for _, segment := range m.segments.GetSegments() {
|
||||||
|
for _, binlog := range segment.GetBinlogs() {
|
||||||
|
if segment.State != commonpb.SegmentState_Dropped {
|
||||||
|
valid = append(valid, binlog.Binlogs...)
|
||||||
|
} else {
|
||||||
|
dropped = append(valid, binlog.Binlogs...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, statLog := range segment.GetStatslogs() {
|
||||||
|
if segment.State != commonpb.SegmentState_Dropped {
|
||||||
|
valid = append(valid, statLog.Binlogs...)
|
||||||
|
} else {
|
||||||
|
dropped = append(valid, statLog.Binlogs...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, deltaLog := range segment.GetDeltalogs() {
|
||||||
|
if segment.State != commonpb.SegmentState_Dropped {
|
||||||
|
valid = append(valid, deltaLog.GetDeltaLogPath())
|
||||||
|
} else {
|
||||||
|
dropped = append(valid, deltaLog.GetDeltaLogPath())
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return valid, dropped
|
||||||
|
}
|
||||||
|
|
||||||
// GetSegmentsByChannel returns all segment info which insert channel equals provided `dmlCh`
|
// GetSegmentsByChannel returns all segment info which insert channel equals provided `dmlCh`
|
||||||
func (m *meta) GetSegmentsByChannel(dmlCh string) []*SegmentInfo {
|
func (m *meta) GetSegmentsByChannel(dmlCh string) []*SegmentInfo {
|
||||||
m.RLock()
|
m.RLock()
|
||||||
|
|||||||
@ -17,6 +17,7 @@
|
|||||||
package datacoord
|
package datacoord
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -42,6 +43,14 @@ type ParamTable struct {
|
|||||||
CollectionBinlogSubPath string
|
CollectionBinlogSubPath string
|
||||||
ChannelWatchSubPath string
|
ChannelWatchSubPath string
|
||||||
|
|
||||||
|
// --- MinIO ---
|
||||||
|
MinioAddress string
|
||||||
|
MinioAccessKeyID string
|
||||||
|
MinioSecretAccessKey string
|
||||||
|
MinioUseSSL bool
|
||||||
|
MinioBucketName string
|
||||||
|
MinioRootPath string
|
||||||
|
|
||||||
// --- Pulsar ---
|
// --- Pulsar ---
|
||||||
PulsarAddress string
|
PulsarAddress string
|
||||||
|
|
||||||
@ -68,6 +77,7 @@ type ParamTable struct {
|
|||||||
UpdatedTime time.Time
|
UpdatedTime time.Time
|
||||||
|
|
||||||
EnableCompaction bool
|
EnableCompaction bool
|
||||||
|
EnableGarbageCollection bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Params is a package scoped variable of type ParamTable.
|
// Params is a package scoped variable of type ParamTable.
|
||||||
@ -108,6 +118,13 @@ func (p *ParamTable) Init() {
|
|||||||
p.initStatsStreamPosSubPath()
|
p.initStatsStreamPosSubPath()
|
||||||
|
|
||||||
p.initEnableCompaction()
|
p.initEnableCompaction()
|
||||||
|
|
||||||
|
p.initMinioAddress()
|
||||||
|
p.initMinioAccessKeyID()
|
||||||
|
p.initMinioSecretAccessKey()
|
||||||
|
p.initMinioUseSSL()
|
||||||
|
p.initMinioBucketName()
|
||||||
|
p.initMinioRootPath()
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitOnce ensures param table is a singleton
|
// InitOnce ensures param table is a singleton
|
||||||
@ -275,3 +292,56 @@ func (p *ParamTable) initChannelWatchPrefix() {
|
|||||||
func (p *ParamTable) initEnableCompaction() {
|
func (p *ParamTable) initEnableCompaction() {
|
||||||
p.EnableCompaction = p.ParseBool("datacoord.enableCompaction", false)
|
p.EnableCompaction = p.ParseBool("datacoord.enableCompaction", false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *ParamTable) initEnableGarbageCollection() {
|
||||||
|
p.EnableGarbageCollection = p.ParseBool("datacoord.enableGarbageCollection", false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- MinIO ---
|
||||||
|
func (p *ParamTable) initMinioAddress() {
|
||||||
|
endpoint, err := p.Load("_MinioAddress")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
p.MinioAddress = endpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ParamTable) initMinioAccessKeyID() {
|
||||||
|
keyID, err := p.Load("_MinioAccessKeyID")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
p.MinioAccessKeyID = keyID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ParamTable) initMinioSecretAccessKey() {
|
||||||
|
key, err := p.Load("_MinioSecretAccessKey")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
p.MinioSecretAccessKey = key
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ParamTable) initMinioUseSSL() {
|
||||||
|
usessl, err := p.Load("_MinioUseSSL")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
p.MinioUseSSL, _ = strconv.ParseBool(usessl)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ParamTable) initMinioBucketName() {
|
||||||
|
bucketName, err := p.Load("_MinioBucketName")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
p.MinioBucketName = bucketName
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ParamTable) initMinioRootPath() {
|
||||||
|
rootPath, err := p.Load("minio.rootPath")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
Params.MinioRootPath = rootPath
|
||||||
|
}
|
||||||
|
|||||||
@ -32,6 +32,8 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||||
"github.com/milvus-io/milvus/internal/util/mqclient"
|
"github.com/milvus-io/milvus/internal/util/mqclient"
|
||||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||||
|
"github.com/minio/minio-go/v7"
|
||||||
|
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||||
@ -105,6 +107,8 @@ type Server struct {
|
|||||||
sessionManager *SessionManager
|
sessionManager *SessionManager
|
||||||
channelManager *ChannelManager
|
channelManager *ChannelManager
|
||||||
rootCoordClient types.RootCoord
|
rootCoordClient types.RootCoord
|
||||||
|
garbageCollector *garbageCollector
|
||||||
|
gcOpt GcOption
|
||||||
|
|
||||||
compactionTrigger trigger
|
compactionTrigger trigger
|
||||||
compactionHandler compactionPlanContext
|
compactionHandler compactionPlanContext
|
||||||
@ -249,6 +253,10 @@ func (s *Server) Start() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = s.initGarbageCollection(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
s.startServerLoop()
|
s.startServerLoop()
|
||||||
Params.CreatedTime = time.Now()
|
Params.CreatedTime = time.Now()
|
||||||
Params.UpdatedTime = time.Now()
|
Params.UpdatedTime = time.Now()
|
||||||
@ -291,6 +299,42 @@ func (s *Server) stopCompactionTrigger() {
|
|||||||
s.compactionTrigger.stop()
|
s.compactionTrigger.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) initGarbageCollection() error {
|
||||||
|
var cli *minio.Client
|
||||||
|
var err error
|
||||||
|
if Params.EnableGarbageCollection {
|
||||||
|
cli, err = minio.New(Params.MinioAddress, &minio.Options{
|
||||||
|
Creds: credentials.NewStaticV4(Params.MinioAccessKeyID, Params.MinioSecretAccessKey, ""),
|
||||||
|
Secure: Params.MinioUseSSL,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
has, err := cli.BucketExists(context.TODO(), Params.MinioBucketName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !has {
|
||||||
|
err = cli.MakeBucket(context.TODO(), Params.MinioBucketName, minio.MakeBucketOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s.garbageCollector = newGarbageCollector(s.meta, GcOption{
|
||||||
|
cli: cli,
|
||||||
|
enabled: Params.EnableGarbageCollection,
|
||||||
|
bucketName: Params.MinioBucketName,
|
||||||
|
rootPath: Params.MinioRootPath,
|
||||||
|
|
||||||
|
checkInterval: defaultGcInterval,
|
||||||
|
missingTolerance: defaultMissingTolerance,
|
||||||
|
dropTolerance: defaultMissingTolerance,
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) initServiceDiscovery() error {
|
func (s *Server) initServiceDiscovery() error {
|
||||||
sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
|
sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -342,6 +386,7 @@ func (s *Server) startServerLoop() {
|
|||||||
s.startDataNodeTtLoop(s.serverLoopCtx)
|
s.startDataNodeTtLoop(s.serverLoopCtx)
|
||||||
s.startWatchService(s.serverLoopCtx)
|
s.startWatchService(s.serverLoopCtx)
|
||||||
s.startFlushLoop(s.serverLoopCtx)
|
s.startFlushLoop(s.serverLoopCtx)
|
||||||
|
s.garbageCollector.start()
|
||||||
go s.session.LivenessCheck(s.serverLoopCtx, func() {
|
go s.session.LivenessCheck(s.serverLoopCtx, func() {
|
||||||
log.Error("Data Coord disconnected from etcd, process will exit", zap.Int64("Server Id", s.session.ServerID))
|
log.Error("Data Coord disconnected from etcd, process will exit", zap.Int64("Server Id", s.session.ServerID))
|
||||||
if err := s.Stop(); err != nil {
|
if err := s.Stop(); err != nil {
|
||||||
@ -651,6 +696,7 @@ func (s *Server) Stop() error {
|
|||||||
}
|
}
|
||||||
log.Debug("dataCoord server shutdown")
|
log.Debug("dataCoord server shutdown")
|
||||||
s.cluster.Close()
|
s.cluster.Close()
|
||||||
|
s.garbageCollector.close()
|
||||||
s.stopServerLoop()
|
s.stopServerLoop()
|
||||||
|
|
||||||
if Params.EnableCompaction {
|
if Params.EnableCompaction {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user