Fix flush panic after compaction (#18677)

See also: #18565

Signed-off-by: yangxuan <xuan.yang@zilliz.com>

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2022-08-17 17:08:49 +08:00 committed by GitHub
parent 0db281c4ff
commit 16dcd9718b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 269 additions and 115 deletions

View File

@ -105,26 +105,138 @@ func (dn *deleteNode) Close() {
log.Info("Flowgraph Delete Node closing")
}
func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) ([]UniqueID, error) {
log.Debug("bufferDeleteMsg", zap.Any("primary keys", msg.PrimaryKeys), zap.String("vChannelName", dn.channelName))
func (dn *deleteNode) showDelBuf(segIDs []UniqueID, ts Timestamp) {
for _, segID := range segIDs {
if v, ok := dn.delBuf.Load(segID); ok {
delDataBuf, _ := v.(*DelDataBuf)
log.Debug("delta buffer status",
zap.Uint64("timestamp", ts),
zap.Int64("segment ID", segID),
zap.Int64("entries", delDataBuf.GetEntriesNum()),
zap.String("vChannel", dn.channelName))
}
}
}
// Update delBuf for merged segments
compactedTo2From := dn.replica.listCompactedSegmentIDs()
for compactedTo, compactedFrom := range compactedTo2From {
compactToDelBuff := newDelDataBuf()
for _, segID := range compactedFrom {
value, loaded := dn.delBuf.LoadAndDelete(segID)
if loaded {
compactToDelBuff.updateFromBuf(value.(*DelDataBuf))
// Operate implementing flowgraph.Node, performs delete data process
func (dn *deleteNode) Operate(in []Msg) []Msg {
//log.Debug("deleteNode Operating")
if len(in) != 1 {
log.Error("Invalid operate message input in deleteNode", zap.Int("input length", len(in)))
return nil
}
fgMsg, ok := in[0].(*flowGraphMsg)
if !ok {
if in[0] == nil {
log.Debug("type assertion failed for flowGraphMsg because it's nil")
} else {
log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
}
return []Msg{}
}
var spans []opentracing.Span
for _, msg := range fgMsg.deleteMessages {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
spans = append(spans, sp)
msg.SetTraceCtx(ctx)
}
// update compacted segment before operation
if len(fgMsg.deleteMessages) > 0 || len(fgMsg.segmentsToFlush) > 0 {
dn.updateCompactedSegments()
}
// process delete messages
var segIDs []UniqueID
for i, msg := range fgMsg.deleteMessages {
traceID, _, _ := trace.InfoFromSpan(spans[i])
log.Info("Buffer delete request in DataNode", zap.String("traceID", traceID))
tmpSegIDs, err := dn.bufferDeleteMsg(msg, fgMsg.timeRange)
if err != nil {
// error occurs only when deleteMsg is misaligned, should not happen
err = fmt.Errorf("buffer delete msg failed, err = %s", err)
log.Error(err.Error())
panic(err)
}
segIDs = append(segIDs, tmpSegIDs...)
}
// display changed segment's status in dn.delBuf of a certain ts
if len(fgMsg.deleteMessages) != 0 {
dn.showDelBuf(segIDs, fgMsg.timeRange.timestampMax)
}
// process flush messages
if len(fgMsg.segmentsToFlush) > 0 {
log.Debug("DeleteNode receives flush message",
zap.Int64s("segIDs", fgMsg.segmentsToFlush),
zap.String("vChannelName", dn.channelName))
for _, segmentToFlush := range fgMsg.segmentsToFlush {
buf, ok := dn.delBuf.Load(segmentToFlush)
if !ok {
// no related delta data to flush, send empty buf to complete flush life-cycle
dn.flushManager.flushDelData(nil, segmentToFlush, fgMsg.endPositions[0])
} else {
err := retry.Do(dn.ctx, func() error {
return dn.flushManager.flushDelData(buf.(*DelDataBuf), segmentToFlush, fgMsg.endPositions[0])
}, flowGraphRetryOpt)
if err != nil {
err = fmt.Errorf("failed to flush delete data, err = %s", err)
log.Error(err.Error())
panic(err)
}
// remove delete buf
dn.delBuf.Delete(segmentToFlush)
}
}
dn.delBuf.Store(compactedTo, compactToDelBuff)
dn.replica.removeSegments(compactedFrom...)
log.Debug("update delBuf for merged segments",
}
// process drop collection message, delete node shall notify flush manager all data are cleared and send signal to DataSyncService cleaner
if fgMsg.dropCollection {
dn.flushManager.notifyAllFlushed()
log.Debug("DeleteNode notifies BackgroundGC to release vchannel", zap.String("vChannelName", dn.channelName))
dn.clearSignal <- dn.channelName
}
for _, sp := range spans {
sp.Finish()
}
return nil
}
// update delBuf for compacted segments
func (dn *deleteNode) updateCompactedSegments() {
compactedTo2From := dn.replica.listCompactedSegmentIDs()
for compactedTo, compactedFrom := range compactedTo2From {
var compactToDelBuff *DelDataBuf
delBuf, loaded := dn.delBuf.Load(compactedTo)
if !loaded {
compactToDelBuff = newDelDataBuf()
} else {
compactToDelBuff = delBuf.(*DelDataBuf)
}
for _, segID := range compactedFrom {
if value, loaded := dn.delBuf.LoadAndDelete(segID); loaded {
compactToDelBuff.updateFromBuf(value.(*DelDataBuf))
dn.delBuf.Store(compactedTo, compactToDelBuff)
}
}
log.Debug("update delBuf for compacted segments",
zap.Int64("compactedTo segmentID", compactedTo),
zap.Int64s("compactedFrom segmentIDs", compactedFrom),
)
dn.replica.removeSegments(compactedFrom...)
}
}
func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) ([]UniqueID, error) {
log.Debug("bufferDeleteMsg", zap.Any("primary keys", msg.PrimaryKeys), zap.String("vChannelName", dn.channelName))
primaryKeys := storage.ParseIDs2PrimaryKeys(msg.PrimaryKeys)
segIDToPks, segIDToTss := dn.filterSegmentByPK(msg.PartitionID, primaryKeys, msg.Timestamps)
@ -170,103 +282,6 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) ([
return segIDs, nil
}
func (dn *deleteNode) showDelBuf(segIDs []UniqueID, ts Timestamp) {
for _, segID := range segIDs {
if v, ok := dn.delBuf.Load(segID); ok {
delDataBuf, _ := v.(*DelDataBuf)
log.Debug("delta buffer status",
zap.Uint64("timestamp", ts),
zap.Int64("segment ID", segID),
zap.Int64("entries", delDataBuf.GetEntriesNum()),
zap.String("vChannel", dn.channelName))
}
}
}
// Operate implementing flowgraph.Node, performs delete data process
func (dn *deleteNode) Operate(in []Msg) []Msg {
//log.Debug("deleteNode Operating")
if len(in) != 1 {
log.Error("Invalid operate message input in deleteNode", zap.Int("input length", len(in)))
return nil
}
fgMsg, ok := in[0].(*flowGraphMsg)
if !ok {
if in[0] == nil {
log.Debug("type assertion failed for flowGraphMsg because it's nil")
} else {
log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
}
return []Msg{}
}
var spans []opentracing.Span
for _, msg := range fgMsg.deleteMessages {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
spans = append(spans, sp)
msg.SetTraceCtx(ctx)
}
var segIDs []UniqueID
for i, msg := range fgMsg.deleteMessages {
traceID, _, _ := trace.InfoFromSpan(spans[i])
log.Info("Buffer delete request in DataNode", zap.String("traceID", traceID))
tmpSegIDs, err := dn.bufferDeleteMsg(msg, fgMsg.timeRange)
if err != nil {
// error occurs only when deleteMsg is misaligned, should not happen
err = fmt.Errorf("buffer delete msg failed, err = %s", err)
log.Error(err.Error())
panic(err)
}
segIDs = append(segIDs, tmpSegIDs...)
}
// show changed segment's status in dn.delBuf of a certain ts
if len(fgMsg.deleteMessages) != 0 {
dn.showDelBuf(segIDs, fgMsg.timeRange.timestampMax)
}
// handle flush
if len(fgMsg.segmentsToFlush) > 0 {
log.Debug("DeleteNode receives flush message",
zap.Int64s("segIDs", fgMsg.segmentsToFlush),
zap.String("vChannelName", dn.channelName))
for _, segmentToFlush := range fgMsg.segmentsToFlush {
buf, ok := dn.delBuf.Load(segmentToFlush)
if !ok {
// no related delta data to flush, send empty buf to complete flush life-cycle
dn.flushManager.flushDelData(nil, segmentToFlush, fgMsg.endPositions[0])
} else {
err := retry.Do(dn.ctx, func() error {
return dn.flushManager.flushDelData(buf.(*DelDataBuf), segmentToFlush, fgMsg.endPositions[0])
}, flowGraphRetryOpt)
if err != nil {
err = fmt.Errorf("failed to flush delete data, err = %s", err)
log.Error(err.Error())
panic(err)
}
// remove delete buf
dn.delBuf.Delete(segmentToFlush)
}
}
}
// drop collection signal, delete node shall notify flush manager all data are cleared and send signal to DataSyncService cleaner
if fgMsg.dropCollection {
dn.flushManager.notifyAllFlushed()
log.Debug("DeleteNode notifies BackgroundGC to release vchannel", zap.String("vChannelName", dn.channelName))
dn.clearSignal <- dn.channelName
}
for _, sp := range spans {
sp.Finish()
}
return nil
}
// filterSegmentByPK returns the bloom filter check result.
// If the key may exists in the segment, returns it in map.
// If the key not exists in the segment, the segment is filter out.

View File

@ -358,6 +358,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
// send again shall trigger empty buffer flush
delNode.Operate([]flowgraph.Msg{fgMsg})
})
t.Run("Test deleteNode Operate valid with dropCollection", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
@ -426,6 +427,56 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
delNode.Operate([]flowgraph.Msg{fgMsg})
})
})
t.Run("Test issue#18565", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
chanName := "datanode-test-FlowGraphDeletenode-issue18565"
testPath := "/test/datanode/root/meta"
assert.NoError(t, clearEtcd(testPath))
Params.EtcdCfg.MetaRootPath = testPath
Params.DataNodeCfg.DeleteBinlogRootPath = testPath
replica := &SegmentReplica{
newSegments: make(map[UniqueID]*Segment),
normalSegments: make(map[UniqueID]*Segment),
flushedSegments: make(map[UniqueID]*Segment),
compactedSegments: make(map[UniqueID]*Segment),
}
c := &nodeConfig{
replica: replica,
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c)
assert.Nil(t, err)
compactedSegment := UniqueID(10020987)
replica.compactedSegments[compactedSegment] = &Segment{
segmentID: compactedSegment,
compactedTo: 100,
}
msg := genFlowGraphDeleteMsg(int64Pks, chanName)
msg.deleteMessages = []*msgstream.DeleteMsg{}
msg.segmentsToFlush = []UniqueID{compactedSegment}
delNode.delBuf.Store(compactedSegment, &DelDataBuf{delData: &DeleteData{}})
delNode.flushManager = NewRendezvousFlushManager(&allocator{}, cm, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
var fgMsg flowgraph.Msg = &msg
flowGraphRetryOpt = retry.Attempts(1)
assert.NotPanics(t, func() {
delNode.Operate([]flowgraph.Msg{fgMsg})
})
_, ok := delNode.delBuf.Load(100)
assert.False(t, ok)
_, ok = delNode.delBuf.Load(compactedSegment)
assert.False(t, ok)
})
}
func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) {
@ -468,3 +519,87 @@ func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) {
delNode.showDelBuf([]UniqueID{111, 112, 113}, 100)
}
func TestFlowGraphDeleteNode_updateCompactedSegments(t *testing.T) {
cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir))
defer cm.RemoveWithPrefix("")
fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, &mockReplica{}, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
chanName := "datanode-test-FlowGraphDeletenode-showDelBuf"
testPath := "/test/datanode/root/meta"
assert.NoError(t, clearEtcd(testPath))
Params.EtcdCfg.MetaRootPath = testPath
Params.DataNodeCfg.DeleteBinlogRootPath = testPath
replica := SegmentReplica{
newSegments: make(map[UniqueID]*Segment),
normalSegments: make(map[UniqueID]*Segment),
flushedSegments: make(map[UniqueID]*Segment),
compactedSegments: make(map[UniqueID]*Segment),
}
c := &nodeConfig{
replica: &replica,
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c)
require.NoError(t, err)
tests := []struct {
description string
segIDsInBuffer []UniqueID
compactedToIDs []UniqueID
compactedFromIDs []UniqueID
expectedSegsRemain []UniqueID
}{
{"zero segments",
[]UniqueID{}, []UniqueID{}, []UniqueID{}, []UniqueID{}},
{"segment no compaction",
[]UniqueID{100, 101}, []UniqueID{}, []UniqueID{}, []UniqueID{100, 101}},
{"segment compacted not in buffer",
[]UniqueID{100, 101}, []UniqueID{200}, []UniqueID{103}, []UniqueID{100, 101}},
{"segment compacted in buffer one",
[]UniqueID{100, 101}, []UniqueID{201}, []UniqueID{100}, []UniqueID{101, 201}},
{"segment compacted in buffer all-1",
[]UniqueID{100, 101}, []UniqueID{201, 201}, []UniqueID{100, 101}, []UniqueID{201}},
{"segment compacted in buffer all-2",
[]UniqueID{100, 101}, []UniqueID{201, 202}, []UniqueID{100, 101}, []UniqueID{201, 202}},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
for _, seg := range test.segIDsInBuffer {
delBuf := newDelDataBuf()
delNode.delBuf.Store(seg, delBuf)
}
for i, seg := range test.compactedFromIDs {
replica.compactedSegments[seg] = &Segment{
segmentID: seg,
compactedTo: test.compactedToIDs[i],
}
}
delNode.updateCompactedSegments()
for _, remain := range test.expectedSegsRemain {
_, ok := delNode.delBuf.Load(remain)
assert.True(t, ok)
}
var count int
delNode.delBuf.Range(func(key, value interface{}) bool {
count++
return true
})
assert.Equal(t, len(test.expectedSegsRemain), count)
})
}
}

View File

@ -41,12 +41,16 @@ const (
maxBloomFalsePositive float64 = 0.005
)
type primaryKey = storage.PrimaryKey
type int64PrimaryKey = storage.Int64PrimaryKey
type varCharPrimaryKey = storage.VarCharPrimaryKey
type (
primaryKey = storage.PrimaryKey
int64PrimaryKey = storage.Int64PrimaryKey
varCharPrimaryKey = storage.VarCharPrimaryKey
)
var newInt64PrimaryKey = storage.NewInt64PrimaryKey
var newVarCharPrimaryKey = storage.NewVarCharPrimaryKey
var (
newInt64PrimaryKey = storage.NewInt64PrimaryKey
newVarCharPrimaryKey = storage.NewVarCharPrimaryKey
)
// Replica is DataNode unique replication
type Replica interface {