From ca2057c57dc6c2446b60a57b154e28fb27339482 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Fri, 25 Oct 2024 14:35:45 +0800 Subject: [PATCH] enhance: Tidy import options (#37077) (#37078) 1. Tidy import options. 2. Tidy common import util functions. issue: https://github.com/milvus-io/milvus/issues/34150 pr: https://github.com/milvus-io/milvus/pull/37077 --------- Signed-off-by: bigsheeper --- internal/datacoord/services.go | 17 ++--- internal/proxy/impl.go | 1 + internal/util/importutilv2/common/util.go | 11 +++ .../util/importutilv2/common/util_test.go | 68 +++++++++++++++++++ internal/util/importutilv2/json/reader.go | 15 +--- internal/util/importutilv2/json/row_parser.go | 7 +- internal/util/importutilv2/numpy/reader.go | 2 +- internal/util/importutilv2/numpy/util.go | 10 --- internal/util/importutilv2/option.go | 41 +++++++++-- internal/util/importutilv2/option_test.go | 53 +++++++++++++++ internal/util/importutilv2/parquet/reader.go | 2 +- internal/util/importutilv2/parquet/util.go | 11 --- 12 files changed, 180 insertions(+), 58 deletions(-) create mode 100644 internal/util/importutilv2/common/util_test.go create mode 100644 internal/util/importutilv2/option_test.go diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index dc80b6d3bd..b8920fc428 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1636,19 +1636,12 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter zap.Int64("collection", in.GetCollectionID()), zap.Int64s("partitions", in.GetPartitionIDs()), zap.Strings("channels", in.GetChannelNames())) - log.Info("receive import request", zap.Any("files", in.GetFiles())) + log.Info("receive import request", zap.Any("files", in.GetFiles()), zap.Any("options", in.GetOptions())) - var timeoutTs uint64 = math.MaxUint64 - timeoutStr, err := funcutil.GetAttrByKeyFromRepeatedKV("timeout", in.GetOptions()) - if err == nil { - // Specifies the timeout duration for import, such as "300s", "1.5h" or "1h45m". - dur, err := time.ParseDuration(timeoutStr) - if err != nil { - resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("parse import timeout failed, err=%w", err))) - return resp, nil - } - curTs := tsoutil.GetCurrentTime() - timeoutTs = tsoutil.AddPhysicalDurationOnTs(curTs, dur) + timeoutTs, err := importutilv2.GetTimeoutTs(in.GetOptions()) + if err != nil { + resp.Status = merr.Status(merr.WrapErrImportFailed(err.Error())) + return resp, nil } files := in.GetFiles() diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 527f858010..29c158be55 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -6163,6 +6163,7 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest) zap.String("partition name", req.GetPartitionName()), zap.Any("files", req.GetFiles()), zap.String("role", typeutil.ProxyRole), + zap.Any("options", req.GetOptions()), ) resp := &internalpb.ImportResponse{ diff --git a/internal/util/importutilv2/common/util.go b/internal/util/importutilv2/common/util.go index 62f18491b9..ba26bd5f91 100644 --- a/internal/util/importutilv2/common/util.go +++ b/internal/util/importutilv2/common/util.go @@ -78,3 +78,14 @@ func CheckArrayCapacity(arrLength int, maxCapacity int64) error { } return nil } + +func EstimateReadCountPerBatch(bufferSize int, schema *schemapb.CollectionSchema) (int64, error) { + sizePerRecord, err := typeutil.EstimateMaxSizePerRecord(schema) + if err != nil { + return 0, err + } + if 1000*sizePerRecord <= bufferSize { + return 1000, nil + } + return int64(bufferSize) / int64(sizePerRecord), nil +} diff --git a/internal/util/importutilv2/common/util_test.go b/internal/util/importutilv2/common/util_test.go new file mode 100644 index 0000000000..efbb32cbdb --- /dev/null +++ b/internal/util/importutilv2/common/util_test.go @@ -0,0 +1,68 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/common" +) + +func TestUtil_EstimateReadCountPerBatch(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 101, + Name: "vec", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + } + count, err := EstimateReadCountPerBatch(16*1024*1024, schema) + assert.NoError(t, err) + assert.Equal(t, int64(1000), count) + + schema.Fields = append(schema.Fields, &schemapb.FieldSchema{ + FieldID: 102, + Name: "vec2", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "invalidDim", + }, + }, + }) + _, err = EstimateReadCountPerBatch(16*1024*1024, schema) + assert.Error(t, err) +} diff --git a/internal/util/importutilv2/json/reader.go b/internal/util/importutilv2/json/reader.go index 49c84ee8b8..7606600914 100644 --- a/internal/util/importutilv2/json/reader.go +++ b/internal/util/importutilv2/json/reader.go @@ -27,8 +27,8 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/pkg/util/merr" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) const ( @@ -58,7 +58,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co if err != nil { return nil, merr.WrapErrImportFailed(fmt.Sprintf("read json file failed, path=%s, err=%s", path, err.Error())) } - count, err := estimateReadCountPerBatch(bufferSize, schema) + count, err := common.EstimateReadCountPerBatch(bufferSize, schema) if err != nil { return nil, err } @@ -181,14 +181,3 @@ func (j *reader) Size() (int64, error) { } func (j *reader) Close() {} - -func estimateReadCountPerBatch(bufferSize int, schema *schemapb.CollectionSchema) (int64, error) { - sizePerRecord, err := typeutil.EstimateMaxSizePerRecord(schema) - if err != nil { - return 0, err - } - if 1000*sizePerRecord <= bufferSize { - return 1000, nil - } - return int64(bufferSize) / int64(sizePerRecord), nil -} diff --git a/internal/util/importutilv2/json/row_parser.go b/internal/util/importutilv2/json/row_parser.go index 634c464eb2..623384f22a 100644 --- a/internal/util/importutilv2/json/row_parser.go +++ b/internal/util/importutilv2/json/row_parser.go @@ -396,7 +396,9 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) { } case schemapb.DataType_Array: arr, ok := obj.([]interface{}) - + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } maxCapacity, err := parameterutil.GetMaxCapacity(r.id2Field[fieldID]) if err != nil { return nil, err @@ -404,9 +406,6 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) { if err = common.CheckArrayCapacity(len(arr), maxCapacity); err != nil { return nil, err } - if !ok { - return nil, r.wrapTypeError(obj, fieldID) - } scalarFieldData, err := r.arrayToFieldData(arr, r.id2Field[fieldID].GetElementType()) if err != nil { return nil, err diff --git a/internal/util/importutilv2/numpy/reader.go b/internal/util/importutilv2/numpy/reader.go index acd69e05c2..708cb35270 100644 --- a/internal/util/importutilv2/numpy/reader.go +++ b/internal/util/importutilv2/numpy/reader.go @@ -48,7 +48,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co fields := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { return field.GetFieldID() }) - count, err := calcRowCount(bufferSize, schema) + count, err := common.EstimateReadCountPerBatch(bufferSize, schema) if err != nil { return nil, err } diff --git a/internal/util/importutilv2/numpy/util.go b/internal/util/importutilv2/numpy/util.go index 612596b375..e55392f0b4 100644 --- a/internal/util/importutilv2/numpy/util.go +++ b/internal/util/importutilv2/numpy/util.go @@ -30,7 +30,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/util/merr" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) var ( @@ -241,12 +240,3 @@ func validateHeader(npyReader *npy.Reader, field *schemapb.FieldSchema, dim int) } return nil } - -func calcRowCount(bufferSize int, schema *schemapb.CollectionSchema) (int64, error) { - sizePerRecord, err := typeutil.EstimateMaxSizePerRecord(schema) - if err != nil { - return 0, err - } - rowCount := int64(bufferSize) / int64(sizePerRecord) - return rowCount, nil -} diff --git a/internal/util/importutilv2/option.go b/internal/util/importutilv2/option.go index b8e958c19f..078e5d8425 100644 --- a/internal/util/importutilv2/option.go +++ b/internal/util/importutilv2/option.go @@ -21,6 +21,7 @@ import ( "math" "strconv" "strings" + "time" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -29,17 +30,45 @@ import ( ) const ( - StartTs = "start_ts" - StartTs2 = "startTs" - EndTs = "end_ts" - EndTs2 = "endTs" + // Timeout specifies the timeout duration for import, such as "300s", "1.5h" or "1h45m". + Timeout = "timeout" + + // SkipDQC indicates whether to bypass the disk quota check, default to false. + SkipDQC = "skip_disk_quota_check" +) + +// Options for backup-restore mode. +const ( + // BackupFlag indicates whether the import is in backup-restore mode, default to false. BackupFlag = "backup" - L0Import = "l0_import" - SkipDQC = "skip_disk_quota_check" + + // L0Import indicates whether to import l0 segments only. + L0Import = "l0_import" + + // StartTs StartTs2 EndTs EndTs2 are used to filter data during backup-restore import. + StartTs = "start_ts" + StartTs2 = "startTs" + EndTs = "end_ts" + EndTs2 = "endTs" ) type Options []*commonpb.KeyValuePair +func GetTimeoutTs(options Options) (uint64, error) { + var timeoutTs uint64 = math.MaxUint64 + timeoutStr, err := funcutil.GetAttrByKeyFromRepeatedKV(Timeout, options) + if err == nil { + var dur time.Duration + dur, err = time.ParseDuration(timeoutStr) + if err != nil { + return 0, fmt.Errorf("parse timeout failed, err=%w", err) + } + curTs := tsoutil.GetCurrentTime() + timeoutTs = tsoutil.AddPhysicalDurationOnTs(curTs, dur) + } + return timeoutTs, nil +} + func ParseTimeRange(options Options) (uint64, uint64, error) { importOptions := funcutil.KeyValuePair2Map(options) getTimestamp := func(defaultValue uint64, targetKeys ...string) (uint64, error) { diff --git a/internal/util/importutilv2/option_test.go b/internal/util/importutilv2/option_test.go new file mode 100644 index 0000000000..0a5deedad9 --- /dev/null +++ b/internal/util/importutilv2/option_test.go @@ -0,0 +1,53 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importutilv2 + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) + +func TestOption_GetTimeout(t *testing.T) { + const delta = 3 * time.Second + + options := []*commonpb.KeyValuePair{{Key: Timeout, Value: "300s"}} + ts, err := GetTimeoutTs(options) + assert.NoError(t, err) + pt := tsoutil.PhysicalTime(ts) + assert.WithinDuration(t, time.Now().Add(300*time.Second), pt, delta) + + options = []*commonpb.KeyValuePair{{Key: Timeout, Value: "1.5h"}} + ts, err = GetTimeoutTs(options) + assert.NoError(t, err) + pt = tsoutil.PhysicalTime(ts) + assert.WithinDuration(t, time.Now().Add(90*time.Minute), pt, delta) + + options = []*commonpb.KeyValuePair{{Key: Timeout, Value: "1h45m"}} + ts, err = GetTimeoutTs(options) + assert.NoError(t, err) + pt = tsoutil.PhysicalTime(ts) + assert.WithinDuration(t, time.Now().Add(105*time.Minute), pt, delta) + + options = []*commonpb.KeyValuePair{{Key: Timeout, Value: "invalidTime"}} + _, err = GetTimeoutTs(options) + assert.Error(t, err) +} diff --git a/internal/util/importutilv2/parquet/reader.go b/internal/util/importutilv2/parquet/reader.go index 4a29e344dd..4ac0325cd4 100644 --- a/internal/util/importutilv2/parquet/reader.go +++ b/internal/util/importutilv2/parquet/reader.go @@ -74,7 +74,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co if err != nil { return nil, err } - count, err := estimateReadCountPerBatch(bufferSize, schema) + count, err := common.EstimateReadCountPerBatch(bufferSize, schema) if err != nil { return nil, err } diff --git a/internal/util/importutilv2/parquet/util.go b/internal/util/importutilv2/parquet/util.go index d74b293474..0306eef7ee 100644 --- a/internal/util/importutilv2/parquet/util.go +++ b/internal/util/importutilv2/parquet/util.go @@ -250,14 +250,3 @@ func isSchemaEqual(schema *schemapb.CollectionSchema, arrSchema *arrow.Schema) e } return nil } - -func estimateReadCountPerBatch(bufferSize int, schema *schemapb.CollectionSchema) (int64, error) { - sizePerRecord, err := typeutil.EstimateMaxSizePerRecord(schema) - if err != nil { - return 0, err - } - if 1000*sizePerRecord <= bufferSize { - return 1000, nil - } - return int64(bufferSize) / int64(sizePerRecord), nil -}