enhance: [2.5] add config for meta batch (#46514)

### **User description**
add a new config to control meta batch to avoid too large

fix: https://github.com/milvus-io/milvus/issues/44569

pr: https://github.com/milvus-io/milvus/pull/44645


___

### **PR Type**
Enhancement


___

### **Description**
- Replace hardcoded `MaxEtcdTxnNum` constant with configurable parameter

- Add new `maxEtcdTxnNum` configuration to `MetaStoreConfig` with
default value 64

- Update all metastore catalog implementations to use dynamic config
value

- Remove hardcoded constant from `util/constant.go` and update test
expectations


___

### Diagram Walkthrough


```mermaid
flowchart LR
  A["Hardcoded MaxEtcdTxnNum<br/>constant 128"] -->|Replace with| B["MetaStoreConfig<br/>maxEtcdTxnNum param"]
  B -->|Default value| C["64 operations<br/>per transaction"]
  B -->|Used by| D["DataCoord<br/>Catalog"]
  B -->|Used by| E["RootCoord<br/>Catalog"]
  B -->|Used by| F["StreamingCoord<br/>Catalog"]
  B -->|Used by| G["StreamingNode<br/>Catalog"]
  B -->|Used by| H["SuffixSnapshot<br/>Catalog"]
```



<details><summary><h3>File Walkthrough</h3></summary>

<table><thead><tr><th></th><th align="left">Relevant
files</th></tr></thead><tbody><tr><td><strong>Enhancement</strong></td><td><details><summary>5
files</summary><table>
<tr>
<td><strong>kv_catalog.go</strong><dd><code>Use configurable
MaxEtcdTxnNum in batch operations</code>&nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; </dd></td>
<td><a
href="https://github.com/milvus-io/milvus/pull/46514/files#diff-21f10b97df37f264c572a5bea752c442a1933f1441f658c90c546740d529d536">+4/-2</a>&nbsp;
&nbsp; &nbsp; </td>

</tr>

<tr>
<td><strong>kv_catalog.go</strong><dd><code>Replace hardcoded constant
with dynamic config parameter</code>&nbsp; </dd></td>
<td><a
href="https://github.com/milvus-io/milvus/pull/46514/files#diff-ca605f818c1903caba7e8fdd022856403889ed63703161028a6cc0005418aa0b">+6/-6</a>&nbsp;
&nbsp; &nbsp; </td>

</tr>

<tr>
<td><strong>suffix_snapshot.go</strong><dd><code>Use dynamic config for
etcd transaction batch limits</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
</dd></td>
<td><a
href="https://github.com/milvus-io/milvus/pull/46514/files#diff-41ca5f1e7439335fbd3c198a612f93fb12268bd94e3dc988117f669e45fe462a">+4/-3</a>&nbsp;
&nbsp; &nbsp; </td>

</tr>

<tr>
<td><strong>kv_catalog.go</strong><dd><code>Replace util constant with
paramtable configuration</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
</dd></td>
<td><a
href="https://github.com/milvus-io/milvus/pull/46514/files#diff-5ddd0cfcc47a07c1f0a5246b63928f018e8267176a6d1d5780712fa986f508c4">+3/-2</a>&nbsp;
&nbsp; &nbsp; </td>

</tr>

<tr>
<td><strong>kv_catalog.go</strong><dd><code>Use configurable
MaxEtcdTxnNum for batch operations</code>&nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; </dd></td>
<td><a
href="https://github.com/milvus-io/milvus/pull/46514/files#diff-4cc762324a9c223f7276776ec29d0476bd70e94eca0e194a1b9e6ee67c7c15f5">+4/-3</a>&nbsp;
&nbsp; &nbsp; </td>

</tr>

</table></details></td></tr><tr><td><strong>Tests</strong></td><td><details><summary>2
files</summary><table>
<tr>
<td><strong>kv_catalog_test.go</strong><dd><code>Update test
expectations for batch operation counts</code>&nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; </dd></td>
<td><a
href="https://github.com/milvus-io/milvus/pull/46514/files#diff-52a66a32833546c9f5a39c02bf3ee2bd58099a1027e84179021c3e1e91afd6e6">+2/-2</a>&nbsp;
&nbsp; &nbsp; </td>

</tr>

<tr>
<td><strong>kv_catalog_test.go</strong><dd><code>Update tests to use
configurable MaxEtcdTxnNum parameter</code>&nbsp; </dd></td>
<td><a
href="https://github.com/milvus-io/milvus/pull/46514/files#diff-bbf08f8d1c8410ed63e07719efe937402b111a27a841f0098552a8a5d8d4574f">+4/-3</a>&nbsp;
&nbsp; &nbsp; </td>

</tr>
</table></details></td></tr><tr><td><strong>Configuration
changes</strong></td><td><details><summary>3 files</summary><table>
<tr>
<td><strong>constant.go</strong><dd><code>Remove hardcoded MaxEtcdTxnNum
constant definition</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; </dd></td>
<td><a
href="https://github.com/milvus-io/milvus/pull/46514/files#diff-9a2143fe538a654bdd5e1e0967e4e547faea75726e569376a6055bb837c6c683">+0/-3</a>&nbsp;
&nbsp; &nbsp; </td>

</tr>

<tr>
<td><strong>service_param.go</strong><dd><code>Add MaxEtcdTxnNum
parameter to MetaStoreConfig</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td>
<td><a
href="https://github.com/milvus-io/milvus/pull/46514/files#diff-9ef00df6bb7232974dc09d15a0ce2719d977163f41789918e0e4ac7fa4742bf0">+10/-0</a>&nbsp;
&nbsp; </td>

</tr>

<tr>
<td><strong>milvus.yaml</strong><dd><code>Add maxEtcdTxnNum
configuration with default value</code>&nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; </dd></td>
<td><a
href="https://github.com/milvus-io/milvus/pull/46514/files#diff-6e254e06f0f065af33ea15a45c6538bdb785c064a70f2cc9c7c7369d80065a06">+1/-0</a>&nbsp;
&nbsp; &nbsp; </td>

</tr>
</table></details></td></tr></tbody></table>

</details>

___

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
Co-authored-by: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com>
Co-authored-by: xiaofanluan <xiaofan.luan@zilliz.com>
This commit is contained in:
yihao.dai 2025-12-24 10:01:19 +08:00 committed by GitHub
parent 44d915a43b
commit ddf19de7f7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 54 additions and 24 deletions

View File

@ -67,6 +67,7 @@ metastore:
snapshot:
ttl: 86400 # snapshot ttl in seconds
reserveTime: 3600 # snapshot reserve time in seconds
maxEtcdTxnNum: 64 # maximum number of operations in a single etcd transaction
# Related configuration of tikv, used to store Milvus metadata.
# Notice that when TiKV is enabled for metastore, you still need to have etcd for service discovery.

View File

@ -338,7 +338,8 @@ func (kc *Catalog) SaveByBatch(ctx context.Context, kvs map[string]string) error
saveFn := func(partialKvs map[string]string) error {
return kc.MetaKv.MultiSave(ctx, partialKvs)
}
err := etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, saveFn)
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
err := etcd.SaveByBatchWithLimit(kvs, maxTxnNum, saveFn)
if err != nil {
log.Ctx(ctx).Error("failed to save by batch", zap.Error(err))
return err
@ -390,7 +391,8 @@ func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*d
saveFn := func(partialKvs map[string]string) error {
return kc.MetaKv.MultiSave(ctx, partialKvs)
}
if err := etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, saveFn); err != nil {
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
if err := etcd.SaveByBatchWithLimit(kvs, maxTxnNum, saveFn); err != nil {
return err
}

View File

@ -455,7 +455,7 @@ func Test_AlterSegments(t *testing.T) {
})
assert.NoError(t, err)
assert.Equal(t, 255+3, len(savedKvs))
assert.Equal(t, 3, opGroupCount)
assert.Equal(t, 5, opGroupCount)
adjustedSeg, err := catalog.LoadFromSegmentPath(context.TODO(), segmentXL.CollectionID, segmentXL.PartitionID, segmentXL.ID)
assert.NoError(t, err)
@ -611,7 +611,7 @@ func Test_SaveDroppedSegmentsInBatch_MultiSave(t *testing.T) {
kvSize = 0
err := catalog.SaveDroppedSegmentsInBatch(context.TODO(), segments2)
assert.NoError(t, err)
assert.Equal(t, 2, count)
assert.Equal(t, 3, count)
assert.Equal(t, 129, kvSize)
}
}

View File

@ -104,7 +104,6 @@ func BuildAliasPrefixWithDB(dbID int64) string {
}
// since SnapshotKV may save both snapshot key and the original key if the original key is newest
// MaxEtcdTxnNum need to divided by 2
func batchMultiSaveAndRemove(ctx context.Context, snapshot kv.SnapShotKV, limit int, saves map[string]string, removals []string, ts typeutil.Timestamp) error {
saveFn := func(partialKvs map[string]string) error {
return snapshot.MultiSave(ctx, partialKvs, ts)
@ -223,8 +222,8 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection,
// Though batchSave is not atomic enough, we can promise the atomicity outside.
// Recovering from failure, if we found collection is creating, we should remove all these related meta.
// since SnapshotKV may save both snapshot key and the original key if the original key is newest
// MaxEtcdTxnNum need to divided by 2
return etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum/2, func(partialKvs map[string]string) error {
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
return etcd.SaveByBatchWithLimit(kvs, maxTxnNum, func(partialKvs map[string]string) error {
return kc.Snapshot.MultiSave(ctx, partialKvs, ts)
})
}
@ -620,8 +619,8 @@ func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Col
// If we found collection under dropping state, we'll know that gc is not completely on this collection.
// However, if we remove collection first, we cannot remove other metas.
// since SnapshotKV may save both snapshot key and the original key if the original key is newest
// MaxEtcdTxnNum need to divided by 2
if err := batchMultiSaveAndRemove(ctx, kc.Snapshot, util.MaxEtcdTxnNum/2, nil, delMetakeysSnap, ts); err != nil {
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
if err := batchMultiSaveAndRemove(ctx, kc.Snapshot, maxTxnNum, nil, delMetakeysSnap, ts); err != nil {
return err
}
@ -671,7 +670,8 @@ func (kc *Catalog) alterModifyCollection(ctx context.Context, oldColl *model.Col
saves[k] = string(v)
}
}
return etcd.SaveByBatchWithLimit(saves, util.MaxEtcdTxnNum/2, func(partialKvs map[string]string) error {
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
return etcd.SaveByBatchWithLimit(saves, maxTxnNum, func(partialKvs map[string]string) error {
return kc.Snapshot.MultiSave(ctx, partialKvs, ts)
})
}

View File

@ -35,9 +35,14 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
func init() {
paramtable.Init()
}
var (
indexName = "idx"
IndexID = 1
@ -975,7 +980,7 @@ func Test_batchMultiSaveAndRemove(t *testing.T) {
return errors.New("error mock MultiSave")
}
saves := map[string]string{"k": "v"}
err := batchMultiSaveAndRemove(context.TODO(), snapshot, util.MaxEtcdTxnNum/2, saves, []string{}, 0)
err := batchMultiSaveAndRemove(context.TODO(), snapshot, paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt(), saves, []string{}, 0)
assert.Error(t, err)
})
t.Run("failed to remove", func(t *testing.T) {
@ -988,7 +993,7 @@ func Test_batchMultiSaveAndRemove(t *testing.T) {
}
saves := map[string]string{"k": "v"}
removals := []string{"prefix1", "prefix2"}
err := batchMultiSaveAndRemove(context.TODO(), snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0)
err := batchMultiSaveAndRemove(context.TODO(), snapshot, paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt(), saves, removals, 0)
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
@ -1009,7 +1014,7 @@ func Test_batchMultiSaveAndRemove(t *testing.T) {
saves[fmt.Sprintf("k%d", i)] = fmt.Sprintf("v%d", i)
removals = append(removals, fmt.Sprintf("k%d", i))
}
err := batchMultiSaveAndRemove(context.TODO(), snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0)
err := batchMultiSaveAndRemove(context.TODO(), snapshot, paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt(), saves, removals, 0)
assert.NoError(t, err)
})
}

View File

@ -33,7 +33,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/kv"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
@ -547,7 +546,8 @@ func (ss *SuffixSnapshot) MultiSaveAndRemove(ctx context.Context, saves map[stri
updateList = append(updateList, removal)
}
err = etcd.SaveByBatchWithLimit(execute, util.MaxEtcdTxnNum, func(partialKvs map[string]string) error {
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
err = etcd.SaveByBatchWithLimit(execute, maxTxnNum, func(partialKvs map[string]string) error {
return ss.MetaKv.MultiSave(ctx, partialKvs)
})
if err == nil {
@ -658,7 +658,8 @@ func (ss *SuffixSnapshot) batchRemoveExpiredKvs(ctx context.Context, keyGroup []
removeFn := func(partialKeys []string) error {
return ss.MetaKv.MultiRemove(ctx, partialKeys)
}
return etcd.RemoveByBatchWithLimit(keyGroup, util.MaxEtcdTxnNum, removeFn)
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
return etcd.RemoveByBatchWithLimit(keyGroup, maxTxnNum, removeFn)
}
// removeExpiredKvs removes expired key-value pairs from the snapshot

View File

@ -10,8 +10,8 @@ import (
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/pkg/v2/kv"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/util"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
// NewCataLog creates a new catalog instance
@ -64,7 +64,8 @@ func (c *catalog) SavePChannels(ctx context.Context, infos []*streamingpb.PChann
}
kvs[key] = string(v)
}
return etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, func(partialKvs map[string]string) error {
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
return etcd.SaveByBatchWithLimit(kvs, maxTxnNum, func(partialKvs map[string]string) error {
return c.metaKV.MultiSave(ctx, partialKvs)
})
}

View File

@ -11,8 +11,13 @@ import (
"github.com/milvus-io/milvus/pkg/v2/mocks/mock_kv"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
func init() {
paramtable.Init()
}
func TestCatalog(t *testing.T) {
kv := mock_kv.NewMockMetaKv(t)

View File

@ -11,8 +11,8 @@ import (
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/pkg/v2/kv"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/util"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
// NewCataLog creates a new streaming-node catalog instance.
@ -80,8 +80,9 @@ func (c *catalog) SaveSegmentAssignments(ctx context.Context, pChannelName strin
kvs[key] = string(data)
}
maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt()
if len(removes) > 0 {
if err := etcd.RemoveByBatchWithLimit(removes, util.MaxEtcdTxnNum, func(partialRemoves []string) error {
if err := etcd.RemoveByBatchWithLimit(removes, maxTxnNum, func(partialRemoves []string) error {
return c.metaKV.MultiRemove(ctx, partialRemoves)
}); err != nil {
return err
@ -89,7 +90,7 @@ func (c *catalog) SaveSegmentAssignments(ctx context.Context, pChannelName strin
}
if len(kvs) > 0 {
return etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, func(partialKvs map[string]string) error {
return etcd.SaveByBatchWithLimit(kvs, maxTxnNum, func(partialKvs map[string]string) error {
return c.metaKV.MultiSave(ctx, partialKvs)
})
}

View File

@ -10,8 +10,13 @@ import (
"github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
func init() {
paramtable.Init()
}
func TestCatalog(t *testing.T) {
kv := mocks.NewMetaKv(t)
k := "p1"

View File

@ -73,8 +73,7 @@ const (
RoleConfigDBName = "db_name"
RoleConfigPrivilege = "privilege"
MaxEtcdTxnNum = 128
GB = 1024 * 1024 * 1024
GB = 1024 * 1024 * 1024
)
const (

View File

@ -947,7 +947,7 @@ Large numeric passwords require double quotes to avoid yaml parsing precision is
p.StorageReadRetryAttempts = ParamItem{
Key: "common.storage.readRetryAttempts",
Version: "2.6.8",
Version: "2.5.25",
DefaultValue: "10",
Doc: "The number of retry attempts for reading from object storage; only retryable errors will trigger a retry.",
Export: false,

View File

@ -462,6 +462,7 @@ type MetaStoreConfig struct {
SnapshotReserveTimeSeconds ParamItem `refreshable:"true"`
PaginationSize ParamItem `refreshable:"true"`
ReadConcurrency ParamItem `refreshable:"true"`
MaxEtcdTxnNum ParamItem `refreshable:"true"`
}
func (p *MetaStoreConfig) Init(base *BaseTable) {
@ -508,6 +509,15 @@ func (p *MetaStoreConfig) Init(base *BaseTable) {
}
p.ReadConcurrency.Init(base.mgr)
p.MaxEtcdTxnNum = ParamItem{
Key: "metastore.maxEtcdTxnNum",
Version: "2.5.25",
DefaultValue: "64",
Doc: `maximum number of operations in a single etcd transaction`,
Export: true,
}
p.MaxEtcdTxnNum.Init(base.mgr)
// TODO: The initialization operation of metadata storage is called in the initialization phase of every node.
// There should be a single initialization operation for meta store, then move the metrics registration to there.
metrics.RegisterMetaType(p.MetaStoreType.GetValue())