diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 858fcd0c8a..371c7eb049 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/masterpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" ) @@ -249,8 +250,27 @@ func (node *DataNode) ReleaseDataSyncService(vchanName string) { log.Debug("Release flowgraph resources end", zap.String("Vchannel", vchanName)) } +var FilterThreshold Timestamp + // Start will update DataNode state to HEALTHY func (node *DataNode) Start() error { + + rep, err := node.masterService.AllocTimestamp(node.ctx, &masterpb.AllocTimestampRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_RequestTSO, + MsgID: 0, + Timestamp: 0, + SourceID: node.NodeID, + }, + Count: 1, + }) + + if rep.Status.ErrorCode != commonpb.ErrorCode_Success || err != nil { + return errors.New("DataNode fail to start") + } + + FilterThreshold = rep.GetTimestamp() + go node.BackGroundGC(node.clearSignal) node.UpdateStateCode(internalpb.StateCode_Healthy) return nil @@ -277,8 +297,8 @@ func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmCha return status, errors.New(status.GetReason()) default: - for _, chanPair := range in.GetVchannels() { - node.NewDataSyncService(chanPair) + for _, chanInfo := range in.GetVchannels() { + node.NewDataSyncService(chanInfo) } status.ErrorCode = commonpb.ErrorCode_Success diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index b8599ba59f..9fd6db3bb5 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -135,9 +135,13 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) { } return nil } - - var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.msFactory, vchanInfo.GetChannelName(), vchanInfo.GetCheckPoints()) - var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID) + var dmStreamNode Node = newDmInputNode( + dsService.ctx, + dsService.msFactory, + vchanInfo.GetChannelName(), + vchanInfo.GetSeekPosition(), + ) + var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo) var insertBufferNode Node = newInsertBufferNode( dsService.ctx, dsService.replica, diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 1f48ccfb3d..156717a062 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -12,11 +12,14 @@ package datanode import ( + "sync" + "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/flowgraph" ) @@ -26,6 +29,10 @@ type ddNode struct { clearSignal chan<- UniqueID collectionID UniqueID + + mu sync.RWMutex + seg2cp map[UniqueID]*datapb.CheckPoint // Segment ID + vchanInfo *datapb.VchannelInfo } func (ddn *ddNode) Name() string { @@ -70,10 +77,14 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { log.Info("Destroying current flowgraph") } case commonpb.MsgType_Insert: - resMsg := ddn.filterFlushedSegmentInsertMessages(msg.(*msgstream.InsertMsg)) - if resMsg != nil { - iMsg.insertMessages = append(iMsg.insertMessages, resMsg) + if msg.EndTs() < FilterThreshold { + resMsg := ddn.filterFlushedSegmentInsertMessages(msg.(*msgstream.InsertMsg)) + if resMsg != nil { + iMsg.insertMessages = append(iMsg.insertMessages, resMsg) + } } + + iMsg.insertMessages = append(iMsg.insertMessages, msg.(*msgstream.InsertMsg)) } } @@ -86,17 +97,49 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } func (ddn *ddNode) filterFlushedSegmentInsertMessages(msg *msgstream.InsertMsg) *msgstream.InsertMsg { - // TODO fileter insert messages of flushed segments + ddn.mu.Lock() + defer ddn.mu.Unlock() + + if ddn.isFlushed(msg.GetSegmentID()) { + return nil + } + + if cp, ok := ddn.seg2cp[msg.GetSegmentID()]; ok { + if msg.EndTs() > cp.GetPosition().GetTimestamp() { + return nil + } + delete(ddn.seg2cp, msg.GetSegmentID()) + } + return msg } -func newDDNode(clearSignal chan<- UniqueID, collID UniqueID) *ddNode { +func (ddn *ddNode) isFlushed(segmentID UniqueID) bool { + ddn.mu.Lock() + defer ddn.mu.Unlock() + + for _, id := range ddn.vchanInfo.GetFlushedSegments() { + if id == segmentID { + return true + } + } + return false +} + +func newDDNode(clearSignal chan<- UniqueID, collID UniqueID, vchanInfo *datapb.VchannelInfo) *ddNode { baseNode := BaseNode{} baseNode.SetMaxParallelism(Params.FlowGraphMaxQueueLength) + cp := make(map[UniqueID]*datapb.CheckPoint) + for _, c := range vchanInfo.GetCheckPoints() { + cp[c.GetSegmentID()] = c + } + return &ddNode{ BaseNode: baseNode, clearSignal: clearSignal, collectionID: collID, + seg2cp: cp, + vchanInfo: vchanInfo, } } diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index 5e5ae232ca..b40ee1012b 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -16,19 +16,22 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" - "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/flowgraph" ) -func newDmInputNode(ctx context.Context, factory msgstream.Factory, vchannelName string, checkPoints []*datapb.CheckPoint) *flowgraph.InputNode { +func newDmInputNode(ctx context.Context, factory msgstream.Factory, vchannelName string, seekPos *internalpb.MsgPosition) *flowgraph.InputNode { // TODO seek maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism - consumeSubName := Params.MsgChannelSubName + // consumeSubName := Params.MsgChannelSubName insertStream, _ := factory.NewTtMsgStream(ctx) - insertStream.AsConsumer([]string{vchannelName}, consumeSubName) - log.Debug("datanode AsConsumer: " + vchannelName + " : " + consumeSubName) + insertStream.Seek([]*internalpb.MsgPosition{seekPos}) + + // insertStream.AsConsumer([]string{vchannelName}, consumeSubName) + // log.Debug("datanode AsConsumer: " + vchannelName + " : " + consumeSubName) + log.Debug("datanode Seek: " + vchannelName) var stream msgstream.MsgStream = insertStream node := flowgraph.NewInputNode(&stream, "dmInputNode", maxQueueLength, maxParallelism) diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index cd8babb1ea..47e5fd3509 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -523,6 +523,14 @@ func (m *MasterServiceFactory) AllocID(ctx context.Context, in *masterpb.AllocID return resp, nil } +func (m *MasterServiceFactory) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRequest) (*masterpb.AllocTimestampResponse, error) { + resp := &masterpb.AllocTimestampResponse{ + Status: &commonpb.Status{}, + Timestamp: 1000, + } + return resp, nil +} + func (m *MasterServiceFactory) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) { resp := &milvuspb.ShowCollectionsResponse{ Status: &commonpb.Status{},