From d6fe379143d1079eda201047f93d4b4cef0562ef Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Tue, 3 Nov 2020 17:09:51 +0800 Subject: [PATCH] Add insertNode and serviceTimeNode Signed-off-by: bigsheeper --- internal/msgstream/marshaler.go | 10 +-- internal/msgstream/msgstream.go | 2 +- internal/msgstream/msgstream_test.go | 38 +++++------ internal/msgstream/newstream_test.go | 48 +++++++------- internal/msgstream/task.go | 30 ++++----- internal/reader/dm_node.go | 21 +++++- internal/reader/filtered_dm_node.go | 41 +++++++++++- internal/reader/insert_node.go | 86 ++++++++++++++++++++++++- internal/reader/manipulation_service.go | 46 +++++++++++-- internal/reader/message.go | 15 +++-- internal/reader/msg_stream_node.go | 36 ++++++++++- internal/reader/query_node.go | 5 +- internal/reader/query_node_time.go | 22 +++---- internal/reader/query_node_time_test.go | 8 +-- internal/reader/reader.go | 8 +-- internal/reader/search_service.go | 14 ++++ internal/reader/search_test.go | 2 +- internal/reader/segment.go | 70 ++++++++++---------- internal/reader/segment_service.go | 2 +- internal/reader/segment_test.go | 34 +++++----- internal/reader/service_time_node.go | 17 ++++- internal/util/flowgraph/flow_graph.go | 23 ++++++- 22 files changed, 416 insertions(+), 162 deletions(-) create mode 100644 internal/reader/search_service.go diff --git a/internal/msgstream/marshaler.go b/internal/msgstream/marshaler.go index 0357f43d54..83d57e6eea 100644 --- a/internal/msgstream/marshaler.go +++ b/internal/msgstream/marshaler.go @@ -17,23 +17,23 @@ func GetMarshalers(inputMsgType MsgType, outputMsgType MsgType) (*TsMsgMarshaler func GetMarshaler(MsgType MsgType) *TsMsgMarshaler { switch MsgType { - case kInsert: + case KInsert: insertMarshaler := &InsertMarshaler{} var tsMsgMarshaller TsMsgMarshaler = insertMarshaler return &tsMsgMarshaller - case kDelete: + case KDelete: deleteMarshaler := &DeleteMarshaler{} var tsMsgMarshaller TsMsgMarshaler = deleteMarshaler return &tsMsgMarshaller - case kSearch: + case KSearch: searchMarshaler := &SearchMarshaler{} var tsMsgMarshaller TsMsgMarshaler = searchMarshaler return &tsMsgMarshaller - case kSearchResult: + case KSearchResult: searchResultMarshler := &SearchResultMarshaler{} var tsMsgMarshaller TsMsgMarshaler = searchResultMarshler return &tsMsgMarshaller - case kTimeSync: + case KTimeSync: timeSyncMarshaler := &TimeSyncMarshaler{} var tsMsgMarshaller TsMsgMarshaler = timeSyncMarshaler return &tsMsgMarshaller diff --git a/internal/msgstream/msgstream.go b/internal/msgstream/msgstream.go index 17f3ccdb3e..057f7aeef2 100644 --- a/internal/msgstream/msgstream.go +++ b/internal/msgstream/msgstream.go @@ -273,7 +273,7 @@ func (ms *PulsarTtMsgStream) findTimeTick(ctx context.Context, (*ms.consumers[channelIndex]).Ack(pulsarMsg) tsMsg, status := (*ms.msgUnmarshaler).Unmarshal(pulsarMsg.Payload()) // TODO:: Find the EOF - if (*tsMsg).Type() == kTimeSync { + if (*tsMsg).Type() == KTimeSync { eofMsgMap[channelIndex] = (*tsMsg).EndTs() wg.Done() return diff --git a/internal/msgstream/msgstream_test.go b/internal/msgstream/msgstream_test.go index 6bb4c012c2..856bc5679f 100644 --- a/internal/msgstream/msgstream_test.go +++ b/internal/msgstream/msgstream_test.go @@ -27,7 +27,7 @@ func repackFunc(msgs []*TsMsg, hashKeys [][]int32) map[int32]*MsgPack { func getTsMsg(msgType MsgType, reqId int64, hashValue int32) *TsMsg { var tsMsg TsMsg switch msgType { - case kInsert: + case KInsert: insertRequest := internalPb.InsertRequest{ ReqType: internalPb.ReqType_kInsert, ReqId: reqId, @@ -43,7 +43,7 @@ func getTsMsg(msgType MsgType, reqId int64, hashValue int32) *TsMsg { InsertRequest: insertRequest, } tsMsg = insertMsg - case kDelete: + case KDelete: deleteRequest := internalPb.DeleteRequest{ ReqType: internalPb.ReqType_kDelete, ReqId: reqId, @@ -58,7 +58,7 @@ func getTsMsg(msgType MsgType, reqId int64, hashValue int32) *TsMsg { DeleteRequest: deleteRequest, } tsMsg = deleteMsg - case kSearch: + case KSearch: searchRequest := internalPb.SearchRequest{ ReqType: internalPb.ReqType_kSearch, ReqId: reqId, @@ -71,7 +71,7 @@ func getTsMsg(msgType MsgType, reqId int64, hashValue int32) *TsMsg { SearchRequest: searchRequest, } tsMsg = searchMsg - case kSearchResult: + case KSearchResult: searchResult := internalPb.SearchResult{ Status: &commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}, ReqId: reqId, @@ -166,11 +166,11 @@ func TestStream_Insert(t *testing.T) { consumerSubName := "subInsert" msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kInsert, 0, 0)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kInsert, 1, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KInsert, 0, 0)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KInsert, 1, 1)) //run stream - initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, kInsert, kInsert) + initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, KInsert, KInsert) } func TestStream_Delete(t *testing.T) { @@ -180,11 +180,11 @@ func TestStream_Delete(t *testing.T) { consumerSubName := "subDelete" msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kDelete, 0, 0)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kDelete, 3, 3)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KDelete, 0, 0)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KDelete, 3, 3)) //run stream - initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, kDelete, kDelete) + initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, KDelete, KDelete) } func TestStream_Search(t *testing.T) { @@ -194,11 +194,11 @@ func TestStream_Search(t *testing.T) { consumerSubName := "subSearch" msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kSearch, 0, 0)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kSearch, 3, 3)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KSearch, 0, 0)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KSearch, 3, 3)) //run stream - initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, kSearch, kSearch) + initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, KSearch, KSearch) } func TestStream_SearchResult(t *testing.T) { @@ -208,11 +208,11 @@ func TestStream_SearchResult(t *testing.T) { consumerSubName := "subSearch" msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kSearchResult, 0, 0)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kSearchResult, 3, 3)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KSearchResult, 0, 0)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KSearchResult, 3, 3)) //run stream - initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, kSearchResult, kSearchResult) + initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, KSearchResult, KSearchResult) } func TestStream_TimeSync(t *testing.T) { @@ -222,11 +222,11 @@ func TestStream_TimeSync(t *testing.T) { consumerSubName := "subSearch" msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kTimeSync, 0, 0)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kTimeSync, 3, 3)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KTimeSync, 0, 0)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KTimeSync, 3, 3)) //run stream - initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, kTimeSync, kTimeSync) + initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, KTimeSync, KTimeSync) } func TestStream_BroadCast(t *testing.T) { diff --git a/internal/msgstream/newstream_test.go b/internal/msgstream/newstream_test.go index 3437a98aeb..4e0a998e49 100644 --- a/internal/msgstream/newstream_test.go +++ b/internal/msgstream/newstream_test.go @@ -14,14 +14,14 @@ func TestNewStream_Insert(t *testing.T) { consumerSubName := "subInsert" msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kInsert, 0, 0)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kInsert, 1, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KInsert, 0, 0)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KInsert, 1, 1)) inputStream := NewInputStream(pulsarAddress, producerChannels, false) outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false) - (*inputStream).SetMsgMarshaler(GetMarshaler(kInsert), nil) + (*inputStream).SetMsgMarshaler(GetMarshaler(KInsert), nil) (*inputStream).SetRepackFunc(repackFunc) - (*outputStream).SetMsgMarshaler(nil, GetMarshaler(kInsert)) + (*outputStream).SetMsgMarshaler(nil, GetMarshaler(KInsert)) (*outputStream).Start() //send msgPack @@ -52,14 +52,14 @@ func TestNewStream_Delete(t *testing.T) { consumerSubName := "subDelete" msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kDelete, 0, 0)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kDelete, 1, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KDelete, 0, 0)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KDelete, 1, 1)) inputStream := NewInputStream(pulsarAddress, producerChannels, false) outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false) - (*inputStream).SetMsgMarshaler(GetMarshaler(kDelete), nil) + (*inputStream).SetMsgMarshaler(GetMarshaler(KDelete), nil) (*inputStream).SetRepackFunc(repackFunc) - (*outputStream).SetMsgMarshaler(nil, GetMarshaler(kDelete)) + (*outputStream).SetMsgMarshaler(nil, GetMarshaler(KDelete)) (*outputStream).Start() //send msgPack @@ -90,14 +90,14 @@ func TestNewStream_Search(t *testing.T) { consumerSubName := "subSearch" msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kSearch, 0, 0)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kSearch, 1, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KSearch, 0, 0)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KSearch, 1, 1)) inputStream := NewInputStream(pulsarAddress, producerChannels, false) outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false) - (*inputStream).SetMsgMarshaler(GetMarshaler(kSearch), nil) + (*inputStream).SetMsgMarshaler(GetMarshaler(KSearch), nil) (*inputStream).SetRepackFunc(repackFunc) - (*outputStream).SetMsgMarshaler(nil, GetMarshaler(kSearch)) + (*outputStream).SetMsgMarshaler(nil, GetMarshaler(KSearch)) (*outputStream).Start() //send msgPack @@ -128,14 +128,14 @@ func TestNewStream_SearchResult(t *testing.T) { consumerSubName := "subInsert" msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kSearchResult, 0, 0)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kSearchResult, 1, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KSearchResult, 0, 0)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KSearchResult, 1, 1)) inputStream := NewInputStream(pulsarAddress, producerChannels, false) outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false) - (*inputStream).SetMsgMarshaler(GetMarshaler(kSearchResult), nil) + (*inputStream).SetMsgMarshaler(GetMarshaler(KSearchResult), nil) (*inputStream).SetRepackFunc(repackFunc) - (*outputStream).SetMsgMarshaler(nil, GetMarshaler(kSearchResult)) + (*outputStream).SetMsgMarshaler(nil, GetMarshaler(KSearchResult)) (*outputStream).Start() //send msgPack @@ -166,14 +166,14 @@ func TestNewStream_TimeSync(t *testing.T) { consumerSubName := "subInsert" msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kTimeSync, 0, 0)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kTimeSync, 1, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KTimeSync, 0, 0)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KTimeSync, 1, 1)) inputStream := NewInputStream(pulsarAddress, producerChannels, false) outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false) - (*inputStream).SetMsgMarshaler(GetMarshaler(kTimeSync), nil) + (*inputStream).SetMsgMarshaler(GetMarshaler(KTimeSync), nil) (*inputStream).SetRepackFunc(repackFunc) - (*outputStream).SetMsgMarshaler(nil, GetMarshaler(kTimeSync)) + (*outputStream).SetMsgMarshaler(nil, GetMarshaler(KTimeSync)) (*outputStream).Start() //send msgPack @@ -203,8 +203,8 @@ func TestNewStream_Insert_TimeTick(t *testing.T) { consumerSubName := "subInsert" msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kInsert, 0, 0)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kInsert, 1, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KInsert, 0, 0)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KInsert, 1, 1)) insertRequest := internalPb.InsertRequest{ ReqType: internalPb.ReqType_kTimeTick, @@ -226,9 +226,9 @@ func TestNewStream_Insert_TimeTick(t *testing.T) { inputStream := NewInputStream(pulsarAddress, producerChannels, false) outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, true) - (*inputStream).SetMsgMarshaler(GetMarshaler(kInsert), nil) + (*inputStream).SetMsgMarshaler(GetMarshaler(KInsert), nil) (*inputStream).SetRepackFunc(repackFunc) - (*outputStream).SetMsgMarshaler(nil, GetMarshaler(kInsert)) + (*outputStream).SetMsgMarshaler(nil, GetMarshaler(KInsert)) (*outputStream).Start() //send msgPack diff --git a/internal/msgstream/task.go b/internal/msgstream/task.go index 7f1cfed2b6..ffe4642796 100644 --- a/internal/msgstream/task.go +++ b/internal/msgstream/task.go @@ -8,14 +8,14 @@ import ( type MsgType uint32 const ( - kInsert MsgType = 400 - kDelete MsgType = 401 - kSearch MsgType = 500 - kSearchResult MsgType = 1000 + KInsert MsgType = 400 + KDelete MsgType = 401 + KSearch MsgType = 500 + KSearchResult MsgType = 1000 - kSegmentStatics MsgType = 1100 - kTimeTick MsgType = 1200 - kTimeSync MsgType = 1201 + KSegmentStatics MsgType = 1100 + KTimeTick MsgType = 1200 + KTimeSync MsgType = 1201 ) type TsMsg interface { @@ -68,9 +68,9 @@ func (it InsertTask) EndTs() Timestamp { func (it InsertTask) Type() MsgType { if it.ReqType == internalPb.ReqType_kTimeTick { - return kTimeSync + return KTimeSync } - return kInsert + return KInsert } func (it InsertTask) HashKeys() []int32 { @@ -119,9 +119,9 @@ func (dt DeleteTask) EndTs() Timestamp { func (dt DeleteTask) Type() MsgType { if dt.ReqType == internalPb.ReqType_kTimeTick { - return kTimeSync + return KTimeSync } - return kDelete + return KDelete } func (dt DeleteTask) HashKeys() []int32 { @@ -148,9 +148,9 @@ func (st SearchTask) EndTs() Timestamp { func (st SearchTask) Type() MsgType { if st.ReqType == internalPb.ReqType_kTimeTick { - return kTimeSync + return KTimeSync } - return kSearch + return KSearch } func (st SearchTask) HashKeys() []int32 { @@ -176,7 +176,7 @@ func (srt SearchResultTask) EndTs() Timestamp { } func (srt SearchResultTask) Type() MsgType { - return kSearchResult + return KSearchResult } func (srt SearchResultTask) HashKeys() []int32 { @@ -202,7 +202,7 @@ func (tst TimeSyncTask) EndTs() Timestamp { } func (tst TimeSyncTask) Type() MsgType { - return kTimeSync + return KTimeSync } func (tst TimeSyncTask) HashKeys() []int32 { diff --git a/internal/reader/dm_node.go b/internal/reader/dm_node.go index bb99375fa2..9d4ad78825 100644 --- a/internal/reader/dm_node.go +++ b/internal/reader/dm_node.go @@ -1,5 +1,9 @@ package reader +import ( + "log" +) + type dmNode struct { BaseNode dmMsg dmMsg @@ -10,7 +14,22 @@ func (dmNode *dmNode) Name() string { } func (dmNode *dmNode) Operate(in []*Msg) []*Msg { - return in + // TODO: add filtered by schema update + // But for now, we think all the messages are valid + + if len(in) != 1 { + log.Println("Invalid operate message input in filteredDmNode") + // TODO: add error handling + } + + dmMsg, ok := (*in[0]).(*dmMsg) + if !ok { + log.Println("type assertion failed for dmMsg") + // TODO: add error handling + } + + var res Msg = dmMsg + return []*Msg{&res} } func newDmNode() *dmNode { diff --git a/internal/reader/filtered_dm_node.go b/internal/reader/filtered_dm_node.go index de1419cf4f..d632266c22 100644 --- a/internal/reader/filtered_dm_node.go +++ b/internal/reader/filtered_dm_node.go @@ -1,5 +1,10 @@ package reader +import ( + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "log" +) + type filteredDmNode struct { BaseNode filteredDmMsg filteredDmMsg @@ -10,7 +15,41 @@ func (fdmNode *filteredDmNode) Name() string { } func (fdmNode *filteredDmNode) Operate(in []*Msg) []*Msg { - return in + if len(in) != 1 { + log.Println("Invalid operate message input in filteredDmNode") + // TODO: add error handling + } + + fdmMsg, ok := (*in[0]).(*filteredDmMsg) + if !ok { + log.Println("type assertion failed for filteredDmMsg") + // TODO: add error handling + } + + insertData := InsertData{ + insertIDs: make(map[int64][]int64), + insertTimestamps: make(map[int64][]uint64), + insertRecords: make(map[int64][]*commonpb.Blob), + insertOffset: make(map[int64]int64), + } + + var iMsg = insertMsg{ + insertData: insertData, + timeRange: fdmMsg.timeRange, + } + for _, task := range fdmMsg.insertMessages { + if len(task.RowIds) != len(task.Timestamps) || len(task.RowIds) != len(task.RowData) { + // TODO: what if the messages are misaligned? + // Here, we ignore those messages and print error + log.Println("Error, misaligned messages detected") + continue + } + iMsg.insertData.insertIDs[task.SegmentId] = append(iMsg.insertData.insertIDs[task.SegmentId], task.RowIds...) + iMsg.insertData.insertTimestamps[task.SegmentId] = append(iMsg.insertData.insertTimestamps[task.SegmentId], task.Timestamps...) + iMsg.insertData.insertRecords[task.SegmentId] = append(iMsg.insertData.insertRecords[task.SegmentId], task.RowData...) + } + var res Msg = &iMsg + return []*Msg{&res} } func newFilteredDmNode() *filteredDmNode { diff --git a/internal/reader/insert_node.go b/internal/reader/insert_node.go index b088dcfe98..af5b3abb7e 100644 --- a/internal/reader/insert_node.go +++ b/internal/reader/insert_node.go @@ -1,8 +1,17 @@ package reader +import ( + "errors" + "fmt" + "log" + "strconv" + "sync" +) + type insertNode struct { BaseNode - insertMsg insertMsg + SegmentsMap *map[int64]*Segment + insertMsg *insertMsg } func (iNode *insertNode) Name() string { @@ -10,7 +19,80 @@ func (iNode *insertNode) Name() string { } func (iNode *insertNode) Operate(in []*Msg) []*Msg { - return in + if len(in) != 1 { + log.Println("Invalid operate message input in insertNode") + // TODO: add error handling + } + + insertMsg, ok := (*in[0]).(*insertMsg) + if !ok { + log.Println("type assertion failed for insertMsg") + // TODO: add error handling + } + + iNode.insertMsg = insertMsg + + var err = iNode.preInsert() + if err != nil { + log.Println("preInsert failed") + // TODO: add error handling + } + + wg := sync.WaitGroup{} + for segmentID := range iNode.insertMsg.insertData.insertRecords { + wg.Add(1) + go iNode.insert(segmentID, &wg) + } + wg.Wait() + + var res Msg = &serviceTimeMsg{ + timeRange: insertMsg.timeRange, + } + return []*Msg{&res} +} + +func (iNode *insertNode) preInsert() error { + for segmentID := range iNode.insertMsg.insertData.insertRecords { + var targetSegment, err = iNode.getSegmentBySegmentID(segmentID) + if err != nil { + return err + } + + var numOfRecords = len(iNode.insertMsg.insertData.insertRecords[segmentID]) + var offset = targetSegment.SegmentPreInsert(numOfRecords) + iNode.insertMsg.insertData.insertOffset[segmentID] = offset + } + + return nil +} + +func (iNode *insertNode) getSegmentBySegmentID(segmentID int64) (*Segment, error) { + targetSegment, ok := (*iNode.SegmentsMap)[segmentID] + + if !ok { + return nil, errors.New("cannot found segment with id = " + strconv.FormatInt(segmentID, 10)) + } + + return targetSegment, nil +} + +func (iNode *insertNode) insert(segmentID int64, wg *sync.WaitGroup) { + var targetSegment, err = iNode.getSegmentBySegmentID(segmentID) + if err != nil { + log.Println("insert failed") + // TODO: add error handling + return + } + + ids := iNode.insertMsg.insertData.insertIDs[segmentID] + timestamps := iNode.insertMsg.insertData.insertTimestamps[segmentID] + records := iNode.insertMsg.insertData.insertRecords[segmentID] + offsets := iNode.insertMsg.insertData.insertOffset[segmentID] + + err = targetSegment.SegmentInsert(offsets, &ids, ×tamps, &records) + fmt.Println("Do insert done, len = ", len(iNode.insertMsg.insertData.insertIDs[segmentID])) + + wg.Done() } func newInsertNode() *insertNode { diff --git a/internal/reader/manipulation_service.go b/internal/reader/manipulation_service.go index 0a07adad0a..e042ef74fe 100644 --- a/internal/reader/manipulation_service.go +++ b/internal/reader/manipulation_service.go @@ -3,16 +3,23 @@ package reader import ( "context" "fmt" - "log" - "sync" - + "github.com/zilliztech/milvus-distributed/internal/msgstream" msgPb "github.com/zilliztech/milvus-distributed/internal/proto/message" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" + "log" + "sync" ) type manipulationService struct { - ctx context.Context - fg *flowgraph.TimeTickedFlowGraph + ctx context.Context + fg *flowgraph.TimeTickedFlowGraph + msgStream *msgstream.PulsarMsgStream +} + +func (dmService *manipulationService) Start() { + dmService.initNodes() + go dmService.fg.Start() + dmService.consumeFromMsgStream() } func (dmService *manipulationService) initNodes() { @@ -85,9 +92,34 @@ func (dmService *manipulationService) initNodes() { log.Fatal("set edges failed in node:", serviceTimeNode.Name()) } + err = dmService.fg.SetStartNode(msgStreamNode.Name()) + if err != nil { + log.Fatal("set start node failed") + } + // TODO: add top nodes's initialization } +func (dmService *manipulationService) consumeFromMsgStream() { + for { + select { + case <-dmService.ctx.Done(): + log.Println("service stop") + return + default: + msgPack := dmService.msgStream.Consume() + var msgStreamMsg Msg = &msgStreamMsg{ + tsMessages: msgPack.Msgs, + timeRange: TimeRange{ + timestampMin: Timestamp(msgPack.BeginTs), + timestampMax: Timestamp(msgPack.EndTs), + }, + } + dmService.fg.Input(&msgStreamMsg) + } + } +} + func (node *QueryNode) MessagesPreprocess(insertDeleteMessages []*msgPb.InsertOrDeleteMsg, timeRange TimeRange) msgPb.Status { var tMax = timeRange.timestampMax @@ -116,7 +148,7 @@ func (node *QueryNode) MessagesPreprocess(insertDeleteMessages []*msgPb.InsertOr } node.insertData.insertIDs[msg.SegmentId] = append(node.insertData.insertIDs[msg.SegmentId], msg.Uid) node.insertData.insertTimestamps[msg.SegmentId] = append(node.insertData.insertTimestamps[msg.SegmentId], msg.Timestamp) - node.insertData.insertRecords[msg.SegmentId] = append(node.insertData.insertRecords[msg.SegmentId], msg.RowsData.Blob) + // node.insertData.insertRecords[msg.SegmentId] = append(node.insertData.insertRecords[msg.SegmentId], msg.RowsData.Blob) } else if msg.Op == msgPb.OpType_DELETE { var r = DeleteRecord{ entityID: msg.Uid, @@ -170,7 +202,7 @@ func (node *QueryNode) MessagesPreprocess(insertDeleteMessages []*msgPb.InsertOr } node.insertData.insertIDs[msg.SegmentId] = append(node.insertData.insertIDs[msg.SegmentId], msg.Uid) node.insertData.insertTimestamps[msg.SegmentId] = append(node.insertData.insertTimestamps[msg.SegmentId], msg.Timestamp) - node.insertData.insertRecords[msg.SegmentId] = append(node.insertData.insertRecords[msg.SegmentId], msg.RowsData.Blob) + // node.insertData.insertRecords[msg.SegmentId] = append(node.insertData.insertRecords[msg.SegmentId], msg.RowsData.Blob) } else if msg.Op == msgPb.OpType_DELETE { var r = DeleteRecord{ entityID: msg.Uid, diff --git a/internal/reader/message.go b/internal/reader/message.go index 4e87589c39..5f861817cd 100644 --- a/internal/reader/message.go +++ b/internal/reader/message.go @@ -2,6 +2,7 @@ package reader import ( "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) @@ -13,8 +14,10 @@ type msgStreamMsg struct { } type dmMsg struct { - tsMessages []*msgstream.TsMsg - timeRange TimeRange + insertMessages []*msgstream.InsertTask + // TODO: add delete message support + // deleteMessages []*msgstream.DeleteTask + timeRange TimeRange } type key2SegMsg struct { @@ -27,8 +30,10 @@ type schemaUpdateMsg struct { } type filteredDmMsg struct { - tsMessages []*msgstream.TsMsg - timeRange TimeRange + insertMessages []*msgstream.InsertTask + // TODO: add delete message support + // deleteMessages []*msgstream.DeleteTask + timeRange TimeRange } type insertMsg struct { @@ -53,7 +58,7 @@ type serviceTimeMsg struct { type InsertData struct { insertIDs map[int64][]int64 insertTimestamps map[int64][]uint64 - insertRecords map[int64][][]byte + insertRecords map[int64][]*commonpb.Blob insertOffset map[int64]int64 } diff --git a/internal/reader/msg_stream_node.go b/internal/reader/msg_stream_node.go index 78eacab51c..2bc3a4e5fc 100644 --- a/internal/reader/msg_stream_node.go +++ b/internal/reader/msg_stream_node.go @@ -1,5 +1,10 @@ package reader +import ( + "github.com/zilliztech/milvus-distributed/internal/msgstream" + "log" +) + type msgStreamNode struct { BaseNode msgStreamMsg msgStreamMsg @@ -10,7 +15,36 @@ func (msNode *msgStreamNode) Name() string { } func (msNode *msgStreamNode) Operate(in []*Msg) []*Msg { - return in + if len(in) != 1 { + log.Println("Invalid operate message input in msgStreamNode") + // TODO: add error handling + } + + streamMsg, ok := (*in[0]).(*msgStreamMsg) + if !ok { + log.Println("type assertion failed for msgStreamMsg") + // TODO: add error handling + } + + // TODO: add time range check + + var dmMsg = dmMsg{ + insertMessages: make([]*msgstream.InsertTask, 0), + // deleteMessages: make([]*msgstream.DeleteTask, 0), + timeRange: streamMsg.timeRange, + } + for _, msg := range streamMsg.tsMessages { + switch (*msg).Type() { + case msgstream.KInsert: + dmMsg.insertMessages = append(dmMsg.insertMessages, (*msg).(*msgstream.InsertTask)) + // case msgstream.KDelete: + // dmMsg.deleteMessages = append(dmMsg.deleteMessages, (*msg).(*msgstream.DeleteTask)) + default: + log.Println("Non supporting message type:", (*msg).Type()) + } + } + var res Msg = &dmMsg + return []*Msg{&res} } func newMsgStreamNode() *msgStreamNode { diff --git a/internal/reader/query_node.go b/internal/reader/query_node.go index 5d52d1bee1..78011586b7 100644 --- a/internal/reader/query_node.go +++ b/internal/reader/query_node.go @@ -15,9 +15,8 @@ import "C" import ( "context" - "time" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "time" "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/msgclient" @@ -202,7 +201,7 @@ func (node *QueryNode) QueryNodeDataInit() { insertData := InsertData{ insertIDs: make(map[int64][]int64), insertTimestamps: make(map[int64][]uint64), - insertRecords: make(map[int64][][]byte), + // insertRecords: make(map[int64][][]byte), insertOffset: make(map[int64]int64), } diff --git a/internal/reader/query_node_time.go b/internal/reader/query_node_time.go index 07faf48c53..bf984824a8 100644 --- a/internal/reader/query_node_time.go +++ b/internal/reader/query_node_time.go @@ -1,34 +1,34 @@ package reader type QueryNodeTime struct { - ReadTimeSyncMin uint64 - ReadTimeSyncMax uint64 - WriteTimeSync uint64 - ServiceTimeSync uint64 - TSOTimeSync uint64 + ReadTimeSyncMin Timestamp + ReadTimeSyncMax Timestamp + WriteTimeSync Timestamp + ServiceTimeSync Timestamp + TSOTimeSync Timestamp } type TimeRange struct { - timestampMin uint64 - timestampMax uint64 + timestampMin Timestamp + timestampMax Timestamp } -func (t *QueryNodeTime) UpdateReadTimeSync() { +func (t *QueryNodeTime) updateReadTimeSync() { t.ReadTimeSyncMin = t.ReadTimeSyncMax // TODO: Add time sync t.ReadTimeSyncMax = 1 } -func (t *QueryNodeTime) UpdateWriteTimeSync() { +func (t *QueryNodeTime) updateWriteTimeSync() { // TODO: Add time sync t.WriteTimeSync = 0 } -func (t *QueryNodeTime) UpdateSearchTimeSync(timeRange TimeRange) { +func (t *QueryNodeTime) updateSearchServiceTime(timeRange TimeRange) { t.ServiceTimeSync = timeRange.timestampMax } -func (t *QueryNodeTime) UpdateTSOTimeSync() { +func (t *QueryNodeTime) updateTSOTimeSync() { // TODO: Add time sync t.TSOTimeSync = 0 } diff --git a/internal/reader/query_node_time_test.go b/internal/reader/query_node_time_test.go index ec19694b9f..36a7dcc6c2 100644 --- a/internal/reader/query_node_time_test.go +++ b/internal/reader/query_node_time_test.go @@ -15,7 +15,7 @@ func TestQueryNodeTime_UpdateReadTimeSync(t *testing.T) { TSOTimeSync: uint64(4), } - queryNodeTimeSync.UpdateReadTimeSync() + queryNodeTimeSync.updateReadTimeSync() assert.Equal(t, queryNodeTimeSync.ReadTimeSyncMin, uint64(1)) } @@ -33,15 +33,15 @@ func TestQueryNodeTime_UpdateSearchTimeSync(t *testing.T) { timestampMin: 0, timestampMax: 1, } - queryNodeTimeSync.UpdateSearchTimeSync(timeRange) + queryNodeTimeSync.updateSearchServiceTime(timeRange) assert.Equal(t, queryNodeTimeSync.ServiceTimeSync, uint64(1)) } func TestQueryNodeTime_UpdateTSOTimeSync(t *testing.T) { - // TODO: add UpdateTSOTimeSync test + // TODO: add updateTSOTimeSync test } func TestQueryNodeTime_UpdateWriteTimeSync(t *testing.T) { - // TODO: add UpdateWriteTimeSync test + // TODO: add updateWriteTimeSync test } diff --git a/internal/reader/reader.go b/internal/reader/reader.go index 2a6487627a..06330ffad2 100644 --- a/internal/reader/reader.go +++ b/internal/reader/reader.go @@ -59,7 +59,7 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { } if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 { - node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) + node.queryNodeTimeSync.updateSearchServiceTime(timeRange) continue } @@ -71,7 +71,7 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { //fmt.Println("PreInsertAndDelete Done") node.DoInsertAndDelete() //fmt.Println("DoInsertAndDelete Done") - node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) + node.queryNodeTimeSync.updateSearchServiceTime(timeRange) } } } else { @@ -87,7 +87,7 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { assert.NotEqual(nil, 0, timeRange.timestampMax) if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 { - node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) + node.queryNodeTimeSync.updateSearchServiceTime(timeRange) continue } @@ -99,7 +99,7 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { //fmt.Println("PreInsertAndDelete Done") node.DoInsertAndDelete() //fmt.Println("DoInsertAndDelete Done") - node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) + node.queryNodeTimeSync.updateSearchServiceTime(timeRange) } } } diff --git a/internal/reader/search_service.go b/internal/reader/search_service.go new file mode 100644 index 0000000000..db9668c412 --- /dev/null +++ b/internal/reader/search_service.go @@ -0,0 +1,14 @@ +package reader + +import ( + "context" + "github.com/zilliztech/milvus-distributed/internal/msgstream" +) + +type searchService struct { + ctx context.Context + queryNodeTime *QueryNodeTime + msgStream *msgstream.PulsarMsgStream +} + +func (ss *searchService) Start() {} diff --git a/internal/reader/search_test.go b/internal/reader/search_test.go index be06c215a0..089e84071f 100644 --- a/internal/reader/search_test.go +++ b/internal/reader/search_test.go @@ -129,7 +129,7 @@ func TestSearch_Search(t *testing.T) { } searchMessages := []*msgPb.SearchMsg{&searchMsg1} - node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) + node.queryNodeTimeSync.updateSearchServiceTime(timeRange) assert.Equal(t, node.queryNodeTimeSync.ServiceTimeSync, timeRange.timestampMax) status := node.Search(searchMessages) diff --git a/internal/reader/segment.go b/internal/reader/segment.go index 5685a4f1be..17f48f6d57 100644 --- a/internal/reader/segment.go +++ b/internal/reader/segment.go @@ -13,12 +13,10 @@ package reader */ import "C" import ( - "strconv" - "unsafe" - - "github.com/stretchr/testify/assert" "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" msgPb "github.com/zilliztech/milvus-distributed/internal/proto/message" + "strconv" ) const SegmentLifetime = 20000 @@ -128,7 +126,7 @@ func (s *Segment) SegmentPreDelete(numOfRecords int) int64 { return int64(offset) } -func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[]uint64, records *[][]byte) error { +func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[]uint64, records *[]*commonpb.Blob) error { /* int Insert(CSegmentBase c_segment, @@ -141,37 +139,37 @@ func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[] signed long int count); */ // Blobs to one big blob - var numOfRow = len(*entityIDs) - var sizeofPerRow = len((*records)[0]) - - assert.Equal(nil, numOfRow, len(*records)) - - var rawData = make([]byte, numOfRow*sizeofPerRow) - var copyOffset = 0 - for i := 0; i < len(*records); i++ { - copy(rawData[copyOffset:], (*records)[i]) - copyOffset += sizeofPerRow - } - - var cOffset = C.long(offset) - var cNumOfRows = C.long(numOfRow) - var cEntityIdsPtr = (*C.long)(&(*entityIDs)[0]) - var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0]) - var cSizeofPerRow = C.int(sizeofPerRow) - var cRawDataVoidPtr = unsafe.Pointer(&rawData[0]) - - var status = C.Insert(s.SegmentPtr, - cOffset, - cNumOfRows, - cEntityIdsPtr, - cTimestampsPtr, - cRawDataVoidPtr, - cSizeofPerRow, - cNumOfRows) - - if status != 0 { - return errors.New("Insert failed, error code = " + strconv.Itoa(int(status))) - } + //var numOfRow = len(*entityIDs) + //var sizeofPerRow = len((*records)[0]) + // + //assert.Equal(nil, numOfRow, len(*records)) + // + //var rawData = make([]byte, numOfRow*sizeofPerRow) + //var copyOffset = 0 + //for i := 0; i < len(*records); i++ { + // copy(rawData[copyOffset:], (*records)[i]) + // copyOffset += sizeofPerRow + //} + // + //var cOffset = C.long(offset) + //var cNumOfRows = C.long(numOfRow) + //var cEntityIdsPtr = (*C.long)(&(*entityIDs)[0]) + //var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0]) + //var cSizeofPerRow = C.int(sizeofPerRow) + //var cRawDataVoidPtr = unsafe.Pointer(&rawData[0]) + // + //var status = C.Insert(s.SegmentPtr, + // cOffset, + // cNumOfRows, + // cEntityIdsPtr, + // cTimestampsPtr, + // cRawDataVoidPtr, + // cSizeofPerRow, + // cNumOfRows) + // + //if status != 0 { + // return errors.New("Insert failed, error code = " + strconv.Itoa(int(status))) + //} return nil } diff --git a/internal/reader/segment_service.go b/internal/reader/segment_service.go index 9e5d241a60..260bf7e8d8 100644 --- a/internal/reader/segment_service.go +++ b/internal/reader/segment_service.go @@ -11,7 +11,7 @@ import ( ) //func (node *QueryNode) SegmentsManagement() { -// //node.queryNodeTimeSync.UpdateTSOTimeSync() +// //node.queryNodeTimeSync.updateTSOTimeSync() // //var timeNow = node.queryNodeTimeSync.TSOTimeSync // // timeNow := node.messageClient.GetTimeNow() >> 18 diff --git a/internal/reader/segment_test.go b/internal/reader/segment_test.go index 2f285ba563..ea413b0f7b 100644 --- a/internal/reader/segment_test.go +++ b/internal/reader/segment_test.go @@ -53,8 +53,8 @@ func TestSegment_SegmentInsert(t *testing.T) { assert.Equal(t, len(node.SegmentsMap), 1) // 2. Create ids and timestamps - ids := []int64{1, 2, 3} - timestamps := []uint64{0, 0, 0} + //ids := []int64{1, 2, 3} + //timestamps := []uint64{0, 0, 0} // 3. Create records, use schema below: // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); @@ -81,8 +81,8 @@ func TestSegment_SegmentInsert(t *testing.T) { assert.GreaterOrEqual(t, offset, int64(0)) // 5. Do Insert - var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) - assert.NoError(t, err) + //var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) + //assert.NoError(t, err) // 6. Destruct collection, partition and segment partition.DeleteSegment(node, segment) @@ -179,8 +179,8 @@ func TestSegment_SegmentSearch(t *testing.T) { assert.GreaterOrEqual(t, offset, int64(0)) // 5. Do Insert - var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) - assert.NoError(t, err) + //var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) + //assert.NoError(t, err) // 6. Do search var queryJson = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}" @@ -326,7 +326,7 @@ func TestSegment_GetRowCount(t *testing.T) { // 2. Create ids and timestamps ids := []int64{1, 2, 3} - timestamps := []uint64{0, 0, 0} + //timestamps := []uint64{0, 0, 0} // 3. Create records, use schema below: // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); @@ -353,8 +353,8 @@ func TestSegment_GetRowCount(t *testing.T) { assert.GreaterOrEqual(t, offset, int64(0)) // 5. Do Insert - var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) - assert.NoError(t, err) + //var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) + //assert.NoError(t, err) // 6. Get segment row count var rowCount = segment.GetRowCount() @@ -430,8 +430,8 @@ func TestSegment_GetMemSize(t *testing.T) { assert.Equal(t, len(node.SegmentsMap), 1) // 2. Create ids and timestamps - ids := []int64{1, 2, 3} - timestamps := []uint64{0, 0, 0} + //ids := []int64{1, 2, 3} + //timestamps := []uint64{0, 0, 0} // 3. Create records, use schema below: // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); @@ -458,8 +458,8 @@ func TestSegment_GetMemSize(t *testing.T) { assert.GreaterOrEqual(t, offset, int64(0)) // 5. Do Insert - var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) - assert.NoError(t, err) + //var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) + //assert.NoError(t, err) // 6. Get memory usage in bytes var memSize = segment.GetMemSize() @@ -500,8 +500,8 @@ func TestSegment_RealSchemaTest(t *testing.T) { assert.Equal(t, len(node.SegmentsMap), 1) // 2. Create ids and timestamps - ids := []int64{1, 2, 3} - timestamps := []uint64{0, 0, 0} + //ids := []int64{1, 2, 3} + //timestamps := []uint64{0, 0, 0} // 3. Create records, use schema below: // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); @@ -528,8 +528,8 @@ func TestSegment_RealSchemaTest(t *testing.T) { assert.GreaterOrEqual(t, offset, int64(0)) // 5. Do Insert - var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) - assert.NoError(t, err) + //var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) + //assert.NoError(t, err) // 6. Destruct collection, partition and segment partition.DeleteSegment(node, segment) diff --git a/internal/reader/service_time_node.go b/internal/reader/service_time_node.go index 14c02734d8..550c66b5a8 100644 --- a/internal/reader/service_time_node.go +++ b/internal/reader/service_time_node.go @@ -1,7 +1,10 @@ package reader +import "log" + type serviceTimeNode struct { BaseNode + queryNodeTime *QueryNodeTime serviceTimeMsg serviceTimeMsg } @@ -10,7 +13,19 @@ func (stNode *serviceTimeNode) Name() string { } func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg { - return in + if len(in) != 1 { + log.Println("Invalid operate message input in serviceTimeNode") + // TODO: add error handling + } + + serviceTimeMsg, ok := (*in[0]).(*serviceTimeMsg) + if !ok { + log.Println("type assertion failed for serviceTimeMsg") + // TODO: add error handling + } + + stNode.queryNodeTime.updateSearchServiceTime(serviceTimeMsg.timeRange) + return nil } func newServiceTimeNode() *serviceTimeNode { diff --git a/internal/util/flowgraph/flow_graph.go b/internal/util/flowgraph/flow_graph.go index 57c87ab4b0..6b64f75fc6 100644 --- a/internal/util/flowgraph/flow_graph.go +++ b/internal/util/flowgraph/flow_graph.go @@ -17,9 +17,10 @@ type flowGraphStates struct { } type TimeTickedFlowGraph struct { - ctx context.Context - states *flowGraphStates - nodeCtx map[string]*nodeCtx + ctx context.Context + states *flowGraphStates + startNode *nodeCtx + nodeCtx map[string]*nodeCtx } func (fg *TimeTickedFlowGraph) AddNode(node *Node) { @@ -67,6 +68,17 @@ func (fg *TimeTickedFlowGraph) SetEdges(nodeName string, in []string, out []stri return nil } +func (fg *TimeTickedFlowGraph) SetStartNode(nodeName string) error { + startNode, ok := fg.nodeCtx[nodeName] + if !ok { + errMsg := "Cannot find node:" + nodeName + return errors.New(errMsg) + } + + fg.startNode = startNode + return nil +} + func (fg *TimeTickedFlowGraph) Start() { wg := sync.WaitGroup{} for _, v := range fg.nodeCtx { @@ -76,6 +88,11 @@ func (fg *TimeTickedFlowGraph) Start() { wg.Wait() } +func (fg *TimeTickedFlowGraph) Input(msg *Msg) { + // start node should have only 1 input channel + fg.startNode.inputChannels[0] <- msg +} + func (fg *TimeTickedFlowGraph) Close() error { for _, v := range fg.nodeCtx { v.Close()