enhance: Speed up meta recovery (#38285)

Increase the batchSize in WalkWithPrefix operations to 10000.

issue: https://github.com/milvus-io/milvus/issues/37630

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2024-12-12 10:58:43 +08:00 committed by GitHub
parent 304cdc7783
commit a514f839b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 58 additions and 32 deletions

View File

@ -44,19 +44,25 @@ import (
"github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/pkg/util/typeutil"
) )
var paginationSize = 2000
type Catalog struct { type Catalog struct {
MetaKv kv.MetaKv MetaKv kv.MetaKv
paginationSize int
ChunkManagerRootPath string ChunkManagerRootPath string
metaRootpath string metaRootpath string
} }
func NewCatalog(MetaKv kv.MetaKv, chunkManagerRootPath string, metaRootpath string) *Catalog { func NewCatalog(MetaKv kv.MetaKv, chunkManagerRootPath string, metaRootpath string) *Catalog {
return &Catalog{MetaKv: MetaKv, ChunkManagerRootPath: chunkManagerRootPath, metaRootpath: metaRootpath} return &Catalog{
MetaKv: MetaKv,
paginationSize: paramtable.Get().MetaStoreCfg.PaginationSize.GetAsInt(),
ChunkManagerRootPath: chunkManagerRootPath,
metaRootpath: metaRootpath,
}
} }
func (kc *Catalog) ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, error) { func (kc *Catalog) ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, error) {
@ -130,7 +136,7 @@ func (kc *Catalog) listSegments(ctx context.Context) ([]*datapb.SegmentInfo, err
return nil return nil
} }
err := kc.MetaKv.WalkWithPrefix(ctx, SegmentPrefix+"/", paginationSize, applyFn) err := kc.MetaKv.WalkWithPrefix(ctx, SegmentPrefix+"/", kc.paginationSize, applyFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -215,7 +221,7 @@ func (kc *Catalog) listBinlogs(ctx context.Context, binlogType storage.BinlogTyp
return nil return nil
} }
err = kc.MetaKv.WalkWithPrefix(ctx, logPathPrefix, paginationSize, applyFn) err = kc.MetaKv.WalkWithPrefix(ctx, logPathPrefix, kc.paginationSize, applyFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -483,7 +489,7 @@ func (kc *Catalog) ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb
return nil return nil
} }
err := kc.MetaKv.WalkWithPrefix(ctx, ChannelCheckpointPrefix, paginationSize, applyFn) err := kc.MetaKv.WalkWithPrefix(ctx, ChannelCheckpointPrefix, kc.paginationSize, applyFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -568,7 +574,7 @@ func (kc *Catalog) ListIndexes(ctx context.Context) ([]*model.Index, error) {
return nil return nil
} }
err := kc.MetaKv.WalkWithPrefix(ctx, util.FieldIndexPrefix, paginationSize, applyFn) err := kc.MetaKv.WalkWithPrefix(ctx, util.FieldIndexPrefix, kc.paginationSize, applyFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -644,7 +650,7 @@ func (kc *Catalog) ListSegmentIndexes(ctx context.Context) ([]*model.SegmentInde
return nil return nil
} }
err := kc.MetaKv.WalkWithPrefix(ctx, util.SegmentIndexPrefix, paginationSize, applyFn) err := kc.MetaKv.WalkWithPrefix(ctx, util.SegmentIndexPrefix, kc.paginationSize, applyFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -698,7 +704,7 @@ func (kc *Catalog) ListImportJobs(ctx context.Context) ([]*datapb.ImportJob, err
return nil return nil
} }
err := kc.MetaKv.WalkWithPrefix(ctx, ImportJobPrefix, paginationSize, applyFn) err := kc.MetaKv.WalkWithPrefix(ctx, ImportJobPrefix, kc.paginationSize, applyFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -732,7 +738,7 @@ func (kc *Catalog) ListPreImportTasks(ctx context.Context) ([]*datapb.PreImportT
return nil return nil
} }
err := kc.MetaKv.WalkWithPrefix(ctx, PreImportTaskPrefix, paginationSize, applyFn) err := kc.MetaKv.WalkWithPrefix(ctx, PreImportTaskPrefix, kc.paginationSize, applyFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -766,7 +772,7 @@ func (kc *Catalog) ListImportTasks(ctx context.Context) ([]*datapb.ImportTaskV2,
return nil return nil
} }
err := kc.MetaKv.WalkWithPrefix(ctx, ImportTaskPrefix, paginationSize, applyFn) err := kc.MetaKv.WalkWithPrefix(ctx, ImportTaskPrefix, kc.paginationSize, applyFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -806,7 +812,7 @@ func (kc *Catalog) ListCompactionTask(ctx context.Context) ([]*datapb.Compaction
return nil return nil
} }
err := kc.MetaKv.WalkWithPrefix(ctx, CompactionTaskPrefix, paginationSize, applyFn) err := kc.MetaKv.WalkWithPrefix(ctx, CompactionTaskPrefix, kc.paginationSize, applyFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -845,7 +851,7 @@ func (kc *Catalog) ListAnalyzeTasks(ctx context.Context) ([]*indexpb.AnalyzeTask
return nil return nil
} }
err := kc.MetaKv.WalkWithPrefix(ctx, AnalyzeTaskPrefix, paginationSize, applyFn) err := kc.MetaKv.WalkWithPrefix(ctx, AnalyzeTaskPrefix, kc.paginationSize, applyFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -885,7 +891,7 @@ func (kc *Catalog) ListPartitionStatsInfos(ctx context.Context) ([]*datapb.Parti
return nil return nil
} }
err := kc.MetaKv.WalkWithPrefix(ctx, PartitionStatsInfoPrefix, paginationSize, applyFn) err := kc.MetaKv.WalkWithPrefix(ctx, PartitionStatsInfoPrefix, kc.paginationSize, applyFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -948,7 +954,7 @@ func (kc *Catalog) ListStatsTasks(ctx context.Context) ([]*indexpb.StatsTask, er
return nil return nil
} }
err := kc.MetaKv.WalkWithPrefix(ctx, StatsTaskPrefix, paginationSize, applyFn) err := kc.MetaKv.WalkWithPrefix(ctx, StatsTaskPrefix, kc.paginationSize, applyFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -17,10 +17,9 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/util/compressor" "github.com/milvus-io/milvus/pkg/util/compressor"
"github.com/milvus-io/milvus/pkg/util/paramtable"
) )
var paginationSize = 2000
var ErrInvalidKey = errors.New("invalid load info key") var ErrInvalidKey = errors.New("invalid load info key")
const ( const (
@ -36,12 +35,14 @@ const (
) )
type Catalog struct { type Catalog struct {
cli kv.MetaKv cli kv.MetaKv
paginationSize int
} }
func NewCatalog(cli kv.MetaKv) Catalog { func NewCatalog(cli kv.MetaKv) Catalog {
return Catalog{ return Catalog{
cli: cli, cli: cli,
paginationSize: paramtable.Get().MetaStoreCfg.PaginationSize.GetAsInt(),
} }
} }
@ -117,7 +118,7 @@ func (s Catalog) GetCollections(ctx context.Context) ([]*querypb.CollectionLoadI
return nil return nil
} }
err := s.cli.WalkWithPrefix(ctx, CollectionLoadInfoPrefix, paginationSize, applyFn) err := s.cli.WalkWithPrefix(ctx, CollectionLoadInfoPrefix, s.paginationSize, applyFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -136,7 +137,7 @@ func (s Catalog) GetPartitions(ctx context.Context) (map[int64][]*querypb.Partit
return nil return nil
} }
err := s.cli.WalkWithPrefix(ctx, PartitionLoadInfoPrefix, paginationSize, applyFn) err := s.cli.WalkWithPrefix(ctx, PartitionLoadInfoPrefix, s.paginationSize, applyFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -155,7 +156,7 @@ func (s Catalog) GetReplicas(ctx context.Context) ([]*querypb.Replica, error) {
return nil return nil
} }
err := s.cli.WalkWithPrefix(ctx, ReplicaPrefix, paginationSize, applyFn) err := s.cli.WalkWithPrefix(ctx, ReplicaPrefix, s.paginationSize, applyFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -318,7 +319,7 @@ func (s Catalog) GetCollectionTargets(ctx context.Context) (map[int64]*querypb.C
return nil return nil
} }
err := s.cli.WalkWithPrefix(ctx, CollectionTargetPrefix, paginationSize, applyFn) err := s.cli.WalkWithPrefix(ctx, CollectionTargetPrefix, s.paginationSize, applyFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -42,11 +42,8 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/pkg/util/typeutil"
) )
var ( // SuffixSnapshotTombstone special value for tombstone mark
// SuffixSnapshotTombstone special value for tombstone mark var SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC}
SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC}
PaginationSize = 5000
)
// IsTombstone used in migration tool also. // IsTombstone used in migration tool also.
func IsTombstone(value string) bool { func IsTombstone(value string) bool {
@ -84,6 +81,8 @@ type SuffixSnapshot struct {
// snapshotLen pre calculated offset when parsing snapshot key // snapshotLen pre calculated offset when parsing snapshot key
snapshotLen int snapshotLen int
paginationSize int
closeGC chan struct{} closeGC chan struct{}
} }
@ -118,6 +117,7 @@ func NewSuffixSnapshot(metaKV kv.MetaKv, sep, root, snapshot string) (*SuffixSna
snapshotLen: snapshotLen, snapshotLen: snapshotLen,
rootPrefix: root, rootPrefix: root,
rootLen: rootLen, rootLen: rootLen,
paginationSize: paramtable.Get().MetaStoreCfg.PaginationSize.GetAsInt(),
closeGC: make(chan struct{}, 1), closeGC: make(chan struct{}, 1),
} }
go ss.startBackgroundGC(context.TODO()) go ss.startBackgroundGC(context.TODO())
@ -449,7 +449,7 @@ func (ss *SuffixSnapshot) LoadWithPrefix(ctx context.Context, key string, ts typ
return nil return nil
} }
err := ss.MetaKv.WalkWithPrefix(ctx, key, PaginationSize, applyFn) err := ss.MetaKv.WalkWithPrefix(ctx, key, ss.paginationSize, applyFn)
return fks, fvs, err return fks, fvs, err
} }
ss.Lock() ss.Lock()
@ -472,7 +472,7 @@ func (ss *SuffixSnapshot) LoadWithPrefix(ctx context.Context, key string, ts typ
resultValues = append(resultValues, value) resultValues = append(resultValues, value)
} }
err := ss.MetaKv.WalkWithPrefix(ctx, prefix, PaginationSize, func(k []byte, v []byte) error { err := ss.MetaKv.WalkWithPrefix(ctx, prefix, ss.paginationSize, func(k []byte, v []byte) error {
sKey := string(k) sKey := string(k)
sValue := string(v) sValue := string(v)
@ -693,7 +693,7 @@ func (ss *SuffixSnapshot) removeExpiredKvs(ctx context.Context, now time.Time) e
} }
// Walk through all keys with the snapshot prefix // Walk through all keys with the snapshot prefix
err := ss.MetaKv.WalkWithPrefix(ctx, ss.snapshotPrefix, PaginationSize, func(k []byte, v []byte) error { err := ss.MetaKv.WalkWithPrefix(ctx, ss.snapshotPrefix, ss.paginationSize, func(k []byte, v []byte) error {
key := ss.hideRootPrefix(string(k)) key := ss.hideRootPrefix(string(k))
ts, ok := ss.isTSKey(key) ts, ok := ss.isTSKey(key)
if !ok { if !ok {

View File

@ -458,6 +458,7 @@ type MetaStoreConfig struct {
MetaStoreType ParamItem `refreshable:"false"` MetaStoreType ParamItem `refreshable:"false"`
SnapshotTTLSeconds ParamItem `refreshable:"true"` SnapshotTTLSeconds ParamItem `refreshable:"true"`
SnapshotReserveTimeSeconds ParamItem `refreshable:"true"` SnapshotReserveTimeSeconds ParamItem `refreshable:"true"`
PaginationSize ParamItem `refreshable:"true"`
} }
func (p *MetaStoreConfig) Init(base *BaseTable) { func (p *MetaStoreConfig) Init(base *BaseTable) {
@ -488,6 +489,14 @@ func (p *MetaStoreConfig) Init(base *BaseTable) {
} }
p.SnapshotReserveTimeSeconds.Init(base.mgr) p.SnapshotReserveTimeSeconds.Init(base.mgr)
p.PaginationSize = ParamItem{
Key: "metastore.paginationSize",
Version: "2.5.1",
DefaultValue: "10000",
Doc: `limits the number of results to return from metastore.`,
}
p.PaginationSize.Init(base.mgr)
// TODO: The initialization operation of metadata storage is called in the initialization phase of every node. // TODO: The initialization operation of metadata storage is called in the initialization phase of every node.
// There should be a single initialization operation for meta store, then move the metrics registration to there. // There should be a single initialization operation for meta store, then move the metrics registration to there.
metrics.RegisterMetaType(p.MetaStoreType.GetValue()) metrics.RegisterMetaType(p.MetaStoreType.GetValue())

View File

@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/metricsinfo"
) )
@ -210,4 +211,13 @@ func TestServiceParam(t *testing.T) {
t.Logf("Minio rootpath = %s", Params.RootPath.GetValue()) t.Logf("Minio rootpath = %s", Params.RootPath.GetValue())
}) })
t.Run("test metastore config", func(t *testing.T) {
Params := &SParams.MetaStoreCfg
assert.Equal(t, util.MetaStoreTypeEtcd, Params.MetaStoreType.GetValue())
assert.Equal(t, 86400*time.Second, Params.SnapshotTTLSeconds.GetAsDuration(time.Second))
assert.Equal(t, 3600*time.Second, Params.SnapshotReserveTimeSeconds.GetAsDuration(time.Second))
assert.Equal(t, 10000, Params.PaginationSize.GetAsInt())
})
} }