From ddf19de7f7236b9bf9140ccfff8e1059249a79d1 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Wed, 24 Dec 2025 10:01:19 +0800 Subject: [PATCH] enhance: [2.5] add config for meta batch (#46514) ### **User description** add a new config to control meta batch to avoid too large fix: https://github.com/milvus-io/milvus/issues/44569 pr: https://github.com/milvus-io/milvus/pull/44645 ___ ### **PR Type** Enhancement ___ ### **Description** - Replace hardcoded `MaxEtcdTxnNum` constant with configurable parameter - Add new `maxEtcdTxnNum` configuration to `MetaStoreConfig` with default value 64 - Update all metastore catalog implementations to use dynamic config value - Remove hardcoded constant from `util/constant.go` and update test expectations ___ ### Diagram Walkthrough ```mermaid flowchart LR A["Hardcoded MaxEtcdTxnNum
constant 128"] -->|Replace with| B["MetaStoreConfig
maxEtcdTxnNum param"] B -->|Default value| C["64 operations
per transaction"] B -->|Used by| D["DataCoord
Catalog"] B -->|Used by| E["RootCoord
Catalog"] B -->|Used by| F["StreamingCoord
Catalog"] B -->|Used by| G["StreamingNode
Catalog"] B -->|Used by| H["SuffixSnapshot
Catalog"] ```

File Walkthrough

Relevant files
Enhancement
5 files
kv_catalog.go
Use configurable MaxEtcdTxnNum in batch operations             
+4/-2     
kv_catalog.go
Replace hardcoded constant with dynamic config parameter 
+6/-6     
suffix_snapshot.go
Use dynamic config for etcd transaction batch limits         
+4/-3     
kv_catalog.go
Replace util constant with paramtable configuration           
+3/-2     
kv_catalog.go
Use configurable MaxEtcdTxnNum for batch operations           
+4/-3     
Tests
2 files
kv_catalog_test.go
Update test expectations for batch operation counts           
+2/-2     
kv_catalog_test.go
Update tests to use configurable MaxEtcdTxnNum parameter 
+4/-3     
Configuration changes
3 files
constant.go
Remove hardcoded MaxEtcdTxnNum constant definition             
+0/-3     
service_param.go
Add MaxEtcdTxnNum parameter to MetaStoreConfig                     
+10/-0   
milvus.yaml
Add maxEtcdTxnNum configuration with default value             
+1/-0     
___ --------- Signed-off-by: bigsheeper Co-authored-by: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Co-authored-by: xiaofanluan --- configs/milvus.yaml | 1 + internal/metastore/kv/datacoord/kv_catalog.go | 6 ++++-- internal/metastore/kv/datacoord/kv_catalog_test.go | 4 ++-- internal/metastore/kv/rootcoord/kv_catalog.go | 12 ++++++------ internal/metastore/kv/rootcoord/kv_catalog_test.go | 11 ++++++++--- internal/metastore/kv/rootcoord/suffix_snapshot.go | 7 ++++--- internal/metastore/kv/streamingcoord/kv_catalog.go | 5 +++-- .../metastore/kv/streamingcoord/kv_catalog_test.go | 5 +++++ internal/metastore/kv/streamingnode/kv_catalog.go | 7 ++++--- .../metastore/kv/streamingnode/kv_catalog_test.go | 5 +++++ pkg/util/constant.go | 3 +-- pkg/util/paramtable/component_param.go | 2 +- pkg/util/paramtable/service_param.go | 10 ++++++++++ 13 files changed, 54 insertions(+), 24 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 33abf6b060..727abbc41e 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -67,6 +67,7 @@ metastore: snapshot: ttl: 86400 # snapshot ttl in seconds reserveTime: 3600 # snapshot reserve time in seconds + maxEtcdTxnNum: 64 # maximum number of operations in a single etcd transaction # Related configuration of tikv, used to store Milvus metadata. # Notice that when TiKV is enabled for metastore, you still need to have etcd for service discovery. diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index e9bc357c8c..01ea86172d 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -338,7 +338,8 @@ func (kc *Catalog) SaveByBatch(ctx context.Context, kvs map[string]string) error saveFn := func(partialKvs map[string]string) error { return kc.MetaKv.MultiSave(ctx, partialKvs) } - err := etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, saveFn) + maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt() + err := etcd.SaveByBatchWithLimit(kvs, maxTxnNum, saveFn) if err != nil { log.Ctx(ctx).Error("failed to save by batch", zap.Error(err)) return err @@ -390,7 +391,8 @@ func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*d saveFn := func(partialKvs map[string]string) error { return kc.MetaKv.MultiSave(ctx, partialKvs) } - if err := etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, saveFn); err != nil { + maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt() + if err := etcd.SaveByBatchWithLimit(kvs, maxTxnNum, saveFn); err != nil { return err } diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index 853f77b145..f8996132db 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -455,7 +455,7 @@ func Test_AlterSegments(t *testing.T) { }) assert.NoError(t, err) assert.Equal(t, 255+3, len(savedKvs)) - assert.Equal(t, 3, opGroupCount) + assert.Equal(t, 5, opGroupCount) adjustedSeg, err := catalog.LoadFromSegmentPath(context.TODO(), segmentXL.CollectionID, segmentXL.PartitionID, segmentXL.ID) assert.NoError(t, err) @@ -611,7 +611,7 @@ func Test_SaveDroppedSegmentsInBatch_MultiSave(t *testing.T) { kvSize = 0 err := catalog.SaveDroppedSegmentsInBatch(context.TODO(), segments2) assert.NoError(t, err) - assert.Equal(t, 2, count) + assert.Equal(t, 3, count) assert.Equal(t, 129, kvSize) } } diff --git a/internal/metastore/kv/rootcoord/kv_catalog.go b/internal/metastore/kv/rootcoord/kv_catalog.go index 037c19f802..b2b5a0158c 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog.go +++ b/internal/metastore/kv/rootcoord/kv_catalog.go @@ -104,7 +104,6 @@ func BuildAliasPrefixWithDB(dbID int64) string { } // since SnapshotKV may save both snapshot key and the original key if the original key is newest -// MaxEtcdTxnNum need to divided by 2 func batchMultiSaveAndRemove(ctx context.Context, snapshot kv.SnapShotKV, limit int, saves map[string]string, removals []string, ts typeutil.Timestamp) error { saveFn := func(partialKvs map[string]string) error { return snapshot.MultiSave(ctx, partialKvs, ts) @@ -223,8 +222,8 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, // Though batchSave is not atomic enough, we can promise the atomicity outside. // Recovering from failure, if we found collection is creating, we should remove all these related meta. // since SnapshotKV may save both snapshot key and the original key if the original key is newest - // MaxEtcdTxnNum need to divided by 2 - return etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum/2, func(partialKvs map[string]string) error { + maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt() + return etcd.SaveByBatchWithLimit(kvs, maxTxnNum, func(partialKvs map[string]string) error { return kc.Snapshot.MultiSave(ctx, partialKvs, ts) }) } @@ -620,8 +619,8 @@ func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Col // If we found collection under dropping state, we'll know that gc is not completely on this collection. // However, if we remove collection first, we cannot remove other metas. // since SnapshotKV may save both snapshot key and the original key if the original key is newest - // MaxEtcdTxnNum need to divided by 2 - if err := batchMultiSaveAndRemove(ctx, kc.Snapshot, util.MaxEtcdTxnNum/2, nil, delMetakeysSnap, ts); err != nil { + maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt() + if err := batchMultiSaveAndRemove(ctx, kc.Snapshot, maxTxnNum, nil, delMetakeysSnap, ts); err != nil { return err } @@ -671,7 +670,8 @@ func (kc *Catalog) alterModifyCollection(ctx context.Context, oldColl *model.Col saves[k] = string(v) } } - return etcd.SaveByBatchWithLimit(saves, util.MaxEtcdTxnNum/2, func(partialKvs map[string]string) error { + maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt() + return etcd.SaveByBatchWithLimit(saves, maxTxnNum, func(partialKvs map[string]string) error { return kc.Snapshot.MultiSave(ctx, partialKvs, ts) }) } diff --git a/internal/metastore/kv/rootcoord/kv_catalog_test.go b/internal/metastore/kv/rootcoord/kv_catalog_test.go index eb6021d46c..48a22466fd 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog_test.go +++ b/internal/metastore/kv/rootcoord/kv_catalog_test.go @@ -35,9 +35,14 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/etcd" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) +func init() { + paramtable.Init() +} + var ( indexName = "idx" IndexID = 1 @@ -975,7 +980,7 @@ func Test_batchMultiSaveAndRemove(t *testing.T) { return errors.New("error mock MultiSave") } saves := map[string]string{"k": "v"} - err := batchMultiSaveAndRemove(context.TODO(), snapshot, util.MaxEtcdTxnNum/2, saves, []string{}, 0) + err := batchMultiSaveAndRemove(context.TODO(), snapshot, paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt(), saves, []string{}, 0) assert.Error(t, err) }) t.Run("failed to remove", func(t *testing.T) { @@ -988,7 +993,7 @@ func Test_batchMultiSaveAndRemove(t *testing.T) { } saves := map[string]string{"k": "v"} removals := []string{"prefix1", "prefix2"} - err := batchMultiSaveAndRemove(context.TODO(), snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0) + err := batchMultiSaveAndRemove(context.TODO(), snapshot, paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt(), saves, removals, 0) assert.Error(t, err) }) t.Run("normal case", func(t *testing.T) { @@ -1009,7 +1014,7 @@ func Test_batchMultiSaveAndRemove(t *testing.T) { saves[fmt.Sprintf("k%d", i)] = fmt.Sprintf("v%d", i) removals = append(removals, fmt.Sprintf("k%d", i)) } - err := batchMultiSaveAndRemove(context.TODO(), snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0) + err := batchMultiSaveAndRemove(context.TODO(), snapshot, paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt(), saves, removals, 0) assert.NoError(t, err) }) } diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot.go b/internal/metastore/kv/rootcoord/suffix_snapshot.go index 3cecf653d7..324d665b2c 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot.go @@ -33,7 +33,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/kv" "github.com/milvus-io/milvus/pkg/v2/log" - "github.com/milvus-io/milvus/pkg/v2/util" "github.com/milvus-io/milvus/pkg/v2/util/etcd" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" @@ -547,7 +546,8 @@ func (ss *SuffixSnapshot) MultiSaveAndRemove(ctx context.Context, saves map[stri updateList = append(updateList, removal) } - err = etcd.SaveByBatchWithLimit(execute, util.MaxEtcdTxnNum, func(partialKvs map[string]string) error { + maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt() + err = etcd.SaveByBatchWithLimit(execute, maxTxnNum, func(partialKvs map[string]string) error { return ss.MetaKv.MultiSave(ctx, partialKvs) }) if err == nil { @@ -658,7 +658,8 @@ func (ss *SuffixSnapshot) batchRemoveExpiredKvs(ctx context.Context, keyGroup [] removeFn := func(partialKeys []string) error { return ss.MetaKv.MultiRemove(ctx, partialKeys) } - return etcd.RemoveByBatchWithLimit(keyGroup, util.MaxEtcdTxnNum, removeFn) + maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt() + return etcd.RemoveByBatchWithLimit(keyGroup, maxTxnNum, removeFn) } // removeExpiredKvs removes expired key-value pairs from the snapshot diff --git a/internal/metastore/kv/streamingcoord/kv_catalog.go b/internal/metastore/kv/streamingcoord/kv_catalog.go index c7c2f3b013..c469756930 100644 --- a/internal/metastore/kv/streamingcoord/kv_catalog.go +++ b/internal/metastore/kv/streamingcoord/kv_catalog.go @@ -10,8 +10,8 @@ import ( "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/pkg/v2/kv" "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" - "github.com/milvus-io/milvus/pkg/v2/util" "github.com/milvus-io/milvus/pkg/v2/util/etcd" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) // NewCataLog creates a new catalog instance @@ -64,7 +64,8 @@ func (c *catalog) SavePChannels(ctx context.Context, infos []*streamingpb.PChann } kvs[key] = string(v) } - return etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, func(partialKvs map[string]string) error { + maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt() + return etcd.SaveByBatchWithLimit(kvs, maxTxnNum, func(partialKvs map[string]string) error { return c.metaKV.MultiSave(ctx, partialKvs) }) } diff --git a/internal/metastore/kv/streamingcoord/kv_catalog_test.go b/internal/metastore/kv/streamingcoord/kv_catalog_test.go index 9d23c2d24c..163682014a 100644 --- a/internal/metastore/kv/streamingcoord/kv_catalog_test.go +++ b/internal/metastore/kv/streamingcoord/kv_catalog_test.go @@ -11,8 +11,13 @@ import ( "github.com/milvus-io/milvus/pkg/v2/mocks/mock_kv" "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) +func init() { + paramtable.Init() +} + func TestCatalog(t *testing.T) { kv := mock_kv.NewMockMetaKv(t) diff --git a/internal/metastore/kv/streamingnode/kv_catalog.go b/internal/metastore/kv/streamingnode/kv_catalog.go index dba3217814..00bc8a8882 100644 --- a/internal/metastore/kv/streamingnode/kv_catalog.go +++ b/internal/metastore/kv/streamingnode/kv_catalog.go @@ -11,8 +11,8 @@ import ( "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/pkg/v2/kv" "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" - "github.com/milvus-io/milvus/pkg/v2/util" "github.com/milvus-io/milvus/pkg/v2/util/etcd" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) // NewCataLog creates a new streaming-node catalog instance. @@ -80,8 +80,9 @@ func (c *catalog) SaveSegmentAssignments(ctx context.Context, pChannelName strin kvs[key] = string(data) } + maxTxnNum := paramtable.Get().MetaStoreCfg.MaxEtcdTxnNum.GetAsInt() if len(removes) > 0 { - if err := etcd.RemoveByBatchWithLimit(removes, util.MaxEtcdTxnNum, func(partialRemoves []string) error { + if err := etcd.RemoveByBatchWithLimit(removes, maxTxnNum, func(partialRemoves []string) error { return c.metaKV.MultiRemove(ctx, partialRemoves) }); err != nil { return err @@ -89,7 +90,7 @@ func (c *catalog) SaveSegmentAssignments(ctx context.Context, pChannelName strin } if len(kvs) > 0 { - return etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, func(partialKvs map[string]string) error { + return etcd.SaveByBatchWithLimit(kvs, maxTxnNum, func(partialKvs map[string]string) error { return c.metaKV.MultiSave(ctx, partialKvs) }) } diff --git a/internal/metastore/kv/streamingnode/kv_catalog_test.go b/internal/metastore/kv/streamingnode/kv_catalog_test.go index 53ddb0d294..d812a77dd8 100644 --- a/internal/metastore/kv/streamingnode/kv_catalog_test.go +++ b/internal/metastore/kv/streamingnode/kv_catalog_test.go @@ -10,8 +10,13 @@ import ( "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) +func init() { + paramtable.Init() +} + func TestCatalog(t *testing.T) { kv := mocks.NewMetaKv(t) k := "p1" diff --git a/pkg/util/constant.go b/pkg/util/constant.go index 96dc612ae1..acb9c3a46a 100644 --- a/pkg/util/constant.go +++ b/pkg/util/constant.go @@ -73,8 +73,7 @@ const ( RoleConfigDBName = "db_name" RoleConfigPrivilege = "privilege" - MaxEtcdTxnNum = 128 - GB = 1024 * 1024 * 1024 + GB = 1024 * 1024 * 1024 ) const ( diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 49818a408c..6c90f10002 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -947,7 +947,7 @@ Large numeric passwords require double quotes to avoid yaml parsing precision is p.StorageReadRetryAttempts = ParamItem{ Key: "common.storage.readRetryAttempts", - Version: "2.6.8", + Version: "2.5.25", DefaultValue: "10", Doc: "The number of retry attempts for reading from object storage; only retryable errors will trigger a retry.", Export: false, diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index 046f0ec990..c989b97f59 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -462,6 +462,7 @@ type MetaStoreConfig struct { SnapshotReserveTimeSeconds ParamItem `refreshable:"true"` PaginationSize ParamItem `refreshable:"true"` ReadConcurrency ParamItem `refreshable:"true"` + MaxEtcdTxnNum ParamItem `refreshable:"true"` } func (p *MetaStoreConfig) Init(base *BaseTable) { @@ -508,6 +509,15 @@ func (p *MetaStoreConfig) Init(base *BaseTable) { } p.ReadConcurrency.Init(base.mgr) + p.MaxEtcdTxnNum = ParamItem{ + Key: "metastore.maxEtcdTxnNum", + Version: "2.5.25", + DefaultValue: "64", + Doc: `maximum number of operations in a single etcd transaction`, + Export: true, + } + p.MaxEtcdTxnNum.Init(base.mgr) + // TODO: The initialization operation of metadata storage is called in the initialization phase of every node. // There should be a single initialization operation for meta store, then move the metrics registration to there. metrics.RegisterMetaType(p.MetaStoreType.GetValue())