diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 249bd3abcb..ae5db437be 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -958,6 +958,7 @@ common: enabledJSONKeyStats: false # Indicates sealedsegment whether to enable JSON key stats enabledGrowingSegmentJSONKeyStats: false # Indicates growingsegment whether to enable JSON key stats enableConfigParamTypeCheck: true # Indicates whether to enable config param type check + clusterID: 0 # cluster id # QuotaConfig, configurations of Milvus quota and limits. # By default, we enable: diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 24494516f4..f4278ea62d 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/importutilv2" + "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" @@ -298,7 +299,10 @@ func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc all expansionFactor := paramtable.Get().DataCoordCfg.ImportPreAllocIDExpansionFactor.GetAsInt64() preAllocIDNum := (totalRows + 1) * int64(binlogNum) * expansionFactor - idBegin, idEnd, err := alloc.AllocN(preAllocIDNum) + idBegin, idEnd, err := common.AllocAutoID(func(n uint32) (int64, int64, error) { + ids, ide, e := alloc.AllocN(int64(n)) + return int64(ids), int64(ide), e + }, uint32(preAllocIDNum), Params.CommonCfg.ClusterID.GetAsUint64()) if err != nil { return nil, err } diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index 5c101b36bc..44217e5554 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -12,6 +12,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" @@ -165,7 +166,8 @@ func (it *insertTask) PreExecute(ctx context.Context) error { var rowIDBegin UniqueID var rowIDEnd UniqueID tr := timerecord.NewTimeRecorder("applyPK") - rowIDBegin, rowIDEnd, _ = it.idAllocator.Alloc(rowNums) + clusterID := Params.CommonCfg.ClusterID.GetAsUint64() + rowIDBegin, rowIDEnd, _ = common.AllocAutoID(it.idAllocator.Alloc, rowNums, clusterID) metrics.ProxyApplyPrimaryKeyLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds())) it.insertMsg.RowIDs = make([]UniqueID, rowNums) diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index 44149ee586..895a486bd2 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -156,7 +156,8 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error { rowNums := uint32(it.upsertMsg.InsertMsg.NRows()) // set upsertTask.insertRequest.rowIDs tr := timerecord.NewTimeRecorder("applyPK") - rowIDBegin, rowIDEnd, _ := it.idAllocator.Alloc(rowNums) + clusterID := Params.CommonCfg.ClusterID.GetAsUint64() + rowIDBegin, rowIDEnd, _ := common.AllocAutoID(it.idAllocator.Alloc, rowNums, clusterID) metrics.ProxyApplyPrimaryKeyLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds())) it.upsertMsg.InsertMsg.RowIDs = make([]UniqueID, rowNums) diff --git a/pkg/common/common.go b/pkg/common/common.go index 9a4b536f6c..67f492a0b3 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -19,6 +19,7 @@ package common import ( "encoding/binary" "fmt" + "math/bits" "strconv" "strings" @@ -458,3 +459,15 @@ func ValidateAutoIndexMmapConfig(autoIndexConfigEnable, isVectorField bool, inde } return nil } + +func AllocAutoID(allocFunc func(uint32) (int64, int64, error), rowNum uint32, clusterID uint64) (int64, int64, error) { + idStart, idEnd, err := allocFunc(rowNum) + if err != nil { + return 0, 0, err + } + reversed := bits.Reverse64(clusterID) + // right shift by 1 to preserve sign bit + reversed = reversed >> 1 + + return idStart | int64(reversed), idEnd | int64(reversed), nil +} diff --git a/pkg/common/common_test.go b/pkg/common/common_test.go index 422c020589..477b27f1ab 100644 --- a/pkg/common/common_test.go +++ b/pkg/common/common_test.go @@ -258,3 +258,12 @@ func TestReplicateProperty(t *testing.T) { } }) } + +func TestAllocAutoID(t *testing.T) { + start, end, err := AllocAutoID(func(n uint32) (int64, int64, error) { + return 100, 110, nil + }, 10, 1) + assert.NoError(t, err) + assert.EqualValues(t, 0b0100, start>>60) + assert.EqualValues(t, 0b0100, end>>60) +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index c853af56ab..c89204ac17 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -56,6 +56,7 @@ const ( DefaultSearchCacheBudgetGBRatio = 0.10 DefaultLoadNumThreadRatio = 8.0 DefaultBeamWidthRatio = 4.0 + MaxClusterIDBits = 3 ) // ComponentParam is used to quickly and easily access all components' configurations. @@ -315,6 +316,7 @@ type commonConfig struct { EnabledGrowingSegmentJSONKeyStats ParamItem `refreshable:"true"` EnableConfigParamTypeCheck ParamItem `refreshable:"true"` + ClusterID ParamItem `refreshable:"false"` } func (p *commonConfig) init(base *BaseTable) { @@ -1153,6 +1155,25 @@ This helps Milvus-CDC synchronize incremental data`, Export: true, } p.EnableConfigParamTypeCheck.Init(base.mgr) + p.ClusterID = ParamItem{ + Key: "common.clusterID", + Version: "2.6.3", + DefaultValue: "0", + Doc: "cluster id", + Export: true, + PanicIfEmpty: true, + Formatter: func(v string) string { + if getAsInt(v) < 0 { + return "" + } + maxClusterID := (int64(1) << MaxClusterIDBits) - 1 + if getAsInt64(v) > maxClusterID { + return "" + } + return v + }, + } + p.ClusterID.Init(base.mgr) } type gpuConfig struct { diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 49bf2aad6f..51692c14e1 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -142,6 +142,13 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 60*time.Second, params.CommonCfg.SyncTaskPoolReleaseTimeoutSeconds.GetAsDuration(time.Second)) params.Save("common.sync.taskPoolReleaseTimeoutSeconds", "100") assert.Equal(t, 100*time.Second, params.CommonCfg.SyncTaskPoolReleaseTimeoutSeconds.GetAsDuration(time.Second)) + + assert.Equal(t, 0, params.CommonCfg.ClusterID.GetAsInt()) + params.Save("common.clusterID", "32") + assert.Panics(t, func() { + params.CommonCfg.ClusterID.GetAsInt() + }) + params.Save("common.clusterID", "0") }) t.Run("test rootCoordConfig", func(t *testing.T) {