mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
pr: #44471 https://github.com/milvus-io/milvus/issues/44326 prev: [physical_ts][logical_ts] after [sign_bit][cluster_id][physical_ts][logical_ts] --------- Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
parent
d19d65fb46
commit
8b0bfe4cd8
@ -958,6 +958,7 @@ common:
|
||||
enabledJSONKeyStats: false # Indicates sealedsegment whether to enable JSON key stats
|
||||
enabledGrowingSegmentJSONKeyStats: false # Indicates growingsegment whether to enable JSON key stats
|
||||
enableConfigParamTypeCheck: true # Indicates whether to enable config param type check
|
||||
clusterID: 0 # cluster id
|
||||
|
||||
# QuotaConfig, configurations of Milvus quota and limits.
|
||||
# By default, we enable:
|
||||
|
||||
@ -33,6 +33,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/datacoord/allocator"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util/importutilv2"
|
||||
"github.com/milvus-io/milvus/pkg/v2/common"
|
||||
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
|
||||
@ -298,7 +299,10 @@ func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc all
|
||||
expansionFactor := paramtable.Get().DataCoordCfg.ImportPreAllocIDExpansionFactor.GetAsInt64()
|
||||
preAllocIDNum := (totalRows + 1) * int64(binlogNum) * expansionFactor
|
||||
|
||||
idBegin, idEnd, err := alloc.AllocN(preAllocIDNum)
|
||||
idBegin, idEnd, err := common.AllocAutoID(func(n uint32) (int64, int64, error) {
|
||||
ids, ide, e := alloc.AllocN(int64(n))
|
||||
return int64(ids), int64(ide), e
|
||||
}, uint32(preAllocIDNum), Params.CommonCfg.ClusterID.GetAsUint64())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/allocator"
|
||||
"github.com/milvus-io/milvus/pkg/v2/common"
|
||||
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||
"github.com/milvus-io/milvus/pkg/v2/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
|
||||
@ -165,7 +166,8 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
|
||||
var rowIDBegin UniqueID
|
||||
var rowIDEnd UniqueID
|
||||
tr := timerecord.NewTimeRecorder("applyPK")
|
||||
rowIDBegin, rowIDEnd, _ = it.idAllocator.Alloc(rowNums)
|
||||
clusterID := Params.CommonCfg.ClusterID.GetAsUint64()
|
||||
rowIDBegin, rowIDEnd, _ = common.AllocAutoID(it.idAllocator.Alloc, rowNums, clusterID)
|
||||
metrics.ProxyApplyPrimaryKeyLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
|
||||
it.insertMsg.RowIDs = make([]UniqueID, rowNums)
|
||||
|
||||
@ -156,7 +156,8 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error {
|
||||
rowNums := uint32(it.upsertMsg.InsertMsg.NRows())
|
||||
// set upsertTask.insertRequest.rowIDs
|
||||
tr := timerecord.NewTimeRecorder("applyPK")
|
||||
rowIDBegin, rowIDEnd, _ := it.idAllocator.Alloc(rowNums)
|
||||
clusterID := Params.CommonCfg.ClusterID.GetAsUint64()
|
||||
rowIDBegin, rowIDEnd, _ := common.AllocAutoID(it.idAllocator.Alloc, rowNums, clusterID)
|
||||
metrics.ProxyApplyPrimaryKeyLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
|
||||
it.upsertMsg.InsertMsg.RowIDs = make([]UniqueID, rowNums)
|
||||
|
||||
@ -19,6 +19,7 @@ package common
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math/bits"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
@ -458,3 +459,15 @@ func ValidateAutoIndexMmapConfig(autoIndexConfigEnable, isVectorField bool, inde
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func AllocAutoID(allocFunc func(uint32) (int64, int64, error), rowNum uint32, clusterID uint64) (int64, int64, error) {
|
||||
idStart, idEnd, err := allocFunc(rowNum)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
reversed := bits.Reverse64(clusterID)
|
||||
// right shift by 1 to preserve sign bit
|
||||
reversed = reversed >> 1
|
||||
|
||||
return idStart | int64(reversed), idEnd | int64(reversed), nil
|
||||
}
|
||||
|
||||
@ -258,3 +258,12 @@ func TestReplicateProperty(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestAllocAutoID(t *testing.T) {
|
||||
start, end, err := AllocAutoID(func(n uint32) (int64, int64, error) {
|
||||
return 100, 110, nil
|
||||
}, 10, 1)
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 0b0100, start>>60)
|
||||
assert.EqualValues(t, 0b0100, end>>60)
|
||||
}
|
||||
|
||||
@ -56,6 +56,7 @@ const (
|
||||
DefaultSearchCacheBudgetGBRatio = 0.10
|
||||
DefaultLoadNumThreadRatio = 8.0
|
||||
DefaultBeamWidthRatio = 4.0
|
||||
MaxClusterIDBits = 3
|
||||
)
|
||||
|
||||
// ComponentParam is used to quickly and easily access all components' configurations.
|
||||
@ -315,6 +316,7 @@ type commonConfig struct {
|
||||
EnabledGrowingSegmentJSONKeyStats ParamItem `refreshable:"true"`
|
||||
|
||||
EnableConfigParamTypeCheck ParamItem `refreshable:"true"`
|
||||
ClusterID ParamItem `refreshable:"false"`
|
||||
}
|
||||
|
||||
func (p *commonConfig) init(base *BaseTable) {
|
||||
@ -1153,6 +1155,25 @@ This helps Milvus-CDC synchronize incremental data`,
|
||||
Export: true,
|
||||
}
|
||||
p.EnableConfigParamTypeCheck.Init(base.mgr)
|
||||
p.ClusterID = ParamItem{
|
||||
Key: "common.clusterID",
|
||||
Version: "2.6.3",
|
||||
DefaultValue: "0",
|
||||
Doc: "cluster id",
|
||||
Export: true,
|
||||
PanicIfEmpty: true,
|
||||
Formatter: func(v string) string {
|
||||
if getAsInt(v) < 0 {
|
||||
return ""
|
||||
}
|
||||
maxClusterID := (int64(1) << MaxClusterIDBits) - 1
|
||||
if getAsInt64(v) > maxClusterID {
|
||||
return ""
|
||||
}
|
||||
return v
|
||||
},
|
||||
}
|
||||
p.ClusterID.Init(base.mgr)
|
||||
}
|
||||
|
||||
type gpuConfig struct {
|
||||
|
||||
@ -142,6 +142,13 @@ func TestComponentParam(t *testing.T) {
|
||||
assert.Equal(t, 60*time.Second, params.CommonCfg.SyncTaskPoolReleaseTimeoutSeconds.GetAsDuration(time.Second))
|
||||
params.Save("common.sync.taskPoolReleaseTimeoutSeconds", "100")
|
||||
assert.Equal(t, 100*time.Second, params.CommonCfg.SyncTaskPoolReleaseTimeoutSeconds.GetAsDuration(time.Second))
|
||||
|
||||
assert.Equal(t, 0, params.CommonCfg.ClusterID.GetAsInt())
|
||||
params.Save("common.clusterID", "32")
|
||||
assert.Panics(t, func() {
|
||||
params.CommonCfg.ClusterID.GetAsInt()
|
||||
})
|
||||
params.Save("common.clusterID", "0")
|
||||
})
|
||||
|
||||
t.Run("test rootCoordConfig", func(t *testing.T) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user