From 10116b85ac3fbe593fc8927b266a7b8536e5d2cb Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 19 Sep 2023 11:53:22 +0800 Subject: [PATCH] Release before remove when releasing flowgraphs (#27191) GetAndRemove removes the fg from manager immediately, while the flowgraph is still releasing. This PR will remove the fg from flowgraphManager AFTER flowgraphs released. - Add Remove for ConcurrentMap - Move collections() into flowgraph manager Signed-off-by: yangxuan --- internal/datanode/flow_graph_manager.go | 16 ++++++++++++++-- internal/datanode/flow_graph_manager_test.go | 1 + internal/datanode/metrics_info.go | 11 +---------- pkg/util/typeutil/map.go | 8 ++++++++ pkg/util/typeutil/map_test.go | 17 +++++++++++++++++ 5 files changed, 41 insertions(+), 12 deletions(-) diff --git a/internal/datanode/flow_graph_manager.go b/internal/datanode/flow_graph_manager.go index 5e873d9c5a..10ab84e752 100644 --- a/internal/datanode/flow_graph_manager.go +++ b/internal/datanode/flow_graph_manager.go @@ -137,11 +137,13 @@ func (fm *flowgraphManager) addAndStart(dn *DataNode, vchan *datapb.VchannelInfo } func (fm *flowgraphManager) release(vchanName string) { - if fg, loaded := fm.flowgraphs.GetAndRemove(vchanName); loaded { + if fg, loaded := fm.flowgraphs.Get(vchanName); loaded { fg.close() + fm.flowgraphs.Remove(vchanName) + metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec() + rateCol.removeFlowGraphChannel(vchanName) } - rateCol.removeFlowGraphChannel(vchanName) } func (fm *flowgraphManager) getFlushCh(segID UniqueID) (chan<- flushMsg, error) { @@ -227,3 +229,13 @@ func (fm *flowgraphManager) dropAll() { return true }) } + +func (fm *flowgraphManager) collections() []int64 { + collectionSet := typeutil.UniqueSet{} + fm.flowgraphs.Range(func(key string, value *dataSyncService) bool { + collectionSet.Insert(value.channel.getCollectionID()) + return true + }) + + return collectionSet.Collect() +} diff --git a/internal/datanode/flow_graph_manager_test.go b/internal/datanode/flow_graph_manager_test.go index 1be3b02bf0..d997a8df77 100644 --- a/internal/datanode/flow_graph_manager_test.go +++ b/internal/datanode/flow_graph_manager_test.go @@ -55,6 +55,7 @@ func TestFlowGraphManager(t *testing.T) { defer func() { fm.dropAll() }() + t.Run("Test addAndStart", func(t *testing.T) { vchanName := "by-dev-rootcoord-dml-test-flowgraphmanager-addAndStart" vchan := &datapb.VchannelInfo{ diff --git a/internal/datanode/metrics_info.go b/internal/datanode/metrics_info.go index 3ba9612bfe..ca2ac31a48 100644 --- a/internal/datanode/metrics_info.go +++ b/internal/datanode/metrics_info.go @@ -49,15 +49,6 @@ func (node *DataNode) getQuotaMetrics() (*metricsinfo.DataNodeQuotaMetrics, erro return nil, err } - getAllCollections := func() []int64 { - collectionSet := typeutil.UniqueSet{} - node.flowgraphManager.flowgraphs.Range(func(key string, fg *dataSyncService) bool { - collectionSet.Insert(fg.channel.getCollectionID()) - return true - }) - - return collectionSet.Collect() - } minFGChannel, minFGTt := rateCol.getMinFlowGraphTt() return &metricsinfo.DataNodeQuotaMetrics{ Hms: metricsinfo.HardwareMetrics{}, @@ -69,7 +60,7 @@ func (node *DataNode) getQuotaMetrics() (*metricsinfo.DataNodeQuotaMetrics, erro }, Effect: metricsinfo.NodeEffect{ NodeID: node.GetSession().ServerID, - CollectionIDs: getAllCollections(), + CollectionIDs: node.flowgraphManager.collections(), }, }, nil } diff --git a/pkg/util/typeutil/map.go b/pkg/util/typeutil/map.go index 6da8d3a02d..c11b1528dc 100644 --- a/pkg/util/typeutil/map.go +++ b/pkg/util/typeutil/map.go @@ -101,6 +101,14 @@ func (m *ConcurrentMap[K, V]) GetAndRemove(key K) (V, bool) { return value.(V), true } +// Remove removes the `key`, `value` set if `key` is in the map, +// does nothing if `key` not in the map. +func (m *ConcurrentMap[K, V]) Remove(key K) { + if _, loaded := m.inner.LoadAndDelete(key); loaded { + m.len.Dec() + } +} + func (m *ConcurrentMap[K, V]) Len() int { return int(m.len.Load()) } diff --git a/pkg/util/typeutil/map_test.go b/pkg/util/typeutil/map_test.go index 2e96fa0167..b34a113ea9 100644 --- a/pkg/util/typeutil/map_test.go +++ b/pkg/util/typeutil/map_test.go @@ -127,6 +127,23 @@ func (suite *MapUtilSuite) TestConcurrentMap() { suite.FailNow("empty map range") return false }) + + suite.Run("TestRemove", func() { + currMap := NewConcurrentMap[int64, string]() + suite.Equal(0, currMap.Len()) + + currMap.Remove(100) + suite.Equal(0, currMap.Len()) + + suite.Equal(currMap.Len(), 0) + v, loaded := currMap.GetOrInsert(100, "v-100") + suite.Equal("v-100", v) + suite.Equal(false, loaded) + suite.Equal(1, currMap.Len()) + + currMap.Remove(100) + suite.Equal(0, currMap.Len()) + }) } func TestMapUtil(t *testing.T) {