From 9258f7059d890b8eeea8a37746eb9d16376853cc Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Thu, 19 Jan 2023 21:31:44 +0800 Subject: [PATCH] Fix Bulkload ut print too much result (#21821) Signed-off-by: xiaofan-luan --- internal/rootcoord/import_manager.go | 5 ++--- internal/rootcoord/import_manager_test.go | 7 +++++-- internal/rootcoord/scheduler_test.go | 10 +++++++--- internal/util/paramtable/component_param.go | 8 ++++++++ 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/internal/rootcoord/import_manager.go b/internal/rootcoord/import_manager.go index c7c4164b99..eb33501ea7 100644 --- a/internal/rootcoord/import_manager.go +++ b/internal/rootcoord/import_manager.go @@ -40,8 +40,7 @@ import ( ) const ( - MaxPendingCount = 65536 // TODO: Make this configurable. - delimiter = "/" + delimiter = "/" ) // checkPendingTasksInterval is the default interval to check and send out pending tasks, @@ -94,7 +93,7 @@ func newImportManager(ctx context.Context, client kv.TxnKV, mgr := &importManager{ ctx: ctx, taskStore: client, - pendingTasks: make([]*datapb.ImportTaskInfo, 0, MaxPendingCount), // currently task queue max size is 32 + pendingTasks: make([]*datapb.ImportTaskInfo, 0, Params.RootCoordCfg.ImportMaxPendingTaskCount.GetAsInt()), // currently task queue max size is 32 workingTasks: make(map[int64]*datapb.ImportTaskInfo), busyNodes: make(map[int64]int64), pendingLock: sync.RWMutex{}, diff --git a/internal/rootcoord/import_manager_test.go b/internal/rootcoord/import_manager_test.go index 543217767b..6ec84f8320 100644 --- a/internal/rootcoord/import_manager_test.go +++ b/internal/rootcoord/import_manager_test.go @@ -542,7 +542,10 @@ func TestImportManager_ImportJob(t *testing.T) { globalCount++ return globalCount, 0, nil } + paramtable.Get().Save(Params.RootCoordCfg.ImportTaskSubPath.Key, "test_import_task") + paramtable.Get().Save(Params.RootCoordCfg.ImportMaxPendingTaskCount.Key, "16") + defer paramtable.Get().Remove(Params.RootCoordCfg.ImportMaxPendingTaskCount.Key) colID := int64(100) mockKv := memkv.NewMemoryKV() callMarkSegmentsDropped := func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error) { @@ -654,9 +657,9 @@ func TestImportManager_ImportJob(t *testing.T) { // the pending list already has one task // once task count exceeds MaxPendingCount, return error - for i := 0; i <= MaxPendingCount; i++ { + for i := 0; i <= Params.RootCoordCfg.ImportMaxPendingTaskCount.GetAsInt(); i++ { resp = mgr.importJob(context.TODO(), rowReq, colID, 0) - if i < MaxPendingCount-1 { + if i < Params.RootCoordCfg.ImportMaxPendingTaskCount.GetAsInt()-1 { assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) } else { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) diff --git a/internal/rootcoord/scheduler_test.go b/internal/rootcoord/scheduler_test.go index f92c8bfd98..f05f4cc4fc 100644 --- a/internal/rootcoord/scheduler_test.go +++ b/internal/rootcoord/scheduler_test.go @@ -190,9 +190,13 @@ func Test_scheduler_updateDdlMinTsLoop(t *testing.T) { paramtable.Get().Save(Params.ProxyCfg.TimeTickInterval.Key, "1") s.Start() - time.Sleep(time.Millisecond * 4) - - assert.Greater(t, s.GetMinDdlTs(), Timestamp(100)) + for i := 0; i < 100; i++ { + if s.GetMinDdlTs() > Timestamp(100) { + break + } + assert.True(t, i < 100) + time.Sleep(time.Millisecond) + } // add task to queue. n := 10 diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 516f4ff62f..baf91f5620 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -579,6 +579,7 @@ type rootCoordConfig struct { MinSegmentSizeToEnableIndex ParamItem `refreshable:"true"` ImportTaskExpiration ParamItem `refreshable:"true"` ImportTaskRetention ParamItem `refreshable:"true"` + ImportMaxPendingTaskCount ParamItem `refreshable:"true"` ImportTaskSubPath ParamItem `refreshable:"true"` EnableActiveStandby ParamItem `refreshable:"false"` } @@ -626,6 +627,13 @@ func (p *rootCoordConfig) init(base *BaseTable) { } p.ImportTaskSubPath.Init(base.mgr) + p.ImportMaxPendingTaskCount = ParamItem{ + Key: "rootCoord.importMaxPendingTaskCount", + Version: "2.2.2", + DefaultValue: strconv.Itoa(65535), + } + p.ImportMaxPendingTaskCount.Init(base.mgr) + p.EnableActiveStandby = ParamItem{ Key: "rootCoord.enableActiveStandby", Version: "2.2.0",