diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 4898524e2d..7a75a14679 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -39,13 +39,13 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querynodev2/cluster" "github.com/milvus-io/milvus/internal/querynodev2/delegator/deletebuffer" - "github.com/milvus-io/milvus/internal/querynodev2/optimizers" "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/function" "github.com/milvus-io/milvus/internal/util/reduce" + "github.com/milvus-io/milvus/internal/util/searchutil/optimizers" "github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 6e6407df4e..f7ff0da91f 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -50,16 +50,16 @@ import ( grpcquerynodeclient "github.com/milvus-io/milvus/internal/distributed/querynode/client" "github.com/milvus-io/milvus/internal/querynodev2/cluster" "github.com/milvus-io/milvus/internal/querynodev2/delegator" - "github.com/milvus-io/milvus/internal/querynodev2/optimizers" "github.com/milvus-io/milvus/internal/querynodev2/pipeline" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tasks" "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/registry" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/initcore" + "github.com/milvus-io/milvus/internal/util/searchutil/optimizers" + "github.com/milvus-io/milvus/internal/util/searchutil/scheduler" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/log" @@ -113,7 +113,7 @@ type QueryNode struct { loader segments.Loader // Search/Query - scheduler tasks.Scheduler + scheduler scheduler.Scheduler // etcd client etcdCli *clientv3.Client @@ -339,7 +339,7 @@ func (node *QueryNode) Init() error { } schedulePolicy := paramtable.Get().QueryNodeCfg.SchedulePolicyName.GetValue() - node.scheduler = tasks.NewScheduler( + node.scheduler = scheduler.NewScheduler( schedulePolicy, ) diff --git a/internal/querynodev2/server_test.go b/internal/querynodev2/server_test.go index 30164e0b8b..37070bbae9 100644 --- a/internal/querynodev2/server_test.go +++ b/internal/querynodev2/server_test.go @@ -34,10 +34,10 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/querynodev2/optimizers" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/searchutil/optimizers" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/paramtable" ) diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 085154f696..e3ac0fb7f7 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -42,6 +42,7 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tasks" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/searchutil/scheduler" "github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -689,7 +690,7 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe return resp, nil } - var task tasks.Task + var task scheduler.Task if paramtable.Get().QueryNodeCfg.UseStreamComputing.GetAsBool() { task = tasks.NewStreamingSearchTask(searchCtx, collection, node.manager, req, node.serverID) } else { diff --git a/internal/querynodev2/tasks/query_stream_task.go b/internal/querynodev2/tasks/query_stream_task.go index 1626eb76bb..e24c755373 100644 --- a/internal/querynodev2/tasks/query_stream_task.go +++ b/internal/querynodev2/tasks/query_stream_task.go @@ -6,10 +6,11 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/internal/util/searchutil/scheduler" "github.com/milvus-io/milvus/internal/util/streamrpc" ) -var _ Task = &QueryStreamTask{} +var _ scheduler.Task = &QueryStreamTask{} func NewQueryStreamTask(ctx context.Context, collection *segments.Collection, diff --git a/internal/querynodev2/tasks/query_task.go b/internal/querynodev2/tasks/query_task.go index 2a655460a8..da4f18f720 100644 --- a/internal/querynodev2/tasks/query_task.go +++ b/internal/querynodev2/tasks/query_task.go @@ -15,6 +15,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/segcorepb" "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/internal/util/searchutil/scheduler" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -22,7 +23,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -var _ Task = &QueryTask{} +var _ scheduler.Task = &QueryTask{} func NewQueryTask(ctx context.Context, collection *segments.Collection, diff --git a/internal/querynodev2/tasks/search_task.go b/internal/querynodev2/tasks/search_task.go index 0a2118a787..39f25f542f 100644 --- a/internal/querynodev2/tasks/search_task.go +++ b/internal/querynodev2/tasks/search_task.go @@ -20,6 +20,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/internal/util/searchutil/scheduler" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -30,8 +31,8 @@ import ( ) var ( - _ Task = &SearchTask{} - _ MergeTask = &SearchTask{} + _ scheduler.Task = &SearchTask{} + _ scheduler.MergeTask = &SearchTask{} ) type SearchTask struct { @@ -346,7 +347,7 @@ func (t *SearchTask) NQ() int64 { return t.nq } -func (t *SearchTask) MergeWith(other Task) bool { +func (t *SearchTask) MergeWith(other scheduler.Task) bool { switch other := other.(type) { case *SearchTask: return t.Merge(other) @@ -416,7 +417,7 @@ func NewStreamingSearchTask(ctx context.Context, } } -func (t *StreamingSearchTask) MergeWith(other Task) bool { +func (t *StreamingSearchTask) MergeWith(other scheduler.Task) bool { return false } diff --git a/internal/querynodev2/optimizers/mock_query_hook.go b/internal/util/searchutil/optimizers/mock_query_hook.go similarity index 100% rename from internal/querynodev2/optimizers/mock_query_hook.go rename to internal/util/searchutil/optimizers/mock_query_hook.go diff --git a/internal/querynodev2/optimizers/query_hook.go b/internal/util/searchutil/optimizers/query_hook.go similarity index 100% rename from internal/querynodev2/optimizers/query_hook.go rename to internal/util/searchutil/optimizers/query_hook.go diff --git a/internal/querynodev2/optimizers/query_hook_test.go b/internal/util/searchutil/optimizers/query_hook_test.go similarity index 100% rename from internal/querynodev2/optimizers/query_hook_test.go rename to internal/util/searchutil/optimizers/query_hook_test.go diff --git a/internal/querynodev2/tasks/concurrent_safe_scheduler.go b/internal/util/searchutil/scheduler/concurrent_safe_scheduler.go similarity index 99% rename from internal/querynodev2/tasks/concurrent_safe_scheduler.go rename to internal/util/searchutil/scheduler/concurrent_safe_scheduler.go index 0454207361..7d04fe7c18 100644 --- a/internal/querynodev2/tasks/concurrent_safe_scheduler.go +++ b/internal/util/searchutil/scheduler/concurrent_safe_scheduler.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import ( "fmt" diff --git a/internal/querynodev2/tasks/concurrent_safe_scheduler_test.go b/internal/util/searchutil/scheduler/concurrent_safe_scheduler_test.go similarity index 99% rename from internal/querynodev2/tasks/concurrent_safe_scheduler_test.go rename to internal/util/searchutil/scheduler/concurrent_safe_scheduler_test.go index 69064f18fa..4017ff88f8 100644 --- a/internal/querynodev2/tasks/concurrent_safe_scheduler_test.go +++ b/internal/util/searchutil/scheduler/concurrent_safe_scheduler_test.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import ( "context" diff --git a/internal/querynodev2/tasks/fifo_policy.go b/internal/util/searchutil/scheduler/fifo_policy.go similarity index 98% rename from internal/querynodev2/tasks/fifo_policy.go rename to internal/util/searchutil/scheduler/fifo_policy.go index 168f30b6c0..e23b04eaf1 100644 --- a/internal/querynodev2/tasks/fifo_policy.go +++ b/internal/util/searchutil/scheduler/fifo_policy.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import ( "github.com/milvus-io/milvus/pkg/util/paramtable" diff --git a/internal/querynodev2/tasks/mock_task_test.go b/internal/util/searchutil/scheduler/mock_task_test.go similarity index 99% rename from internal/querynodev2/tasks/mock_task_test.go rename to internal/util/searchutil/scheduler/mock_task_test.go index 4705e84ba4..9f334cb242 100644 --- a/internal/querynodev2/tasks/mock_task_test.go +++ b/internal/util/searchutil/scheduler/mock_task_test.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import ( "context" diff --git a/internal/querynodev2/tasks/policy_test.go b/internal/util/searchutil/scheduler/policy_test.go similarity index 99% rename from internal/querynodev2/tasks/policy_test.go rename to internal/util/searchutil/scheduler/policy_test.go index 03ce1a811f..c88b9972b8 100644 --- a/internal/querynodev2/tasks/policy_test.go +++ b/internal/util/searchutil/scheduler/policy_test.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import ( "fmt" diff --git a/internal/querynodev2/tasks/queues.go b/internal/util/searchutil/scheduler/queues.go similarity index 99% rename from internal/querynodev2/tasks/queues.go rename to internal/util/searchutil/scheduler/queues.go index 3582d6ce1a..7d0828c96a 100644 --- a/internal/querynodev2/tasks/queues.go +++ b/internal/util/searchutil/scheduler/queues.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import ( "container/ring" diff --git a/internal/querynodev2/tasks/queues_test.go b/internal/util/searchutil/scheduler/queues_test.go similarity index 99% rename from internal/querynodev2/tasks/queues_test.go rename to internal/util/searchutil/scheduler/queues_test.go index ead133a2a1..79441d1f61 100644 --- a/internal/querynodev2/tasks/queues_test.go +++ b/internal/util/searchutil/scheduler/queues_test.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import ( "fmt" diff --git a/internal/querynodev2/tasks/tasks.go b/internal/util/searchutil/scheduler/tasks.go similarity index 99% rename from internal/querynodev2/tasks/tasks.go rename to internal/util/searchutil/scheduler/tasks.go index 7606642d12..cadb784729 100644 --- a/internal/querynodev2/tasks/tasks.go +++ b/internal/util/searchutil/scheduler/tasks.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import "github.com/milvus-io/milvus/internal/proto/internalpb" diff --git a/internal/querynodev2/tasks/user_task_polling_policy.go b/internal/util/searchutil/scheduler/user_task_polling_policy.go similarity index 99% rename from internal/querynodev2/tasks/user_task_polling_policy.go rename to internal/util/searchutil/scheduler/user_task_polling_policy.go index 8ea88659a4..e09d6c0c58 100644 --- a/internal/querynodev2/tasks/user_task_polling_policy.go +++ b/internal/util/searchutil/scheduler/user_task_polling_policy.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import ( "fmt"