Handles DropPartitionMsg in datanode flowgraph (#18292)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2022-07-15 17:12:27 +08:00 committed by GitHub
parent cd826dc874
commit f0846fb79b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 385 additions and 28 deletions

View File

@ -134,6 +134,16 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
fgMsg.dropCollection = true fgMsg.dropCollection = true
} }
case commonpb.MsgType_DropPartition:
dpMsg := msg.(*msgstream.DropPartitionMsg)
if dpMsg.GetCollectionID() == ddn.collectionID {
log.Info("drop partition msg received",
zap.Int64("collectionID", dpMsg.GetCollectionID()),
zap.Int64("partitionID", dpMsg.GetPartitionID()),
zap.String("vChanneName", ddn.vChannelName))
fgMsg.dropPartitions = append(fgMsg.dropPartitions, dpMsg.PartitionID)
}
case commonpb.MsgType_Insert: case commonpb.MsgType_Insert:
imsg := msg.(*msgstream.InsertMsg) imsg := msg.(*msgstream.InsertMsg)
if imsg.CollectionID != ddn.collectionID { if imsg.CollectionID != ddn.collectionID {

View File

@ -161,6 +161,59 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) {
} }
}) })
t.Run("Test DDNode Operate DropPartition Msg", func(t *testing.T) {
// valid inputs
tests := []struct {
ddnCollID UniqueID
msgCollID UniqueID
msgPartID UniqueID
expectOutput []UniqueID
description string
}{
{1, 1, 101, []UniqueID{101},
"DropCollectionMsg collID == ddNode collID"},
{1, 2, 101, []UniqueID{},
"DropCollectionMsg collID != ddNode collID"},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
factory := dependency.NewDefaultFactory(true)
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(t, err)
deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc)
deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"})
ddn := ddNode{
ctx: context.Background(),
collectionID: test.ddnCollID,
deltaMsgStream: deltaStream,
vChannelName: "ddn_drop_msg",
compactionExecutor: newCompactionExecutor(),
}
var dropPartMsg msgstream.TsMsg = &msgstream.DropPartitionMsg{
DropPartitionRequest: internalpb.DropPartitionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropPartition},
CollectionID: test.msgCollID,
PartitionID: test.msgPartID,
},
}
tsMessages := []msgstream.TsMsg{dropPartMsg}
var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil)
rt := ddn.Operate([]Msg{msgStreamMsg})
assert.NotEmpty(t, rt)
fgMsg, ok := rt[0].(*flowGraphMsg)
assert.True(t, ok)
assert.ElementsMatch(t, test.expectOutput, fgMsg.dropPartitions)
})
}
})
t.Run("Test DDNode Operate and filter insert msg", func(t *testing.T) { t.Run("Test DDNode Operate and filter insert msg", func(t *testing.T) {
factory := dependency.NewDefaultFactory(true) factory := dependency.NewDefaultFactory(true)
deltaStream, err := factory.NewMsgStream(context.Background()) deltaStream, err := factory.NewMsgStream(context.Background())

View File

@ -315,6 +315,35 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
} }
} }
mergeFlushTask := func(segmentID UniqueID, setupTask func(task *flushTask)) {
// Merge auto & manual flush tasks with the same segment ID.
dup := false
for i, task := range flushTaskList {
if task.segmentID == segmentID {
log.Info("merging flush task, updating flushed flag",
zap.Int64("segment ID", segmentID))
setupTask(&flushTaskList[i])
dup = true
break
}
}
// Load buffer and create new flush task if there's no existing flush task for this segment.
if !dup {
bd, ok := ibNode.insertBuffer.Load(segmentID)
var buf *BufferData
if ok {
buf = bd.(*BufferData)
}
task := flushTask{
buffer: buf,
segmentID: segmentID,
dropped: false,
}
setupTask(&task)
flushTaskList = append(flushTaskList, task)
}
}
// Manual Flush // Manual Flush
select { select {
case fmsg := <-ibNode.flushChan: case fmsg := <-ibNode.flushChan:
@ -324,33 +353,27 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
zap.Bool("flushed", fmsg.flushed), zap.Bool("flushed", fmsg.flushed),
zap.String("v-channel name", ibNode.channelName), zap.String("v-channel name", ibNode.channelName),
) )
// Merge auto & manual flush tasks with the same segment ID. mergeFlushTask(fmsg.segmentID, func(task *flushTask) {
dup := false task.flushed = fmsg.flushed
for i, task := range flushTaskList { })
if task.segmentID == fmsg.segmentID { default:
log.Info("merging flush task, updating flushed flag", }
zap.Int64("segment ID", fmsg.segmentID),
zap.Bool("flushed", fmsg.flushed)) // process drop partition
flushTaskList[i].flushed = fmsg.flushed for _, partitionDrop := range fgMsg.dropPartitions {
dup = true segmentIDs := ibNode.replica.listPartitionSegments(partitionDrop)
break log.Info("(Drop Partition) process drop partition",
} zap.Int64("collectionID", ibNode.replica.getCollectionID()),
} zap.Int64("partitionID", partitionDrop),
// Load buffer and create new flush task if there's no existing flush task for this segment. zap.Int64s("segmentIDs", segmentIDs),
if !dup { zap.String("v-channel name", ibNode.channelName),
bd, ok := ibNode.insertBuffer.Load(fmsg.segmentID) )
var buf *BufferData for _, segID := range segmentIDs {
if ok { mergeFlushTask(segID, func(task *flushTask) {
buf = bd.(*BufferData) task.flushed = true
} task.dropped = true
flushTaskList = append(flushTaskList, flushTask{
buffer: buf,
segmentID: fmsg.segmentID,
flushed: fmsg.flushed,
dropped: false,
}) })
} }
default:
} }
} }

View File

@ -636,6 +636,238 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
}) })
} }
func TestFlowGraphInsertBufferNode_DropPartition(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
partitionID := int64(1)
testPath := "/test/datanode/root/meta"
err := clearEtcd(testPath)
require.NoError(t, err)
Params.EtcdCfg.MetaRootPath = testPath
Factory := &MetaFactory{}
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
dataFactory := NewDataFactory()
mockRootCoord := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
colRep := &SegmentReplica{
collectionID: collMeta.ID,
newSegments: make(map[UniqueID]*Segment),
normalSegments: make(map[UniqueID]*Segment),
flushedSegments: make(map[UniqueID]*Segment),
}
colRep.metaService = newMetaService(mockRootCoord, collMeta.ID)
factory := dependency.NewDefaultFactory(true)
flushPacks := []*segmentFlushPack{}
fpMut := sync.Mutex{}
wg := sync.WaitGroup{}
cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir))
defer cm.RemoveWithPrefix("")
fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, colRep, func(pack *segmentFlushPack) {
fpMut.Lock()
flushPacks = append(flushPacks, pack)
fpMut.Unlock()
colRep.listNewSegmentsStartPositions()
colRep.listSegmentsCheckPoints()
if pack.flushed || pack.dropped {
colRep.segmentFlushed(pack.segmentID)
}
wg.Done()
}, emptyFlushAndDropFunc)
flushChan := make(chan flushMsg, 100)
resendTTChan := make(chan resendTTMsg, 100)
c := &nodeConfig{
replica: colRep,
msFactory: factory,
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, resendTTChan, fm, newCache(), c)
require.NoError(t, err)
// Auto flush number of rows set to 2
inMsg := genFlowGraphInsertMsg("datanode-03-test-autoflush")
inMsg.insertMessages = dataFactory.GetMsgStreamInsertMsgs(2)
var iMsg flowgraph.Msg = &inMsg
t.Run("Only drop partition", func(t *testing.T) {
// iBNode.insertBuffer.maxSize = 2
tmp := Params.DataNodeCfg.FlushInsertBufferSize
Params.DataNodeCfg.FlushInsertBufferSize = 4 * 4
defer func() {
Params.DataNodeCfg.FlushInsertBufferSize = tmp
}()
for i := range inMsg.insertMessages {
inMsg.insertMessages[i].SegmentID = int64(i%2) + 1
inMsg.insertMessages[i].PartitionID = partitionID
}
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 100}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 123}}
type Test struct {
expectedSegID UniqueID
expectedNumOfRows int64
expectedStartPosTs Timestamp
expectedEndPosTs Timestamp
expectedCpNumOfRows int64
expectedCpPosTs Timestamp
}
beforeAutoFlushTests := []Test{
// segID, numOfRow, startTs, endTs, cp.numOfRow, cp.Ts
{1, 1, 100, 123, 0, 100},
{2, 1, 100, 123, 0, 100},
}
iBNode.Operate([]flowgraph.Msg{iMsg})
require.Equal(t, 2, len(colRep.newSegments))
require.Equal(t, 0, len(colRep.normalSegments))
assert.Equal(t, 0, len(flushPacks))
for i, test := range beforeAutoFlushTests {
colRep.segMu.Lock()
seg, ok := colRep.newSegments[UniqueID(i+1)]
colRep.segMu.Unlock()
assert.True(t, ok)
assert.Equal(t, partitionID, seg.partitionID)
assert.Equal(t, test.expectedSegID, seg.segmentID)
assert.Equal(t, test.expectedNumOfRows, seg.numRows)
assert.Equal(t, test.expectedStartPosTs, seg.startPos.GetTimestamp())
assert.Equal(t, test.expectedCpNumOfRows, seg.checkPoint.numRows)
assert.Equal(t, test.expectedCpPosTs, seg.checkPoint.pos.GetTimestamp())
}
inMsg.insertMessages = nil
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 200}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 234}}
inMsg.dropPartitions = []int64{partitionID}
iMsg = &inMsg
// Triger drop paritition
output := iBNode.Operate([]flowgraph.Msg{iMsg})
fgm := output[0].(*flowGraphMsg)
wg.Add(len(fgm.segmentsToFlush))
t.Log("segments to flush", fgm.segmentsToFlush)
for _, im := range fgm.segmentsToFlush {
// send del done signal
err = fm.flushDelData(nil, im, fgm.endPositions[0])
assert.NoError(t, err)
}
wg.Wait()
require.Equal(t, 0, len(colRep.newSegments))
require.Equal(t, 0, len(colRep.normalSegments))
require.Equal(t, 2, len(colRep.flushedSegments))
assert.Equal(t, 2, len(flushPacks))
assert.Less(t, 0, len(flushPacks[0].insertLogs))
for _, flushPack := range flushPacks {
assert.True(t, flushPack.flushed)
assert.True(t, flushPack.dropped)
}
})
t.Run("drop partition with flush", func(t *testing.T) {
tmp := Params.DataNodeCfg.FlushInsertBufferSize
Params.DataNodeCfg.FlushInsertBufferSize = 4 * 4
defer func() {
Params.DataNodeCfg.FlushInsertBufferSize = tmp
}()
fpMut.Lock()
flushPacks = flushPacks[:0]
fpMut.Unlock()
inMsg := genFlowGraphInsertMsg("datanode-03-test-autoflush")
inMsg.insertMessages = dataFactory.GetMsgStreamInsertMsgs(2)
for i := range inMsg.insertMessages {
inMsg.insertMessages[i].SegmentID = UniqueID(10 + i)
inMsg.insertMessages[i].PartitionID = partitionID
}
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 300}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 323}}
var iMsg flowgraph.Msg = &inMsg
type Test struct {
expectedSegID UniqueID
expectedNumOfRows int64
expectedStartPosTs Timestamp
expectedEndPosTs Timestamp
expectedCpNumOfRows int64
expectedCpPosTs Timestamp
}
beforeAutoFlushTests := []Test{
// segID, numOfRow, startTs, endTs, cp.numOfRow, cp.Ts
{10, 1, 300, 323, 0, 300},
{11, 1, 300, 323, 0, 300},
}
iBNode.Operate([]flowgraph.Msg{iMsg})
require.Equal(t, 2, len(colRep.newSegments))
require.Equal(t, 0, len(colRep.normalSegments))
assert.Equal(t, 0, len(flushPacks))
for _, test := range beforeAutoFlushTests {
colRep.segMu.Lock()
seg, ok := colRep.newSegments[test.expectedSegID]
colRep.segMu.Unlock()
assert.True(t, ok)
assert.Equal(t, partitionID, seg.partitionID)
assert.Equal(t, test.expectedSegID, seg.segmentID)
assert.Equal(t, test.expectedNumOfRows, seg.numRows)
assert.Equal(t, test.expectedStartPosTs, seg.startPos.GetTimestamp())
assert.Equal(t, test.expectedCpNumOfRows, seg.checkPoint.numRows)
assert.Equal(t, test.expectedCpPosTs, seg.checkPoint.pos.GetTimestamp())
}
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 400}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 434}}
inMsg.dropPartitions = []int64{partitionID}
// trigger manual flush
flushChan <- flushMsg{
segmentID: 10,
flushed: true,
}
// trigger auto flush since buffer full
output := iBNode.Operate([]flowgraph.Msg{iMsg})
fgm := output[0].(*flowGraphMsg)
wg.Add(len(fgm.segmentsToFlush))
for _, im := range fgm.segmentsToFlush {
// send del done signal
err = fm.flushDelData(nil, im, fgm.endPositions[0])
assert.NoError(t, err)
}
wg.Wait()
require.Equal(t, 0, len(colRep.newSegments))
require.Equal(t, 0, len(colRep.normalSegments))
require.Equal(t, 4, len(colRep.flushedSegments))
assert.Equal(t, 4, len(flushPacks))
for _, pack := range flushPacks {
assert.True(t, pack.flushed)
assert.True(t, pack.dropped)
}
})
}
// CompactedRootCoord has meta info compacted at ts // CompactedRootCoord has meta info compacted at ts
type CompactedRootCoord struct { type CompactedRootCoord struct {
types.RootCoord types.RootCoord

View File

@ -39,6 +39,7 @@ type flowGraphMsg struct {
//segmentsToFlush is the signal used by insertBufferNode to notify deleteNode to flush //segmentsToFlush is the signal used by insertBufferNode to notify deleteNode to flush
segmentsToFlush []UniqueID segmentsToFlush []UniqueID
dropCollection bool dropCollection bool
dropPartitions []UniqueID
} }
func (fgMsg *flowGraphMsg) TimeTick() Timestamp { func (fgMsg *flowGraphMsg) TimeTick() Timestamp {

View File

@ -56,6 +56,7 @@ type Replica interface {
listAllSegmentIDs() []UniqueID listAllSegmentIDs() []UniqueID
listNotFlushedSegmentIDs() []UniqueID listNotFlushedSegmentIDs() []UniqueID
listPartitionSegments(partID UniqueID) []UniqueID
addNewSegment(segID, collID, partitionID UniqueID, channelName string, startPos, endPos *internalpb.MsgPosition) error addNewSegment(segID, collID, partitionID UniqueID, channelName string, startPos, endPos *internalpb.MsgPosition) error
addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlog []*datapb.FieldBinlog, cp *segmentCheckPoint, recoverTs Timestamp) error addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlog []*datapb.FieldBinlog, cp *segmentCheckPoint, recoverTs Timestamp) error
filterSegments(channelName string, partitionID UniqueID) []*Segment filterSegments(channelName string, partitionID UniqueID) []*Segment
@ -93,9 +94,8 @@ type Segment struct {
endPos *internalpb.MsgPosition endPos *internalpb.MsgPosition
pkFilter *bloom.BloomFilter // bloom filter of pk inside a segment pkFilter *bloom.BloomFilter // bloom filter of pk inside a segment
// TODO silverxia, needs to change to interface to support `string` type PK minPK primaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment
minPK primaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment maxPK primaryKey // maximal pk value, same above
maxPK primaryKey // maximal pk value, same above
} }
// SegmentReplica is the data replication of persistent data in datanode. // SegmentReplica is the data replication of persistent data in datanode.
@ -824,6 +824,33 @@ func (replica *SegmentReplica) listAllSegmentIDs() []UniqueID {
return segIDs return segIDs
} }
func (replica *SegmentReplica) listPartitionSegments(partID UniqueID) []UniqueID {
replica.segMu.RLock()
defer replica.segMu.RUnlock()
var segIDs []UniqueID
for _, seg := range replica.newSegments {
if seg.partitionID == partID {
segIDs = append(segIDs, seg.segmentID)
}
}
for _, seg := range replica.normalSegments {
if seg.partitionID == partID {
segIDs = append(segIDs, seg.segmentID)
}
}
for _, seg := range replica.flushedSegments {
if seg.partitionID == partID {
segIDs = append(segIDs, seg.segmentID)
}
}
return segIDs
}
func (replica *SegmentReplica) listNotFlushedSegmentIDs() []UniqueID { func (replica *SegmentReplica) listNotFlushedSegmentIDs() []UniqueID {
replica.segMu.RLock() replica.segMu.RLock()
defer replica.segMu.RUnlock() defer replica.segMu.RUnlock()

View File

@ -593,6 +593,17 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
assert.ElementsMatch(t, []UniqueID{1, 2, 3}, ids) assert.ElementsMatch(t, []UniqueID{1, 2, 3}, ids)
}) })
t.Run("Test listPartitionSegments", func(t *testing.T) {
sr := &SegmentReplica{
newSegments: map[UniqueID]*Segment{1: {segmentID: 1, partitionID: 1}, 4: {segmentID: 4, partitionID: 2}},
normalSegments: map[UniqueID]*Segment{2: {segmentID: 2, partitionID: 1}, 5: {segmentID: 5, partitionID: 2}},
flushedSegments: map[UniqueID]*Segment{3: {segmentID: 3, partitionID: 1}, 6: {segmentID: 6, partitionID: 2}},
}
ids := sr.listPartitionSegments(1)
assert.ElementsMatch(t, []UniqueID{1, 2, 3}, ids)
})
t.Run("Test_addSegmentMinIOLoadError", func(t *testing.T) { t.Run("Test_addSegmentMinIOLoadError", func(t *testing.T) {
sr, err := newReplica(context.Background(), rc, cm, 1) sr, err := newReplica(context.Background(), rc, cm, 1)
assert.Nil(t, err) assert.Nil(t, err)