From f0846fb79ba5bbfc0a5360b4f2867ebf9a1a3fc4 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 15 Jul 2022 17:12:27 +0800 Subject: [PATCH] Handles DropPartitionMsg in datanode flowgraph (#18292) Signed-off-by: Congqi Xia --- internal/datanode/flow_graph_dd_node.go | 10 + internal/datanode/flow_graph_dd_node_test.go | 53 ++++ .../datanode/flow_graph_insert_buffer_node.go | 73 ++++-- .../flow_graph_insert_buffer_node_test.go | 232 ++++++++++++++++++ internal/datanode/flow_graph_message.go | 1 + internal/datanode/segment_replica.go | 33 ++- internal/datanode/segment_replica_test.go | 11 + 7 files changed, 385 insertions(+), 28 deletions(-) diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 3484b483ca..90a2381312 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -134,6 +134,16 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { fgMsg.dropCollection = true } + case commonpb.MsgType_DropPartition: + dpMsg := msg.(*msgstream.DropPartitionMsg) + if dpMsg.GetCollectionID() == ddn.collectionID { + log.Info("drop partition msg received", + zap.Int64("collectionID", dpMsg.GetCollectionID()), + zap.Int64("partitionID", dpMsg.GetPartitionID()), + zap.String("vChanneName", ddn.vChannelName)) + fgMsg.dropPartitions = append(fgMsg.dropPartitions, dpMsg.PartitionID) + } + case commonpb.MsgType_Insert: imsg := msg.(*msgstream.InsertMsg) if imsg.CollectionID != ddn.collectionID { diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index 3e90dd2f19..8b02b9fe1d 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -161,6 +161,59 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) { } }) + t.Run("Test DDNode Operate DropPartition Msg", func(t *testing.T) { + // valid inputs + tests := []struct { + ddnCollID UniqueID + + msgCollID UniqueID + msgPartID UniqueID + expectOutput []UniqueID + + description string + }{ + {1, 1, 101, []UniqueID{101}, + "DropCollectionMsg collID == ddNode collID"}, + {1, 2, 101, []UniqueID{}, + "DropCollectionMsg collID != ddNode collID"}, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + factory := dependency.NewDefaultFactory(true) + deltaStream, err := factory.NewMsgStream(context.Background()) + assert.Nil(t, err) + deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc) + deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"}) + ddn := ddNode{ + ctx: context.Background(), + collectionID: test.ddnCollID, + deltaMsgStream: deltaStream, + vChannelName: "ddn_drop_msg", + compactionExecutor: newCompactionExecutor(), + } + + var dropPartMsg msgstream.TsMsg = &msgstream.DropPartitionMsg{ + DropPartitionRequest: internalpb.DropPartitionRequest{ + Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropPartition}, + CollectionID: test.msgCollID, + PartitionID: test.msgPartID, + }, + } + tsMessages := []msgstream.TsMsg{dropPartMsg} + var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil) + + rt := ddn.Operate([]Msg{msgStreamMsg}) + + assert.NotEmpty(t, rt) + fgMsg, ok := rt[0].(*flowGraphMsg) + assert.True(t, ok) + assert.ElementsMatch(t, test.expectOutput, fgMsg.dropPartitions) + + }) + } + }) + t.Run("Test DDNode Operate and filter insert msg", func(t *testing.T) { factory := dependency.NewDefaultFactory(true) deltaStream, err := factory.NewMsgStream(context.Background()) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 8f10db1ba8..5f99a08431 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -315,6 +315,35 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { } } + mergeFlushTask := func(segmentID UniqueID, setupTask func(task *flushTask)) { + // Merge auto & manual flush tasks with the same segment ID. + dup := false + for i, task := range flushTaskList { + if task.segmentID == segmentID { + log.Info("merging flush task, updating flushed flag", + zap.Int64("segment ID", segmentID)) + setupTask(&flushTaskList[i]) + dup = true + break + } + } + // Load buffer and create new flush task if there's no existing flush task for this segment. + if !dup { + bd, ok := ibNode.insertBuffer.Load(segmentID) + var buf *BufferData + if ok { + buf = bd.(*BufferData) + } + task := flushTask{ + buffer: buf, + segmentID: segmentID, + dropped: false, + } + setupTask(&task) + flushTaskList = append(flushTaskList, task) + } + } + // Manual Flush select { case fmsg := <-ibNode.flushChan: @@ -324,33 +353,27 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { zap.Bool("flushed", fmsg.flushed), zap.String("v-channel name", ibNode.channelName), ) - // Merge auto & manual flush tasks with the same segment ID. - dup := false - for i, task := range flushTaskList { - if task.segmentID == fmsg.segmentID { - log.Info("merging flush task, updating flushed flag", - zap.Int64("segment ID", fmsg.segmentID), - zap.Bool("flushed", fmsg.flushed)) - flushTaskList[i].flushed = fmsg.flushed - dup = true - break - } - } - // Load buffer and create new flush task if there's no existing flush task for this segment. - if !dup { - bd, ok := ibNode.insertBuffer.Load(fmsg.segmentID) - var buf *BufferData - if ok { - buf = bd.(*BufferData) - } - flushTaskList = append(flushTaskList, flushTask{ - buffer: buf, - segmentID: fmsg.segmentID, - flushed: fmsg.flushed, - dropped: false, + mergeFlushTask(fmsg.segmentID, func(task *flushTask) { + task.flushed = fmsg.flushed + }) + default: + } + + // process drop partition + for _, partitionDrop := range fgMsg.dropPartitions { + segmentIDs := ibNode.replica.listPartitionSegments(partitionDrop) + log.Info("(Drop Partition) process drop partition", + zap.Int64("collectionID", ibNode.replica.getCollectionID()), + zap.Int64("partitionID", partitionDrop), + zap.Int64s("segmentIDs", segmentIDs), + zap.String("v-channel name", ibNode.channelName), + ) + for _, segID := range segmentIDs { + mergeFlushTask(segID, func(task *flushTask) { + task.flushed = true + task.dropped = true }) } - default: } } diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 1d9d3e4895..b79c1823cb 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -636,6 +636,238 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { }) } +func TestFlowGraphInsertBufferNode_DropPartition(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + partitionID := int64(1) + + testPath := "/test/datanode/root/meta" + err := clearEtcd(testPath) + require.NoError(t, err) + Params.EtcdCfg.MetaRootPath = testPath + + Factory := &MetaFactory{} + collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64) + dataFactory := NewDataFactory() + + mockRootCoord := &RootCoordFactory{ + pkType: schemapb.DataType_Int64, + } + + colRep := &SegmentReplica{ + collectionID: collMeta.ID, + newSegments: make(map[UniqueID]*Segment), + normalSegments: make(map[UniqueID]*Segment), + flushedSegments: make(map[UniqueID]*Segment), + } + + colRep.metaService = newMetaService(mockRootCoord, collMeta.ID) + + factory := dependency.NewDefaultFactory(true) + + flushPacks := []*segmentFlushPack{} + fpMut := sync.Mutex{} + wg := sync.WaitGroup{} + + cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir)) + defer cm.RemoveWithPrefix("") + fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, colRep, func(pack *segmentFlushPack) { + fpMut.Lock() + flushPacks = append(flushPacks, pack) + fpMut.Unlock() + colRep.listNewSegmentsStartPositions() + colRep.listSegmentsCheckPoints() + if pack.flushed || pack.dropped { + colRep.segmentFlushed(pack.segmentID) + } + wg.Done() + }, emptyFlushAndDropFunc) + + flushChan := make(chan flushMsg, 100) + resendTTChan := make(chan resendTTMsg, 100) + c := &nodeConfig{ + replica: colRep, + msFactory: factory, + allocator: NewAllocatorFactory(), + vChannelName: "string", + } + iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, resendTTChan, fm, newCache(), c) + require.NoError(t, err) + + // Auto flush number of rows set to 2 + + inMsg := genFlowGraphInsertMsg("datanode-03-test-autoflush") + inMsg.insertMessages = dataFactory.GetMsgStreamInsertMsgs(2) + var iMsg flowgraph.Msg = &inMsg + + t.Run("Only drop partition", func(t *testing.T) { + // iBNode.insertBuffer.maxSize = 2 + tmp := Params.DataNodeCfg.FlushInsertBufferSize + Params.DataNodeCfg.FlushInsertBufferSize = 4 * 4 + defer func() { + Params.DataNodeCfg.FlushInsertBufferSize = tmp + }() + + for i := range inMsg.insertMessages { + inMsg.insertMessages[i].SegmentID = int64(i%2) + 1 + inMsg.insertMessages[i].PartitionID = partitionID + } + inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 100}} + inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 123}} + + type Test struct { + expectedSegID UniqueID + expectedNumOfRows int64 + expectedStartPosTs Timestamp + expectedEndPosTs Timestamp + expectedCpNumOfRows int64 + expectedCpPosTs Timestamp + } + + beforeAutoFlushTests := []Test{ + // segID, numOfRow, startTs, endTs, cp.numOfRow, cp.Ts + {1, 1, 100, 123, 0, 100}, + {2, 1, 100, 123, 0, 100}, + } + iBNode.Operate([]flowgraph.Msg{iMsg}) + + require.Equal(t, 2, len(colRep.newSegments)) + require.Equal(t, 0, len(colRep.normalSegments)) + assert.Equal(t, 0, len(flushPacks)) + + for i, test := range beforeAutoFlushTests { + colRep.segMu.Lock() + seg, ok := colRep.newSegments[UniqueID(i+1)] + colRep.segMu.Unlock() + assert.True(t, ok) + assert.Equal(t, partitionID, seg.partitionID) + assert.Equal(t, test.expectedSegID, seg.segmentID) + assert.Equal(t, test.expectedNumOfRows, seg.numRows) + assert.Equal(t, test.expectedStartPosTs, seg.startPos.GetTimestamp()) + assert.Equal(t, test.expectedCpNumOfRows, seg.checkPoint.numRows) + assert.Equal(t, test.expectedCpPosTs, seg.checkPoint.pos.GetTimestamp()) + } + + inMsg.insertMessages = nil + inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 200}} + inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 234}} + inMsg.dropPartitions = []int64{partitionID} + iMsg = &inMsg + + // Triger drop paritition + output := iBNode.Operate([]flowgraph.Msg{iMsg}) + fgm := output[0].(*flowGraphMsg) + wg.Add(len(fgm.segmentsToFlush)) + t.Log("segments to flush", fgm.segmentsToFlush) + + for _, im := range fgm.segmentsToFlush { + // send del done signal + err = fm.flushDelData(nil, im, fgm.endPositions[0]) + assert.NoError(t, err) + } + wg.Wait() + require.Equal(t, 0, len(colRep.newSegments)) + require.Equal(t, 0, len(colRep.normalSegments)) + require.Equal(t, 2, len(colRep.flushedSegments)) + + assert.Equal(t, 2, len(flushPacks)) + assert.Less(t, 0, len(flushPacks[0].insertLogs)) + for _, flushPack := range flushPacks { + assert.True(t, flushPack.flushed) + assert.True(t, flushPack.dropped) + } + + }) + t.Run("drop partition with flush", func(t *testing.T) { + + tmp := Params.DataNodeCfg.FlushInsertBufferSize + Params.DataNodeCfg.FlushInsertBufferSize = 4 * 4 + defer func() { + Params.DataNodeCfg.FlushInsertBufferSize = tmp + }() + + fpMut.Lock() + flushPacks = flushPacks[:0] + fpMut.Unlock() + + inMsg := genFlowGraphInsertMsg("datanode-03-test-autoflush") + inMsg.insertMessages = dataFactory.GetMsgStreamInsertMsgs(2) + + for i := range inMsg.insertMessages { + inMsg.insertMessages[i].SegmentID = UniqueID(10 + i) + inMsg.insertMessages[i].PartitionID = partitionID + } + inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 300}} + inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 323}} + var iMsg flowgraph.Msg = &inMsg + + type Test struct { + expectedSegID UniqueID + expectedNumOfRows int64 + expectedStartPosTs Timestamp + expectedEndPosTs Timestamp + expectedCpNumOfRows int64 + expectedCpPosTs Timestamp + } + + beforeAutoFlushTests := []Test{ + // segID, numOfRow, startTs, endTs, cp.numOfRow, cp.Ts + {10, 1, 300, 323, 0, 300}, + {11, 1, 300, 323, 0, 300}, + } + iBNode.Operate([]flowgraph.Msg{iMsg}) + + require.Equal(t, 2, len(colRep.newSegments)) + require.Equal(t, 0, len(colRep.normalSegments)) + assert.Equal(t, 0, len(flushPacks)) + + for _, test := range beforeAutoFlushTests { + colRep.segMu.Lock() + seg, ok := colRep.newSegments[test.expectedSegID] + colRep.segMu.Unlock() + assert.True(t, ok) + assert.Equal(t, partitionID, seg.partitionID) + assert.Equal(t, test.expectedSegID, seg.segmentID) + assert.Equal(t, test.expectedNumOfRows, seg.numRows) + assert.Equal(t, test.expectedStartPosTs, seg.startPos.GetTimestamp()) + assert.Equal(t, test.expectedCpNumOfRows, seg.checkPoint.numRows) + assert.Equal(t, test.expectedCpPosTs, seg.checkPoint.pos.GetTimestamp()) + } + + inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 400}} + inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 434}} + inMsg.dropPartitions = []int64{partitionID} + + // trigger manual flush + flushChan <- flushMsg{ + segmentID: 10, + flushed: true, + } + + // trigger auto flush since buffer full + output := iBNode.Operate([]flowgraph.Msg{iMsg}) + fgm := output[0].(*flowGraphMsg) + wg.Add(len(fgm.segmentsToFlush)) + for _, im := range fgm.segmentsToFlush { + // send del done signal + err = fm.flushDelData(nil, im, fgm.endPositions[0]) + assert.NoError(t, err) + } + wg.Wait() + require.Equal(t, 0, len(colRep.newSegments)) + require.Equal(t, 0, len(colRep.normalSegments)) + require.Equal(t, 4, len(colRep.flushedSegments)) + + assert.Equal(t, 4, len(flushPacks)) + for _, pack := range flushPacks { + assert.True(t, pack.flushed) + assert.True(t, pack.dropped) + } + }) + +} + // CompactedRootCoord has meta info compacted at ts type CompactedRootCoord struct { types.RootCoord diff --git a/internal/datanode/flow_graph_message.go b/internal/datanode/flow_graph_message.go index d15653334a..9b768440b0 100644 --- a/internal/datanode/flow_graph_message.go +++ b/internal/datanode/flow_graph_message.go @@ -39,6 +39,7 @@ type flowGraphMsg struct { //segmentsToFlush is the signal used by insertBufferNode to notify deleteNode to flush segmentsToFlush []UniqueID dropCollection bool + dropPartitions []UniqueID } func (fgMsg *flowGraphMsg) TimeTick() Timestamp { diff --git a/internal/datanode/segment_replica.go b/internal/datanode/segment_replica.go index a9f8ce7149..d02db69fad 100644 --- a/internal/datanode/segment_replica.go +++ b/internal/datanode/segment_replica.go @@ -56,6 +56,7 @@ type Replica interface { listAllSegmentIDs() []UniqueID listNotFlushedSegmentIDs() []UniqueID + listPartitionSegments(partID UniqueID) []UniqueID addNewSegment(segID, collID, partitionID UniqueID, channelName string, startPos, endPos *internalpb.MsgPosition) error addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlog []*datapb.FieldBinlog, cp *segmentCheckPoint, recoverTs Timestamp) error filterSegments(channelName string, partitionID UniqueID) []*Segment @@ -93,9 +94,8 @@ type Segment struct { endPos *internalpb.MsgPosition pkFilter *bloom.BloomFilter // bloom filter of pk inside a segment - // TODO silverxia, needs to change to interface to support `string` type PK - minPK primaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment - maxPK primaryKey // maximal pk value, same above + minPK primaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment + maxPK primaryKey // maximal pk value, same above } // SegmentReplica is the data replication of persistent data in datanode. @@ -824,6 +824,33 @@ func (replica *SegmentReplica) listAllSegmentIDs() []UniqueID { return segIDs } +func (replica *SegmentReplica) listPartitionSegments(partID UniqueID) []UniqueID { + replica.segMu.RLock() + defer replica.segMu.RUnlock() + + var segIDs []UniqueID + + for _, seg := range replica.newSegments { + if seg.partitionID == partID { + segIDs = append(segIDs, seg.segmentID) + } + } + + for _, seg := range replica.normalSegments { + if seg.partitionID == partID { + segIDs = append(segIDs, seg.segmentID) + } + } + + for _, seg := range replica.flushedSegments { + if seg.partitionID == partID { + segIDs = append(segIDs, seg.segmentID) + } + } + + return segIDs +} + func (replica *SegmentReplica) listNotFlushedSegmentIDs() []UniqueID { replica.segMu.RLock() defer replica.segMu.RUnlock() diff --git a/internal/datanode/segment_replica_test.go b/internal/datanode/segment_replica_test.go index bd5f8687a9..d6660a7fff 100644 --- a/internal/datanode/segment_replica_test.go +++ b/internal/datanode/segment_replica_test.go @@ -593,6 +593,17 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) { assert.ElementsMatch(t, []UniqueID{1, 2, 3}, ids) }) + t.Run("Test listPartitionSegments", func(t *testing.T) { + sr := &SegmentReplica{ + newSegments: map[UniqueID]*Segment{1: {segmentID: 1, partitionID: 1}, 4: {segmentID: 4, partitionID: 2}}, + normalSegments: map[UniqueID]*Segment{2: {segmentID: 2, partitionID: 1}, 5: {segmentID: 5, partitionID: 2}}, + flushedSegments: map[UniqueID]*Segment{3: {segmentID: 3, partitionID: 1}, 6: {segmentID: 6, partitionID: 2}}, + } + + ids := sr.listPartitionSegments(1) + assert.ElementsMatch(t, []UniqueID{1, 2, 3}, ids) + }) + t.Run("Test_addSegmentMinIOLoadError", func(t *testing.T) { sr, err := newReplica(context.Background(), rc, cm, 1) assert.Nil(t, err)