Forward delete msg (#11210)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
godchen 2021-11-05 11:59:02 +08:00 committed by GitHub
parent 35e8779bd9
commit 9186d5527e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 155 additions and 24 deletions

View File

@ -132,7 +132,7 @@ func TestDataNode(t *testing.T) {
node2 := newIDLEDataNodeMock(ctx) node2 := newIDLEDataNodeMock(ctx)
err = node2.Start() err = node2.Start()
assert.Nil(t, err) assert.Nil(t, err)
dmChannelName := "fake-dm-channel-test-NewDataSyncService" dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-NewDataSyncService"
vchan := &datapb.VchannelInfo{ vchan := &datapb.VchannelInfo{
CollectionID: 1, CollectionID: 1,
@ -160,7 +160,7 @@ func TestDataNode(t *testing.T) {
}) })
t.Run("Test FlushSegments", func(t *testing.T) { t.Run("Test FlushSegments", func(t *testing.T) {
dmChannelName := "fake-dm-channel-test-FlushSegments" dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-FlushSegments"
node1 := newIDLEDataNodeMock(context.TODO()) node1 := newIDLEDataNodeMock(context.TODO())
err = node1.Init() err = node1.Init()
@ -368,9 +368,9 @@ func TestDataNode(t *testing.T) {
collID UniqueID collID UniqueID
dmChannelName string dmChannelName string
}{ }{
{1, "fake-dm-backgroundgc-1"}, {1, "fake-by-dev-rootcoord-dml-backgroundgc-1"},
{2, "fake-dm-backgroundgc-2"}, {2, "fake-by-dev-rootcoord-dml-backgroundgc-2"},
{3, "fake-dm-backgroundgc-3"}, {3, "fake-by-dev-rootcoord-dml-backgroundgc-3"},
{4, ""}, {4, ""},
{1, ""}, {1, ""},
} }
@ -394,7 +394,7 @@ func TestDataNode(t *testing.T) {
}) })
t.Run("Test ReleaseDataSyncService", func(t *testing.T) { t.Run("Test ReleaseDataSyncService", func(t *testing.T) {
dmChannelName := "fake-dm-channel-test-NewDataSyncService" dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-NewDataSyncService"
vchan := &datapb.VchannelInfo{ vchan := &datapb.VchannelInfo{
CollectionID: 1, CollectionID: 1,
@ -485,12 +485,12 @@ func TestWatchChannel(t *testing.T) {
t.Run("test watch channel", func(t *testing.T) { t.Run("test watch channel", func(t *testing.T) {
kv, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath) kv, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
require.NoError(t, err) require.NoError(t, err)
oldInvalidCh := "datanode-etcd-test-channel-invalid" oldInvalidCh := "datanode-etcd-test-by-dev-rootcoord-dml-channel-invalid"
path := fmt.Sprintf("%s/%d/%s", Params.ChannelWatchSubPath, node.NodeID, oldInvalidCh) path := fmt.Sprintf("%s/%d/%s", Params.ChannelWatchSubPath, node.NodeID, oldInvalidCh)
err = kv.Save(path, string([]byte{23})) err = kv.Save(path, string([]byte{23}))
assert.NoError(t, err) assert.NoError(t, err)
ch := fmt.Sprintf("datanode-etcd-test-channel_%d", rand.Int31()) ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31())
path = fmt.Sprintf("%s/%d/%s", Params.ChannelWatchSubPath, node.NodeID, ch) path = fmt.Sprintf("%s/%d/%s", Params.ChannelWatchSubPath, node.NodeID, ch)
c := make(chan struct{}) c := make(chan struct{})
go func() { go func() {

View File

@ -279,7 +279,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
return err return err
} }
var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo) var ddNode Node = newDDNode(dsService.ctx, dsService.clearSignal, dsService.collectionID, vchanInfo, dsService.msFactory)
var insertBufferNode Node var insertBufferNode Node
insertBufferNode, err = newInsertBufferNode( insertBufferNode, err = newInsertBufferNode(
dsService.ctx, dsService.ctx,

View File

@ -96,32 +96,32 @@ func TestDataSyncService_newDataSyncService(te *testing.T) {
tests := []*testInfo{ tests := []*testInfo{
{false, false, &mockMsgStreamFactory{false, true}, {false, false, &mockMsgStreamFactory{false, true},
0, "", 0, "by-dev-rootcoord-dml_test",
0, 0, "", 0, 0, 0, "", 0,
0, 0, "", 0, 0, 0, "", 0,
"SetParamsReturnError"}, "SetParamsReturnError"},
{true, false, &mockMsgStreamFactory{true, true}, {true, false, &mockMsgStreamFactory{true, true},
0, "", 0, "by-dev-rootcoord-dml_test",
1, 0, "", 0, 1, 0, "", 0,
1, 1, "", 0, 1, 1, "", 0,
"CollID 0 mismach with seginfo collID 1"}, "CollID 0 mismach with seginfo collID 1"},
{true, false, &mockMsgStreamFactory{true, true}, {true, false, &mockMsgStreamFactory{true, true},
1, "c1", 1, "by-dev-rootcoord-dml_1",
1, 0, "c2", 0, 1, 0, "by-dev-rootcoord-dml_2", 0,
1, 1, "c3", 0, 1, 1, "by-dev-rootcoord-dml_3", 0,
"chanName c1 mismach with seginfo chanName c2"}, "chanName c1 mismach with seginfo chanName c2"},
{true, false, &mockMsgStreamFactory{true, true}, {true, false, &mockMsgStreamFactory{true, true},
1, "c1", 1, "by-dev-rootcoord-dml_1",
1, 0, "c1", 0, 1, 0, "by-dev-rootcoord-dml_1", 0,
1, 1, "c2", 0, 1, 1, "by-dev-rootcoord-dml_2", 0,
"add normal segments"}, "add normal segments"},
{false, false, &mockMsgStreamFactory{true, false}, {false, false, &mockMsgStreamFactory{true, false},
0, "", 0, "by-dev-rootcoord-dml",
0, 0, "", 0, 0, 0, "", 0,
0, 0, "", 0, 0, 0, "", 0,
"error when newinsertbufernode"}, "error when newinsertbufernode"},
{false, true, &mockMsgStreamFactory{true, false}, {false, true, &mockMsgStreamFactory{true, false},
0, "", 0, "by-dev-rootcoord-dml",
0, 0, "", 0, 0, 0, "", 0,
0, 0, "", 0, 0, 0, "", 0,
"replica nil"}, "replica nil"},

View File

@ -17,6 +17,7 @@
package datanode package datanode
import ( import (
"context"
"sync" "sync"
"go.uber.org/zap" "go.uber.org/zap"
@ -26,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go"
@ -56,6 +58,8 @@ type ddNode struct {
segID2SegInfo sync.Map // segment ID to *SegmentInfo segID2SegInfo sync.Map // segment ID to *SegmentInfo
flushedSegments []*datapb.SegmentInfo flushedSegments []*datapb.SegmentInfo
deltaMsgStream msgstream.MsgStream
} }
// Name returns node name, implementing flowgraph.Node // Name returns node name, implementing flowgraph.Node
@ -95,6 +99,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
endPositions: make([]*internalpb.MsgPosition, 0), endPositions: make([]*internalpb.MsgPosition, 0),
} }
forwardMsgs := make([]msgstream.TsMsg, 0)
for _, msg := range msMsg.TsMessages() { for _, msg := range msMsg.TsMessages() {
switch msg.Type() { switch msg.Type() {
case commonpb.MsgType_DropCollection: case commonpb.MsgType_DropCollection:
@ -124,6 +129,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
fgMsg.insertMessages = append(fgMsg.insertMessages, imsg) fgMsg.insertMessages = append(fgMsg.insertMessages, imsg)
case commonpb.MsgType_Delete: case commonpb.MsgType_Delete:
log.Debug("DDNode receive delete messages") log.Debug("DDNode receive delete messages")
forwardMsgs = append(forwardMsgs, msg)
dmsg := msg.(*msgstream.DeleteMsg) dmsg := msg.(*msgstream.DeleteMsg)
if dmsg.CollectionID != ddn.collectionID { if dmsg.CollectionID != ddn.collectionID {
//log.Debug("filter invalid DeleteMsg, collection mis-match", //log.Debug("filter invalid DeleteMsg, collection mis-match",
@ -134,6 +140,11 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
fgMsg.deleteMessages = append(fgMsg.deleteMessages, dmsg) fgMsg.deleteMessages = append(fgMsg.deleteMessages, dmsg)
} }
} }
err := ddn.forwardDeleteMsg(forwardMsgs, msMsg.TimestampMin(), msMsg.TimestampMax())
if err != nil {
// TODO: proper deal with error
log.Warn("DDNode forward delete msg failed", zap.Error(err))
}
fgMsg.startPositions = append(fgMsg.startPositions, msMsg.StartPositions()...) fgMsg.startPositions = append(fgMsg.startPositions, msMsg.StartPositions()...)
fgMsg.endPositions = append(fgMsg.endPositions, msMsg.EndPositions()...) fgMsg.endPositions = append(fgMsg.endPositions, msMsg.EndPositions()...)
@ -169,7 +180,60 @@ func (ddn *ddNode) isFlushed(segmentID UniqueID) bool {
return false return false
} }
func newDDNode(clearSignal chan<- UniqueID, collID UniqueID, vchanInfo *datapb.VchannelInfo) *ddNode { func (ddn *ddNode) forwardDeleteMsg(msgs []msgstream.TsMsg, minTs Timestamp, maxTs Timestamp) error {
if err := ddn.sendDeltaTimeTick(minTs); err != nil {
return err
}
if len(msgs) != 0 {
var msgPack = msgstream.MsgPack{
Msgs: msgs,
BeginTs: minTs,
EndTs: maxTs,
}
if err := ddn.deltaMsgStream.Produce(&msgPack); err != nil {
return err
}
}
if err := ddn.sendDeltaTimeTick(maxTs); err != nil {
return err
}
return nil
}
func (ddn *ddNode) sendDeltaTimeTick(ts Timestamp) error {
msgPack := msgstream.MsgPack{}
baseMsg := msgstream.BaseMsg{
BeginTimestamp: ts,
EndTimestamp: ts,
HashValues: []uint32{0},
}
timeTickResult := internalpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
MsgID: 0,
Timestamp: ts,
SourceID: Params.NodeID,
},
}
timeTickMsg := &msgstream.TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
if err := ddn.deltaMsgStream.Produce(&msgPack); err != nil {
return err
}
return nil
}
func (ddn *ddNode) Close() {
if ddn.deltaMsgStream != nil {
ddn.deltaMsgStream.Close()
}
}
func newDDNode(ctx context.Context, clearSignal chan<- UniqueID, collID UniqueID, vchanInfo *datapb.VchannelInfo, msFactory msgstream.Factory) *ddNode {
baseNode := BaseNode{} baseNode := BaseNode{}
baseNode.SetMaxQueueLength(Params.FlowGraphMaxQueueLength) baseNode.SetMaxQueueLength(Params.FlowGraphMaxQueueLength)
baseNode.SetMaxParallelism(Params.FlowGraphMaxParallelism) baseNode.SetMaxParallelism(Params.FlowGraphMaxParallelism)
@ -181,11 +245,26 @@ func newDDNode(clearSignal chan<- UniqueID, collID UniqueID, vchanInfo *datapb.V
zap.Int("No. Segment", len(vchanInfo.GetFlushedSegments())), zap.Int("No. Segment", len(vchanInfo.GetFlushedSegments())),
) )
deltaStream, err := msFactory.NewMsgStream(ctx)
if err != nil {
return nil
}
deltaChannelName, err := rootcoord.ConvertChannelName(vchanInfo.ChannelName, Params.DmlChannelName, Params.DeltaChannelName)
if err != nil {
log.Error(err.Error())
return nil
}
deltaStream.AsProducer([]string{deltaChannelName})
log.Debug("datanode AsProducer", zap.String("DeltaChannelName", Params.SegmentStatisticsChannelName))
var deltaMsgStream msgstream.MsgStream = deltaStream
deltaMsgStream.Start()
dd := &ddNode{ dd := &ddNode{
BaseNode: baseNode, BaseNode: baseNode,
clearSignal: clearSignal, clearSignal: clearSignal,
collectionID: collID, collectionID: collID,
flushedSegments: fs, flushedSegments: fs,
deltaMsgStream: deltaMsgStream,
} }
for _, us := range vchanInfo.GetUnflushedSegments() { for _, us := range vchanInfo.GetUnflushedSegments() {

View File

@ -17,6 +17,7 @@
package datanode package datanode
import ( import (
"context"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -28,6 +29,10 @@ import (
"github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/flowgraph"
) )
type mockFactory struct {
msgstream.Factory
}
func TestFlowGraph_DDNode_newDDNode(te *testing.T) { func TestFlowGraph_DDNode_newDDNode(te *testing.T) {
tests := []struct { tests := []struct {
inCollID UniqueID inCollID UniqueID
@ -65,12 +70,15 @@ func TestFlowGraph_DDNode_newDDNode(te *testing.T) {
} }
ddNode := newDDNode( ddNode := newDDNode(
context.Background(),
make(chan UniqueID), make(chan UniqueID),
test.inCollID, test.inCollID,
&datapb.VchannelInfo{ &datapb.VchannelInfo{
FlushedSegments: fi, FlushedSegments: fi,
UnflushedSegments: []*datapb.SegmentInfo{di}, UnflushedSegments: []*datapb.SegmentInfo{di},
ChannelName: "by-dev-rootcoord-dml-test",
}, },
msgstream.NewPmsFactory(),
) )
flushedSegIDs := make([]int64, 0) flushedSegIDs := make([]int64, 0)
@ -131,9 +139,13 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) {
for _, test := range tests { for _, test := range tests {
te.Run(test.description, func(t *testing.T) { te.Run(test.description, func(t *testing.T) {
factory := msgstream.NewPmsFactory()
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(t, err)
ddn := ddNode{ ddn := ddNode{
clearSignal: test.ddnClearSignal, clearSignal: test.ddnClearSignal,
collectionID: test.ddnCollID, collectionID: test.ddnCollID,
deltaMsgStream: deltaStream,
} }
var dropCollMsg msgstream.TsMsg = &msgstream.DropCollectionMsg{ var dropCollMsg msgstream.TsMsg = &msgstream.DropCollectionMsg{
@ -186,10 +198,14 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) {
for _, test := range tests { for _, test := range tests {
te.Run(test.description, func(t *testing.T) { te.Run(test.description, func(t *testing.T) {
fs := &datapb.SegmentInfo{ID: test.ddnFlushedSegment} fs := &datapb.SegmentInfo{ID: test.ddnFlushedSegment}
factory := msgstream.NewPmsFactory()
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(t, err)
// Prepare ddNode states // Prepare ddNode states
ddn := ddNode{ ddn := ddNode{
flushedSegments: []*datapb.SegmentInfo{fs}, flushedSegments: []*datapb.SegmentInfo{fs},
collectionID: test.ddnCollID, collectionID: test.ddnCollID,
deltaMsgStream: deltaStream,
} }
FilterThreshold = test.threshold FilterThreshold = test.threshold
@ -228,9 +244,13 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) {
for _, test := range tests { for _, test := range tests {
te.Run(test.description, func(t *testing.T) { te.Run(test.description, func(t *testing.T) {
factory := msgstream.NewPmsFactory()
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(t, err)
// Prepare ddNode states // Prepare ddNode states
ddn := ddNode{ ddn := ddNode{
collectionID: test.ddnCollID, collectionID: test.ddnCollID,
deltaMsgStream: deltaStream,
} }
// Prepare delete messages // Prepare delete messages
@ -286,9 +306,13 @@ func TestFlowGraph_DDNode_filterMessages(te *testing.T) {
s := &datapb.SegmentInfo{ID: id} s := &datapb.SegmentInfo{ID: id}
fs = append(fs, s) fs = append(fs, s)
} }
factory := msgstream.NewPmsFactory()
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(t, err)
// Prepare ddNode states // Prepare ddNode states
ddn := ddNode{ ddn := ddNode{
flushedSegments: fs, flushedSegments: fs,
deltaMsgStream: deltaStream,
} }
for k, v := range test.ddnSegID2Ts { for k, v := range test.ddnSegID2Ts {
@ -346,7 +370,10 @@ func TestFlowGraph_DDNode_isFlushed(te *testing.T) {
s := &datapb.SegmentInfo{ID: id} s := &datapb.SegmentInfo{ID: id}
fs = append(fs, s) fs = append(fs, s)
} }
ddn := &ddNode{flushedSegments: fs} factory := msgstream.NewPmsFactory()
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(t, err)
ddn := &ddNode{flushedSegments: fs, deltaMsgStream: deltaStream}
assert.Equal(t, test.expectedOut, ddn.isFlushed(test.inSeg)) assert.Equal(t, test.expectedOut, ddn.isFlushed(test.inSeg))
}) })
} }

View File

@ -46,6 +46,10 @@ type ParamTable struct {
DeleteBinlogRootPath string DeleteBinlogRootPath string
Alias string // Different datanode in one machine Alias string // Different datanode in one machine
// Channel Name
DmlChannelName string
DeltaChannelName string
// Pulsar address // Pulsar address
PulsarAddress string PulsarAddress string
@ -130,6 +134,9 @@ func (p *ParamTable) Init() {
p.initMinioUseSSL() p.initMinioUseSSL()
p.initMinioBucketName() p.initMinioBucketName()
p.initDmlChannelName()
p.initDeltaChannelName()
p.initRoleName() p.initRoleName()
} }
@ -289,3 +296,21 @@ func (p *ParamTable) initMinioBucketName() {
func (p *ParamTable) initRoleName() { func (p *ParamTable) initRoleName() {
p.RoleName = "datanode" p.RoleName = "datanode"
} }
func (p *ParamTable) initDmlChannelName() {
config, err := p.Load("msgChannel.chanNamePrefix.rootCoordDml")
if err != nil {
panic(err)
}
s := []string{p.ClusterChannelPrefix, config}
p.DmlChannelName = strings.Join(s, "-")
}
func (p *ParamTable) initDeltaChannelName() {
config, err := p.Load("msgChannel.chanNamePrefix.rootCoordDelta")
if err != nil {
config = "rootcoord-delta"
}
s := []string{p.ClusterChannelPrefix, config}
p.DeltaChannelName = strings.Join(s, "-")
}