diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index f195e57aa7..fd3e76e6f7 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -326,7 +326,6 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro // ddStreamNode err = dsService.fg.SetEdges(dmStreamNode.Name(), - []string{}, []string{ddNode.Name()}, ) if err != nil { @@ -336,7 +335,6 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro // ddNode err = dsService.fg.SetEdges(ddNode.Name(), - []string{dmStreamNode.Name()}, []string{insertBufferNode.Name()}, ) if err != nil { @@ -346,7 +344,6 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro // insertBufferNode err = dsService.fg.SetEdges(insertBufferNode.Name(), - []string{ddNode.Name()}, []string{deleteNode.Name()}, ) if err != nil { @@ -356,7 +353,6 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro //deleteNode err = dsService.fg.SetEdges(deleteNode.Name(), - []string{insertBufferNode.Name()}, []string{}, ) if err != nil { diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 231abea397..4bccd5ab51 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -82,6 +82,11 @@ func (ddn *ddNode) Name() string { // Operate handles input messages, implementing flowgrpah.Node func (ddn *ddNode) Operate(in []Msg) []Msg { + if in == nil { + log.Debug("type assertion failed for MsgStreamMsg because it's nil") + return []Msg{} + } + if len(in) != 1 { log.Warn("Invalid operate message input in ddNode", zap.Int("input length", len(in))) return []Msg{} @@ -89,11 +94,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { msMsg, ok := in[0].(*MsgStreamMsg) if !ok { - if in[0] == nil { - log.Debug("type assertion failed for MsgStreamMsg because it's nil") - } else { - log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) - } + log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) return []Msg{} } diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index 713d9fd248..303c103593 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -120,20 +120,19 @@ func (dn *deleteNode) showDelBuf(segIDs []UniqueID, ts Timestamp) { // Operate implementing flowgraph.Node, performs delete data process func (dn *deleteNode) Operate(in []Msg) []Msg { - //log.Debug("deleteNode Operating") + if in == nil { + log.Debug("type assertion failed for flowGraphMsg because it's nil") + return []Msg{} + } if len(in) != 1 { log.Error("Invalid operate message input in deleteNode", zap.Int("input length", len(in))) - return nil + return []Msg{} } fgMsg, ok := in[0].(*flowGraphMsg) if !ok { - if in[0] == nil { - log.Debug("type assertion failed for flowGraphMsg because it's nil") - } else { - log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) - } + log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) return []Msg{} } @@ -205,7 +204,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { for _, sp := range spans { sp.Finish() } - return nil + return in } // update delBuf for compacted segments diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index 32febb3f9d..1b277fa7a1 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -209,8 +209,6 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { "Invalid input length == 0"}, {[]Msg{&flowGraphMsg{}, &flowGraphMsg{}, &flowGraphMsg{}}, "Invalid input length == 3"}, - {[]Msg{&flowGraphMsg{}}, - "Invalid input length == 1 but input message is not msgStreamMsg"}, {[]Msg{&flowgraph.MsgStreamMsg{}}, "Invalid input length == 1 but input message is not flowGraphMsg"}, } diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 7c75404b25..195cf96de5 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -175,7 +175,10 @@ func (ibNode *insertBufferNode) Close() { } func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { - // log.Debug("InsertBufferNode Operating") + if in == nil { + log.Debug("type assertion failed for flowGraphMsg because it's nil") + return []Msg{} + } if len(in) != 1 { log.Error("Invalid operate message input in insertBufferNode", zap.Int("input length", len(in))) @@ -184,11 +187,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { fgMsg, ok := in[0].(*flowGraphMsg) if !ok { - if in[0] == nil { - log.Debug("type assertion failed for flowGraphMsg because it's nil") - } else { - log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) - } + log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) return []Msg{} } diff --git a/internal/querynode/flow_graph_delete_node.go b/internal/querynode/flow_graph_delete_node.go index 2db67a1145..f3f34474df 100644 --- a/internal/querynode/flow_graph_delete_node.go +++ b/internal/querynode/flow_graph_delete_node.go @@ -52,6 +52,11 @@ func (dNode *deleteNode) Name() string { // Operate handles input messages, do delete operations func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { + if in == nil { + log.Debug("type assertion failed for deleteMsg because it's nil", zap.String("name", dNode.Name())) + return []Msg{} + } + if len(in) != 1 { log.Warn("Invalid operate message input in deleteNode", zap.Int("input length", len(in)), zap.String("name", dNode.Name())) return []Msg{} @@ -59,11 +64,7 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { dMsg, ok := in[0].(*deleteMsg) if !ok { - if in[0] == nil { - log.Debug("type assertion failed for deleteMsg because it's nil", zap.String("name", dNode.Name())) - } else { - log.Warn("type assertion failed for deleteMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", dNode.Name())) - } + log.Warn("type assertion failed for deleteMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", dNode.Name())) return []Msg{} } @@ -73,10 +74,6 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { deleteOffset: map[UniqueID]int64{}, } - if dMsg == nil { - return []Msg{} - } - var spans []opentracing.Span for _, msg := range dMsg.deleteMessages { sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) diff --git a/internal/querynode/flow_graph_filter_delete_node.go b/internal/querynode/flow_graph_filter_delete_node.go index deb2d4b666..dcb93da531 100644 --- a/internal/querynode/flow_graph_filter_delete_node.go +++ b/internal/querynode/flow_graph_filter_delete_node.go @@ -45,6 +45,11 @@ func (fddNode *filterDeleteNode) Name() string { // Operate handles input messages, to filter invalid delete messages func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { + if in == nil { + log.Debug("type assertion failed for MsgStreamMsg because it's nil", zap.String("name", fddNode.Name())) + return []Msg{} + } + if len(in) != 1 { log.Warn("Invalid operate message input in filterDDNode", zap.Int("input length", len(in)), zap.String("name", fddNode.Name())) return []Msg{} @@ -52,15 +57,7 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { msgStreamMsg, ok := in[0].(*MsgStreamMsg) if !ok { - if in[0] == nil { - log.Debug("type assertion failed for MsgStreamMsg because it's nil", zap.String("name", fddNode.Name())) - } else { - log.Warn("type assertion failed for MsgStreamMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", fddNode.Name())) - } - return []Msg{} - } - - if msgStreamMsg == nil { + log.Warn("type assertion failed for MsgStreamMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", fddNode.Name())) return []Msg{} } diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index 9824d9cd70..0e95959a44 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -49,6 +49,10 @@ func (fdmNode *filterDmNode) Name() string { // Operate handles input messages, to filter invalid insert messages func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { + if in == nil { + log.Debug("type assertion failed for MsgStreamMsg because it's nil", zap.String("name", fdmNode.Name())) + return []Msg{} + } if len(in) != 1 { log.Warn("Invalid operate message input in filterDmNode", zap.Int("input length", len(in)), zap.String("name", fdmNode.Name())) return []Msg{} @@ -56,15 +60,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { msgStreamMsg, ok := in[0].(*MsgStreamMsg) if !ok { - if in[0] == nil { - log.Debug("type assertion failed for MsgStreamMsg because it's nil", zap.String("name", fdmNode.Name())) - } else { - log.Warn("type assertion failed for MsgStreamMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", fdmNode.Name())) - } - return []Msg{} - } - - if msgStreamMsg == nil { + log.Warn("type assertion failed for MsgStreamMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", fdmNode.Name())) return []Msg{} } diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index 8b620156cd..f778c5920a 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -72,6 +72,11 @@ func (iNode *insertNode) Name() string { // Operate handles input messages, to execute insert operations func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { + if in == nil { + log.Debug("type assertion failed for insertMsg because it's nil", zap.String("name", iNode.Name())) + return []Msg{} + } + if len(in) != 1 { log.Warn("Invalid operate message input in insertNode", zap.Int("input length", len(in)), zap.String("name", iNode.Name())) return []Msg{} @@ -79,11 +84,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { iMsg, ok := in[0].(*insertMsg) if !ok { - if in[0] == nil { - log.Debug("type assertion failed for insertMsg because it's nil", zap.String("name", iNode.Name())) - } else { - log.Warn("type assertion failed for insertMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", iNode.Name())) - } + log.Warn("type assertion failed for insertMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", iNode.Name())) return []Msg{} } @@ -95,10 +96,6 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { insertPKs: make(map[UniqueID][]primaryKey), } - if iMsg == nil { - return []Msg{} - } - var spans []opentracing.Span for _, msg := range iMsg.insertMessages { sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) diff --git a/internal/querynode/flow_graph_query_node.go b/internal/querynode/flow_graph_query_node.go index e20d7a6179..49baa59549 100644 --- a/internal/querynode/flow_graph_query_node.go +++ b/internal/querynode/flow_graph_query_node.go @@ -75,7 +75,6 @@ func newQueryNodeFlowGraph(ctx context.Context, // dmStreamNode err = q.flowGraph.SetEdges(dmStreamNode.Name(), - []string{}, []string{filterDmNode.Name()}, ) if err != nil { @@ -84,7 +83,6 @@ func newQueryNodeFlowGraph(ctx context.Context, // filterDmNode err = q.flowGraph.SetEdges(filterDmNode.Name(), - []string{dmStreamNode.Name()}, []string{insertNode.Name()}, ) if err != nil { @@ -93,7 +91,6 @@ func newQueryNodeFlowGraph(ctx context.Context, // insertNode err = q.flowGraph.SetEdges(insertNode.Name(), - []string{filterDmNode.Name()}, []string{serviceTimeNode.Name()}, ) if err != nil { @@ -102,7 +99,6 @@ func newQueryNodeFlowGraph(ctx context.Context, // serviceTimeNode err = q.flowGraph.SetEdges(serviceTimeNode.Name(), - []string{insertNode.Name()}, []string{}, ) if err != nil { @@ -145,7 +141,6 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context, // dmStreamNode err = q.flowGraph.SetEdges(dmStreamNode.Name(), - []string{}, []string{filterDeleteNode.Name()}, ) if err != nil { @@ -154,7 +149,6 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context, // filterDmNode err = q.flowGraph.SetEdges(filterDeleteNode.Name(), - []string{dmStreamNode.Name()}, []string{deleteNode.Name()}, ) if err != nil { @@ -163,7 +157,6 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context, // insertNode err = q.flowGraph.SetEdges(deleteNode.Name(), - []string{filterDeleteNode.Name()}, []string{serviceTimeNode.Name()}, ) if err != nil { @@ -172,7 +165,6 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context, // serviceTimeNode err = q.flowGraph.SetEdges(serviceTimeNode.Name(), - []string{deleteNode.Name()}, []string{}, ) if err != nil { diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index c853982cf3..9b263c5d87 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -42,6 +42,11 @@ func (stNode *serviceTimeNode) Name() string { // Operate handles input messages, to execute insert operations func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { + if in == nil { + log.Debug("type assertion failed for serviceTimeMsg because it's nil", zap.String("name", stNode.Name())) + return []Msg{} + } + if len(in) != 1 { log.Warn("Invalid operate message input in serviceTimeNode, input length = ", zap.Int("input node", len(in)), zap.String("name", stNode.Name())) return []Msg{} @@ -49,15 +54,7 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { serviceTimeMsg, ok := in[0].(*serviceTimeMsg) if !ok { - if in[0] == nil { - log.Debug("type assertion failed for serviceTimeMsg because it's nil", zap.String("name", stNode.Name())) - } else { - log.Warn("type assertion failed for serviceTimeMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", stNode.Name())) - } - return []Msg{} - } - - if serviceTimeMsg == nil { + log.Warn("type assertion failed for serviceTimeMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", stNode.Name())) return []Msg{} } @@ -75,7 +72,7 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { zap.Any("channel", stNode.vChannel), ) - return []Msg{} + return in } // newServiceTimeNode returns a new serviceTimeNode diff --git a/internal/util/flowgraph/flow_graph.go b/internal/util/flowgraph/flow_graph.go index 28e5002daf..c5bab262ac 100644 --- a/internal/util/flowgraph/flow_graph.go +++ b/internal/util/flowgraph/flow_graph.go @@ -22,6 +22,8 @@ import ( "sync" ) +// Flow Graph is no longer a graph rather than a simple pipeline, this simplified our code and increase recovery speed - xiaofan. + // TimeTickedFlowGraph flowgraph with input from tt msg stream type TimeTickedFlowGraph struct { nodeCtx map[NodeName]*nodeCtx @@ -33,45 +35,37 @@ type TimeTickedFlowGraph struct { // AddNode add Node into flowgraph func (fg *TimeTickedFlowGraph) AddNode(node Node) { nodeCtx := nodeCtx{ - node: node, - downstreamInputChanIdx: make(map[string]int), - closeCh: make(chan struct{}), - closeWg: fg.closeWg, + node: node, + closeCh: make(chan struct{}), + closeWg: fg.closeWg, } fg.nodeCtx[node.Name()] = &nodeCtx } // SetEdges set directed edges from in nodes to out nodes -func (fg *TimeTickedFlowGraph) SetEdges(nodeName string, in []string, out []string) error { +func (fg *TimeTickedFlowGraph) SetEdges(nodeName string, out []string) error { currentNode, ok := fg.nodeCtx[nodeName] if !ok { errMsg := "Cannot find node:" + nodeName return errors.New(errMsg) } - // init current node's downstream - currentNode.downstream = make([]*nodeCtx, len(out)) - - // set in nodes - for i, inNodeName := range in { - inNode, ok := fg.nodeCtx[inNodeName] - if !ok { - errMsg := "Cannot find in node:" + inNodeName - return errors.New(errMsg) - } - inNode.downstreamInputChanIdx[nodeName] = i + if len(out) > 1 { + errMsg := "Flow graph now support only pipeline mode, with only one or zero output:" + nodeName + return errors.New(errMsg) } + // init current node's downstream // set out nodes - for i, n := range out { - outNode, ok := fg.nodeCtx[n] + for _, name := range out { + outNode, ok := fg.nodeCtx[name] if !ok { - errMsg := "Cannot find out node:" + n + errMsg := "Cannot find out node:" + name return errors.New(errMsg) } maxQueueLength := outNode.node.MaxQueueLength() - outNode.inputChannels = append(outNode.inputChannels, make(chan Msg, maxQueueLength)) - currentNode.downstream[i] = outNode + outNode.inputChannel = make(chan []Msg, maxQueueLength) + currentNode.downstream = outNode } return nil diff --git a/internal/util/flowgraph/flow_graph_test.go b/internal/util/flowgraph/flow_graph_test.go index 74c3455c85..bb6d28eeed 100644 --- a/internal/util/flowgraph/flow_graph_test.go +++ b/internal/util/flowgraph/flow_graph_test.go @@ -18,7 +18,6 @@ package flowgraph import ( "context" - "log" "math" "math/rand" "testing" @@ -27,11 +26,10 @@ import ( "github.com/stretchr/testify/assert" ) -// Flow graph basic example: count `d = pow(a) + sqrt(a)` +// Flow graph basic example: count `c = pow(a) + 2` // nodeA: receive input value a from input channel // nodeB: count b = pow(a, 2) -// nodeC: count c = sqrt(a) -// nodeD: count d = b + c +// nodeD: count c = b + 2 type nodeA struct { BaseNode @@ -45,11 +43,6 @@ type nodeB struct { } type nodeC struct { - BaseNode - c float64 -} - -type nodeD struct { BaseNode d float64 outputChan chan float64 @@ -73,7 +66,7 @@ func (n *nodeA) Operate(in []Msg) []Msg { var res Msg = &numMsg{ num: a, } - return []Msg{res, res} + return []Msg{res} } func (n *nodeB) Name() string { @@ -81,12 +74,9 @@ func (n *nodeB) Name() string { } func (n *nodeB) Operate(in []Msg) []Msg { - if len(in) != 1 { - panic("illegal in") - } a, ok := in[0].(*numMsg) if !ok { - return []Msg{} + return nil } b := math.Pow(a.num, 2) var res Msg = &numMsg{ @@ -100,43 +90,17 @@ func (n *nodeC) Name() string { } func (n *nodeC) Operate(in []Msg) []Msg { - if len(in) != 1 { - panic("illegal in") - } - a, ok := in[0].(*numMsg) - if !ok { - return []Msg{} - } - c := math.Sqrt(a.num) - var res Msg = &numMsg{ - num: c, - } - return []Msg{res} -} - -func (n *nodeD) Name() string { - return "NodeD" -} - -func (n *nodeD) Operate(in []Msg) []Msg { - if len(in) != 2 { - panic("illegal in") - } b, ok := in[0].(*numMsg) if !ok { return nil } - c, ok := in[1].(*numMsg) - if !ok { - return nil - } - d := b.num + c.num - n.outputChan <- d + c := b.num + 2 + n.outputChan <- c // return nil because nodeD doesn't have any downstream node. return nil } -func createExampleFlowGraph() (*TimeTickedFlowGraph, chan float64, chan float64, context.CancelFunc) { +func createExampleFlowGraph() (*TimeTickedFlowGraph, chan float64, chan float64, context.CancelFunc, error) { const MaxQueueLength = 1024 ctx, cancel := context.WithCancel(context.Background()) @@ -160,52 +124,35 @@ func createExampleFlowGraph() (*TimeTickedFlowGraph, chan float64, chan float64, BaseNode: BaseNode{ maxQueueLength: MaxQueueLength, }, - } - var d Node = &nodeD{ - BaseNode: BaseNode{ - maxQueueLength: MaxQueueLength, - }, outputChan: outputChan, } fg.AddNode(a) fg.AddNode(b) fg.AddNode(c) - fg.AddNode(d) var err = fg.SetEdges(a.Name(), - []string{}, - []string{b.Name(), c.Name()}, + []string{b.Name()}, ) if err != nil { - log.Fatal("set edges failed") + return nil, nil, nil, cancel, err } err = fg.SetEdges(b.Name(), - []string{a.Name()}, - []string{d.Name()}, + []string{c.Name()}, ) if err != nil { - log.Fatal("set edges failed") + return nil, nil, nil, cancel, err } err = fg.SetEdges(c.Name(), - []string{a.Name()}, - []string{d.Name()}, - ) - if err != nil { - log.Fatal("set edges failed") - } - - err = fg.SetEdges(d.Name(), - []string{b.Name(), c.Name()}, []string{}, ) if err != nil { - log.Fatal("set edges failed") + return nil, nil, nil, cancel, err } - return fg, inputChan, outputChan, cancel + return fg, inputChan, outputChan, cancel, nil } func TestTimeTickedFlowGraph_AddNode(t *testing.T) { @@ -232,60 +179,9 @@ func TestTimeTickedFlowGraph_AddNode(t *testing.T) { assert.Equal(t, len(fg.nodeCtx), 2) } -func TestTimeTickedFlowGraph_SetEdges(t *testing.T) { - const MaxQueueLength = 1024 - inputChan := make(chan float64, MaxQueueLength) - - fg := NewTimeTickedFlowGraph(context.TODO()) - - var a Node = &nodeA{ - BaseNode: BaseNode{ - maxQueueLength: MaxQueueLength, - }, - inputChan: inputChan, - } - var b Node = &nodeB{ - BaseNode: BaseNode{ - maxQueueLength: MaxQueueLength, - }, - } - var c Node = &nodeC{ - BaseNode: BaseNode{ - maxQueueLength: MaxQueueLength, - }, - } - - fg.AddNode(a) - fg.AddNode(b) - fg.AddNode(c) - - var err = fg.SetEdges(a.Name(), - []string{b.Name()}, - []string{c.Name()}, - ) - assert.Nil(t, err) - - err = fg.SetEdges("Invalid", - []string{b.Name()}, - []string{c.Name()}, - ) - assert.Error(t, err) - - err = fg.SetEdges(a.Name(), - []string{"Invalid"}, - []string{c.Name()}, - ) - assert.Error(t, err) - - err = fg.SetEdges(a.Name(), - []string{b.Name()}, - []string{"Invalid"}, - ) - assert.Error(t, err) -} - func TestTimeTickedFlowGraph_Start(t *testing.T) { - fg, inputChan, outputChan, cancel := createExampleFlowGraph() + fg, inputChan, outputChan, cancel, err := createExampleFlowGraph() + assert.NoError(t, err) defer cancel() fg.Start() @@ -297,7 +193,7 @@ func TestTimeTickedFlowGraph_Start(t *testing.T) { // output check d := <-outputChan - res := math.Pow(a, 2) + math.Sqrt(a) + res := math.Pow(a, 2) + 2 assert.Equal(t, d, res) } }() @@ -305,7 +201,8 @@ func TestTimeTickedFlowGraph_Start(t *testing.T) { } func TestTimeTickedFlowGraph_Close(t *testing.T) { - fg, _, _, cancel := createExampleFlowGraph() + fg, _, _, cancel, err := createExampleFlowGraph() + assert.NoError(t, err) defer cancel() fg.Close() } diff --git a/internal/util/flowgraph/input_node.go b/internal/util/flowgraph/input_node.go index 0ad23da735..136ed04e6d 100644 --- a/internal/util/flowgraph/input_node.go +++ b/internal/util/flowgraph/input_node.go @@ -17,6 +17,8 @@ package flowgraph import ( + "sync" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/util/trace" @@ -28,9 +30,9 @@ import ( // InputNode is the entry point of flowgragh type InputNode struct { BaseNode - inStream msgstream.MsgStream - name string - closeMsgChan chan struct{} + inStream msgstream.MsgStream + name string + closeOnce sync.Once } // IsInputNode returns whether Node is InputNode @@ -45,15 +47,9 @@ func (inNode *InputNode) Start() { // Close implements node func (inNode *InputNode) Close() { - select { - case <-inNode.closeMsgChan: - return - default: - close(inNode.closeMsgChan) - log.Debug("message stream closed", - zap.String("node name", inNode.name), - ) - } + inNode.closeOnce.Do(func() { + inNode.inStream.Close() + }) } // Name returns node name @@ -68,44 +64,40 @@ func (inNode *InputNode) InStream() msgstream.MsgStream { // Operate consume a message pack from msgstream and return func (inNode *InputNode) Operate(in []Msg) []Msg { - select { - case <-inNode.closeMsgChan: - inNode.inStream.Close() + msgPack, ok := <-inNode.inStream.Chan() + if !ok { + log.Warn("MsgStream closed", zap.Any("input node", inNode.Name())) return []Msg{&MsgStreamMsg{ isCloseMsg: true, }} - case msgPack, ok := <-inNode.inStream.Chan(): - if !ok { - log.Warn("MsgStream closed", zap.Any("input node", inNode.Name())) - return []Msg{} - } - - // TODO: add status - if msgPack == nil { - return nil - } - var spans []opentracing.Span - for _, msg := range msgPack.Msgs { - sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) - sp.LogFields(oplog.String("input_node name", inNode.Name())) - spans = append(spans, sp) - msg.SetTraceCtx(ctx) - } - - var msgStreamMsg Msg = &MsgStreamMsg{ - tsMessages: msgPack.Msgs, - timestampMin: msgPack.BeginTs, - timestampMax: msgPack.EndTs, - startPositions: msgPack.StartPositions, - endPositions: msgPack.EndPositions, - } - - for _, span := range spans { - span.Finish() - } - - return []Msg{msgStreamMsg} } + + // TODO: add status + if msgPack == nil { + return []Msg{} + } + var spans []opentracing.Span + for _, msg := range msgPack.Msgs { + sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) + sp.LogFields(oplog.String("input_node name", inNode.Name())) + spans = append(spans, sp) + msg.SetTraceCtx(ctx) + } + + var msgStreamMsg Msg = &MsgStreamMsg{ + tsMessages: msgPack.Msgs, + timestampMin: msgPack.BeginTs, + timestampMax: msgPack.EndTs, + startPositions: msgPack.StartPositions, + endPositions: msgPack.EndPositions, + } + + for _, span := range spans { + span.Finish() + } + + // TODO batch operate msg + return []Msg{msgStreamMsg} } // NewInputNode composes an InputNode with provided MsgStream, name and parameters @@ -115,9 +107,8 @@ func NewInputNode(inStream msgstream.MsgStream, nodeName string, maxQueueLength baseNode.SetMaxParallelism(maxParallelism) return &InputNode{ - BaseNode: baseNode, - inStream: inStream, - name: nodeName, - closeMsgChan: make(chan struct{}), + BaseNode: baseNode, + inStream: inStream, + name: nodeName, } } diff --git a/internal/util/flowgraph/input_node_test.go b/internal/util/flowgraph/input_node_test.go index 7812805a4d..e5ea0213a1 100644 --- a/internal/util/flowgraph/input_node_test.go +++ b/internal/util/flowgraph/input_node_test.go @@ -53,8 +53,11 @@ func TestInputNode(t *testing.T) { stream := inputNode.InStream() assert.NotNil(t, stream) - output := inputNode.Operate([]Msg{}) - assert.Greater(t, len(output), 0) + output := inputNode.Operate(nil) + assert.NotNil(t, output) + msg, ok := output[0].(*MsgStreamMsg) + assert.True(t, ok) + assert.False(t, msg.isCloseMsg) } func Test_NewInputNode(t *testing.T) { diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go index 997a193c5f..9412445a38 100644 --- a/internal/util/flowgraph/node.go +++ b/internal/util/flowgraph/node.go @@ -52,11 +52,9 @@ type BaseNode struct { // nodeCtx maintains the running context for a Node in flowgragh type nodeCtx struct { - node Node - inputChannels []chan Msg - inputMessages []Msg - downstream []*nodeCtx - downstreamInputChanIdx map[string]int + node Node + inputChannel chan []Msg + downstream *nodeCtx closeCh chan struct{} // notify work to exit closeWg *sync.WaitGroup @@ -100,45 +98,37 @@ func (nodeCtx *nodeCtx) work() { return default: // inputs from inputsMessages for Operate - var inputs, res []Msg + var input, output []Msg if !nodeCtx.node.IsInputNode() { - nodeCtx.collectInputMessages() - inputs = nodeCtx.inputMessages + input = <-nodeCtx.inputChannel } + // the input message decides whether the operate method is executed - if isCloseMsg(inputs) { - res = inputs + if isCloseMsg(input) { + output = input } - if len(res) == 0 { + if len(output) == 0 { n := nodeCtx.node - res = n.Operate(inputs) + output = n.Operate(input) } - // the res decide whether the node should be closed. - if isCloseMsg(res) { + // the output decide whether the node should be closed. + if isCloseMsg(output) { close(nodeCtx.closeCh) nodeCtx.closeWg.Done() nodeCtx.node.Close() + if nodeCtx.inputChannel != nil { + close(nodeCtx.inputChannel) + } } if enableTtChecker { checker.Check(name) } - downstreamLength := len(nodeCtx.downstreamInputChanIdx) - if len(nodeCtx.downstream) < downstreamLength { - log.Warn("", zap.Any("nodeCtx.downstream length", len(nodeCtx.downstream))) + // deliver to all following flow graph node. + if nodeCtx.downstream != nil { + nodeCtx.downstream.inputChannel <- output } - if len(res) < downstreamLength { - // log.Println("node result length = ", len(res)) - break - } - - w := sync.WaitGroup{} - for i := 0; i < downstreamLength; i++ { - w.Add(1) - go nodeCtx.downstream[i].deliverMsg(&w, res[i], nodeCtx.downstreamInputChanIdx[nodeCtx.downstream[i].node.Name()]) - } - w.Wait() } } } @@ -150,72 +140,6 @@ func (nodeCtx *nodeCtx) Close() { } } -// deliverMsg tries to put the Msg to specified downstream channel -func (nodeCtx *nodeCtx) deliverMsg(wg *sync.WaitGroup, msg Msg, inputChanIdx int) { - defer wg.Done() - defer func() { - err := recover() - if err != nil { - log.Warn(fmt.Sprintln(err)) - } - }() - nodeCtx.inputChannels[inputChanIdx] <- msg -} - -func (nodeCtx *nodeCtx) collectInputMessages() { - inputsNum := len(nodeCtx.inputChannels) - nodeCtx.inputMessages = make([]Msg, inputsNum) - - // init inputMessages, - // receive messages from inputChannels, - // and move them to inputMessages. - for i := 0; i < inputsNum; i++ { - channel := nodeCtx.inputChannels[i] - msg, ok := <-channel - if !ok { - // TODO: add status - log.Warn("input channel closed") - return - } - nodeCtx.inputMessages[i] = msg - } - - // timeTick alignment check - if len(nodeCtx.inputMessages) > 1 { - t := nodeCtx.inputMessages[0].TimeTick() - latestTime := t - for i := 1; i < len(nodeCtx.inputMessages); i++ { - if latestTime < nodeCtx.inputMessages[i].TimeTick() { - latestTime = nodeCtx.inputMessages[i].TimeTick() - } - } - - // wait for time tick - sign := make(chan struct{}) - go func() { - for i := 0; i < len(nodeCtx.inputMessages); i++ { - for nodeCtx.inputMessages[i].TimeTick() != latestTime { - log.Debug("Try to align timestamp", zap.Uint64("t1", latestTime), zap.Uint64("t2", nodeCtx.inputMessages[i].TimeTick())) - channel := nodeCtx.inputChannels[i] - msg, ok := <-channel - if !ok { - log.Warn("input channel closed") - return - } - nodeCtx.inputMessages[i] = msg - } - } - sign <- struct{}{} - }() - - select { - case <-time.After(10 * time.Second): - panic("Fatal, misaligned time tick, please restart pulsar") - case <-sign: - } - } -} - // MaxQueueLength returns the maximal queue length func (node *BaseNode) MaxQueueLength() int32 { return node.maxQueueLength diff --git a/internal/util/flowgraph/node_test.go b/internal/util/flowgraph/node_test.go index 8cbec47f72..7b1c91cdfe 100644 --- a/internal/util/flowgraph/node_test.go +++ b/internal/util/flowgraph/node_test.go @@ -77,16 +77,12 @@ func TestNodeCtx_Start(t *testing.T) { inputNode := NewInputNode(msgStream, nodeName, 100, 100) node := &nodeCtx{ - node: inputNode, - inputChannels: make([]chan Msg, 2), - downstreamInputChanIdx: make(map[string]int), - closeCh: make(chan struct{}), - closeWg: &sync.WaitGroup{}, + node: inputNode, + closeCh: make(chan struct{}), + closeWg: &sync.WaitGroup{}, } - for i := 0; i < len(node.inputChannels); i++ { - node.inputChannels[i] = make(chan Msg) - } + node.inputChannel = make(chan []Msg) assert.NotPanics(t, func() { node.Start()