diff --git a/internal/querynode/flow_graph_query_node.go b/internal/querynode/flow_graph_query_node.go index 902fcf19ac..b342911266 100644 --- a/internal/querynode/flow_graph_query_node.go +++ b/internal/querynode/flow_graph_query_node.go @@ -45,12 +45,11 @@ type ( // queryNodeFlowGraph is a TimeTickedFlowGraph in query node type queryNodeFlowGraph struct { - ctx context.Context - cancel context.CancelFunc collectionID UniqueID vchannel Channel flowGraph *flowgraph.TimeTickedFlowGraph dmlStream msgstream.MsgStream + tSafeReplica TSafeReplicaInterface consumerCnt int } @@ -62,17 +61,14 @@ func newQueryNodeFlowGraph(ctx context.Context, vchannel Channel, factory msgstream.Factory) (*queryNodeFlowGraph, error) { - ctx1, cancel := context.WithCancel(ctx) - q := &queryNodeFlowGraph{ - ctx: ctx1, - cancel: cancel, collectionID: collectionID, vchannel: vchannel, - flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1), + tSafeReplica: tSafeReplica, + flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx), } - dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, vchannel, metrics.InsertLabel) + dmStreamNode, err := q.newDmInputNode(ctx, factory, collectionID, vchannel, metrics.InsertLabel) if err != nil { return nil, err } @@ -128,17 +124,14 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context, vchannel Channel, factory msgstream.Factory) (*queryNodeFlowGraph, error) { - ctx1, cancel := context.WithCancel(ctx) - q := &queryNodeFlowGraph{ - ctx: ctx1, - cancel: cancel, collectionID: collectionID, vchannel: vchannel, - flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1), + tSafeReplica: tSafeReplica, + flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx), } - dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, vchannel, metrics.DeleteLabel) + dmStreamNode, err := q.newDmInputNode(ctx, factory, collectionID, vchannel, metrics.DeleteLabel) if err != nil { return nil, err } @@ -247,6 +240,8 @@ func (q *queryNodeFlowGraph) consumeFlowGraphFromPosition(position *internalpb.M start := time.Now() err := q.dmlStream.Seek([]*internalpb.MsgPosition{position}) + // setup first ts + q.tSafeReplica.setTSafe(q.vchannel, position.GetTimestamp()) ts, _ := tsoutil.ParseTS(position.GetTimestamp()) log.Info("query node flow graph seeks from position", @@ -264,7 +259,6 @@ func (q *queryNodeFlowGraph) consumeFlowGraphFromPosition(position *internalpb.M // close would close queryNodeFlowGraph func (q *queryNodeFlowGraph) close() { - q.cancel() q.flowGraph.Close() if q.dmlStream != nil && q.consumerCnt > 0 { metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Sub(float64(q.consumerCnt))