enhance: add config for meta batch(#44569) (#44645)

fix: https://github.com/milvus-io/milvus/issues/44569
add a new config to control meta batch to avoid too large

Signed-off-by: xiaofanluan <xiaofan.luan@zilliz.com>
This commit is contained in:
Xiaofan 2025-09-30 17:31:02 +08:00 committed by GitHub
parent c9f01a73cc
commit 7c00f292bc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 43 additions and 27 deletions

View File

@ -67,6 +67,7 @@ metastore:
snapshot:
ttl: 86400 # snapshot ttl in seconds
reserveTime: 3600 # snapshot reserve time in seconds
maxEtcdTxnNum: 64 # maximum number of operations in a single etcd transaction
# Related configuration of tikv, used to store Milvus metadata.
# Notice that when TiKV is enabled for metastore, you still need to have etcd for service discovery.

View File

@ -338,7 +338,8 @@ func (kc *Catalog) SaveByBatch(ctx context.Context, kvs map[string]string) error
saveFn := func(partialKvs map[string]string) error {
return kc.MetaKv.MultiSave(ctx, partialKvs)
}
err := etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, saveFn)
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
err := etcd.SaveByBatchWithLimit(kvs, maxTxnNum, saveFn)
if err != nil {
log.Ctx(ctx).Error("failed to save by batch", zap.Error(err))
return err
@ -390,7 +391,8 @@ func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*d
saveFn := func(partialKvs map[string]string) error {
return kc.MetaKv.MultiSave(ctx, partialKvs)
}
if err := etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, saveFn); err != nil {
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
if err := etcd.SaveByBatchWithLimit(kvs, maxTxnNum, saveFn); err != nil {
return err
}

View File

@ -455,7 +455,7 @@ func Test_AlterSegments(t *testing.T) {
})
assert.NoError(t, err)
assert.Equal(t, 255+3, len(savedKvs))
assert.Equal(t, 3, opGroupCount)
assert.Equal(t, 5, opGroupCount)
adjustedSeg, err := catalog.LoadFromSegmentPath(context.TODO(), segmentXL.CollectionID, segmentXL.PartitionID, segmentXL.ID)
assert.NoError(t, err)
@ -611,7 +611,7 @@ func Test_SaveDroppedSegmentsInBatch_MultiSave(t *testing.T) {
kvSize = 0
err := catalog.SaveDroppedSegmentsInBatch(context.TODO(), segments2)
assert.NoError(t, err)
assert.Equal(t, 2, count)
assert.Equal(t, 3, count)
assert.Equal(t, 129, kvSize)
}
}

View File

@ -113,7 +113,6 @@ func BuildAliasPrefixWithDB(dbID int64) string {
}
// since SnapshotKV may save both snapshot key and the original key if the original key is newest
// MaxEtcdTxnNum need to divided by 2
func batchMultiSaveAndRemove(ctx context.Context, snapshot kv.SnapShotKV, limit int, saves map[string]string, removals []string, ts typeutil.Timestamp) error {
saveFn := func(partialKvs map[string]string) error {
return snapshot.MultiSave(ctx, partialKvs, ts)
@ -243,8 +242,8 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection,
// Though batchSave is not atomic enough, we can promise the atomicity outside.
// Recovering from failure, if we found collection is creating, we should remove all these related meta.
// since SnapshotKV may save both snapshot key and the original key if the original key is newest
// MaxEtcdTxnNum need to divided by 2
return etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum/2, func(partialKvs map[string]string) error {
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
return etcd.SaveByBatchWithLimit(kvs, maxTxnNum, func(partialKvs map[string]string) error {
return kc.Snapshot.MultiSave(ctx, partialKvs, ts)
})
}
@ -667,8 +666,8 @@ func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Col
// If we found collection under dropping state, we'll know that gc is not completely on this collection.
// However, if we remove collection first, we cannot remove other metas.
// since SnapshotKV may save both snapshot key and the original key if the original key is newest
// MaxEtcdTxnNum need to divided by 2
if err := batchMultiSaveAndRemove(ctx, kc.Snapshot, util.MaxEtcdTxnNum/2, nil, delMetakeysSnap, ts); err != nil {
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
if err := batchMultiSaveAndRemove(ctx, kc.Snapshot, maxTxnNum, nil, delMetakeysSnap, ts); err != nil {
return err
}
@ -730,7 +729,8 @@ func (kc *Catalog) alterModifyCollection(ctx context.Context, oldColl *model.Col
}
}
return etcd.SaveByBatchWithLimit(saves, util.MaxEtcdTxnNum/2, func(partialKvs map[string]string) error {
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
return etcd.SaveByBatchWithLimit(saves, maxTxnNum, func(partialKvs map[string]string) error {
return kc.Snapshot.MultiSave(ctx, partialKvs, ts)
})
}

View File

@ -35,6 +35,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -987,7 +988,7 @@ func Test_batchMultiSaveAndRemove(t *testing.T) {
return errors.New("error mock MultiSave")
}
saves := map[string]string{"k": "v"}
err := batchMultiSaveAndRemove(context.TODO(), snapshot, util.MaxEtcdTxnNum/2, saves, []string{}, 0)
err := batchMultiSaveAndRemove(context.TODO(), snapshot, paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt(), saves, []string{}, 0)
assert.Error(t, err)
})
t.Run("failed to remove", func(t *testing.T) {
@ -1000,7 +1001,7 @@ func Test_batchMultiSaveAndRemove(t *testing.T) {
}
saves := map[string]string{"k": "v"}
removals := []string{"prefix1", "prefix2"}
err := batchMultiSaveAndRemove(context.TODO(), snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0)
err := batchMultiSaveAndRemove(context.TODO(), snapshot, paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt(), saves, removals, 0)
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
@ -1021,7 +1022,7 @@ func Test_batchMultiSaveAndRemove(t *testing.T) {
saves[fmt.Sprintf("k%d", i)] = fmt.Sprintf("v%d", i)
removals = append(removals, fmt.Sprintf("k%d", i))
}
err := batchMultiSaveAndRemove(context.TODO(), snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0)
err := batchMultiSaveAndRemove(context.TODO(), snapshot, paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt(), saves, removals, 0)
assert.NoError(t, err)
})
}

View File

@ -33,7 +33,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/kv"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
@ -547,7 +546,8 @@ func (ss *SuffixSnapshot) MultiSaveAndRemove(ctx context.Context, saves map[stri
updateList = append(updateList, removal)
}
err = etcd.SaveByBatchWithLimit(execute, util.MaxEtcdTxnNum, func(partialKvs map[string]string) error {
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
err = etcd.SaveByBatchWithLimit(execute, maxTxnNum, func(partialKvs map[string]string) error {
return ss.MetaKv.MultiSave(ctx, partialKvs)
})
if err == nil {
@ -658,7 +658,8 @@ func (ss *SuffixSnapshot) batchRemoveExpiredKvs(ctx context.Context, keyGroup []
removeFn := func(partialKeys []string) error {
return ss.MetaKv.MultiRemove(ctx, partialKeys)
}
return etcd.RemoveByBatchWithLimit(keyGroup, util.MaxEtcdTxnNum, removeFn)
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
return etcd.RemoveByBatchWithLimit(keyGroup, maxTxnNum, removeFn)
}
// removeExpiredKvs removes expired key-value pairs from the snapshot

View File

@ -11,9 +11,9 @@ import (
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/pkg/v2/kv"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/util"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
// NewCataLog creates a new catalog instance
@ -134,7 +134,8 @@ func (c *catalog) SavePChannels(ctx context.Context, infos []*streamingpb.PChann
}
kvs[key] = string(v)
}
return etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, func(partialKvs map[string]string) error {
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
return etcd.SaveByBatchWithLimit(kvs, maxTxnNum, func(partialKvs map[string]string) error {
return c.metaKV.MultiSave(ctx, partialKvs)
})
}
@ -195,7 +196,8 @@ func (c *catalog) SaveReplicateConfiguration(ctx context.Context, config *stream
}
kvs[key] = string(v)
}
return etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, func(partialKvs map[string]string) error {
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
return etcd.SaveByBatchWithLimit(kvs, maxTxnNum, func(partialKvs map[string]string) error {
return c.metaKV.MultiSave(ctx, partialKvs)
})
}

View File

@ -14,9 +14,9 @@ import (
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/pkg/v2/kv"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/util"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -131,15 +131,16 @@ func (c *catalog) SaveVChannels(ctx context.Context, pchannelName string, vchann
}
// TODO: We should perform a remove and save as a transaction but current the kv interface doesn't support it.
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
if len(removes) > 0 {
if err := etcd.RemoveByBatchWithLimit(removes, util.MaxEtcdTxnNum, func(partialRemoves []string) error {
if err := etcd.RemoveByBatchWithLimit(removes, maxTxnNum, func(partialRemoves []string) error {
return c.metaKV.MultiRemove(ctx, partialRemoves)
}); err != nil {
return err
}
}
if len(kvs) > 0 {
return etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, func(partialKvs map[string]string) error {
return etcd.SaveByBatchWithLimit(kvs, maxTxnNum, func(partialKvs map[string]string) error {
return c.metaKV.MultiSave(ctx, partialKvs)
})
}
@ -227,8 +228,9 @@ func (c *catalog) SaveSegmentAssignments(ctx context.Context, pChannelName strin
kvs[key] = string(data)
}
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
if len(removes) > 0 {
if err := etcd.RemoveByBatchWithLimit(removes, util.MaxEtcdTxnNum, func(partialRemoves []string) error {
if err := etcd.RemoveByBatchWithLimit(removes, maxTxnNum, func(partialRemoves []string) error {
return c.metaKV.MultiRemove(ctx, partialRemoves)
}); err != nil {
return err
@ -236,7 +238,7 @@ func (c *catalog) SaveSegmentAssignments(ctx context.Context, pChannelName strin
}
if len(kvs) > 0 {
return etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, func(partialKvs map[string]string) error {
return etcd.SaveByBatchWithLimit(kvs, maxTxnNum, func(partialKvs map[string]string) error {
return c.metaKV.MultiSave(ctx, partialKvs)
})
}

View File

@ -75,9 +75,6 @@ const (
RoleConfigObjectName = "object_name"
RoleConfigDBName = "db_name"
RoleConfigPrivilege = "privilege"
MaxEtcdTxnNum = 128
GB = 1024 * 1024 * 1024
)
var (

View File

@ -479,6 +479,7 @@ type MetaStoreConfig struct {
SnapshotReserveTimeSeconds ParamItem `refreshable:"true"`
PaginationSize ParamItem `refreshable:"true"`
ReadConcurrency ParamItem `refreshable:"true"`
MaxEtcdTxnNum ParamItem `refreshable:"true"`
}
func (p *MetaStoreConfig) Init(base *BaseTable) {
@ -525,6 +526,15 @@ func (p *MetaStoreConfig) Init(base *BaseTable) {
}
p.ReadConcurrency.Init(base.mgr)
p.MaxEtcdTxnNum = ParamItem{
Key: "metastore.maxEtcdTxnNum",
Version: "2.6.3",
DefaultValue: "64",
Doc: `maximum number of operations in a single etcd transaction`,
Export: true,
}
p.MaxEtcdTxnNum.Init(base.mgr)
// TODO: The initialization operation of metadata storage is called in the initialization phase of every node.
// There should be a single initialization operation for meta store, then move the metrics registration to there.
metrics.RegisterMetaType(p.MetaStoreType.GetValue())