From 60cd548bf552b686fccc850c2c321db0e2037be4 Mon Sep 17 00:00:00 2001 From: yah01 Date: Tue, 28 Feb 2023 14:19:47 +0800 Subject: [PATCH] Rename concurrency package to conc (#22453) Signed-off-by: yah01 --- internal/datanode/data_sync_service.go | 6 ++--- internal/datanode/io_pool.go | 8 +++---- internal/datanode/io_pool_test.go | 6 ++--- internal/querynode/impl_test.go | 4 ++-- internal/querynode/mock_test.go | 4 ++-- internal/querynode/query_node.go | 6 ++--- internal/querynode/segment_loader.go | 22 +++++++++---------- internal/util/{concurrency => conc}/future.go | 2 +- .../util/{concurrency => conc}/future_test.go | 2 +- internal/util/{concurrency => conc}/pool.go | 2 +- .../util/{concurrency => conc}/pool_test.go | 2 +- 11 files changed, 32 insertions(+), 32 deletions(-) rename internal/util/{concurrency => conc}/future.go (99%) rename internal/util/{concurrency => conc}/future_test.go (98%) rename internal/util/{concurrency => conc}/pool.go (99%) rename internal/util/{concurrency => conc}/pool_test.go (98%) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index b872afa10a..3398069bcc 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -35,7 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/commonpbutil" - "github.com/milvus-io/milvus/internal/util/concurrency" + "github.com/milvus-io/milvus/internal/util/conc" "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/paramtable" @@ -203,7 +203,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo, tick return err } - futures := make([]*concurrency.Future[any], 0, len(unflushedSegmentInfos)+len(flushedSegmentInfos)) + futures := make([]*conc.Future[any], 0, len(unflushedSegmentInfos)+len(flushedSegmentInfos)) for _, us := range unflushedSegmentInfos { if us.CollectionID != dsService.collectionID || @@ -283,7 +283,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo, tick tickler.watch() defer tickler.stop() - err = concurrency.AwaitAll(futures...) + err = conc.AwaitAll(futures...) if err != nil { return err } diff --git a/internal/datanode/io_pool.go b/internal/datanode/io_pool.go index 26c523cec8..b97f5eb2a4 100644 --- a/internal/datanode/io_pool.go +++ b/internal/datanode/io_pool.go @@ -3,10 +3,10 @@ package datanode import ( "sync" - "github.com/milvus-io/milvus/internal/util/concurrency" + "github.com/milvus-io/milvus/internal/util/conc" ) -var ioPool *concurrency.Pool +var ioPool *conc.Pool var ioPoolInitOnce sync.Once func initIOPool() { @@ -15,10 +15,10 @@ func initIOPool() { capacity = 32 } // error only happens with negative expiry duration or with negative pre-alloc size. - ioPool = concurrency.NewPool(capacity) + ioPool = conc.NewPool(capacity) } -func getOrCreateIOPool() *concurrency.Pool { +func getOrCreateIOPool() *conc.Pool { ioPoolInitOnce.Do(initIOPool) return ioPool } diff --git a/internal/datanode/io_pool_test.go b/internal/datanode/io_pool_test.go index abab9aa4ee..e41a3cf915 100644 --- a/internal/datanode/io_pool_test.go +++ b/internal/datanode/io_pool_test.go @@ -6,7 +6,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/milvus-io/milvus/internal/util/concurrency" + "github.com/milvus-io/milvus/internal/util/conc" "github.com/milvus-io/milvus/internal/util/paramtable" ) @@ -23,14 +23,14 @@ func Test_getOrCreateIOPool(t *testing.T) { go func() { defer wg.Done() p := getOrCreateIOPool() - futures := make([]*concurrency.Future[any], 0, nTask) + futures := make([]*conc.Future[any], 0, nTask) for j := 0; j < nTask; j++ { future := p.Submit(func() (interface{}, error) { return nil, nil }) futures = append(futures, future) } - err := concurrency.AwaitAll(futures...) + err := conc.AwaitAll(futures...) assert.NoError(t, err) }() } diff --git a/internal/querynode/impl_test.go b/internal/querynode/impl_test.go index bd880f9bd9..db63c20e82 100644 --- a/internal/querynode/impl_test.go +++ b/internal/querynode/impl_test.go @@ -30,7 +30,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" queryPb "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/util/concurrency" + "github.com/milvus-io/milvus/internal/util/conc" "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/sessionutil" @@ -116,7 +116,7 @@ func TestImpl_WatchDmChannels(t *testing.T) { defer func() { node.taskPool = originPool }() - node.taskPool = concurrency.NewDefaultPool() + node.taskPool = conc.NewDefaultPool() node.taskPool.Release() status, err = node.WatchDmChannels(ctx, req) assert.NoError(t, err) diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 8b3ec09b3d..82115148d9 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -46,7 +46,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util" - "github.com/milvus-io/milvus/internal/util/concurrency" + "github.com/milvus-io/milvus/internal/util/conc" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/funcutil" @@ -1696,7 +1696,7 @@ func genSimpleQueryNodeWithMQFactory(ctx context.Context, fac dependency.Factory node.etcdCli = etcdCli node.initSession() - node.taskPool = concurrency.NewPool(2, ants.WithPreAlloc(true)) + node.taskPool = conc.NewPool(2, ants.WithPreAlloc(true)) etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath.GetValue()) node.etcdKV = etcdKV diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index b3c3df75cf..b05bc79054 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -46,7 +46,7 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" - "github.com/milvus-io/milvus/internal/util/concurrency" + "github.com/milvus-io/milvus/internal/util/conc" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/gc" "github.com/milvus-io/milvus/internal/util/hardware" @@ -124,7 +124,7 @@ type QueryNode struct { queryShardService *queryShardService // pool for load/release channel - taskPool *concurrency.Pool + taskPool *conc.Pool IsStandAlone bool } @@ -271,7 +271,7 @@ func (node *QueryNode) Init() error { node.etcdKV = etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue()) log.Info("queryNode try to connect etcd success", zap.Any("MetaRootPath", Params.EtcdCfg.MetaRootPath)) - node.taskPool = concurrency.NewDefaultPool() + node.taskPool = conc.NewDefaultPool() node.metaReplica = newCollectionReplica() node.loader = newSegmentLoader( node.metaReplica, diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 52ea66284f..27f022ed05 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -45,7 +45,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" - "github.com/milvus-io/milvus/internal/util/concurrency" + "github.com/milvus-io/milvus/internal/util/conc" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/hardware" "github.com/milvus-io/milvus/internal/util/indexparamcheck" @@ -73,8 +73,8 @@ type segmentLoader struct { cm storage.ChunkManager // minio cm etcdKV *etcdkv.EtcdKV - ioPool *concurrency.Pool - cpuPool *concurrency.Pool + ioPool *conc.Pool + cpuPool *conc.Pool factory msgstream.Factory } @@ -319,7 +319,7 @@ func (loader *segmentLoader) loadGrowingSegmentFields(ctx context.Context, segme iCodec := storage.InsertCodec{} // change all field bin log loading into concurrent - loadFutures := make([]*concurrency.Future[any], 0, len(fieldBinlogs)) + loadFutures := make([]*conc.Future[any], 0, len(fieldBinlogs)) for _, fieldBinlog := range fieldBinlogs { futures := loader.loadFieldBinlogsAsync(ctx, fieldBinlog) loadFutures = append(loadFutures, futures...) @@ -402,7 +402,7 @@ func (loader *segmentLoader) loadSealedField(ctx context.Context, segment *Segme // acquire a CPU worker before load field binlogs futures := loader.loadFieldBinlogsAsync(ctx, field) - err := concurrency.AwaitAll(futures...) + err := conc.AwaitAll(futures...) if err != nil { return err } @@ -427,8 +427,8 @@ func (loader *segmentLoader) loadSealedField(ctx context.Context, segment *Segme } // Load binlogs concurrently into memory from KV storage asyncly -func (loader *segmentLoader) loadFieldBinlogsAsync(ctx context.Context, field *datapb.FieldBinlog) []*concurrency.Future[any] { - futures := make([]*concurrency.Future[any], 0, len(field.Binlogs)) +func (loader *segmentLoader) loadFieldBinlogsAsync(ctx context.Context, field *datapb.FieldBinlog) []*conc.Future[any] { + futures := make([]*conc.Future[any], 0, len(field.Binlogs)) for i := range field.Binlogs { path := field.Binlogs[i].GetLogPath() future := loader.ioPool.Submit(func() (interface{}, error) { @@ -473,7 +473,7 @@ func (loader *segmentLoader) loadFieldIndexData(ctx context.Context, segment *Se log := log.With(zap.Int64("segment", segment.ID())) indexBuffer := make([][]byte, 0, len(indexInfo.IndexFilePaths)) filteredPaths := make([]string, 0, len(indexInfo.IndexFilePaths)) - futures := make([]*concurrency.Future[any], 0, len(indexInfo.IndexFilePaths)) + futures := make([]*conc.Future[any], 0, len(indexInfo.IndexFilePaths)) indexCodec := storage.NewIndexFileBinlogCodec() // TODO, remove the load index info froam @@ -552,7 +552,7 @@ func (loader *segmentLoader) loadFieldIndexData(ctx context.Context, segment *Se futures = append(futures, indexFuture) } - err = concurrency.AwaitAll(futures...) + err = conc.AwaitAll(futures...) if err != nil { return err } @@ -985,8 +985,8 @@ func newSegmentLoader( if ioPoolSize > 256 { ioPoolSize = 256 } - ioPool := concurrency.NewPool(ioPoolSize, ants.WithPreAlloc(true)) - cpuPool := concurrency.NewPool(cpuNum, ants.WithPreAlloc(true)) + ioPool := conc.NewPool(ioPoolSize, ants.WithPreAlloc(true)) + cpuPool := conc.NewPool(cpuNum, ants.WithPreAlloc(true)) log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize), diff --git a/internal/util/concurrency/future.go b/internal/util/conc/future.go similarity index 99% rename from internal/util/concurrency/future.go rename to internal/util/conc/future.go index 1187f72c84..94c974317e 100644 --- a/internal/util/concurrency/future.go +++ b/internal/util/conc/future.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package concurrency +package conc type future interface { wait() diff --git a/internal/util/concurrency/future_test.go b/internal/util/conc/future_test.go similarity index 98% rename from internal/util/concurrency/future_test.go rename to internal/util/conc/future_test.go index 31ed80b0f2..eb4f72b5b0 100644 --- a/internal/util/concurrency/future_test.go +++ b/internal/util/conc/future_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package concurrency +package conc import ( "testing" diff --git a/internal/util/concurrency/pool.go b/internal/util/conc/pool.go similarity index 99% rename from internal/util/concurrency/pool.go rename to internal/util/conc/pool.go index 77728b86c0..c088fe3853 100644 --- a/internal/util/concurrency/pool.go +++ b/internal/util/conc/pool.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package concurrency +package conc import ( "runtime" diff --git a/internal/util/concurrency/pool_test.go b/internal/util/conc/pool_test.go similarity index 98% rename from internal/util/concurrency/pool_test.go rename to internal/util/conc/pool_test.go index d440810fb6..747da36d77 100644 --- a/internal/util/concurrency/pool_test.go +++ b/internal/util/conc/pool_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package concurrency +package conc import ( "testing"