diff --git a/configs/milvus.yaml b/configs/milvus.yaml index e2ed510722..008cd204ee 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -157,7 +157,9 @@ proxy: timeTick: bufSize: 512 maxNameLength: 255 # Maximum length of name for a collection or alias - maxFieldNum: 256 # Maximum number of fields in a collection + maxFieldNum: 64 # Maximum number of fields in a collection. + # As of today (2.2.0 and after) it is strongly DISCOURAGED to set maxFieldNum >= 64. + # So adjust at your risk! maxDimension: 32768 # Maximum dimension of a vector # It's strongly DISCOURAGED to set `maxShardNum` > 64. maxShardNum: 64 # Maximum number of shards in a collection diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index a08f444ca3..43a7263728 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -280,6 +280,10 @@ func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.Segm } maps.Copy(kvsPiece, kvs) currSize += len(kvs) + if len(kvs) >= maxEtcdTxnNum { + log.Warn("a single segment's Etcd save has over maxEtcdTxnNum operations." + + " Please double check your config") + } } if currSize > 0 { if err := etcd.SaveByBatch(kvsPiece, saveFn); err != nil { @@ -805,10 +809,12 @@ func buildFieldBinlogPath(collectionID typeutil.UniqueID, partitionID typeutil.U return fmt.Sprintf("%s/%d/%d/%d/%d", SegmentBinlogPathPrefix, collectionID, partitionID, segmentID, fieldID) } +// TODO: There's no need to include fieldID in the delta log path key. func buildFieldDeltalogPath(collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, segmentID typeutil.UniqueID, fieldID typeutil.UniqueID) string { return fmt.Sprintf("%s/%d/%d/%d/%d", SegmentDeltalogPathPrefix, collectionID, partitionID, segmentID, fieldID) } +// TODO: There's no need to include fieldID in the stats log path key. func buildFieldStatslogPath(collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, segmentID typeutil.UniqueID, fieldID typeutil.UniqueID) string { return fmt.Sprintf("%s/%d/%d/%d/%d", SegmentStatslogPathPrefix, collectionID, partitionID, segmentID, fieldID) } diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index 27419df787..9ba1644993 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -436,7 +436,7 @@ func Test_AlterSegments(t *testing.T) { err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segmentXL}) assert.Nil(t, err) assert.Equal(t, 255+3, len(savedKvs)) - assert.Equal(t, 5, opGroupCount) + assert.Equal(t, 3, opGroupCount) adjustedSeg, err := catalog.LoadFromSegmentPath(segmentXL.CollectionID, segmentXL.PartitionID, segmentXL.ID) assert.NoError(t, err) @@ -600,8 +600,8 @@ func Test_SaveDroppedSegmentsInBatch_MultiSave(t *testing.T) { // testing for reaching max operation { - segments2 := make([]*datapb.SegmentInfo, 65) - for i := 0; i < 65; i++ { + segments2 := make([]*datapb.SegmentInfo, 129) + for i := 0; i < 129; i++ { segments2[i] = &datapb.SegmentInfo{ ID: int64(i), CollectionID: 1000, @@ -614,7 +614,7 @@ func Test_SaveDroppedSegmentsInBatch_MultiSave(t *testing.T) { err := catalog.SaveDroppedSegmentsInBatch(context.TODO(), segments2) assert.Nil(t, err) assert.Equal(t, 2, count) - assert.Equal(t, 65, kvSize) + assert.Equal(t, 129, kvSize) } } diff --git a/internal/metastore/kv/rootcoord/kv_catalog.go b/internal/metastore/kv/rootcoord/kv_catalog.go index ce1791faf2..fec38d4336 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog.go +++ b/internal/metastore/kv/rootcoord/kv_catalog.go @@ -72,7 +72,7 @@ func batchMultiSaveAndRemoveWithPrefix(snapshot kv.SnapShotKV, maxTxnNum int, sa saveFn := func(partialKvs map[string]string) error { return snapshot.MultiSave(partialKvs, ts) } - if err := etcd.SaveByBatch(saves, saveFn); err != nil { + if err := etcd.SaveByBatchWithLimit(saves, maxTxnNum/2, saveFn); err != nil { return err } @@ -132,7 +132,7 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, // Though batchSave is not atomic enough, we can promise the atomicity outside. // Recovering from failure, if we found collection is creating, we should removing all these related meta. - return etcd.SaveByBatch(kvs, func(partialKvs map[string]string) error { + return etcd.SaveByBatchWithLimit(kvs, maxTxnNum/2, func(partialKvs map[string]string) error { return kc.Snapshot.MultiSave(partialKvs, ts) }) } diff --git a/internal/util/etcd/etcd_util.go b/internal/util/etcd/etcd_util.go index 2b8f76e61c..75a0e7538d 100644 --- a/internal/util/etcd/etcd_util.go +++ b/internal/util/etcd/etcd_util.go @@ -32,7 +32,7 @@ import ( ) var ( - maxTxnNum = 64 + maxTxnNum = 128 ) // GetEtcdClient returns etcd client @@ -110,8 +110,8 @@ func min(a, b int) int { return b } -//SaveByBatch there will not guarantee atomicity -func SaveByBatch(kvs map[string]string, op func(partialKvs map[string]string) error) error { +// SaveByBatchWithLimit is SaveByBatch with customized limit. +func SaveByBatchWithLimit(kvs map[string]string, limit int, op func(partialKvs map[string]string) error) error { if len(kvs) == 0 { return nil } @@ -124,8 +124,8 @@ func SaveByBatch(kvs map[string]string, op func(partialKvs map[string]string) er values = append(values, v) } - for i := 0; i < len(kvs); i = i + maxTxnNum { - end := min(i+maxTxnNum, len(keys)) + for i := 0; i < len(kvs); i = i + limit { + end := min(i+limit, len(keys)) batch, err := buildKvGroup(keys[i:end], values[i:end]) if err != nil { return err @@ -138,6 +138,11 @@ func SaveByBatch(kvs map[string]string, op func(partialKvs map[string]string) er return nil } +// SaveByBatch there will not guarantee atomicity. +func SaveByBatch(kvs map[string]string, op func(partialKvs map[string]string) error) error { + return SaveByBatchWithLimit(kvs, maxTxnNum, op) +} + func RemoveByBatch(removals []string, op func(partialKeys []string) error) error { if len(removals) == 0 { return nil diff --git a/tests/python_client/common/common_type.py b/tests/python_client/common/common_type.py index 07ce6c03cc..6edadb4f45 100644 --- a/tests/python_client/common/common_type.py +++ b/tests/python_client/common/common_type.py @@ -57,7 +57,7 @@ compact_segment_num_threshold = 3 compact_delta_ratio_reciprocal = 5 # compact_delta_binlog_ratio is 0.2 compact_retention_duration = 40 # compaction travel time retention range 20s max_compaction_interval = 60 # the max time interval (s) from the last compaction -max_field_num = 256 # Maximum number of fields in a collection +max_field_num = 64 # Maximum number of fields in a collection default_replica_num = 1 IMAGE_REPOSITORY_MILVUS = "harbor.milvus.io/dockerhub/milvusdb/milvus" NAMESPACE_CHAOS_TESTING = "chaos-testing"