From 8f9e62fa187f1159d76d0c7fcfc601854429f0be Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 2 Dec 2021 16:39:33 +0800 Subject: [PATCH] Apply DropVirtualChannel and FlushManager drop mode (#12563) Signed-off-by: Congqi Xia --- internal/datacoord/channel_manager.go | 2 + internal/datacoord/handler.go | 33 ++- internal/datacoord/meta.go | 14 +- internal/datacoord/mock_test.go | 2 + internal/datacoord/server_test.go | 261 +++++++++--------- internal/datacoord/services.go | 18 +- internal/datanode/compaction_executor.go | 11 +- internal/datanode/compaction_executor_test.go | 75 ++++- internal/datanode/compactor.go | 10 + internal/datanode/data_node.go | 19 +- internal/datanode/data_sync_service.go | 6 +- internal/datanode/data_sync_service_test.go | 3 +- internal/datanode/flow_graph_dd_node.go | 26 +- internal/datanode/flow_graph_dd_node_test.go | 7 +- internal/datanode/flow_graph_delete_node.go | 1 + .../datanode/flow_graph_delete_node_test.go | 35 +++ .../datanode/flow_graph_insert_buffer_node.go | 4 + .../flow_graph_insert_buffer_node_test.go | 8 +- internal/datanode/flush_manager.go | 10 + internal/datanode/flush_manager_test.go | 135 ++++++--- 20 files changed, 466 insertions(+), 214 deletions(-) diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 97b1c3097a..f61fe13549 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -131,7 +131,9 @@ func (c *ChannelManager) unwatchDroppedChannels() { err := c.remove(nodeChannel.NodeID, ch) if err != nil { log.Warn("unable to remove channel", zap.String("channel", ch.Name), zap.Error(err)) + continue } + c.h.FinishDropChannel(ch.Name) } } } diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 77d7c8724c..e30a3df1ac 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -16,6 +16,7 @@ type Handler interface { // GetVChanPositions gets the information recovery needed of a channel GetVChanPositions(channel string, collectionID UniqueID, partitionID UniqueID) *datapb.VchannelInfo CheckShouldDropChannel(channel string) bool + FinishDropChannel(channel string) } // Handler is a helper of Server @@ -131,18 +132,24 @@ func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID } func (h *ServerHandler) CheckShouldDropChannel(channel string) bool { - segments := h.s.meta.GetSegmentsByChannel(channel) - for _, segment := range segments { - if segment.GetStartPosition() != nil && // filter empty segment - // FIXME: we filter compaction generated segments - // because datanode may not know the segment due to the network lag or - // datacoord crash when handling CompleteCompaction. - // FIXME: cancel this limitation for #12265 - // need to change a unified DropAndFlush to solve the root problem - //len(segment.CompactionFrom) == 0 && - segment.GetState() != commonpb.SegmentState_Dropped { - return false + /* + segments := h.s.meta.GetSegmentsByChannel(channel) + for _, segment := range segments { + if segment.GetStartPosition() != nil && // filter empty segment + // FIXME: we filter compaction generated segments + // because datanode may not know the segment due to the network lag or + // datacoord crash when handling CompleteCompaction. + // FIXME: cancel this limitation for #12265 + // need to change a unified DropAndFlush to solve the root problem + //len(segment.CompactionFrom) == 0 && + segment.GetState() != commonpb.SegmentState_Dropped { + return false + } } - } - return true + return false*/ + return h.s.meta.ChannelHasRemoveFlag(channel) +} + +func (h *ServerHandler) FinishDropChannel(channel string) { + h.s.meta.FinishRemoveChannel(channel) } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 0267ace715..fc6061dba1 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -38,6 +38,8 @@ const ( segmentPrefix = metaPrefix + "/s" channelRemovePrefix = metaPrefix + "/channel-removal" handoffSegmentPrefix = "querycoord-handoff" + + removeFlagTomestone = "removed" ) type meta struct { @@ -460,7 +462,7 @@ func (m *meta) saveDropSegmentAndRemove(channel string, modSegments map[int64]*S // add removal flag into meta, preventing non-atomic removal channel failure removalFlag := buildChannelRemovePath(channel) - kv[removalFlag] = "" + kv[removalFlag] = removeFlagTomestone } err := m.saveKvTxn(kv) @@ -482,6 +484,16 @@ func (m *meta) FinishRemoveChannel(channel string) error { return m.client.Remove(key) } +// ChannelHasRemoveFlag +func (m *meta) ChannelHasRemoveFlag(channel string) bool { + key := buildChannelRemovePath(channel) + v, err := m.client.Load(key) + if err != nil || v != removeFlagTomestone { + return false + } + return true +} + // ListSegmentFiles lists all segments' logs func (m *meta) ListSegmentFiles() []string { m.RLock() diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 03ae902a55..ed50c85041 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -602,3 +602,5 @@ func (h *mockHandler) GetVChanPositions(channel string, collectionID UniqueID, p func (h *mockHandler) CheckShouldDropChannel(channel string) bool { return false } + +func (h *mockHandler) FinishDropChannel(channel string) {} diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 0763c0a952..bf85569e8d 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -891,35 +891,35 @@ func TestSaveBinlogPaths(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode()) assert.Equal(t, serverNotServingErrMsg, resp.GetReason()) }) + /* + t.Run("test save dropped segment and remove channel", func(t *testing.T) { + spyCh := make(chan struct{}, 1) + svr := newTestServer(t, nil, SetSegmentManager(&spySegmentManager{spyCh: spyCh})) + defer closeTestServer(t, svr) - t.Run("test save dropped segment and remove channel", func(t *testing.T) { - spyCh := make(chan struct{}, 1) - svr := newTestServer(t, nil, SetSegmentManager(&spySegmentManager{spyCh: spyCh})) - defer closeTestServer(t, svr) + svr.meta.AddCollection(&datapb.CollectionInfo{ID: 1}) + err := svr.meta.AddSegment(&SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 1, + InsertChannel: "ch1", + State: commonpb.SegmentState_Growing, + }, + }) + assert.Nil(t, err) - svr.meta.AddCollection(&datapb.CollectionInfo{ID: 1}) - err := svr.meta.AddSegment(&SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ - ID: 1, - CollectionID: 1, - InsertChannel: "ch1", - State: commonpb.SegmentState_Growing, - }, - }) - assert.Nil(t, err) + err = svr.channelManager.AddNode(0) + assert.Nil(t, err) + err = svr.channelManager.Watch(&channel{"ch1", 1}) + assert.Nil(t, err) - err = svr.channelManager.AddNode(0) - assert.Nil(t, err) - err = svr.channelManager.Watch(&channel{"ch1", 1}) - assert.Nil(t, err) - - _, err = svr.SaveBinlogPaths(context.TODO(), &datapb.SaveBinlogPathsRequest{ - SegmentID: 1, - Dropped: true, - }) - assert.Nil(t, err) - <-spyCh - }) + _, err = svr.SaveBinlogPaths(context.TODO(), &datapb.SaveBinlogPathsRequest{ + SegmentID: 1, + Dropped: true, + }) + assert.Nil(t, err) + <-spyCh + })*/ } func TestDropVirtualChannel(t *testing.T) { @@ -1410,111 +1410,127 @@ func TestShouldDropChannel(t *testing.T) { }, }, }) + /* + s1 := &datapb.SegmentInfo{ + ID: 1, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + StartPosition: &internalpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{8, 9, 10}, + MsgGroup: "", + Timestamp: 0, + }, + DmlPosition: &internalpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 0, + }, + } + s2 := &datapb.SegmentInfo{ + ID: 2, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + CompactionFrom: []int64{4, 5}, + StartPosition: &internalpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{8, 9, 10}, + MsgGroup: "", + }, + DmlPosition: &internalpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + } + s3 := &datapb.SegmentInfo{ + ID: 3, + CollectionID: 0, + PartitionID: 1, + InsertChannel: "ch1", + State: commonpb.SegmentState_Growing, + StartPosition: &internalpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{8, 9, 10}, + MsgGroup: "", + }, + DmlPosition: &internalpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{11, 12, 13}, + MsgGroup: "", + Timestamp: 2, + }, + } + s4 := &datapb.SegmentInfo{ + ID: 4, + CollectionID: 0, + PartitionID: 1, + InsertChannel: "ch1", + State: commonpb.SegmentState_Growing, + }*/ + /* + t.Run("channel without segments", func(t *testing.T) { + r := svr.handler.CheckShouldDropChannel("ch1") + assert.True(t, r) - s1 := &datapb.SegmentInfo{ - ID: 1, - CollectionID: 0, - PartitionID: 0, - InsertChannel: "ch1", - State: commonpb.SegmentState_Dropped, - StartPosition: &internalpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{8, 9, 10}, - MsgGroup: "", - Timestamp: 0, - }, - DmlPosition: &internalpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 0, - }, - } - s2 := &datapb.SegmentInfo{ - ID: 2, - CollectionID: 0, - PartitionID: 0, - InsertChannel: "ch1", - State: commonpb.SegmentState_Flushed, - CompactionFrom: []int64{4, 5}, - StartPosition: &internalpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{8, 9, 10}, - MsgGroup: "", - }, - DmlPosition: &internalpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 1, - }, - } - s3 := &datapb.SegmentInfo{ - ID: 3, - CollectionID: 0, - PartitionID: 1, - InsertChannel: "ch1", - State: commonpb.SegmentState_Growing, - StartPosition: &internalpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{8, 9, 10}, - MsgGroup: "", - }, - DmlPosition: &internalpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{11, 12, 13}, - MsgGroup: "", - Timestamp: 2, - }, - } - s4 := &datapb.SegmentInfo{ - ID: 4, - CollectionID: 0, - PartitionID: 1, - InsertChannel: "ch1", - State: commonpb.SegmentState_Growing, - } + }) - t.Run("channel without segments", func(t *testing.T) { - r := svr.handler.CheckShouldDropChannel("ch1") - assert.True(t, r) + t.Run("channel with all dropped segments", func(t *testing.T) { + err := svr.meta.AddSegment(NewSegmentInfo(s1)) + require.NoError(t, err) + r := svr.handler.CheckShouldDropChannel("ch1") + assert.True(t, r) + }) + + t.Run("channel with all dropped segments and flushed compacted segments", func(t *testing.T) { + err := svr.meta.AddSegment(NewSegmentInfo(s2)) + require.Nil(t, err) + + r := svr.handler.CheckShouldDropChannel("ch1") + assert.False(t, r) + }) + + t.Run("channel with other state segments", func(t *testing.T) { + err := svr.meta.DropSegment(2) + require.Nil(t, err) + err = svr.meta.AddSegment(NewSegmentInfo(s3)) + require.Nil(t, err) + + r := svr.handler.CheckShouldDropChannel("ch1") + assert.False(t, r) + }) + + t.Run("channel with dropped segment and with segment without start position", func(t *testing.T) { + err := svr.meta.DropSegment(3) + require.Nil(t, err) + err = svr.meta.AddSegment(NewSegmentInfo(s4)) + require.Nil(t, err) + + r := svr.handler.CheckShouldDropChannel("ch1") + assert.True(t, r) + }) + */ + t.Run("channel name not in kv", func(t *testing.T) { + assert.False(t, svr.handler.CheckShouldDropChannel("ch99")) }) - t.Run("channel with all dropped segments", func(t *testing.T) { - err := svr.meta.AddSegment(NewSegmentInfo(s1)) + t.Run("channel in remove flag", func(t *testing.T) { + key := buildChannelRemovePath("ch1") + err := svr.meta.client.Save(key, removeFlagTomestone) require.NoError(t, err) - r := svr.handler.CheckShouldDropChannel("ch1") - assert.True(t, r) + assert.True(t, svr.handler.CheckShouldDropChannel("ch1")) }) - t.Run("channel with all dropped segments and flushed compacted segments", func(t *testing.T) { - err := svr.meta.AddSegment(NewSegmentInfo(s2)) - require.Nil(t, err) - - r := svr.handler.CheckShouldDropChannel("ch1") - assert.False(t, r) - }) - - t.Run("channel with other state segments", func(t *testing.T) { - err := svr.meta.DropSegment(2) - require.Nil(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(s3)) - require.Nil(t, err) - - r := svr.handler.CheckShouldDropChannel("ch1") - assert.False(t, r) - }) - - t.Run("channel with dropped segment and with segment without start position", func(t *testing.T) { - err := svr.meta.DropSegment(3) - require.Nil(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(s4)) - require.Nil(t, err) - - r := svr.handler.CheckShouldDropChannel("ch1") - assert.True(t, r) + t.Run("channel name not matched", func(t *testing.T) { + assert.False(t, svr.handler.CheckShouldDropChannel("ch2")) }) } @@ -1683,7 +1699,6 @@ func TestGetRecoveryInfo(t *testing.T) { assert.EqualValues(t, 1, resp.GetBinlogs()[0].GetFieldBinlogs()[0].GetFieldID()) assert.ElementsMatch(t, []string{"/binlog/file1", "/binlog/file2"}, resp.GetBinlogs()[0].GetFieldBinlogs()[0].GetBinlogs()) }) - t.Run("with dropped segments", func(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index b4fa5b6dd5..31034ce3a5 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -353,14 +353,16 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath log.Debug("flush segment with meta", zap.Int64("id", req.SegmentID), zap.Any("meta", req.GetField2BinlogPaths())) - if req.GetDropped() && s.handler.CheckShouldDropChannel(channel) { - log.Debug("remove channel", zap.String("channel", channel)) - err = s.channelManager.RemoveChannel(channel) - if err != nil { - log.Warn("failed to remove channel", zap.String("channel", channel), zap.Error(err)) - } - s.segmentManager.DropSegmentsOfChannel(ctx, channel) - } + // Drop logic handler in DropVirtualChannel + /* + if req.GetDropped() && s.handler.CheckShouldDropChannel(channel) { + log.Debug("remove channel", zap.String("channel", channel)) + err = s.channelManager.RemoveChannel(channel) + if err != nil { + log.Warn("failed to remove channel", zap.String("channel", channel), zap.Error(err)) + } + s.segmentManager.DropSegmentsOfChannel(ctx, channel) + }*/ if req.GetFlushed() { s.segmentManager.DropSegment(ctx, req.SegmentID) diff --git a/internal/datanode/compaction_executor.go b/internal/datanode/compaction_executor.go index 1dcc5a6565..0ee23255e8 100644 --- a/internal/datanode/compaction_executor.go +++ b/internal/datanode/compaction_executor.go @@ -35,6 +35,7 @@ type compactionExecutor struct { parallelCh chan struct{} executing sync.Map // planID to compactor taskCh chan compactor + dropped sync.Map // vchannel dropped } // 0.5*min(8, NumCPU/2) @@ -98,11 +99,19 @@ func (c *compactionExecutor) stopTask(planID UniqueID) { } } +func (c *compactionExecutor) channelValidateForCompaction(vChannelName string) bool { + // if vchannel marked dropped, compaction should not proceed + _, loaded := c.dropped.Load(vChannelName) + return !loaded +} + func (c *compactionExecutor) stopExecutingtaskByVChannelName(vChannelName string) { + c.dropped.Store(vChannelName, struct{}{}) c.executing.Range(func(key interface{}, value interface{}) bool { - if value.(*compactionTask).plan.GetChannel() == vChannelName { + if value.(compactor).getChannelName() == vChannelName { c.stopTask(key.(UniqueID)) } + log.Warn(value.(compactor).getChannelName()) return true }) } diff --git a/internal/datanode/compaction_executor_test.go b/internal/datanode/compaction_executor_test.go index b729b5159b..717b97567f 100644 --- a/internal/datanode/compaction_executor_test.go +++ b/internal/datanode/compaction_executor_test.go @@ -18,7 +18,10 @@ package datanode import ( "context" + "sync" "testing" + + "github.com/stretchr/testify/assert" ) func TestCompactionExecutor(t *testing.T) { @@ -64,24 +67,83 @@ func TestCompactionExecutor(t *testing.T) { } }) + t.Run("Test channel valid check", func(t *testing.T) { + tests := []struct { + expected bool + channel string + desc string + }{ + {expected: true, channel: "ch1", desc: "no in dropped"}, + {expected: false, channel: "ch2", desc: "in dropped"}, + } + ex := newCompactionExecutor() + ex.stopExecutingtaskByVChannelName("ch2") + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + assert.Equal(t, test.expected, ex.channelValidateForCompaction(test.channel)) + }) + } + }) + + t.Run("test stop vchannel tasks", func(t *testing.T) { + ex := newCompactionExecutor() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go ex.start(ctx) + mc := newMockCompactor(true) + mc.alwaysWorking = true + + ex.execute(mc) + + // wait for task enqueued + found := false + for !found { + ex.executing.Range(func(key, value interface{}) bool { + found = true + return true + }) + } + + ex.stopExecutingtaskByVChannelName("mock") + + select { + case <-mc.ctx.Done(): + default: + t.FailNow() + } + }) + } func newMockCompactor(isvalid bool) *mockCompactor { - return &mockCompactor{isvalid: isvalid} + ctx, cancel := context.WithCancel(context.TODO()) + return &mockCompactor{ + ctx: ctx, + cancel: cancel, + isvalid: isvalid, + } } type mockCompactor struct { - ctx context.Context - cancel context.CancelFunc - isvalid bool + sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + isvalid bool + alwaysWorking bool } var _ compactor = (*mockCompactor)(nil) func (mc *mockCompactor) compact() error { + mc.Add(1) + defer mc.Done() if !mc.isvalid { return errStart } + if mc.alwaysWorking { + <-mc.ctx.Done() + return mc.ctx.Err() + } return nil } @@ -92,9 +154,14 @@ func (mc *mockCompactor) getPlanID() UniqueID { func (mc *mockCompactor) stop() { if mc.cancel != nil { mc.cancel() + mc.Wait() } } func (mc *mockCompactor) getCollection() UniqueID { return 1 } + +func (mc *mockCompactor) getChannelName() string { + return "mock" +} diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index b179e37917..e6f05cafaa 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -52,6 +52,7 @@ type compactor interface { stop() getPlanID() UniqueID getCollection() UniqueID + getChannelName() string } // make sure compactionTask implements compactor interface @@ -70,6 +71,8 @@ type compactionTask struct { ctx context.Context cancel context.CancelFunc + + wg sync.WaitGroup } // check if compactionTask implements compactor @@ -102,12 +105,17 @@ func newCompactionTask( func (t *compactionTask) stop() { t.cancel() + t.wg.Wait() } func (t *compactionTask) getPlanID() UniqueID { return t.plan.GetPlanID() } +func (t *compactionTask) getChannelName() string { + return t.plan.GetChannel() +} + func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (map[UniqueID]Timestamp, *DelDataBuf, error) { dCodec := storage.NewDeleteCodec() @@ -257,6 +265,8 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, } func (t *compactionTask) compact() error { + t.wg.Add(1) + defer t.wg.Done() ctxTimeout, cancelAll := context.WithTimeout(t.ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second) defer cancelAll() diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index a9263d0c30..f7b656aa7f 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -274,7 +274,9 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) { case clientv3.EventTypeDelete: // guaranteed there is no "/" in channel name parts := strings.Split(string(evt.Kv.Key), "/") - node.ReleaseDataSyncService(parts[len(parts)-1]) + vchanName := parts[len(parts)-1] + log.Warn("handle channel delete event", zap.Int64("node id", Params.NodeID), zap.String("vchannel", vchanName)) + node.ReleaseDataSyncService(vchanName) } } @@ -333,7 +335,7 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error { flushCh := make(chan flushMsg, 100) - dataSyncService, err := newDataSyncService(node.ctx, flushCh, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataCoord, node.segmentCache, node.blobKv) + dataSyncService, err := newDataSyncService(node.ctx, flushCh, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataCoord, node.segmentCache, node.blobKv, node.compactionExecutor) if err != nil { return err } @@ -357,7 +359,6 @@ func (node *DataNode) BackGroundGC(vChannelCh <-chan string) { select { case vChan := <-vChannelCh: log.Info("GC flowgraph", zap.String("vChan", vChan)) - node.stopCompactionOfVChannel(vChan) node.ReleaseDataSyncService(vChan) case <-node.ctx.Done(): log.Info("DataNode ctx done") @@ -741,12 +742,6 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe }, nil } -func (node *DataNode) stopCompactionOfVChannel(vChan string) { - log.Debug("Stop compaction of vChannel", zap.String("vChannelName", vChan)) - - node.compactionExecutor.stopExecutingtaskByVChannelName(vChan) -} - func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) { status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -759,6 +754,12 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan return status, nil } + if !node.compactionExecutor.channelValidateForCompaction(req.GetChannel()) { + log.Warn("channel of compaction is marked invalid in compaction executor", zap.String("channel name", req.GetChannel())) + status.Reason = "channel marked invalid" + return status, nil + } + binlogIO := &binlogIO{node.blobKv, ds.idAllocator} task := newCompactionTask( node.ctx, diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index f1bfbd9ac8..5d6c71c3b3 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -47,6 +47,7 @@ type dataSyncService struct { flushingSegCache *Cache // a guarding cache stores currently flushing segment ids flushManager flushManager // flush manager handles flush process blobKV kv.BaseKV + compactor *compactionExecutor // reference to compaction executor } func newDataSyncService(ctx context.Context, @@ -59,7 +60,7 @@ func newDataSyncService(ctx context.Context, dataCoord types.DataCoord, flushingSegCache *Cache, blobKV kv.BaseKV, - + compactor *compactionExecutor, ) (*dataSyncService, error) { if replica == nil { @@ -82,6 +83,7 @@ func newDataSyncService(ctx context.Context, clearSignal: clearSignal, flushingSegCache: flushingSegCache, blobKV: blobKV, + compactor: compactor, } if err := service.initNodes(vchan); err != nil { @@ -212,7 +214,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro return err } - var ddNode Node = newDDNode(dsService.ctx, dsService.collectionID, vchanInfo, dsService.msFactory) + var ddNode Node = newDDNode(dsService.ctx, dsService.collectionID, vchanInfo, dsService.msFactory, dsService.compactor) var insertBufferNode Node insertBufferNode, err = newInsertBufferNode( dsService.ctx, diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 2752234bd1..33a2fbbd2c 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -148,6 +148,7 @@ func TestDataSyncService_newDataSyncService(te *testing.T) { df, newCache(), memkv.NewMemoryKV(), + newCompactionExecutor(), ) if !test.isValidCase { @@ -224,7 +225,7 @@ func TestDataSyncService_Start(t *testing.T) { } signalCh := make(chan string, 100) - sync, err := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh, &DataCoordFactory{}, newCache(), memkv.NewMemoryKV()) + sync, err := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh, &DataCoordFactory{}, newCache(), memkv.NewMemoryKV(), newCompactionExecutor()) assert.Nil(t, err) // sync.replica.addCollection(collMeta.ID, collMeta.Schema) diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 290c38bbda..35cd6b0a55 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -61,8 +61,9 @@ type ddNode struct { droppedSegments []*datapb.SegmentInfo vchannelName string - deltaMsgStream msgstream.MsgStream - dropMode atomic.Value + deltaMsgStream msgstream.MsgStream + dropMode atomic.Value + compactionExecutor *compactionExecutor } // Name returns node name, implementing flowgraph.Node @@ -110,7 +111,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { dropCollection: false, } - forwardMsgs := make([]msgstream.TsMsg, 0) + var forwardMsgs []msgstream.TsMsg for _, msg := range msMsg.TsMessages() { switch msg.Type() { case commonpb.MsgType_DropCollection: @@ -119,6 +120,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { zap.Any("collectionID", ddn.collectionID), zap.String("vChannelName", ddn.vchannelName)) ddn.dropMode.Store(true) + + log.Debug("Stop compaction of vChannel", zap.String("vChannelName", ddn.vchannelName)) + ddn.compactionExecutor.stopExecutingtaskByVChannelName(ddn.vchannelName) fgMsg.dropCollection = true } case commonpb.MsgType_Insert: @@ -257,7 +261,8 @@ func (ddn *ddNode) Close() { } } -func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelInfo, msFactory msgstream.Factory) *ddNode { +func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelInfo, + msFactory msgstream.Factory, compactor *compactionExecutor) *ddNode { baseNode := BaseNode{} baseNode.SetMaxQueueLength(Params.FlowGraphMaxQueueLength) baseNode.SetMaxParallelism(Params.FlowGraphMaxParallelism) @@ -288,12 +293,13 @@ func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelI deltaMsgStream.Start() dd := &ddNode{ - BaseNode: baseNode, - collectionID: collID, - flushedSegments: fs, - droppedSegments: vchanInfo.GetDroppedSegments(), - vchannelName: vchanInfo.ChannelName, - deltaMsgStream: deltaMsgStream, + BaseNode: baseNode, + collectionID: collID, + flushedSegments: fs, + droppedSegments: vchanInfo.GetDroppedSegments(), + vchannelName: vchanInfo.ChannelName, + deltaMsgStream: deltaMsgStream, + compactionExecutor: compactor, } dd.dropMode.Store(false) diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index d91109fc17..2c9b2429f1 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -82,6 +82,7 @@ func TestFlowGraph_DDNode_newDDNode(te *testing.T) { ChannelName: "by-dev-rootcoord-dml-test", }, mmf, + newCompactionExecutor(), ) require.NotNil(t, ddNode) @@ -146,8 +147,10 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { deltaStream, err := factory.NewMsgStream(context.Background()) assert.Nil(t, err) ddn := ddNode{ - collectionID: test.ddnCollID, - deltaMsgStream: deltaStream, + collectionID: test.ddnCollID, + deltaMsgStream: deltaStream, + vchannelName: "ddn_drop_msg", + compactionExecutor: newCompactionExecutor(), } var dropCollMsg msgstream.TsMsg = &msgstream.DropCollectionMsg{ diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index 98e8486022..ad6e57da6f 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -233,6 +233,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { } if fgMsg.dropCollection { + dn.flushManager.notifyAllFlushed() log.Debug("DeleteNode notifies BackgroundGC to release vchannel", zap.String("vChannelName", dn.channelName)) dn.clearSignal <- dn.channelName } diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index be9532359e..005f382b09 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -282,4 +282,39 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { // send again shall trigger empty buffer flush delNode.Operate([]flowgraph.Msg{fgMsg}) }) + t.Run("Test deleteNode Operate valid with dropCollection", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + chanName := "datanode-test-FlowGraphDeletenode-operate" + testPath := "/test/datanode/root/meta" + assert.NoError(t, clearEtcd(testPath)) + Params.MetaRootPath = testPath + Params.DeleteBinlogRootPath = testPath + + c := &nodeConfig{ + replica: replica, + allocator: NewAllocatorFactory(), + vChannelName: chanName, + } + sig := make(chan string, 1) + delNode, err := newDeleteNode(ctx, fm, sig, c) + assert.Nil(t, err) + + msg := genFlowGraphDeleteMsg(pks, chanName) + msg.segmentsToFlush = segIDs + + msg.endPositions[0].Timestamp = 100 // set to normal timestamp + msg.dropCollection = true + assert.NotPanics(t, func() { + fm.startDropping() + delNode.Operate([]flowgraph.Msg{&msg}) + }) + timer := time.NewTimer(time.Millisecond) + select { + case <-timer.C: + t.FailNow() + case <-sig: + } + }) } diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 03c303fdd5..a428d5b5f0 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -179,6 +179,10 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { return []Msg{} } + if fgMsg.dropCollection { + ibNode.flushManager.startDropping() + } + var spans []opentracing.Span for _, msg := range fgMsg.insertMessages { sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 1fef49c1af..d2b364c8e4 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -204,8 +204,12 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { } inMsg := genFlowGraphInsertMsg(insertChannelName) - var fgMsg flowgraph.Msg = &inMsg - iBNode.Operate([]flowgraph.Msg{fgMsg}) + assert.NotPanics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) }) + + // test drop collection operate + inMsg = genFlowGraphInsertMsg(insertChannelName) + inMsg.dropCollection = true + assert.NotPanics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) }) } /* diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index 6e193e3be5..150f7e998e 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -496,6 +496,16 @@ func (m *rendezvousFlushManager) startDropping() { m.dropHandler.dropFlushWg.Wait() // waits for all drop mode task done m.dropHandler.Lock() defer m.dropHandler.Unlock() + // apply injection if any + for _, pack := range m.dropHandler.packs { + q := m.getFlushQueue(pack.segmentID) + // queue will never be nil, sincde getFlushQueue will initialize one if not found + q.injectMut.Lock() + if q.postInjection != nil { + q.postInjection(pack) + } + q.injectMut.Unlock() + } m.dropHandler.flushAndDrop(m.dropHandler.packs) // invoke drop & flush }() } diff --git a/internal/datanode/flush_manager_test.go b/internal/datanode/flush_manager_test.go index 8331fdf1b3..8afe42459a 100644 --- a/internal/datanode/flush_manager_test.go +++ b/internal/datanode/flush_manager_test.go @@ -364,56 +364,115 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) { } func TestRendezvousFlushManager_dropMode(t *testing.T) { - kv := memkv.NewMemoryKV() + t.Run("test drop mode", func(t *testing.T) { + kv := memkv.NewMemoryKV() - var mut sync.Mutex - var result []*segmentFlushPack - signal := make(chan struct{}) + var mut sync.Mutex + var result []*segmentFlushPack + signal := make(chan struct{}) - m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) { - }, func(packs []*segmentFlushPack) { + m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) { + }, func(packs []*segmentFlushPack) { + mut.Lock() + result = packs + mut.Unlock() + close(signal) + }) + + halfMsgID := []byte{1, 1, 1} + m.flushBufferData(nil, -1, true, false, &internalpb.MsgPosition{ + MsgID: halfMsgID, + }) + + m.startDropping() + // half normal, half drop mode, should not appear in final packs + m.flushDelData(nil, -1, &internalpb.MsgPosition{ + MsgID: halfMsgID, + }) + + target := make(map[int64]struct{}) + for i := 1; i < 11; i++ { + target[int64(i)] = struct{}{} + m.flushBufferData(nil, int64(i), true, false, &internalpb.MsgPosition{ + MsgID: []byte{1}, + }) + m.flushDelData(nil, int64(i), &internalpb.MsgPosition{ + MsgID: []byte{1}, + }) + } + + m.notifyAllFlushed() + + <-signal mut.Lock() - result = packs - mut.Unlock() - close(signal) - }) + defer mut.Unlock() - halfMsgID := []byte{1, 1, 1} - m.flushBufferData(nil, -1, true, false, &internalpb.MsgPosition{ - MsgID: halfMsgID, + output := make(map[int64]struct{}) + for _, pack := range result { + assert.NotEqual(t, -1, pack.segmentID) + output[pack.segmentID] = struct{}{} + _, has := target[pack.segmentID] + assert.True(t, has) + } + assert.Equal(t, len(target), len(output)) }) + t.Run("test drop mode with injection", func(t *testing.T) { + kv := memkv.NewMemoryKV() - m.startDropping() - // half normal, half drop mode, should not appear in final packs - m.flushDelData(nil, -1, &internalpb.MsgPosition{ - MsgID: halfMsgID, - }) + var mut sync.Mutex + var result []*segmentFlushPack + signal := make(chan struct{}) - target := make(map[int64]struct{}) - for i := 1; i < 11; i++ { - target[int64(i)] = struct{}{} - m.flushBufferData(nil, int64(i), true, false, &internalpb.MsgPosition{ - MsgID: []byte{1}, + m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) { + }, func(packs []*segmentFlushPack) { + mut.Lock() + result = packs + mut.Unlock() + close(signal) }) - m.flushDelData(nil, int64(i), &internalpb.MsgPosition{ - MsgID: []byte{1}, + + halfMsgID := []byte{1, 1, 1} + m.flushBufferData(nil, -1, true, false, &internalpb.MsgPosition{ + MsgID: halfMsgID, }) - } - m.notifyAllFlushed() + injFunc := func(pack *segmentFlushPack) { + pack.segmentID = 100 + } + for i := 1; i < 11; i++ { + it := newTaskInjection(1, injFunc) + m.injectFlush(it, int64(i)) + <-it.Injected() + it.injectDone(true) + } - <-signal - mut.Lock() - defer mut.Unlock() + m.startDropping() + // half normal, half drop mode, should not appear in final packs + m.flushDelData(nil, -1, &internalpb.MsgPosition{ + MsgID: halfMsgID, + }) + + for i := 1; i < 11; i++ { + m.flushBufferData(nil, int64(i), true, false, &internalpb.MsgPosition{ + MsgID: []byte{1}, + }) + m.flushDelData(nil, int64(i), &internalpb.MsgPosition{ + MsgID: []byte{1}, + }) + } + + m.notifyAllFlushed() + + <-signal + mut.Lock() + defer mut.Unlock() + + for _, pack := range result { + assert.NotEqual(t, -1, pack.segmentID) + assert.Equal(t, int64(100), pack.segmentID) + } + }) - output := make(map[int64]struct{}) - for _, pack := range result { - assert.NotEqual(t, -1, pack.segmentID) - output[pack.segmentID] = struct{}{} - _, has := target[pack.segmentID] - assert.True(t, has) - } - assert.Equal(t, len(target), len(output)) } func TestRendezvousFlushManager_close(t *testing.T) {