diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go index 4d8c5a2dec..76ce2fabb0 100644 --- a/internal/proxynode/impl.go +++ b/internal/proxynode/impl.go @@ -1475,11 +1475,11 @@ func (node *ProxyNode) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.Ge }, SegmentIDs: segments, }) - log.Debug("GetQuerySegmentInfo ", zap.Any("infos", infoResp.Infos), zap.Any("status", infoResp.Status)) if err != nil { resp.Status.Reason = err.Error() return resp, nil } + log.Debug("GetQuerySegmentInfo ", zap.Any("infos", infoResp.Infos), zap.Any("status", infoResp.Status)) if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success { resp.Status.Reason = infoResp.Status.Reason return resp, nil diff --git a/internal/querynode/collection.go b/internal/querynode/collection.go index a4bafd75b9..159b44cbd8 100644 --- a/internal/querynode/collection.go +++ b/internal/querynode/collection.go @@ -23,11 +23,13 @@ package querynode */ import "C" import ( + "unsafe" + + "go.uber.org/zap" + "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/schemapb" - "go.uber.org/zap" - "unsafe" ) type Collection struct { @@ -61,6 +63,9 @@ func (c *Collection) removePartitionID(partitionID UniqueID) { } func (c *Collection) addWatchedDmChannels(channels []VChannel) { + log.Debug("add watch dm channels to collection", + zap.Any("channels", channels), + zap.Any("collectionID", c.ID())) c.watchedChannels = append(c.watchedChannels, channels...) } @@ -79,9 +84,10 @@ func newCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Co collection := C.NewCollection(cSchemaBlob) var newCollection = &Collection{ - collectionPtr: collection, - id: collectionID, - schema: schema, + collectionPtr: collection, + id: collectionID, + schema: schema, + watchedChannels: make([]VChannel, 0), } C.free(unsafe.Pointer(cSchemaBlob)) diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index ffae382352..d03184b450 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -23,12 +23,11 @@ package querynode */ import "C" import ( + "errors" "fmt" "strconv" "sync" - "errors" - "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go index 30322ae2c3..0989fa6b7c 100644 --- a/internal/querynode/data_sync_service.go +++ b/internal/querynode/data_sync_service.go @@ -13,127 +13,193 @@ package querynode import ( "context" + "errors" + "fmt" + "sync" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" - "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/util/flowgraph" +) + +type flowGraphType = int32 + +const ( + flowGraphTypeCollection = 0 + flowGraphTypePartition = 1 ) type dataSyncService struct { - ctx context.Context - cancel context.CancelFunc + ctx context.Context - collectionID UniqueID - fg *flowgraph.TimeTickedFlowGraph + mu sync.Mutex // guards FlowGraphs + collectionFlowGraphs map[UniqueID][]*queryNodeFlowGraph // map[collectionID]flowGraphs + partitionFlowGraphs map[UniqueID][]*queryNodeFlowGraph // map[partitionID]flowGraphs - dmStream msgstream.MsgStream - msFactory msgstream.Factory - - replica ReplicaInterface - tSafeReplica TSafeReplicaInterface + streamingReplica ReplicaInterface + tSafeReplica TSafeReplicaInterface + msFactory msgstream.Factory } -func newDataSyncService(ctx context.Context, - replica ReplicaInterface, - tSafeReplica TSafeReplicaInterface, - factory msgstream.Factory, - collectionID UniqueID) *dataSyncService { +// collection flow graph +func (dsService *dataSyncService) addCollectionFlowGraph(collectionID UniqueID, vChannels []string) error { + dsService.mu.Lock() + defer dsService.mu.Unlock() - ctx1, cancel := context.WithCancel(ctx) - - service := &dataSyncService{ - ctx: ctx1, - cancel: cancel, - collectionID: collectionID, - fg: nil, - replica: replica, - tSafeReplica: tSafeReplica, - msFactory: factory, + if _, ok := dsService.collectionFlowGraphs[collectionID]; ok { + return errors.New("collection flow graph has been existed, collectionID = " + fmt.Sprintln(collectionID)) } - - service.initNodes() - return service -} - -func (dsService *dataSyncService) start() { - dsService.fg.Start() -} - -func (dsService *dataSyncService) close() { - dsService.cancel() - if dsService.fg != nil { - dsService.fg.Close() - } - log.Debug("dataSyncService closed", zap.Int64("collectionID", dsService.collectionID)) -} - -func (dsService *dataSyncService) initNodes() { - // TODO: add delete pipeline support - - dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) - - var dmStreamNode node = dsService.newDmInputNode(dsService.ctx) - - var filterDmNode node = newFilteredDmNode(dsService.replica, dsService.collectionID) - - var insertNode node = newInsertNode(dsService.replica, dsService.collectionID) - var serviceTimeNode node = newServiceTimeNode(dsService.ctx, - dsService.replica, - dsService.tSafeReplica, - dsService.msFactory, - dsService.collectionID) - - dsService.fg.AddNode(dmStreamNode) - - dsService.fg.AddNode(filterDmNode) - - dsService.fg.AddNode(insertNode) - dsService.fg.AddNode(serviceTimeNode) - - // dmStreamNode - var err = dsService.fg.SetEdges(dmStreamNode.Name(), - []string{}, - []string{filterDmNode.Name()}, - ) - if err != nil { - log.Error("set edges failed in node:", zap.String("node name", dmStreamNode.Name())) - } - - // filterDmNode - err = dsService.fg.SetEdges(filterDmNode.Name(), - []string{dmStreamNode.Name()}, - []string{insertNode.Name()}, - ) - if err != nil { - log.Error("set edges failed in node:", zap.String("node name", filterDmNode.Name())) - } - - // insertNode - err = dsService.fg.SetEdges(insertNode.Name(), - []string{filterDmNode.Name()}, - []string{serviceTimeNode.Name()}, - ) - if err != nil { - log.Error("set edges failed in node:", zap.String("node name", insertNode.Name())) - } - - // serviceTimeNode - err = dsService.fg.SetEdges(serviceTimeNode.Name(), - []string{insertNode.Name()}, - []string{}, - ) - if err != nil { - log.Error("set edges failed in node:", zap.String("node name", serviceTimeNode.Name())) - } -} - -func (dsService *dataSyncService) seekSegment(position *internalpb.MsgPosition) error { - err := dsService.dmStream.Seek([]*internalpb.MsgPosition{position}) - if err != nil { - return err + dsService.collectionFlowGraphs[collectionID] = make([]*queryNodeFlowGraph, 0) + for _, vChannel := range vChannels { + // collection flow graph doesn't need partition id + partitionID := UniqueID(0) + newFlowGraph := newQueryNodeFlowGraph(dsService.ctx, + flowGraphTypeCollection, + collectionID, + partitionID, + dsService.streamingReplica, + dsService.tSafeReplica, + vChannel, + dsService.msFactory) + dsService.collectionFlowGraphs[collectionID] = append(dsService.collectionFlowGraphs[collectionID], newFlowGraph) + log.Debug("add collection flow graph", + zap.Any("collectionID", collectionID), + zap.Any("channel", vChannel)) } return nil } + +func (dsService *dataSyncService) getCollectionFlowGraphs(collectionID UniqueID) ([]*queryNodeFlowGraph, error) { + dsService.mu.Lock() + defer dsService.mu.Unlock() + + if _, ok := dsService.collectionFlowGraphs[collectionID]; !ok { + return nil, errors.New("collection flow graph doesn't existed, collectionID = " + fmt.Sprintln(collectionID)) + } + return dsService.collectionFlowGraphs[collectionID], nil +} + +func (dsService *dataSyncService) startCollectionFlowGraph(collectionID UniqueID) error { + dsService.mu.Lock() + defer dsService.mu.Unlock() + + if _, ok := dsService.collectionFlowGraphs[collectionID]; !ok { + return errors.New("collection flow graph doesn't existed, collectionID = " + fmt.Sprintln(collectionID)) + } + for _, fg := range dsService.collectionFlowGraphs[collectionID] { + // start flow graph + log.Debug("start flow graph", zap.Any("channel", fg.channel)) + go fg.flowGraph.Start() + } + return nil +} + +func (dsService *dataSyncService) removeCollectionFlowGraph(collectionID UniqueID) { + dsService.mu.Lock() + defer dsService.mu.Unlock() + + if _, ok := dsService.collectionFlowGraphs[collectionID]; ok { + for _, nodeFG := range dsService.collectionFlowGraphs[collectionID] { + // close flow graph + nodeFG.close() + } + dsService.collectionFlowGraphs[collectionID] = nil + } + delete(dsService.collectionFlowGraphs, collectionID) +} + +// partition flow graph +func (dsService *dataSyncService) addPartitionFlowGraph(collectionID UniqueID, partitionID UniqueID, vChannels []string) error { + dsService.mu.Lock() + defer dsService.mu.Unlock() + + if _, ok := dsService.partitionFlowGraphs[partitionID]; ok { + return errors.New("partition flow graph has been existed, partitionID = " + fmt.Sprintln(partitionID)) + } + dsService.partitionFlowGraphs[partitionID] = make([]*queryNodeFlowGraph, 0) + for _, vChannel := range vChannels { + newFlowGraph := newQueryNodeFlowGraph(dsService.ctx, + flowGraphTypePartition, + collectionID, + partitionID, + dsService.streamingReplica, + dsService.tSafeReplica, + vChannel, + dsService.msFactory) + dsService.partitionFlowGraphs[partitionID] = append(dsService.partitionFlowGraphs[partitionID], newFlowGraph) + } + return nil +} + +func (dsService *dataSyncService) getPartitionFlowGraphs(partitionID UniqueID) ([]*queryNodeFlowGraph, error) { + dsService.mu.Lock() + defer dsService.mu.Unlock() + + if _, ok := dsService.partitionFlowGraphs[partitionID]; !ok { + return nil, errors.New("partition flow graph doesn't existed, partitionID = " + fmt.Sprintln(partitionID)) + } + return dsService.partitionFlowGraphs[partitionID], nil +} + +func (dsService *dataSyncService) startPartitionFlowGraph(partitionID UniqueID) error { + dsService.mu.Lock() + defer dsService.mu.Unlock() + + if _, ok := dsService.partitionFlowGraphs[partitionID]; !ok { + return errors.New("partition flow graph doesn't existed, partitionID = " + fmt.Sprintln(partitionID)) + } + for _, fg := range dsService.partitionFlowGraphs[partitionID] { + // start flow graph + go fg.flowGraph.Start() + } + return nil +} + +func (dsService *dataSyncService) removePartitionFlowGraph(partitionID UniqueID) { + dsService.mu.Lock() + defer dsService.mu.Unlock() + + if _, ok := dsService.partitionFlowGraphs[partitionID]; ok { + for _, nodeFG := range dsService.partitionFlowGraphs[partitionID] { + // close flow graph + nodeFG.close() + } + dsService.partitionFlowGraphs[partitionID] = nil + } + delete(dsService.partitionFlowGraphs, partitionID) +} + +func newDataSyncService(ctx context.Context, + streamingReplica ReplicaInterface, + tSafeReplica TSafeReplicaInterface, + factory msgstream.Factory) *dataSyncService { + + return &dataSyncService{ + ctx: ctx, + collectionFlowGraphs: make(map[UniqueID][]*queryNodeFlowGraph), + partitionFlowGraphs: make(map[UniqueID][]*queryNodeFlowGraph), + streamingReplica: streamingReplica, + tSafeReplica: tSafeReplica, + msFactory: factory, + } +} + +func (dsService *dataSyncService) close() { + for _, nodeFGs := range dsService.collectionFlowGraphs { + for _, nodeFG := range nodeFGs { + if nodeFG != nil { + nodeFG.flowGraph.Close() + } + } + } + for _, nodeFGs := range dsService.partitionFlowGraphs { + for _, nodeFG := range nodeFGs { + if nodeFG != nil { + nodeFG.flowGraph.Close() + } + } + } + dsService.collectionFlowGraphs = make(map[UniqueID][]*queryNodeFlowGraph) + dsService.partitionFlowGraphs = make(map[UniqueID][]*queryNodeFlowGraph) +} diff --git a/internal/querynode/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go index 17936a24d5..c87b8772da 100644 --- a/internal/querynode/data_sync_service_test.go +++ b/internal/querynode/data_sync_service_test.go @@ -115,13 +115,11 @@ func TestDataSyncService_Start(t *testing.T) { err := msFactory.SetParams(m) assert.Nil(t, err) - // dataSync - node.streaming.dataSyncServices[collectionID] = newDataSyncService(node.queryNodeLoopCtx, - node.streaming.replica, - node.streaming.tSafeReplica, - msFactory, - collectionID) - go node.streaming.dataSyncServices[collectionID].start() + channels := []VChannel{"0"} + err = node.streaming.dataSyncService.addCollectionFlowGraph(collectionID, channels) + assert.NoError(t, err) + err = node.streaming.dataSyncService.startCollectionFlowGraph(collectionID) + assert.NoError(t, err) <-node.queryNodeLoopCtx.Done() node.Stop() diff --git a/internal/querynode/flow_graph_dd_node.go b/internal/querynode/flow_graph_dd_node.go index 98d625fe09..749efc0b41 100644 --- a/internal/querynode/flow_graph_dd_node.go +++ b/internal/querynode/flow_graph_dd_node.go @@ -13,7 +13,6 @@ package querynode import ( "github.com/golang/protobuf/proto" - "github.com/milvus-io/milvus/internal/util/trace" "github.com/opentracing/opentracing-go" "go.uber.org/zap" @@ -21,6 +20,7 @@ import ( "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/util/flowgraph" + "github.com/milvus-io/milvus/internal/util/trace" ) type ddNode struct { diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index 6895109dad..8be43f15dc 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -12,20 +12,23 @@ package querynode import ( - "fmt" + "errors" + + "github.com/opentracing/opentracing-go" + "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/trace" - "github.com/opentracing/opentracing-go" - "go.uber.org/zap" ) type filterDmNode struct { baseNode + graphType flowGraphType collectionID UniqueID + partitionID UniqueID replica ReplicaInterface } @@ -72,8 +75,6 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if resMsg != nil { iMsg.insertMessages = append(iMsg.insertMessages, resMsg) } - // case commonpb.MsgType_kDelete: - // dmMsg.deleteMessages = append(dmMsg.deleteMessages, (*msg).(*msgstream.DeleteTask)) default: log.Warn("Non supporting", zap.Int32("message type", int32(msg.Type()))) } @@ -102,9 +103,14 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg return nil } + // if the flow graph type is partition, check if the partition is target partition + if fdmNode.graphType == flowGraphTypePartition && msg.PartitionID != fdmNode.partitionID { + return nil + } + // check if the segment is in excluded segments excludedSegments, err := fdmNode.replica.getExcludedSegments(fdmNode.collectionID) - log.Debug("excluded segments", zap.String("segmentIDs", fmt.Sprintln(excludedSegments))) + //log.Debug("excluded segments", zap.String("segmentIDs", fmt.Sprintln(excludedSegments))) if err != nil { log.Error(err.Error()) return nil @@ -115,12 +121,6 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg } } - // TODO: If the last record is drop type, all insert requests are invalid. - //if !records[len(records)-1].createOrDrop { - // return nil - //} - - // Filter insert requests before last record. if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) { // TODO: what if the messages are misaligned? Here, we ignore those messages and print error log.Error("Error, misaligned messages detected") @@ -134,7 +134,11 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg return msg } -func newFilteredDmNode(replica ReplicaInterface, collectionID UniqueID) *filterDmNode { +func newFilteredDmNode(replica ReplicaInterface, + graphType flowGraphType, + collectionID UniqueID, + partitionID UniqueID) *filterDmNode { + maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -142,9 +146,16 @@ func newFilteredDmNode(replica ReplicaInterface, collectionID UniqueID) *filterD baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) + if graphType != flowGraphTypeCollection && graphType != flowGraphTypePartition { + err := errors.New("invalid flow graph type") + log.Error(err.Error()) + } + return &filterDmNode{ baseNode: baseNode, + graphType: graphType, collectionID: collectionID, + partitionID: partitionID, replica: replica, } } diff --git a/internal/querynode/flow_graph_gc_node.go b/internal/querynode/flow_graph_gc_node.go index a7ed51a62d..634978f1dc 100644 --- a/internal/querynode/flow_graph_gc_node.go +++ b/internal/querynode/flow_graph_gc_node.go @@ -12,11 +12,10 @@ package querynode import ( - "github.com/milvus-io/milvus/internal/util/flowgraph" - "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/flowgraph" ) type gcNode struct { diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index 0ffdeccfab..f079bb475f 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -15,18 +15,18 @@ import ( "context" "sync" + "github.com/opentracing/opentracing-go" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/trace" - "github.com/opentracing/opentracing-go" - "go.uber.org/zap" ) type insertNode struct { baseNode - collectionID UniqueID - replica ReplicaInterface + replica ReplicaInterface } type InsertData struct { @@ -84,15 +84,6 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } } - segment, err := iNode.replica.getSegmentByID(task.SegmentID) - if err != nil { - log.Error(err.Error()) - continue - } - if segment.enableLoadBinLog { - continue - } - insertData.insertIDs[task.SegmentID] = append(insertData.insertIDs[task.SegmentID], task.RowIDs...) insertData.insertTimestamps[task.SegmentID] = append(insertData.insertTimestamps[task.SegmentID], task.Timestamps...) insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...) @@ -132,20 +123,14 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { for _, sp := range spans { sp.Finish() } + return []Msg{res} } func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) { log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID)) var targetSegment, err = iNode.replica.getSegmentByID(segmentID) - log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID), - zap.Any("targetSegment", targetSegment), - zap.Error(err), - zap.Any("SegmentType", targetSegment.segmentType), - zap.Any("enableLoadBinLog", targetSegment.enableLoadBinLog), - ) - - if targetSegment.segmentType != segmentTypeGrowing || targetSegment.enableLoadBinLog { + if targetSegment.segmentType != segmentTypeGrowing { wg.Done() return } @@ -175,12 +160,11 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn } log.Debug("Do insert done", zap.Int("len", len(insertData.insertIDs[segmentID])), - zap.Int64("segmentID", segmentID), - zap.Int64("collectionID", iNode.collectionID)) + zap.Int64("segmentID", segmentID)) wg.Done() } -func newInsertNode(replica ReplicaInterface, collectionID UniqueID) *insertNode { +func newInsertNode(replica ReplicaInterface) *insertNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -189,8 +173,7 @@ func newInsertNode(replica ReplicaInterface, collectionID UniqueID) *insertNode baseNode.SetMaxParallelism(maxParallelism) return &insertNode{ - baseNode: baseNode, - collectionID: collectionID, - replica: replica, + baseNode: baseNode, + replica: replica, } } diff --git a/internal/querynode/flow_graph_msg_stream_input_nodes.go b/internal/querynode/flow_graph_msg_stream_input_nodes.go deleted file mode 100644 index cc0245f800..0000000000 --- a/internal/querynode/flow_graph_msg_stream_input_nodes.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package querynode - -import ( - "context" - - "github.com/milvus-io/milvus/internal/util/flowgraph" -) - -func (dsService *dataSyncService) newDmInputNode(ctx context.Context) *flowgraph.InputNode { - // query node doesn't need to consume any topic - insertStream, _ := dsService.msFactory.NewTtMsgStream(ctx) - dsService.dmStream = insertStream - - maxQueueLength := Params.FlowGraphMaxQueueLength - maxParallelism := Params.FlowGraphMaxParallelism - - node := flowgraph.NewInputNode(&insertStream, "dmInputNode", maxQueueLength, maxParallelism) - return node -} diff --git a/internal/querynode/flow_graph_query_node.go b/internal/querynode/flow_graph_query_node.go new file mode 100644 index 0000000000..2dfcd9b6df --- /dev/null +++ b/internal/querynode/flow_graph_query_node.go @@ -0,0 +1,142 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package querynode + +import ( + "context" + "errors" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/msgstream" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/util/flowgraph" +) + +type queryNodeFlowGraph struct { + ctx context.Context + cancel context.CancelFunc + collectionID UniqueID + partitionID UniqueID + channel VChannel + flowGraph *flowgraph.TimeTickedFlowGraph + dmlStream msgstream.MsgStream +} + +func newQueryNodeFlowGraph(ctx context.Context, + flowGraphType flowGraphType, + collectionID UniqueID, + partitionID UniqueID, + streamingReplica ReplicaInterface, + tSafeReplica TSafeReplicaInterface, + channel VChannel, + factory msgstream.Factory) *queryNodeFlowGraph { + + ctx1, cancel := context.WithCancel(ctx) + + q := &queryNodeFlowGraph{ + ctx: ctx1, + cancel: cancel, + collectionID: collectionID, + partitionID: partitionID, + channel: channel, + flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1), + } + + var dmStreamNode node = q.newDmInputNode(ctx1, factory) + var filterDmNode node = newFilteredDmNode(streamingReplica, flowGraphType, collectionID, partitionID) + var insertNode node = newInsertNode(streamingReplica) + var serviceTimeNode node = newServiceTimeNode(ctx1, tSafeReplica, collectionID, channel, factory) + + q.flowGraph.AddNode(dmStreamNode) + q.flowGraph.AddNode(filterDmNode) + q.flowGraph.AddNode(insertNode) + q.flowGraph.AddNode(serviceTimeNode) + + // dmStreamNode + var err = q.flowGraph.SetEdges(dmStreamNode.Name(), + []string{}, + []string{filterDmNode.Name()}, + ) + if err != nil { + log.Error("set edges failed in node:", zap.String("node name", dmStreamNode.Name())) + } + + // filterDmNode + err = q.flowGraph.SetEdges(filterDmNode.Name(), + []string{dmStreamNode.Name()}, + []string{insertNode.Name()}, + ) + if err != nil { + log.Error("set edges failed in node:", zap.String("node name", filterDmNode.Name())) + } + + // insertNode + err = q.flowGraph.SetEdges(insertNode.Name(), + []string{filterDmNode.Name()}, + []string{serviceTimeNode.Name()}, + ) + if err != nil { + log.Error("set edges failed in node:", zap.String("node name", insertNode.Name())) + } + + // serviceTimeNode + err = q.flowGraph.SetEdges(serviceTimeNode.Name(), + []string{insertNode.Name()}, + []string{}, + ) + if err != nil { + log.Error("set edges failed in node:", zap.String("node name", serviceTimeNode.Name())) + } + + return q +} + +func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstream.Factory) *flowgraph.InputNode { + insertStream, err := factory.NewTtMsgStream(ctx) + if err != nil { + log.Error(err.Error()) + } else { + q.dmlStream = insertStream + } + + maxQueueLength := Params.FlowGraphMaxQueueLength + maxParallelism := Params.FlowGraphMaxParallelism + + node := flowgraph.NewInputNode(&insertStream, "dmlInputNode", maxQueueLength, maxParallelism) + return node +} + +func (q *queryNodeFlowGraph) consumerFlowGraph(channel VChannel, subName ConsumeSubName) error { + if q.dmlStream == nil { + return errors.New("null dml message stream in flow graph") + } + q.dmlStream.AsConsumer([]string{channel}, subName) + log.Debug("query node flow graph consumes from virtual channel", zap.Any("vChannel", channel)) + return nil +} + +func (q *queryNodeFlowGraph) seekQueryNodeFlowGraph(position *internalpb.MsgPosition) error { + err := q.dmlStream.Seek([]*internalpb.MsgPosition{position}) + return err +} + +func (q *queryNodeFlowGraph) close() { + q.cancel() + q.flowGraph.Close() + log.Debug("stop query node flow graph", + zap.Any("collectionID", q.collectionID), + zap.Any("partitionID", q.partitionID), + zap.Any("channel", q.channel), + ) +} diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index d4bf7d9286..ff66aa1f0b 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -13,21 +13,21 @@ package querynode import ( "context" + "strconv" + + "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" - "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/flowgraph" - "go.uber.org/zap" ) type serviceTimeNode struct { baseNode - collectionID UniqueID - replica ReplicaInterface - tSafeReplica TSafeReplicaInterface - timeTickMsgStream msgstream.MsgStream + collectionID UniqueID + vChannel VChannel + tSafeReplica TSafeReplicaInterface + //timeTickMsgStream msgstream.MsgStream } func (stNode *serviceTimeNode) Name() string { @@ -35,7 +35,7 @@ func (stNode *serviceTimeNode) Name() string { } func (stNode *serviceTimeNode) Close() { - stNode.timeTickMsgStream.Close() + //stNode.timeTickMsgStream.Close() } func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { @@ -57,16 +57,16 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } // update service time - // TODO: remove and use vChannel - vChannel := collectionIDToChannel(stNode.collectionID) - stNode.tSafeReplica.setTSafe(vChannel, serviceTimeMsg.timeRange.timestampMax) + channel := stNode.vChannel + strconv.FormatInt(stNode.collectionID, 10) + stNode.tSafeReplica.setTSafe(channel, serviceTimeMsg.timeRange.timestampMax) //log.Debug("update tSafe:", // zap.Int64("tSafe", int64(serviceTimeMsg.timeRange.timestampMax)), - // zap.Int64("collectionID", stNode.collectionID)) + // zap.Any("collectionID", stNode.collectionID), + //) - if err := stNode.sendTimeTick(serviceTimeMsg.timeRange.timestampMax); err != nil { - log.Error("Error: send time tick into pulsar channel failed", zap.Error(err)) - } + //if err := stNode.sendTimeTick(serviceTimeMsg.timeRange.timestampMax); err != nil { + // log.Error("Error: send time tick into pulsar channel failed", zap.Error(err)) + //} var res Msg = &gcMsg{ gcRecord: serviceTimeMsg.gcRecord, @@ -75,32 +75,32 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { return []Msg{res} } -func (stNode *serviceTimeNode) sendTimeTick(ts Timestamp) error { - msgPack := msgstream.MsgPack{} - timeTickMsg := msgstream.TimeTickMsg{ - BaseMsg: msgstream.BaseMsg{ - BeginTimestamp: ts, - EndTimestamp: ts, - HashValues: []uint32{0}, - }, - TimeTickMsg: internalpb.TimeTickMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_TimeTick, - MsgID: 0, - Timestamp: ts, - SourceID: Params.QueryNodeID, - }, - }, - } - msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg) - return stNode.timeTickMsgStream.Produce(&msgPack) -} +//func (stNode *serviceTimeNode) sendTimeTick(ts Timestamp) error { +// msgPack := msgstream.MsgPack{} +// timeTickMsg := msgstream.TimeTickMsg{ +// BaseMsg: msgstream.BaseMsg{ +// BeginTimestamp: ts, +// EndTimestamp: ts, +// HashValues: []uint32{0}, +// }, +// TimeTickMsg: internalpb.TimeTickMsg{ +// Base: &commonpb.MsgBase{ +// MsgType: commonpb.MsgType_TimeTick, +// MsgID: 0, +// Timestamp: ts, +// SourceID: Params.QueryNodeID, +// }, +// }, +// } +// msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg) +// return stNode.timeTickMsgStream.Produce(&msgPack) +//} func newServiceTimeNode(ctx context.Context, - replica ReplicaInterface, tSafeReplica TSafeReplicaInterface, - factory msgstream.Factory, - collectionID UniqueID) *serviceTimeNode { + collectionID UniqueID, + channel VChannel, + factory msgstream.Factory) *serviceTimeNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -109,15 +109,19 @@ func newServiceTimeNode(ctx context.Context, baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) - timeTimeMsgStream, _ := factory.NewMsgStream(ctx) - timeTimeMsgStream.AsProducer([]string{Params.QueryTimeTickChannelName}) - log.Debug("querynode AsProducer: " + Params.QueryTimeTickChannelName) + //timeTimeMsgStream, err := factory.NewMsgStream(ctx) + //if err != nil { + // log.Error(err.Error()) + //} else { + // timeTimeMsgStream.AsProducer([]string{Params.QueryTimeTickChannelName}) + // log.Debug("query node AsProducer: " + Params.QueryTimeTickChannelName) + //} return &serviceTimeNode{ - baseNode: baseNode, - collectionID: collectionID, - replica: replica, - tSafeReplica: tSafeReplica, - timeTickMsgStream: timeTimeMsgStream, + baseNode: baseNode, + collectionID: collectionID, + vChannel: channel, + tSafeReplica: tSafeReplica, + //timeTickMsgStream: timeTimeMsgStream, } } diff --git a/internal/querynode/load_index_info.go b/internal/querynode/load_index_info.go index d00aa89eae..7e642da7ea 100644 --- a/internal/querynode/load_index_info.go +++ b/internal/querynode/load_index_info.go @@ -20,12 +20,11 @@ package querynode */ import "C" import ( + "errors" "path/filepath" "strconv" "unsafe" - "errors" - "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" diff --git a/internal/querynode/plan.go b/internal/querynode/plan.go index 51187d4149..cab4078a4f 100644 --- a/internal/querynode/plan.go +++ b/internal/querynode/plan.go @@ -22,8 +22,9 @@ package querynode import "C" import ( "errors" - "github.com/milvus-io/milvus/internal/proto/planpb" "unsafe" + + "github.com/milvus-io/milvus/internal/proto/planpb" ) type Plan struct { diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 1ca0625534..4f9f766262 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -122,7 +122,7 @@ func (node *QueryNode) Init() error { node.dataService, node.indexService, node.msFactory) - node.streaming = newStreaming() + node.streaming = newStreaming(node.queryNodeLoopCtx, node.msFactory) C.SegcoreInit() registerReq := &queryPb.RegisterNodeRequest{ diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index bf3ed33961..5207065e68 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -173,14 +173,17 @@ func newQueryNodeMock() *QueryNode { }() } - msFactory := msgstream.NewPmsFactory() + msFactory, err := newMessageStreamFactory() + if err != nil { + panic(err) + } svr := NewQueryNode(ctx, Params.QueryNodeID, msFactory) - err := svr.SetQueryService(&queryServiceMock{}) + err = svr.SetQueryService(&queryServiceMock{}) if err != nil { panic(err) } svr.historical = newHistorical(svr.queryNodeLoopCtx, nil, nil, nil, svr.msFactory) - svr.streaming = newStreaming() + svr.streaming = newStreaming(ctx, msFactory) return svr } diff --git a/internal/querynode/retrieve_collection.go b/internal/querynode/retrieve_collection.go index ce95d56de4..295728339c 100644 --- a/internal/querynode/retrieve_collection.go +++ b/internal/querynode/retrieve_collection.go @@ -15,6 +15,8 @@ import ( "context" "errors" "fmt" + "math" + "reflect" "sync" "github.com/golang/protobuf/proto" @@ -43,8 +45,8 @@ type retrieveCollection struct { unsolvedMsgMu sync.Mutex unsolvedMsg []*msgstream.RetrieveMsg - tSafeMutex sync.Mutex - tSafeWatcher *tSafeWatcher + tSafeWatchers map[VChannel]*tSafeWatcher + watcherSelectCase []reflect.SelectCase serviceableTimeMutex sync.Mutex serviceableTime Timestamp @@ -72,13 +74,15 @@ func newRetrieveCollection(releaseCtx context.Context, streamingReplica: streamingReplica, tSafeReplica: tSafeReplica, + tSafeWatchers: make(map[VChannel]*tSafeWatcher), + msgBuffer: msgBuffer, unsolvedMsg: unsolvedMsg, retrieveResultMsgStream: retrieveResultStream, } - rc.register(collectionID) + rc.register() return rc } @@ -101,13 +105,20 @@ func (rc *retrieveCollection) setServiceableTime(t Timestamp) { } func (rc *retrieveCollection) waitNewTSafe() Timestamp { - // block until dataSyncService updating tSafe - // TODO: remove and use vChannel - vChannel := collectionIDToChannel(rc.collectionID) - // block until dataSyncService updating tSafe - rc.tSafeWatcher.hasUpdate() - ts := rc.tSafeReplica.getTSafe(vChannel) - return ts + // block until any vChannel updating tSafe + _, _, recvOK := reflect.Select(rc.watcherSelectCase) + if !recvOK { + log.Error("tSafe has been closed") + return invalidTimestamp + } + t := Timestamp(math.MaxInt64) + for channel := range rc.tSafeWatchers { + ts := rc.tSafeReplica.getTSafe(channel) + if ts <= t { + t = ts + } + } + return t } func (rc *retrieveCollection) start() { @@ -115,13 +126,24 @@ func (rc *retrieveCollection) start() { go rc.doUnsolvedMsgRetrieve() } -func (rc *retrieveCollection) register(collectionID UniqueID) { - vChannel := collectionIDToChannel(collectionID) - rc.tSafeReplica.addTSafe(vChannel) - rc.tSafeMutex.Lock() - rc.tSafeWatcher = newTSafeWatcher() - rc.tSafeMutex.Unlock() - rc.tSafeReplica.registerTSafeWatcher(vChannel, rc.tSafeWatcher) +func (rc *retrieveCollection) register() { + // register tSafe watcher and init watcher select case + collection, err := rc.historicalReplica.getCollectionByID(rc.collectionID) + if err != nil { + log.Error(err.Error()) + return + } + + rc.watcherSelectCase = make([]reflect.SelectCase, 0) + for _, channel := range collection.getWatchedDmChannels() { + rc.tSafeReplica.addTSafe(channel) + rc.tSafeWatchers[channel] = newTSafeWatcher() + rc.tSafeReplica.registerTSafeWatcher(channel, rc.tSafeWatchers[channel]) + rc.watcherSelectCase = append(rc.watcherSelectCase, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(rc.tSafeWatchers[channel].watcherChan()), + }) + } } func (rc *retrieveCollection) addToUnsolvedMsg(msg *msgstream.RetrieveMsg) { diff --git a/internal/querynode/retrieve_service.go b/internal/querynode/retrieve_service.go index 6a8ad10dfb..492ffa8c4a 100644 --- a/internal/querynode/retrieve_service.go +++ b/internal/querynode/retrieve_service.go @@ -16,10 +16,11 @@ import ( "errors" "strconv" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/util/trace" - "go.uber.org/zap" ) type retrieveService struct { diff --git a/internal/querynode/search_collection.go b/internal/querynode/search_collection.go index 2d5aa2c289..0bc7c25243 100644 --- a/internal/querynode/search_collection.go +++ b/internal/querynode/search_collection.go @@ -14,9 +14,12 @@ package querynode import ( "context" "errors" + "math" + "reflect" "sync" "github.com/golang/protobuf/proto" + oplog "github.com/opentracing/opentracing-go/log" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" @@ -26,7 +29,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/tsoutil" - oplog "github.com/opentracing/opentracing-go/log" ) type searchCollection struct { @@ -42,8 +44,8 @@ type searchCollection struct { unsolvedMsgMu sync.Mutex // guards unsolvedMsg unsolvedMsg []*msgstream.SearchMsg - tSafeMutex sync.Mutex - tSafeWatcher *tSafeWatcher + tSafeWatchers map[VChannel]*tSafeWatcher + watcherSelectCase []reflect.SelectCase serviceableTimeMutex sync.Mutex // guards serviceableTime serviceableTime Timestamp @@ -60,6 +62,7 @@ func newSearchCollection(releaseCtx context.Context, streamingReplica ReplicaInterface, tSafeReplica TSafeReplicaInterface, searchResultStream msgstream.MsgStream) *searchCollection { + receiveBufSize := Params.SearchReceiveBufSize msgBuffer := make(chan *msgstream.SearchMsg, receiveBufSize) unsolvedMsg := make([]*msgstream.SearchMsg, 0) @@ -73,13 +76,15 @@ func newSearchCollection(releaseCtx context.Context, streamingReplica: streamingReplica, tSafeReplica: tSafeReplica, + tSafeWatchers: make(map[VChannel]*tSafeWatcher), + msgBuffer: msgBuffer, unsolvedMsg: unsolvedMsg, searchResultMsgStream: searchResultStream, } - sc.register(collectionID) + sc.register() return sc } @@ -88,14 +93,25 @@ func (s *searchCollection) start() { go s.doUnsolvedMsgSearch() } -func (s *searchCollection) register(collectionID UniqueID) { - // TODO: remove and use vChannel - vChannel := collectionIDToChannel(collectionID) - s.tSafeReplica.addTSafe(vChannel) - s.tSafeMutex.Lock() - s.tSafeWatcher = newTSafeWatcher() - s.tSafeMutex.Unlock() - s.tSafeReplica.registerTSafeWatcher(vChannel, s.tSafeWatcher) +func (s *searchCollection) register() { + collection, err := s.streamingReplica.getCollectionByID(s.collectionID) + if err != nil { + log.Error(err.Error()) + return + } + + s.watcherSelectCase = make([]reflect.SelectCase, 0) + log.Debug("register tSafe watcher and init watcher select case", + zap.Any("dml channels", collection.getWatchedDmChannels()), + zap.Any("collectionID", collection.ID())) + for _, channel := range collection.getWatchedDmChannels() { + s.tSafeWatchers[channel] = newTSafeWatcher() + s.tSafeReplica.registerTSafeWatcher(channel, s.tSafeWatchers[channel]) + s.watcherSelectCase = append(s.watcherSelectCase, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(s.tSafeWatchers[channel].watcherChan()), + }) + } } func (s *searchCollection) addToUnsolvedMsg(msg *msgstream.SearchMsg) { @@ -113,12 +129,20 @@ func (s *searchCollection) popAllUnsolvedMsg() []*msgstream.SearchMsg { } func (s *searchCollection) waitNewTSafe() Timestamp { - // TODO: remove and use vChannel - vChannel := collectionIDToChannel(s.collectionID) - // block until dataSyncService updating tSafe - s.tSafeWatcher.hasUpdate() - ts := s.tSafeReplica.getTSafe(vChannel) - return ts + // block until any vChannel updating tSafe + _, _, recvOK := reflect.Select(s.watcherSelectCase) + if !recvOK { + log.Error("tSafe has been closed", zap.Any("collectionID", s.collectionID)) + return invalidTimestamp + } + t := Timestamp(math.MaxInt64) + for channel := range s.tSafeWatchers { + ts := s.tSafeReplica.getTSafe(channel) + if ts <= t { + t = ts + } + } + return t } func (s *searchCollection) getServiceableTime() Timestamp { @@ -129,6 +153,12 @@ func (s *searchCollection) getServiceableTime() Timestamp { func (s *searchCollection) setServiceableTime(t Timestamp) { s.serviceableTimeMutex.Lock() + defer s.serviceableTimeMutex.Unlock() + + if t < s.serviceableTime { + return + } + gracefulTimeInMilliSecond := Params.GracefulTime if gracefulTimeInMilliSecond > 0 { gracefulTime := tsoutil.ComposeTS(gracefulTimeInMilliSecond, 0) @@ -136,7 +166,6 @@ func (s *searchCollection) setServiceableTime(t Timestamp) { } else { s.serviceableTime = t } - s.serviceableTimeMutex.Unlock() } func (s *searchCollection) emptySearch(searchMsg *msgstream.SearchMsg) { @@ -174,6 +203,7 @@ func (s *searchCollection) receiveSearchMsg() { zap.Any("serviceTime", st), zap.Any("delta seconds", (sm.BeginTs()-serviceTime)/(1000*1000*1000)), zap.Any("collectionID", s.collectionID), + zap.Any("msgID", sm.ID()), ) s.addToUnsolvedMsg(sm) sp.LogFields( @@ -216,7 +246,7 @@ func (s *searchCollection) doUnsolvedMsgSearch() { default: serviceTime := s.waitNewTSafe() s.setServiceableTime(serviceTime) - log.Debug("querynode::doUnsolvedMsgSearch: setServiceableTime", + log.Debug("query node::doUnsolvedMsgSearch: setServiceableTime", zap.Any("serviceTime", serviceTime), ) log.Debug("get tSafe from flow graph", diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index dd1ecfc1f8..c11401d378 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -17,10 +17,11 @@ import ( "errors" "strconv" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/util/trace" - "go.uber.org/zap" ) type searchService struct { @@ -78,6 +79,7 @@ func newSearchService(ctx context.Context, } func (s *searchService) start() { + log.Debug("start search service") s.searchMsgStream.Start() s.searchResultMsgStream.Start() s.startEmptySearchCollection() @@ -145,6 +147,7 @@ func (s *searchService) consumeSearch() { } func (s *searchService) close() { + log.Debug("search service closed") if s.searchMsgStream != nil { s.searchMsgStream.Close() } diff --git a/internal/querynode/search_service_test.go b/internal/querynode/search_service_test.go index 70c337def5..a1a3e504cf 100644 --- a/internal/querynode/search_service_test.go +++ b/internal/querynode/search_service_test.go @@ -140,18 +140,6 @@ func TestSearch_Search(t *testing.T) { msFactory, err := newMessageStreamFactory() assert.NoError(t, err) - // start dataSync - newDS := newDataSyncService(node.queryNodeLoopCtx, - node.streaming.replica, - node.streaming.tSafeReplica, - msFactory, - collectionID) - err = node.streaming.addDataSyncService(collectionID, newDS) - assert.NoError(t, err) - ds, err := node.streaming.getDataSyncService(collectionID) - assert.NoError(t, err) - go ds.start() - // start search service node.searchService = newSearchService(node.queryNodeLoopCtx, node.historical.replica, @@ -161,10 +149,6 @@ func TestSearch_Search(t *testing.T) { go node.searchService.start() node.searchService.startSearchCollection(collectionID) - // TODO: remove and use vChannel - vChannel := collectionIDToChannel(collectionID) - node.streaming.tSafeReplica.setTSafe(vChannel, 1000) - // load segment err = node.historical.replica.addSegment(segmentID, defaultPartitionID, collectionID, segmentTypeSealed) assert.NoError(t, err) @@ -196,18 +180,6 @@ func TestSearch_SearchMultiSegments(t *testing.T) { msFactory, err := newMessageStreamFactory() assert.NoError(t, err) - // start dataSync - newDS := newDataSyncService(node.queryNodeLoopCtx, - node.streaming.replica, - node.streaming.tSafeReplica, - msFactory, - collectionID) - err = node.streaming.addDataSyncService(collectionID, newDS) - assert.NoError(t, err) - ds, err := node.streaming.getDataSyncService(collectionID) - assert.NoError(t, err) - go ds.start() - // start search service node.searchService = newSearchService(node.queryNodeLoopCtx, node.streaming.replica, @@ -217,10 +189,6 @@ func TestSearch_SearchMultiSegments(t *testing.T) { go node.searchService.start() node.searchService.startSearchCollection(collectionID) - // TODO: remove and use vChannel - vChannel := collectionIDToChannel(collectionID) - node.streaming.tSafeReplica.setTSafe(vChannel, 1000) - // load segments err = node.historical.replica.addSegment(segmentID1, defaultPartitionID, collectionID, segmentTypeSealed) assert.NoError(t, err) diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 0c356e9c80..06fed2a8a9 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -23,6 +23,7 @@ package querynode */ import "C" import ( + "errors" "fmt" "github.com/milvus-io/milvus/internal/proto/planpb" "strconv" @@ -32,8 +33,6 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" - "errors" - "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" ) diff --git a/internal/querynode/streaming.go b/internal/querynode/streaming.go index 795258622a..3c59ae0113 100644 --- a/internal/querynode/streaming.go +++ b/internal/querynode/streaming.go @@ -12,27 +12,30 @@ package querynode import ( - "errors" - "fmt" - "sync" + "context" + + "github.com/milvus-io/milvus/internal/msgstream" ) type streaming struct { + ctx context.Context + replica ReplicaInterface tSafeReplica TSafeReplicaInterface - dsServicesMu sync.Mutex // guards dataSyncServices - dataSyncServices map[UniqueID]*dataSyncService + dataSyncService *dataSyncService + msFactory msgstream.Factory } -func newStreaming() *streaming { +func newStreaming(ctx context.Context, factory msgstream.Factory) *streaming { replica := newCollectionReplica() tReplica := newTSafeReplica() - ds := make(map[UniqueID]*dataSyncService) + newDS := newDataSyncService(ctx, replica, tReplica, factory) + return &streaming{ - replica: replica, - tSafeReplica: tReplica, - dataSyncServices: ds, + replica: replica, + tSafeReplica: tReplica, + dataSyncService: newDS, } } @@ -42,37 +45,11 @@ func (s *streaming) start() { func (s *streaming) close() { // TODO: stop stats - for _, ds := range s.dataSyncServices { - ds.close() + + if s.dataSyncService != nil { + s.dataSyncService.close() } - s.dataSyncServices = make(map[UniqueID]*dataSyncService) // free collectionReplica s.replica.freeAll() } - -func (s *streaming) getDataSyncService(collectionID UniqueID) (*dataSyncService, error) { - s.dsServicesMu.Lock() - defer s.dsServicesMu.Unlock() - ds, ok := s.dataSyncServices[collectionID] - if !ok { - return nil, errors.New("cannot found dataSyncService, collectionID =" + fmt.Sprintln(collectionID)) - } - return ds, nil -} - -func (s *streaming) addDataSyncService(collectionID UniqueID, ds *dataSyncService) error { - s.dsServicesMu.Lock() - defer s.dsServicesMu.Unlock() - if _, ok := s.dataSyncServices[collectionID]; ok { - return errors.New("dataSyncService has been existed, collectionID =" + fmt.Sprintln(collectionID)) - } - s.dataSyncServices[collectionID] = ds - return nil -} - -func (s *streaming) removeDataSyncService(collectionID UniqueID) { - s.dsServicesMu.Lock() - defer s.dsServicesMu.Unlock() - delete(s.dataSyncServices, collectionID) -} diff --git a/internal/querynode/task.go b/internal/querynode/task.go index b1896811e0..6161ac71e8 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -18,11 +18,11 @@ import ( "math/rand" "strconv" "strings" + "time" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/internalpb" queryPb "github.com/milvus-io/milvus/internal/proto/querypb" ) @@ -110,35 +110,35 @@ func (w *watchDmChannelsTask) PreExecute(ctx context.Context) error { func (w *watchDmChannelsTask) Execute(ctx context.Context) error { log.Debug("starting WatchDmChannels ...", zap.String("ChannelIDs", fmt.Sprintln(w.req.ChannelIDs))) + // TODO: pass load type, col or partition + + // 1. init channels in collection meta collectionID := w.req.CollectionID - ds, err := w.node.streaming.getDataSyncService(collectionID) - if err != nil || ds.dmStream == nil { - errMsg := "null data sync service or null data manipulation stream, collectionID = " + fmt.Sprintln(collectionID) - log.Error(errMsg) - return errors.New(errMsg) + + // TODO: Remove this and use unique vChannel + channelTmp := make([]string, 0) + for _, channel := range w.req.ChannelIDs { + channelTmp = append(channelTmp, channel+strconv.FormatInt(collectionID, 10)) } - switch t := ds.dmStream.(type) { - case *msgstream.MqTtMsgStream: - default: - _ = t - errMsg := "type assertion failed for dm message stream" - log.Error(errMsg) - return errors.New(errMsg) + collection, err := w.node.streaming.replica.getCollectionByID(collectionID) + if err != nil { + log.Error(err.Error()) + return err } + collection.addWatchedDmChannels(channelTmp) + // 2. get subscription name getUniqueSubName := func() string { prefixName := Params.MsgChannelSubName return prefixName + "-" + strconv.FormatInt(collectionID, 10) } + consumeSubName := getUniqueSubName() - // add request channel + // 3. group channels by to seeking or consuming consumeChannels := w.req.ChannelIDs toSeekInfo := make([]*internalpb.MsgPosition, 0) toDirSubChannels := make([]string, 0) - - consumeSubName := getUniqueSubName() - for _, info := range w.req.Infos { if len(info.Pos.MsgID) == 0 { toDirSubChannels = append(toDirSubChannels, info.ChannelID) @@ -155,23 +155,61 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { } } - ds.dmStream.AsConsumer(toDirSubChannels, consumeSubName) + // 4. add flow graph + err = w.node.streaming.dataSyncService.addCollectionFlowGraph(collectionID, consumeChannels) + if err != nil { + return err + } + log.Debug("query node add flow graphs, channels = " + strings.Join(consumeChannels, ", ")) + + // 5. channels as consumer + nodeFGs, err := w.node.streaming.dataSyncService.getCollectionFlowGraphs(collectionID) + if err != nil { + return err + } + for _, channel := range toDirSubChannels { + for _, fg := range nodeFGs { + if fg.channel == channel { + err := fg.consumerFlowGraph(channel, consumeSubName) + if err != nil { + errMsg := "msgStream consume error :" + err.Error() + log.Error(errMsg) + return errors.New(errMsg) + } + } + } + } + log.Debug("as consumer channels", zap.Any("channels", consumeChannels)) + + // 6. seek channel for _, pos := range toSeekInfo { - err := ds.dmStream.Seek([]*internalpb.MsgPosition{pos}) - if err != nil { - errMsg := "msgStream seek error :" + err.Error() - log.Error(errMsg) - return errors.New(errMsg) + for _, fg := range nodeFGs { + if fg.channel == pos.ChannelName { + err := fg.seekQueryNodeFlowGraph(pos) + if err != nil { + errMsg := "msgStream seek error :" + err.Error() + log.Error(errMsg) + return errors.New(errMsg) + } + } } } - collection, err := w.node.streaming.replica.getCollectionByID(collectionID) + // add tSafe + for _, channel := range channelTmp { + w.node.streaming.tSafeReplica.addTSafe(channel) + } + + // 7. start search collection + w.node.searchService.startSearchCollection(collectionID) + log.Debug("start search collection", zap.Any("collectionID", collectionID)) + + // 8. start flow graphs + err = w.node.streaming.dataSyncService.startCollectionFlowGraph(collectionID) if err != nil { - log.Error(err.Error()) return err } - collection.addWatchedDmChannels(w.req.ChannelIDs) - log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName) + log.Debug("WatchDmChannels done", zap.String("ChannelIDs", fmt.Sprintln(w.req.ChannelIDs))) return nil } @@ -229,19 +267,6 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error { } } l.node.streaming.replica.initExcludedSegments(collectionID) - newDS := newDataSyncService(l.node.queryNodeLoopCtx, - l.node.streaming.replica, - l.node.streaming.tSafeReplica, - l.node.msFactory, - collectionID) - // ignore duplicated dataSyncService error - _ = l.node.streaming.addDataSyncService(collectionID, newDS) - ds, err := l.node.streaming.getDataSyncService(collectionID) - if err != nil { - return err - } - go ds.start() - l.node.searchService.startSearchCollection(collectionID) } if !hasPartitionInHistorical { err := l.node.historical.replica.addPartition(collectionID, partitionID) @@ -302,16 +327,20 @@ func (r *releaseCollectionTask) PreExecute(ctx context.Context) error { } func (r *releaseCollectionTask) Execute(ctx context.Context) error { - ds, err := r.node.streaming.getDataSyncService(r.req.CollectionID) - if err == nil && ds != nil { - ds.close() - r.node.streaming.removeDataSyncService(r.req.CollectionID) - // TODO: remove and use vChannel - vChannel := collectionIDToChannel(r.req.CollectionID) - r.node.streaming.tSafeReplica.removeTSafe(vChannel) - r.node.streaming.replica.removeExcludedSegments(r.req.CollectionID) + log.Debug("receive release collection task", zap.Any("collectionID", r.req.CollectionID)) + r.node.streaming.dataSyncService.removeCollectionFlowGraph(r.req.CollectionID) + collection, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID) + if err != nil { + log.Error(err.Error()) + } else { + // remove all tSafes of the target collection + for _, channel := range collection.getWatchedDmChannels() { + r.node.streaming.tSafeReplica.removeTSafe(channel) + } } + r.node.streaming.replica.removeExcludedSegments(r.req.CollectionID) + if r.node.searchService.hasSearchCollection(r.req.CollectionID) { r.node.searchService.stopSearchCollection(r.req.CollectionID) } @@ -326,12 +355,15 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error { hasCollectionInStreaming := r.node.streaming.replica.hasCollection(r.req.CollectionID) if hasCollectionInStreaming { - err := r.node.streaming.replica.removePartition(r.req.CollectionID) + err := r.node.streaming.replica.removeCollection(r.req.CollectionID) if err != nil { return err } } + // TODO: for debugging, remove this + time.Sleep(2 * time.Second) + log.Debug("ReleaseCollection done", zap.Int64("collectionID", r.req.CollectionID)) return nil } diff --git a/internal/querynode/tsafe.go b/internal/querynode/tsafe.go index 8cf9c76639..1ec4183b2e 100644 --- a/internal/querynode/tsafe.go +++ b/internal/querynode/tsafe.go @@ -31,10 +31,15 @@ func (watcher *tSafeWatcher) notify() { } } +// deprecated func (watcher *tSafeWatcher) hasUpdate() { <-watcher.notifyChan } +func (watcher *tSafeWatcher) watcherChan() <-chan bool { + return watcher.notifyChan +} + type tSafer interface { get() Timestamp set(t Timestamp) diff --git a/internal/querynode/tsafe_replica.go b/internal/querynode/tsafe_replica.go index ce1e97a130..9313331e7b 100644 --- a/internal/querynode/tsafe_replica.go +++ b/internal/querynode/tsafe_replica.go @@ -13,11 +13,7 @@ package querynode import ( "errors" - "fmt" - "strconv" "sync" - - "github.com/milvus-io/milvus/internal/log" ) // TSafeReplicaInterface is the interface wrapper of tSafeReplica @@ -39,6 +35,7 @@ func (t *tSafeReplica) getTSafe(vChannel VChannel) Timestamp { defer t.mu.Unlock() safer, err := t.getTSaferPrivate(vChannel) if err != nil { + //log.Error("get tSafe failed", zap.Error(err)) return 0 } return safer.get() @@ -49,6 +46,7 @@ func (t *tSafeReplica) setTSafe(vChannel VChannel, timestamp Timestamp) { defer t.mu.Unlock() safer, err := t.getTSaferPrivate(vChannel) if err != nil { + //log.Error("set tSafe failed", zap.Error(err)) return } safer.set(timestamp) @@ -57,7 +55,7 @@ func (t *tSafeReplica) setTSafe(vChannel VChannel, timestamp Timestamp) { func (t *tSafeReplica) getTSaferPrivate(vChannel VChannel) (tSafer, error) { if _, ok := t.tSafes[vChannel]; !ok { err := errors.New("cannot found tSafer, vChannel = " + vChannel) - log.Error(err.Error()) + //log.Error(err.Error()) return nil, err } return t.tSafes[vChannel], nil @@ -67,6 +65,7 @@ func (t *tSafeReplica) addTSafe(vChannel VChannel) { t.mu.Lock() defer t.mu.Unlock() t.tSafes[vChannel] = newTSafe() + //log.Debug("add tSafe done", zap.Any("channel", vChannel)) } func (t *tSafeReplica) removeTSafe(vChannel VChannel) { @@ -85,25 +84,12 @@ func (t *tSafeReplica) registerTSafeWatcher(vChannel VChannel, watcher *tSafeWat defer t.mu.Unlock() safer, err := t.getTSaferPrivate(vChannel) if err != nil { + //log.Error("register tSafe watcher failed", zap.Error(err)) return } safer.registerTSafeWatcher(watcher) } -// TODO: remove and use real vChannel -func collectionIDToChannel(collectionID UniqueID) VChannel { - return fmt.Sprintln(collectionID) -} - -// TODO: remove and use real vChannel -func channelTOCollectionID(channel VChannel) UniqueID { - collectionID, err := strconv.ParseInt(channel, 10, 64) - if err != nil { - return 0 - } - return collectionID -} - func newTSafeReplica() TSafeReplicaInterface { var replica TSafeReplicaInterface = &tSafeReplica{ tSafes: make(map[string]tSafer), diff --git a/internal/querynode/type_def.go b/internal/querynode/type_def.go index 36bbec6046..1be0f14b12 100644 --- a/internal/querynode/type_def.go +++ b/internal/querynode/type_def.go @@ -20,6 +20,8 @@ const ( timestampFieldID = 1 ) +const invalidTimestamp = Timestamp(0) + type ( UniqueID = typeutil.UniqueID // Timestamp is timestamp diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go index 07f18d2cce..e97a52110f 100644 --- a/internal/util/flowgraph/node.go +++ b/internal/util/flowgraph/node.go @@ -58,7 +58,7 @@ func (nodeCtx *nodeCtx) Start(ctx context.Context, wg *sync.WaitGroup) { select { case <-ctx.Done(): wg.Done() - //fmt.Println(nodeCtx.node.Name(), "closed") + fmt.Println(nodeCtx.node.Name(), "closed") return default: // inputs from inputsMessages for Operate