milvus/internal/querynodev2/tasks/query_stream_task.go
Bingyi Sun fecd9c21ba
feat: LRU cache implementation (#32567)
issue: https://github.com/milvus-io/milvus/issues/32783
This pr is the implementation of lru cache on branch lru-dev.

Signed-off-by: sunby <sunbingyi1992@gmail.com>
Co-authored-by: chyezh <chyezh@outlook.com>
Co-authored-by: MrPresent-Han <chun.han@zilliz.com>
Co-authored-by: Ted Xu <ted.xu@zilliz.com>
Co-authored-by: jaime <yun.zhang@zilliz.com>
Co-authored-by: wayblink <anyang.wang@zilliz.com>
2024-05-06 20:29:30 +08:00

94 lines
2.0 KiB
Go

package tasks
import (
"context"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/util/streamrpc"
)
var _ Task = &QueryStreamTask{}
func NewQueryStreamTask(ctx context.Context,
collection *segments.Collection,
manager *segments.Manager,
req *querypb.QueryRequest,
srv streamrpc.QueryStreamServer,
) *QueryStreamTask {
return &QueryStreamTask{
ctx: ctx,
collection: collection,
segmentManager: manager,
req: req,
srv: srv,
notifier: make(chan error, 1),
}
}
type QueryStreamTask struct {
ctx context.Context
collection *segments.Collection
segmentManager *segments.Manager
req *querypb.QueryRequest
srv streamrpc.QueryStreamServer
notifier chan error
}
// Return the username which task is belong to.
// Return "" if the task do not contain any user info.
func (t *QueryStreamTask) Username() string {
return t.req.Req.GetUsername()
}
func (t *QueryStreamTask) IsGpuIndex() bool {
return false
}
// PreExecute the task, only call once.
func (t *QueryStreamTask) PreExecute() error {
return nil
}
func (t *QueryStreamTask) Execute() error {
retrievePlan, err := segments.NewRetrievePlan(
t.ctx,
t.collection,
t.req.Req.GetSerializedExprPlan(),
t.req.Req.GetMvccTimestamp(),
t.req.Req.Base.GetMsgID(),
)
if err != nil {
return err
}
defer retrievePlan.Delete()
segments, err := segments.RetrieveStream(t.ctx, t.segmentManager, retrievePlan, t.req, t.srv)
defer t.segmentManager.Segment.Unpin(segments)
if err != nil {
return err
}
return nil
}
func (t *QueryStreamTask) Done(err error) {
t.notifier <- err
}
func (t *QueryStreamTask) Canceled() error {
return t.ctx.Err()
}
func (t *QueryStreamTask) Wait() error {
return <-t.notifier
}
func (t *QueryStreamTask) NQ() int64 {
return 1
}
func (t *QueryStreamTask) SearchResult() *internalpb.SearchResults {
return nil
}