diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index c4fd30efa4..b4ad650f7c 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -194,10 +194,6 @@ func (c *importChecker) getLackFilesForImports(job ImportJob) []*datapb.ImportFi preimports := c.importMeta.GetTaskBy(c.ctx, WithType(PreImportTaskType), WithJob(job.GetJobID())) lacks := make(map[int64]*datapb.ImportFileStats, 0) for _, t := range preimports { - if t.GetState() != datapb.ImportTaskStateV2_Completed { - // Preimport tasks are not fully completed, thus generating imports should not be triggered. - return nil - } for _, stat := range t.GetFileStats() { lacks[stat.GetImportFile().GetId()] = stat } @@ -245,6 +241,37 @@ func (c *importChecker) checkPendingJob(job ImportJob) { func (c *importChecker) checkPreImportingJob(job ImportJob) { log := log.With(zap.Int64("jobID", job.GetJobID())) + + preimports := c.importMeta.GetTaskBy(c.ctx, WithType(PreImportTaskType), WithJob(job.GetJobID())) + totalRows := int64(0) + for _, t := range preimports { + if t.GetState() != datapb.ImportTaskStateV2_Completed { + // Preimport tasks are not fully completed, thus generating imports should not be triggered. + return + } + totalRows += lo.SumBy(t.GetFileStats(), func(stat *datapb.ImportFileStats) int64 { + return stat.GetTotalRows() + }) + } + + updateJobState := func(state internalpb.ImportJobState, actions ...UpdateJobAction) { + actions = append(actions, UpdateJobState(state)) + err := c.importMeta.UpdateJob(c.ctx, job.GetJobID(), actions...) + if err != nil { + log.Warn("failed to update job state to Importing", zap.Error(err)) + return + } + preImportDuration := job.GetTR().RecordSpan() + metrics.ImportJobLatency.WithLabelValues(metrics.ImportStagePreImport).Observe(float64(preImportDuration.Milliseconds())) + log.Info("import job preimport done", zap.String("state", state.String()), zap.Duration("jobTimeCost/preimport", preImportDuration)) + } + + if totalRows == 0 { + log.Info("no data to import, skip the subsequent stages, just update job state to Completed") + updateJobState(internalpb.ImportJobState_Completed) + return + } + lacks := c.getLackFilesForImports(job) if len(lacks) == 0 { return @@ -253,10 +280,7 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { requestSize, err := CheckDiskQuota(c.ctx, job, c.meta, c.importMeta) if err != nil { log.Warn("import failed, disk quota exceeded", zap.Error(err)) - err = c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error())) - if err != nil { - log.Warn("failed to update job state to Failed", zap.Error(err)) - } + updateJobState(internalpb.ImportJobState_Failed, UpdateJobReason(err.Error())) return } @@ -271,23 +295,13 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { err = c.importMeta.AddTask(c.ctx, t) if err != nil { log.Warn("add new import task failed", WrapTaskLog(t, zap.Error(err))...) - updateErr := c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error())) - if updateErr != nil { - log.Warn("failed to update job state to Failed", zap.Error(updateErr)) - } + updateJobState(internalpb.ImportJobState_Failed, UpdateJobReason(err.Error())) return } log.Info("add new import task", WrapTaskLog(t)...) } - err = c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing), UpdateRequestedDiskSize(requestSize)) - if err != nil { - log.Warn("failed to update job state to Importing", zap.Error(err)) - return - } - preImportDuration := job.GetTR().RecordSpan() - metrics.ImportJobLatency.WithLabelValues(metrics.ImportStagePreImport).Observe(float64(preImportDuration.Milliseconds())) - log.Info("import job preimport done", zap.Duration("jobTimeCost/preimport", preImportDuration)) + updateJobState(internalpb.ImportJobState_Importing, UpdateRequestedDiskSize(requestSize)) } func (c *importChecker) checkImportingJob(job ImportJob) { @@ -318,7 +332,7 @@ func (c *importChecker) checkStatsJob(job ImportJob) { } statsDuration := job.GetTR().RecordSpan() metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageStats).Observe(float64(statsDuration.Milliseconds())) - log.Info("import job stats done", zap.Duration("jobTimeCost/stats", statsDuration)) + log.Info("import job stats done", zap.String("state", state.String()), zap.Duration("jobTimeCost/stats", statsDuration)) } // Skip stats stage if not enable stats or is l0 import. diff --git a/internal/datacoord/import_checker_test.go b/internal/datacoord/import_checker_test.go index cb587caee3..ec8ef5fce9 100644 --- a/internal/datacoord/import_checker_test.go +++ b/internal/datacoord/import_checker_test.go @@ -186,9 +186,15 @@ func (s *ImportCheckerSuite) TestCheckJob() { s.Equal(internalpb.ImportJobState_PreImporting, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) // test checkPreImportingJob + fileStats := []*datapb.ImportFileStats{ + { + TotalRows: 100, + }, + } catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil) for _, t := range preimportTasks { - err := s.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed)) + err := s.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), + UpdateState(datapb.ImportTaskStateV2_Completed), UpdateFileStats(fileStats)) s.NoError(err) } @@ -303,8 +309,14 @@ func (s *ImportCheckerSuite) TestCheckJob_Failed() { s.Equal(internalpb.ImportJobState_PreImporting, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) // test checkPreImportingJob + fileStats := []*datapb.ImportFileStats{ + { + TotalRows: 100, + }, + } for _, t := range preimportTasks { - err := s.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed)) + err := s.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), + UpdateState(datapb.ImportTaskStateV2_Completed), UpdateFileStats(fileStats)) s.NoError(err) } @@ -688,8 +700,14 @@ func TestImportCheckerCompaction(t *testing.T) { catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(nil).Twice() catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil).Once() preimportTasks := importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType)) + fileStats := []*datapb.ImportFileStats{ + { + TotalRows: 100, + }, + } for _, pt := range preimportTasks { - err := importMeta.UpdateTask(context.TODO(), pt.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed)) + err := importMeta.UpdateTask(context.TODO(), pt.GetTaskID(), + UpdateState(datapb.ImportTaskStateV2_Completed), UpdateFileStats(fileStats)) assert.NoError(t, err) } assert.Eventually(t, func() bool { diff --git a/internal/datanode/importv2/task_import.go b/internal/datanode/importv2/task_import.go index b59fbf74d4..800ece70c1 100644 --- a/internal/datanode/importv2/task_import.go +++ b/internal/datanode/importv2/task_import.go @@ -18,6 +18,7 @@ package importv2 import ( "context" + "fmt" "io" "time" @@ -139,7 +140,7 @@ func (t *ImportTask) Clone() Task { } func (t *ImportTask) Execute() []*conc.Future[any] { - bufferSize := paramtable.Get().DataNodeCfg.ImportInsertBufferSize.GetAsInt() * 1024 * 1024 + bufferSize := paramtable.Get().DataNodeCfg.ImportInsertBufferSize.GetAsInt() log.Info("start to import", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Any("schema", t.GetSchema()))...) @@ -151,7 +152,8 @@ func (t *ImportTask) Execute() []*conc.Future[any] { reader, err := importutilv2.NewReader(t.ctx, t.cm, t.GetSchema(), file, req.GetOptions(), bufferSize) if err != nil { log.Warn("new reader failed", WrapLogFields(t, zap.String("file", file.String()), zap.Error(err))...) - t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) + reason := fmt.Sprintf("error: %v, file: %s", err, file.String()) + t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(reason)) return err } defer reader.Close() @@ -159,7 +161,8 @@ func (t *ImportTask) Execute() []*conc.Future[any] { err = t.importFile(reader) if err != nil { log.Warn("do import failed", WrapLogFields(t, zap.String("file", file.String()), zap.Error(err))...) - t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) + reason := fmt.Sprintf("error: %v, file: %s", err, file.String()) + t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(reason)) return err } log.Info("import file done", WrapLogFields(t, zap.Strings("files", file.GetPaths()), diff --git a/internal/datanode/importv2/task_l0_import.go b/internal/datanode/importv2/task_l0_import.go index 30b639404e..6f5c081b50 100644 --- a/internal/datanode/importv2/task_l0_import.go +++ b/internal/datanode/importv2/task_l0_import.go @@ -127,7 +127,7 @@ func (t *L0ImportTask) Clone() Task { } func (t *L0ImportTask) Execute() []*conc.Future[any] { - bufferSize := paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt() * 1024 * 1024 + bufferSize := paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt() log.Info("start to import l0", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Any("schema", t.GetSchema()))...) @@ -136,8 +136,12 @@ func (t *L0ImportTask) Execute() []*conc.Future[any] { fn := func() (err error) { defer func() { if err != nil { - log.Warn("l0 import task execute failed", WrapLogFields(t, zap.Error(err))...) - t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) + var reason string = err.Error() + if len(t.req.GetFiles()) == 1 { + reason = fmt.Sprintf("error: %v, file: %s", err, t.req.GetFiles()[0].String()) + } + log.Warn("l0 import task execute failed", WrapLogFields(t, zap.Any("file", t.req.GetFiles()), zap.String("err", reason))...) + t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(reason)) } }() diff --git a/internal/datanode/importv2/task_l0_preimport.go b/internal/datanode/importv2/task_l0_preimport.go index 688830a457..0e1b47e9e9 100644 --- a/internal/datanode/importv2/task_l0_preimport.go +++ b/internal/datanode/importv2/task_l0_preimport.go @@ -115,7 +115,7 @@ func (t *L0PreImportTask) Clone() Task { } func (t *L0PreImportTask) Execute() []*conc.Future[any] { - bufferSize := paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt() * 1024 * 1024 + bufferSize := paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt() log.Info("start to preimport l0", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Any("schema", t.GetSchema()))...) @@ -124,8 +124,12 @@ func (t *L0PreImportTask) Execute() []*conc.Future[any] { fn := func() (err error) { defer func() { if err != nil { - log.Warn("l0 import task execute failed", WrapLogFields(t, zap.Error(err))...) - t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) + var reason string = err.Error() + if len(t.GetFileStats()) == 1 { + reason = fmt.Sprintf("error: %v, file: %s", err, t.GetFileStats()[0].GetImportFile().String()) + } + log.Warn("l0 import task execute failed", WrapLogFields(t, zap.String("err", reason))...) + t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(reason)) } }() diff --git a/internal/datanode/importv2/task_preimport.go b/internal/datanode/importv2/task_preimport.go index f43c099423..5bb2a8ab43 100644 --- a/internal/datanode/importv2/task_preimport.go +++ b/internal/datanode/importv2/task_preimport.go @@ -124,7 +124,7 @@ func (t *PreImportTask) Clone() Task { } func (t *PreImportTask) Execute() []*conc.Future[any] { - bufferSize := paramtable.Get().DataNodeCfg.ImportInsertBufferSize.GetAsInt() * 1024 * 1024 + bufferSize := paramtable.Get().DataNodeCfg.ImportInsertBufferSize.GetAsInt() log.Info("start to preimport", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Any("schema", t.GetSchema()))...) @@ -138,7 +138,8 @@ func (t *PreImportTask) Execute() []*conc.Future[any] { reader, err := importutilv2.NewReader(t.ctx, t.cm, t.GetSchema(), file, t.options, bufferSize) if err != nil { log.Warn("new reader failed", WrapLogFields(t, zap.String("file", file.String()), zap.Error(err))...) - t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) + reason := fmt.Sprintf("error: %v, file: %s", err, file.String()) + t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(reason)) return err } defer reader.Close() @@ -146,7 +147,8 @@ func (t *PreImportTask) Execute() []*conc.Future[any] { err = t.readFileStat(reader, i) if err != nil { log.Warn("preimport failed", WrapLogFields(t, zap.String("file", file.String()), zap.Error(err))...) - t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) + reason := fmt.Sprintf("error: %v, file: %s", err, file.String()) + t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(reason)) return err } log.Info("read file stat done", WrapLogFields(t, zap.Strings("files", file.GetPaths()), diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 09e7b29878..8bba74a051 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -5477,6 +5477,10 @@ if this parameter <= 0, will set it as 10`, Version: "2.4.0", Doc: "The insert buffer size (in MB) during import.", DefaultValue: "64", + Formatter: func(v string) string { + bufferSize := getAsFloat(v) + return fmt.Sprintf("%d", int(megaBytes2Bytes(bufferSize))) + }, PanicIfEmpty: false, Export: true, } @@ -5487,6 +5491,10 @@ if this parameter <= 0, will set it as 10`, Version: "2.5.14", Doc: "The delete buffer size (in MB) during import.", DefaultValue: "16", + Formatter: func(v string) string { + bufferSize := getAsFloat(v) + return fmt.Sprintf("%d", int(megaBytes2Bytes(bufferSize))) + }, PanicIfEmpty: false, Export: true, } diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 02c1386541..3fb549a6be 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -611,8 +611,8 @@ func TestComponentParam(t *testing.T) { t.Logf("maxConcurrentImportTaskNum: %d", maxConcurrentImportTaskNum) assert.Equal(t, 16, maxConcurrentImportTaskNum) assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64()) - assert.Equal(t, 64, Params.ImportInsertBufferSize.GetAsInt()) - assert.Equal(t, 16, Params.ImportDeleteBufferSize.GetAsInt()) + assert.Equal(t, 64*1024*1024, Params.ImportInsertBufferSize.GetAsInt()) + assert.Equal(t, 16*1024*1024, Params.ImportDeleteBufferSize.GetAsInt()) assert.Equal(t, 16, Params.MaxTaskSlotNum.GetAsInt()) params.Save("datanode.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) diff --git a/tests/integration/import/auto_id_test.go b/tests/integration/import/auto_id_test.go new file mode 100644 index 0000000000..dd773d9201 --- /dev/null +++ b/tests/integration/import/auto_id_test.go @@ -0,0 +1,195 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importv2 + +import ( + "context" + "fmt" + "math/rand" + "os" + "sync" + "time" + + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" + "github.com/milvus-io/milvus/pkg/v2/util/funcutil" + "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" + "github.com/milvus-io/milvus/tests/integration" +) + +func (s *BulkInsertSuite) runTestAutoID() { + const ( + rowCount = 10 + fileNum = 10 + ) + + c := s.Cluster + ctx, cancel := context.WithTimeout(c.GetContext(), 240*time.Second) + defer cancel() + + collectionName := "TestBulkInsert" + funcutil.GenRandomStr() + + var schema *schemapb.CollectionSchema + fieldSchema1 := &schemapb.FieldSchema{FieldID: 100, Name: "id", DataType: s.pkType, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "128"}}, IsPrimaryKey: true, AutoID: true} + fieldSchema2 := &schemapb.FieldSchema{FieldID: 101, Name: "image_path", DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "65535"}}} + fieldSchema3 := &schemapb.FieldSchema{FieldID: 102, Name: "embeddings", DataType: s.vecType, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "128"}}} + schema = integration.ConstructSchema(collectionName, dim, true, fieldSchema1, fieldSchema2, fieldSchema3) + + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: common.DefaultShardsNum, + }) + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, createCollectionStatus.GetErrorCode()) + + err = os.MkdirAll(c.ChunkManager.RootPath(), os.ModePerm) + s.NoError(err) + + wg := &sync.WaitGroup{} + importReqs := make([]*internalpb.ImportRequest, fileNum) + for i := 0; i < fileNum; i++ { + wg.Add(1) + i := i + go func() { + defer wg.Done() + rowBasedFile := fmt.Sprintf("%s/test_%d_%d.json", c.ChunkManager.RootPath(), i, rand.Int()) + GenerateJSONFile(s.T(), rowBasedFile, schema, rowCount) + files := []*internalpb.ImportFile{ + { + Paths: []string{ + rowBasedFile, + }, + }, + } + importReqs[i] = &internalpb.ImportRequest{ + CollectionName: collectionName, + Files: files, + Options: []*commonpb.KeyValuePair{}, + } + }() + } + defer func() { + for _, req := range importReqs { + os.Remove(req.GetFiles()[0].GetPaths()[0]) + } + }() + wg.Wait() + + for i := 0; i < fileNum; i++ { + wg.Add(1) + i := i + go func() { + defer wg.Done() + importResp, err := c.Proxy.ImportV2(ctx, importReqs[i]) + s.NoError(err) + s.Equal(int32(0), importResp.GetStatus().GetCode()) + log.Info("Import result", zap.Any("importResp", importResp)) + err = WaitForImportDone(ctx, c, importResp.GetJobID()) + s.NoError(err) + }() + } + wg.Wait() + + segments, err := c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + for _, segment := range segments { + s.True(len(segment.GetBinlogs()) > 0) + s.NoError(CheckLogID(segment.GetBinlogs())) + s.True(len(segment.GetDeltalogs()) == 0) + s.True(len(segment.GetStatslogs()) > 0) + s.NoError(CheckLogID(segment.GetStatslogs())) + } + + // create index + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: "embeddings", + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, s.indexType, s.metricType), + }) + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode()) + s.WaitForIndexBuilt(ctx, collectionName, "embeddings") + + // load + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + CollectionName: collectionName, + }) + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode()) + s.WaitForLoad(ctx, collectionName) + + // search + expr := "" + const ( + nq = 2 + topk = 2 + roundDecimal = -1 + ) + params := integration.GetSearchParams(s.indexType, s.metricType) + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + "embeddings", s.vecType, nil, s.metricType, params, nq, dim, topk, roundDecimal) + searchReq.ConsistencyLevel = commonpb.ConsistencyLevel_Eventually + searchResult, err := c.Proxy.Search(ctx, searchReq) + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, searchResult.GetStatus().GetErrorCode()) + s.Equal(nq*topk, len(searchResult.GetResults().GetScores())) + + // verify no duplicate autoID + expr = "id >= 0" + if s.pkType == schemapb.DataType_VarChar { + expr = `id >= "0"` + } + queryResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{ + CollectionName: collectionName, + Expr: expr, + OutputFields: []string{"id"}, + ConsistencyLevel: commonpb.ConsistencyLevel_Eventually, + }) + err = merr.CheckRPCCall(queryResult, err) + s.NoError(err) + count := len(queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()) + if s.pkType == schemapb.DataType_VarChar { + count = len(queryResult.GetFieldsData()[0].GetScalars().GetStringData().GetData()) + } + s.Equal(rowCount*fileNum, count) +} + +func (s *BulkInsertSuite) TestAutoID() { + // make buffer size small to trigger multiple sync + paramtable.Get().Save(paramtable.Get().DataNodeCfg.ImportInsertBufferSize.Key, "0.000001") + defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.ImportInsertBufferSize.Key) + + s.pkType = schemapb.DataType_Int64 + s.runTestAutoID() + + s.pkType = schemapb.DataType_VarChar + s.runTestAutoID() +} diff --git a/tests/integration/import/binlog_test.go b/tests/integration/import/binlog_test.go index 4bed5c81cb..2bff6c0506 100644 --- a/tests/integration/import/binlog_test.go +++ b/tests/integration/import/binlog_test.go @@ -19,6 +19,7 @@ package importv2 import ( "context" "fmt" + "strings" "time" "github.com/samber/lo" @@ -39,7 +40,12 @@ import ( "github.com/milvus-io/milvus/tests/integration" ) -func (s *BulkInsertSuite) PrepareCollectionA(dim, rowNum, delNum, delBatch int) (int64, int64, *schemapb.IDs) { +type DMLGroup struct { + insertRowNums []int + deleteRowNums []int +} + +func (s *BulkInsertSuite) PrepareCollectionA(dim int, dmlGroup *DMLGroup) (int64, int64, *schemapb.IDs) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) defer cancel() c := s.Cluster @@ -51,10 +57,9 @@ func (s *BulkInsertSuite) PrepareCollectionA(dim, rowNum, delNum, delBatch int) s.NoError(err) createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ - CollectionName: collectionName, - Schema: marshaledSchema, - ShardsNum: common.DefaultShardsNum, - ConsistencyLevel: commonpb.ConsistencyLevel_Strong, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: common.DefaultShardsNum, }) s.NoError(merr.CheckRPCCall(createCollectionStatus, err)) @@ -76,7 +81,6 @@ func (s *BulkInsertSuite) PrepareCollectionA(dim, rowNum, delNum, delBatch int) ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, metric.L2), }) s.NoError(merr.CheckRPCCall(createIndexStatus, err)) - s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) // load @@ -86,67 +90,85 @@ func (s *BulkInsertSuite) PrepareCollectionA(dim, rowNum, delNum, delBatch int) s.NoError(merr.CheckRPCCall(loadStatus, err)) s.WaitForLoad(ctx, collectionName) - fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) - hashKeys := integration.GenerateHashKeys(rowNum) - insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ - CollectionName: collectionName, - FieldsData: []*schemapb.FieldData{fVecColumn}, - HashKeys: hashKeys, - NumRows: uint32(rowNum), - }) - s.NoError(merr.CheckRPCCall(insertResult, err)) - insertedIDs := insertResult.GetIDs() + const delBatch = 2 + var ( + totalInsertRowNum = 0 + totalDeleteRowNum = 0 + totalInsertedIDs = &schemapb.IDs{ + IdField: &schemapb.IDs_IntId{ + IntId: &schemapb.LongArray{ + Data: make([]int64, 0), + }, + }, + } + ) - // flush - flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ - CollectionNames: []string{collectionName}, - }) - s.NoError(merr.CheckRPCCall(flushResp, err)) - segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] - ids := segmentIDs.GetData() - s.Require().NotEmpty(segmentIDs) - s.Require().True(has) - flushTs, has := flushResp.GetCollFlushTs()[collectionName] - s.True(has) + for i := range dmlGroup.insertRowNums { + insRow := dmlGroup.insertRowNums[i] + delRow := dmlGroup.deleteRowNums[i] + totalInsertRowNum += insRow + totalDeleteRowNum += delRow - s.WaitForFlush(ctx, ids, flushTs, "", collectionName) - segments, err := c.MetaWatcher.ShowSegments() - s.NoError(err) - s.NotEmpty(segments) - for _, segment := range segments { - log.Info("ShowSegments result", zap.String("segment", segment.String())) - } - - // delete - beginIndex := 0 - for i := 0; i < delBatch; i++ { - delCnt := delNum / delBatch - idBegin := insertedIDs.GetIntId().GetData()[beginIndex] - idEnd := insertedIDs.GetIntId().GetData()[beginIndex+delCnt] - deleteResult, err := c.Proxy.Delete(ctx, &milvuspb.DeleteRequest{ + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, insRow, dim) + hashKeys := integration.GenerateHashKeys(insRow) + insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ CollectionName: collectionName, - Expr: fmt.Sprintf("%d <= %s < %d", idBegin, integration.Int64Field, idEnd), + FieldsData: []*schemapb.FieldData{fVecColumn}, + HashKeys: hashKeys, + NumRows: uint32(insRow), }) - s.NoError(merr.CheckRPCCall(deleteResult, err)) - beginIndex += delCnt + s.NoError(merr.CheckRPCCall(insertResult, err)) + insertedIDs := insertResult.GetIDs() + totalInsertedIDs.IdField.(*schemapb.IDs_IntId).IntId.Data = append( + totalInsertedIDs.IdField.(*schemapb.IDs_IntId).IntId.Data, insertedIDs.IdField.(*schemapb.IDs_IntId).IntId.Data...) - flushResp, err = c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ + // delete + beginIndex := 0 + for j := 0; j < delBatch; j++ { + if delRow == 0 { + continue + } + delCnt := delRow / delBatch + idBegin := insertedIDs.GetIntId().GetData()[beginIndex] + idEnd := insertedIDs.GetIntId().GetData()[beginIndex+delCnt-1] + deleteResult, err := c.Proxy.Delete(ctx, &milvuspb.DeleteRequest{ + CollectionName: collectionName, + Expr: fmt.Sprintf("%d <= %s <= %d", idBegin, integration.Int64Field, idEnd), + }) + s.NoError(merr.CheckRPCCall(deleteResult, err)) + beginIndex += delCnt + } + + // flush + flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ CollectionNames: []string{collectionName}, }) s.NoError(merr.CheckRPCCall(flushResp, err)) - flushTs, has = flushResp.GetCollFlushTs()[collectionName] + segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] + ids := segmentIDs.GetData() + s.Require().NotEmpty(segmentIDs) + s.Require().True(has) + flushTs, has := flushResp.GetCollFlushTs()[collectionName] s.True(has) - s.WaitForFlush(ctx, nil, flushTs, "", collectionName) + s.WaitForFlush(ctx, ids, flushTs, "", collectionName) + segments, err := c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + for _, segment := range segments { + log.Info("ShowSegments result", zap.String("segment", segment.String())) + } } // check l0 segments - segments, err = c.MetaWatcher.ShowSegments() - s.NoError(err) - s.NotEmpty(segments) - l0Segments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { - return segment.GetLevel() == datapb.SegmentLevel_L0 - }) - s.Equal(delBatch, len(l0Segments)) + if totalDeleteRowNum > 0 { + segments, err := c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + l0Segments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { + return segment.GetLevel() == datapb.SegmentLevel_L0 + }) + s.True(len(l0Segments) > 0) + } // search expr := fmt.Sprintf("%s > 0", integration.Int64Field) @@ -162,7 +184,11 @@ func (s *BulkInsertSuite) PrepareCollectionA(dim, rowNum, delNum, delBatch int) err = merr.CheckRPCCall(searchResult, err) s.NoError(err) - s.Equal(nq*topk, len(searchResult.GetResults().GetScores())) + expectResult := nq * topk + if expectResult > totalInsertRowNum-totalDeleteRowNum { + expectResult = totalInsertRowNum - totalDeleteRowNum + } + s.Equal(expectResult, len(searchResult.GetResults().GetScores())) // query expr = fmt.Sprintf("%s >= 0", integration.Int64Field) @@ -174,10 +200,10 @@ func (s *BulkInsertSuite) PrepareCollectionA(dim, rowNum, delNum, delBatch int) err = merr.CheckRPCCall(queryResult, err) s.NoError(err) count := int(queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0]) - s.Equal(rowNum-delNum, count) + s.Equal(totalInsertRowNum-totalDeleteRowNum, count) // query 2 - expr = fmt.Sprintf("%s < %d", integration.Int64Field, insertedIDs.GetIntId().GetData()[10]) + expr = fmt.Sprintf("%s < %d", integration.Int64Field, totalInsertedIDs.GetIntId().GetData()[10]) queryResult, err = c.Proxy.Query(ctx, &milvuspb.QueryRequest{ CollectionName: collectionName, Expr: expr, @@ -186,28 +212,34 @@ func (s *BulkInsertSuite) PrepareCollectionA(dim, rowNum, delNum, delBatch int) err = merr.CheckRPCCall(queryResult, err) s.NoError(err) count = len(queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()) - s.Equal(0, count) + expectCount := 10 + if dmlGroup.deleteRowNums[0] >= 10 { + expectCount = 0 + } + s.Equal(expectCount, count) // get collectionID and partitionID collectionID := showCollectionsResp.GetCollectionIds()[0] partitionID := showPartitionsResp.GetPartitionIDs()[0] - return collectionID, partitionID, insertedIDs + return collectionID, partitionID, totalInsertedIDs } -func (s *BulkInsertSuite) TestBinlogImport() { - const ( - dim = 128 - rowNum = 50000 - delNum = 30000 - delBatch = 10 - ) +func (s *BulkInsertSuite) runBinlogTest(dmlGroup *DMLGroup) { + const dim = 128 - collectionID, partitionID, insertedIDs := s.PrepareCollectionA(dim, rowNum, delNum, delBatch) + collectionID, partitionID, insertedIDs := s.PrepareCollectionA(dim, dmlGroup) c := s.Cluster ctx := c.GetContext() + totalInsertRowNum := lo.SumBy(dmlGroup.insertRowNums, func(num int) int { + return num + }) + totalDeleteRowNum := lo.SumBy(dmlGroup.deleteRowNums, func(num int) int { + return num + }) + collectionName := "TestBinlogImport_B_" + funcutil.GenRandomStr() schema := integration.ConstructSchema(collectionName, dim, true) @@ -246,16 +278,12 @@ func (s *BulkInsertSuite) TestBinlogImport() { s.NoError(merr.CheckRPCCall(flushedSegmentsResp, err)) flushedSegments := flushedSegmentsResp.GetSegments() log.Info("flushed segments", zap.Int64s("segments", flushedSegments)) - segmentBinlogPrefixes := make([]string, 0) - for _, segmentID := range flushedSegments { - segmentBinlogPrefixes = append(segmentBinlogPrefixes, - fmt.Sprintf("/tmp/%s/insert_log/%d/%d/%d", paramtable.Get().EtcdCfg.RootPath.GetValue(), collectionID, partitionID, segmentID)) - } + // binlog import - files := []*internalpb.ImportFile{ - { - Paths: segmentBinlogPrefixes, - }, + files := make([]*internalpb.ImportFile, 0) + for _, segmentID := range flushedSegments { + files = append(files, &internalpb.ImportFile{Paths: []string{fmt.Sprintf("/tmp/%s/insert_log/%d/%d/%d", + paramtable.Get().EtcdCfg.RootPath.GetValue(), collectionID, partitionID, segmentID)}}) } importResp, err := c.Proxy.ImportV2(ctx, &internalpb.ImportRequest{ CollectionName: collectionName, @@ -293,45 +321,48 @@ func (s *BulkInsertSuite) TestBinlogImport() { s.NoError(CheckLogID(segment.GetStatslogs())) // l0 import - files = []*internalpb.ImportFile{ - { - Paths: []string{ - fmt.Sprintf("/tmp/%s/delta_log/%d/%d/", paramtable.Get().EtcdCfg.RootPath.GetValue(), collectionID, common.AllPartitionsID), + if totalDeleteRowNum > 0 { + files = []*internalpb.ImportFile{ + { + Paths: []string{ + fmt.Sprintf("/tmp/%s/delta_log/%d/%d/", + paramtable.Get().EtcdCfg.RootPath.GetValue(), collectionID, common.AllPartitionsID), + }, }, - }, + } + importResp, err = c.Proxy.ImportV2(ctx, &internalpb.ImportRequest{ + CollectionName: collectionName, + Files: files, + Options: []*commonpb.KeyValuePair{ + {Key: "l0_import", Value: "true"}, + }, + }) + s.NoError(merr.CheckRPCCall(importResp, err)) + log.Info("Import result", zap.Any("importResp", importResp)) + + jobID = importResp.GetJobID() + err = WaitForImportDone(ctx, c, jobID) + s.NoError(err) + + segments, err = c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + segments = lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { + return segment.GetCollectionID() == newCollectionID + }) + log.Info("Show segments", zap.Any("segments", segments)) + l0Segments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { + return segment.GetCollectionID() == newCollectionID && segment.GetLevel() == datapb.SegmentLevel_L0 + }) + s.Equal(1, len(l0Segments)) + segment = l0Segments[0] + s.Equal(commonpb.SegmentState_Flushed, segment.GetState()) + s.Equal(common.AllPartitionsID, segment.GetPartitionID()) + s.True(len(segment.GetBinlogs()) == 0) + s.True(len(segment.GetDeltalogs()) > 0) + s.NoError(CheckLogID(segment.GetDeltalogs())) + s.True(len(segment.GetStatslogs()) == 0) } - importResp, err = c.Proxy.ImportV2(ctx, &internalpb.ImportRequest{ - CollectionName: collectionName, - Files: files, - Options: []*commonpb.KeyValuePair{ - {Key: "l0_import", Value: "true"}, - }, - }) - s.NoError(merr.CheckRPCCall(importResp, err)) - log.Info("Import result", zap.Any("importResp", importResp)) - - jobID = importResp.GetJobID() - err = WaitForImportDone(ctx, c, jobID) - s.NoError(err) - - segments, err = c.MetaWatcher.ShowSegments() - s.NoError(err) - s.NotEmpty(segments) - segments = lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { - return segment.GetCollectionID() == newCollectionID - }) - log.Info("Show segments", zap.Any("segments", segments)) - l0Segments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { - return segment.GetCollectionID() == newCollectionID && segment.GetLevel() == datapb.SegmentLevel_L0 - }) - s.Equal(1, len(l0Segments)) - segment = l0Segments[0] - s.Equal(commonpb.SegmentState_Flushed, segment.GetState()) - s.Equal(common.AllPartitionsID, segment.GetPartitionID()) - s.True(len(segment.GetBinlogs()) == 0) - s.True(len(segment.GetDeltalogs()) > 0) - s.NoError(CheckLogID(segment.GetDeltalogs())) - s.True(len(segment.GetStatslogs()) == 0) // load loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ @@ -349,12 +380,17 @@ func (s *BulkInsertSuite) TestBinlogImport() { params := integration.GetSearchParams(integration.IndexFaissIvfFlat, metric.L2) searchReq := integration.ConstructSearchRequest("", collectionName, expr, integration.FloatVecField, schemapb.DataType_FloatVector, nil, metric.L2, params, nq, dim, topk, roundDecimal) + searchReq.ConsistencyLevel = commonpb.ConsistencyLevel_Eventually searchResult, err := c.Proxy.Search(ctx, searchReq) err = merr.CheckRPCCall(searchResult, err) s.NoError(err) - s.Equal(nq*topk, len(searchResult.GetResults().GetScores())) + expectResult := nq * topk + if expectResult > totalInsertRowNum-totalDeleteRowNum { + expectResult = totalInsertRowNum - totalDeleteRowNum + } + s.Equal(expectResult, len(searchResult.GetResults().GetScores())) // check ids from collectionA, because during binlog import, even if the primary key's autoID is set to true, // the primary key from the binlog should be used instead of being reassigned. insertedIDsMap := lo.SliceToMap(insertedIDs.GetIntId().GetData(), func(id int64) (int64, struct{}) { @@ -368,24 +404,104 @@ func (s *BulkInsertSuite) TestBinlogImport() { // query expr = fmt.Sprintf("%s >= 0", integration.Int64Field) queryResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{ - CollectionName: collectionName, - Expr: expr, - OutputFields: []string{"count(*)"}, + CollectionName: collectionName, + Expr: expr, + OutputFields: []string{"count(*)"}, + ConsistencyLevel: commonpb.ConsistencyLevel_Eventually, }) err = merr.CheckRPCCall(queryResult, err) s.NoError(err) count := int(queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0]) - s.Equal(rowNum-delNum, count) + s.Equal(totalInsertRowNum-totalDeleteRowNum, count) // query 2 expr = fmt.Sprintf("%s < %d", integration.Int64Field, insertedIDs.GetIntId().GetData()[10]) queryResult, err = c.Proxy.Query(ctx, &milvuspb.QueryRequest{ - CollectionName: collectionName, - Expr: expr, - OutputFields: []string{}, + CollectionName: collectionName, + Expr: expr, + OutputFields: []string{}, + ConsistencyLevel: commonpb.ConsistencyLevel_Eventually, }) err = merr.CheckRPCCall(queryResult, err) s.NoError(err) count = len(queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()) - s.Equal(0, count) + expectCount := 10 + if dmlGroup.deleteRowNums[0] >= 10 { + expectCount = 0 + } + s.Equal(expectCount, count) +} + +func (s *BulkInsertSuite) TestInvalidInput() { + const dim = 128 + c := s.Cluster + ctx := c.GetContext() + + collectionName := "TestBinlogImport_InvalidInput_" + funcutil.GenRandomStr() + schema := integration.ConstructSchema(collectionName, dim, true) + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: common.DefaultShardsNum, + }) + s.NoError(merr.CheckRPCCall(createCollectionStatus, err)) + + describeCollectionResp, err := c.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + CollectionName: collectionName, + }) + s.NoError(merr.CheckRPCCall(describeCollectionResp, err)) + + // binlog import + files := []*internalpb.ImportFile{ + { + Paths: []string{"invalid-path", "invalid-path", "invalid-path"}, + }, + } + importResp, err := c.Proxy.ImportV2(ctx, &internalpb.ImportRequest{ + CollectionName: collectionName, + PartitionName: paramtable.Get().CommonCfg.DefaultPartitionName.GetValue(), + Files: files, + Options: []*commonpb.KeyValuePair{ + {Key: "backup", Value: "true"}, + }, + }) + err = merr.CheckRPCCall(importResp, err) + s.True(strings.Contains(err.Error(), "too many input paths for binlog import")) + s.Error(err) + log.Info("Import result", zap.Any("importResp", importResp)) +} + +func (s *BulkInsertSuite) TestBinlogImport() { + dmlGroup := &DMLGroup{ + insertRowNums: []int{500, 500, 500}, + deleteRowNums: []int{300, 300, 300}, + } + s.runBinlogTest(dmlGroup) +} + +func (s *BulkInsertSuite) TestBinlogImport_NoDelete() { + dmlGroup := &DMLGroup{ + insertRowNums: []int{500, 500, 500}, + deleteRowNums: []int{0, 0, 0}, + } + s.runBinlogTest(dmlGroup) +} + +func (s *BulkInsertSuite) TestBinlogImport_Partial_0_Rows_Segment() { + dmlGroup := &DMLGroup{ + insertRowNums: []int{500, 500, 500}, + deleteRowNums: []int{500, 300, 0}, + } + s.runBinlogTest(dmlGroup) +} + +func (s *BulkInsertSuite) TestBinlogImport_All_0_Rows_Segment() { + dmlGroup := &DMLGroup{ + insertRowNums: []int{500, 500, 500}, + deleteRowNums: []int{500, 500, 500}, + } + s.runBinlogTest(dmlGroup) } diff --git a/tests/integration/import/dynamic_field_test.go b/tests/integration/import/dynamic_field_test.go index 7a53a781f2..34cb92e1e8 100644 --- a/tests/integration/import/dynamic_field_test.go +++ b/tests/integration/import/dynamic_field_test.go @@ -43,7 +43,7 @@ import ( func (s *BulkInsertSuite) testImportDynamicField() { const ( - rowCount = 10000 + rowCount = 100 ) c := s.Cluster @@ -191,6 +191,7 @@ func (s *BulkInsertSuite) testImportDynamicField() { params := integration.GetSearchParams(integration.IndexFaissIvfFlat, metric.L2) searchReq := integration.ConstructSearchRequest("", collectionName, expr, integration.FloatVecField, schemapb.DataType_FloatVector, nil, metric.L2, params, nq, dim, topk, roundDecimal) + searchReq.ConsistencyLevel = commonpb.ConsistencyLevel_Eventually searchResult, err := c.Proxy.Search(ctx, searchReq) diff --git a/tests/integration/import/import_test.go b/tests/integration/import/import_test.go index 12e032406f..c1a61dea23 100644 --- a/tests/integration/import/import_test.go +++ b/tests/integration/import/import_test.go @@ -49,7 +49,6 @@ type BulkInsertSuite struct { failedReason string pkType schemapb.DataType - autoID bool fileType importutilv2.FileType vecType schemapb.DataType @@ -63,7 +62,6 @@ func (s *BulkInsertSuite) SetupTest() { s.failed = false s.fileType = importutilv2.Parquet s.pkType = schemapb.DataType_Int64 - s.autoID = false s.vecType = schemapb.DataType_FloatVector s.indexType = "HNSW" @@ -82,14 +80,14 @@ func (s *BulkInsertSuite) run() { collectionName := "TestBulkInsert" + funcutil.GenRandomStr() var schema *schemapb.CollectionSchema - fieldSchema1 := &schemapb.FieldSchema{FieldID: 100, Name: "id", DataType: s.pkType, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "128"}}, IsPrimaryKey: true, AutoID: s.autoID} + fieldSchema1 := &schemapb.FieldSchema{FieldID: 100, Name: "id", DataType: s.pkType, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "128"}}, IsPrimaryKey: true, AutoID: false} fieldSchema2 := &schemapb.FieldSchema{FieldID: 101, Name: "image_path", DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "65535"}}} fieldSchema3 := &schemapb.FieldSchema{FieldID: 102, Name: "embeddings", DataType: s.vecType, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "128"}}} fieldSchema4 := &schemapb.FieldSchema{FieldID: 103, Name: "embeddings", DataType: s.vecType, TypeParams: []*commonpb.KeyValuePair{}} if s.vecType != schemapb.DataType_SparseFloatVector { - schema = integration.ConstructSchema(collectionName, dim, s.autoID, fieldSchema1, fieldSchema2, fieldSchema3) + schema = integration.ConstructSchema(collectionName, dim, false, fieldSchema1, fieldSchema2, fieldSchema3) } else { - schema = integration.ConstructSchema(collectionName, dim, s.autoID, fieldSchema1, fieldSchema2, fieldSchema4) + schema = integration.ConstructSchema(collectionName, dim, false, fieldSchema1, fieldSchema2, fieldSchema4) } marshaledSchema, err := proto.Marshal(schema) @@ -211,6 +209,7 @@ func (s *BulkInsertSuite) run() { params := integration.GetSearchParams(s.indexType, s.metricType) searchReq := integration.ConstructSearchRequest("", collectionName, expr, "embeddings", s.vecType, nil, s.metricType, params, nq, dim, topk, roundDecimal) + searchReq.ConsistencyLevel = commonpb.ConsistencyLevel_Eventually searchResult, err := c.Proxy.Search(ctx, searchReq) s.NoError(err) @@ -259,16 +258,6 @@ func (s *BulkInsertSuite) TestMultiFileTypes() { } } -func (s *BulkInsertSuite) TestAutoID() { - s.pkType = schemapb.DataType_Int64 - s.autoID = true - s.run() - - s.pkType = schemapb.DataType_VarChar - s.autoID = true - s.run() -} - func (s *BulkInsertSuite) TestPK() { s.pkType = schemapb.DataType_Int64 s.run() diff --git a/tests/integration/import/multi_vector_test.go b/tests/integration/import/multi_vector_test.go index 168b54657d..936ff6f3b1 100644 --- a/tests/integration/import/multi_vector_test.go +++ b/tests/integration/import/multi_vector_test.go @@ -43,7 +43,7 @@ import ( func (s *BulkInsertSuite) testMultipleVectorFields() { const ( - rowCount = 10000 + rowCount = 100 dim1 = 64 dim2 = 32 ) @@ -215,6 +215,7 @@ func (s *BulkInsertSuite) testMultipleVectorFields() { params := integration.GetSearchParams(integration.IndexFaissIvfFlat, metric.L2) searchReq := integration.ConstructSearchRequest("", collectionName, expr, integration.FloatVecField, schemapb.DataType_FloatVector, nil, metric.L2, params, nq, dim1, topk, roundDecimal) + searchReq.ConsistencyLevel = commonpb.ConsistencyLevel_Eventually searchResult, err := c.Proxy.Search(ctx, searchReq) @@ -225,6 +226,7 @@ func (s *BulkInsertSuite) testMultipleVectorFields() { // search vec 2 searchReq = integration.ConstructSearchRequest("", collectionName, expr, integration.BFloat16VecField, schemapb.DataType_BFloat16Vector, nil, metric.L2, params, nq, dim2, topk, roundDecimal) + searchReq.ConsistencyLevel = commonpb.ConsistencyLevel_Eventually searchResult, err = c.Proxy.Search(ctx, searchReq) diff --git a/tests/integration/import/partition_key_test.go b/tests/integration/import/partition_key_test.go index ecc68c1236..5d7ce63b77 100644 --- a/tests/integration/import/partition_key_test.go +++ b/tests/integration/import/partition_key_test.go @@ -42,7 +42,7 @@ import ( func (s *BulkInsertSuite) TestImportWithPartitionKey() { const ( - rowCount = 10000 + rowCount = 100 ) c := s.Cluster @@ -338,9 +338,10 @@ func (s *BulkInsertSuite) TestImportWithAFewRows() { str := strings.Join(strs, `,`) expr := fmt.Sprintf("%s in [%v]", integration.VarCharField, str) queryResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{ - CollectionName: collectionName, - Expr: expr, - OutputFields: []string{integration.VarCharField}, + CollectionName: collectionName, + Expr: expr, + OutputFields: []string{integration.VarCharField}, + ConsistencyLevel: commonpb.ConsistencyLevel_Eventually, }) err = merr.CheckRPCCall(queryResult, err) s.NoError(err) @@ -355,9 +356,10 @@ func (s *BulkInsertSuite) TestImportWithAFewRows() { // query partition key, CmpOp 1 expr = fmt.Sprintf("%s >= 0", integration.Int64Field) queryResult, err = c.Proxy.Query(ctx, &milvuspb.QueryRequest{ - CollectionName: collectionName, - Expr: expr, - OutputFields: []string{integration.VarCharField}, + CollectionName: collectionName, + Expr: expr, + OutputFields: []string{integration.VarCharField}, + ConsistencyLevel: commonpb.ConsistencyLevel_Eventually, }) err = merr.CheckRPCCall(queryResult, err) s.NoError(err) @@ -373,9 +375,10 @@ func (s *BulkInsertSuite) TestImportWithAFewRows() { target := partitionKeyData[rand.Intn(rowCount)] expr = fmt.Sprintf("%s == \"%s\"", integration.VarCharField, target) queryResult, err = c.Proxy.Query(ctx, &milvuspb.QueryRequest{ - CollectionName: collectionName, - Expr: expr, - OutputFields: []string{integration.VarCharField}, + CollectionName: collectionName, + Expr: expr, + OutputFields: []string{integration.VarCharField}, + ConsistencyLevel: commonpb.ConsistencyLevel_Eventually, }) err = merr.CheckRPCCall(queryResult, err) s.NoError(err)