From 747934eb2bf9f70846f7d30f4a6f9cf5ada18347 Mon Sep 17 00:00:00 2001 From: Jiquan Long Date: Wed, 20 Dec 2023 14:14:05 +0800 Subject: [PATCH] enhance: make consistency level used in delete configurable (#29280) (#29285) pr: #29280 issue: #29279 --------- Signed-off-by: longjiquan --- configs/milvus.yaml | 1 + internal/proxy/task_delete.go | 2 +- internal/proxy/util.go | 14 ++++++++++++++ internal/proxy/util_test.go | 14 ++++++++++++++ pkg/util/paramtable/component_param.go | 12 ++++++++++++ pkg/util/paramtable/component_param_test.go | 3 +++ 6 files changed, 45 insertions(+), 1 deletion(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 8da26b5fdd..99e68255bc 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -498,6 +498,7 @@ common: SearchCacheBudgetGBRatio: 0.1 LoadNumThreadRatio: 8 BeamWidthRatio: 4 + consistencyLevelUsedInDelete: "Bounded" gracefulTime: 5000 # milliseconds. it represents the interval (in ms) by which the request arrival time needs to be subtracted in the case of Bounded Consistency. gracefulStopTimeout: 1800 # seconds. it will force quit the server if the graceful stop process is not completed during this time. storageType: minio # please adjust in embedded Milvus: local diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index 56bc2589d5..8e372f30c6 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -298,7 +298,7 @@ func (dt *deleteTask) getStreamingQueryAndDelteFunc(stream msgstream.MsgStream, PartitionIDs: partationIDs, SerializedExprPlan: serializedPlan, OutputFieldsId: outputFieldIDs, - GuaranteeTimestamp: parseGuaranteeTsFromConsistency(dt.ts, dt.ts, commonpb.ConsistencyLevel_Bounded), + GuaranteeTimestamp: parseGuaranteeTsFromConsistency(dt.ts, dt.ts, getConsistencyLevelFromConfig()), }, DmlChannels: channelIDs, Scope: querypb.DataScope_All, diff --git a/internal/proxy/util.go b/internal/proxy/util.go index d2285f536d..fb75d55e45 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -45,6 +45,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/crypto" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -767,6 +768,19 @@ func ReplaceID2Name(oldStr string, id int64, name string) string { return strings.ReplaceAll(oldStr, strconv.FormatInt(id, 10), name) } +func getConsistencyLevelFromConfig() commonpb.ConsistencyLevel { + value := Params.CommonCfg.ConsistencyLevelUsedInDelete.GetValue() + trimed := strings.TrimSpace(value) + lowered := strings.ToLower(trimed) + for consistencyLevel := range commonpb.ConsistencyLevel_value { + if lowered == strings.ToLower(consistencyLevel) { + return commonpb.ConsistencyLevel(commonpb.ConsistencyLevel_value[consistencyLevel]) + } + } + // not found, use default. + return paramtable.DefaultConsistencyLevelUsedInDelete +} + func parseGuaranteeTsFromConsistency(ts, tMax typeutil.Timestamp, consistency commonpb.ConsistencyLevel) typeutil.Timestamp { switch consistency { case commonpb.ConsistencyLevel_Strong: diff --git a/internal/proxy/util_test.go b/internal/proxy/util_test.go index 1bbc147f4d..73e7a13fda 100644 --- a/internal/proxy/util_test.go +++ b/internal/proxy/util_test.go @@ -2087,3 +2087,17 @@ func TestSendReplicateMessagePack(t *testing.T) { SendReplicateMessagePack(ctx, mockStream, &milvuspb.DropIndexRequest{}) }) } + +func Test_getConsistencyLevelFromConfig(t *testing.T) { + paramtable.Init() + original := Params.CommonCfg.ConsistencyLevelUsedInDelete.GetValue() + defer func() { + Params.CommonCfg.ConsistencyLevelUsedInDelete.SwapTempValue(original) + }() + for consistencyLevel := range commonpb.ConsistencyLevel_value { + Params.CommonCfg.ConsistencyLevelUsedInDelete.SwapTempValue(consistencyLevel) + assert.Equal(t, commonpb.ConsistencyLevel(commonpb.ConsistencyLevel_value[consistencyLevel]), getConsistencyLevelFromConfig()) + } + Params.CommonCfg.ConsistencyLevelUsedInDelete.SwapTempValue("invalid") + assert.Equal(t, paramtable.DefaultConsistencyLevelUsedInDelete, getConsistencyLevelFromConfig()) +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index f634b92c3c..5bba1845fe 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -22,6 +22,7 @@ import ( "github.com/shirou/gopsutil/v3/disk" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/hardware" @@ -31,6 +32,7 @@ import ( const ( // DefaultIndexSliceSize defines the default slice size of index file when serializing. DefaultIndexSliceSize = 16 + DefaultConsistencyLevelUsedInDelete = commonpb.ConsistencyLevel_Bounded DefaultGracefulTime = 5000 // ms DefaultGracefulStopTimeout = 1800 // s DefaultHighPriorityThreadCoreCoefficient = 10 @@ -189,6 +191,7 @@ type commonConfig struct { SearchCacheBudgetGBRatio ParamItem `refreshable:"true"` LoadNumThreadRatio ParamItem `refreshable:"true"` BeamWidthRatio ParamItem `refreshable:"true"` + ConsistencyLevelUsedInDelete ParamItem `refreshable:"true"` GracefulTime ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"` @@ -459,6 +462,15 @@ This configuration is only used by querynode and indexnode, it selects CPU instr } p.BeamWidthRatio.Init(base.mgr) + p.ConsistencyLevelUsedInDelete = ParamItem{ + Key: "common.consistencyLevelUsedInDelete", + Version: "2.0.0", + DefaultValue: DefaultConsistencyLevelUsedInDelete.String(), + Doc: "Consistency level used in delete by expression", + Export: true, + } + p.ConsistencyLevelUsedInDelete.Init(base.mgr) + p.GracefulTime = ParamItem{ Key: "common.gracefulTime", Version: "2.0.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 343e38dc98..03aa1bef59 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -53,6 +53,9 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, Params.IndexSliceSize.GetAsInt64(), int64(DefaultIndexSliceSize)) t.Logf("knowhere index slice size = %d", Params.IndexSliceSize.GetAsInt64()) + assert.Equal(t, Params.ConsistencyLevelUsedInDelete.GetValue(), DefaultConsistencyLevelUsedInDelete.String()) + t.Logf("default ConsistencyLevelUsedInDelete = %s", Params.ConsistencyLevelUsedInDelete.GetValue()) + assert.Equal(t, Params.GracefulTime.GetAsInt64(), int64(DefaultGracefulTime)) t.Logf("default grafeful time = %d", Params.GracefulTime.GetAsInt64())