Release before remove when releasing flowgraphs (#27191)

GetAndRemove removes the fg from manager immediately,
while the flowgraph is still releasing. This PR will remove
the fg from flowgraphManager AFTER flowgraphs released.

- Add Remove for ConcurrentMap
- Move collections() into flowgraph manager

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2023-09-19 11:53:22 +08:00 committed by GitHub
parent b9ab18d692
commit 10116b85ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 41 additions and 12 deletions

View File

@ -137,11 +137,13 @@ func (fm *flowgraphManager) addAndStart(dn *DataNode, vchan *datapb.VchannelInfo
} }
func (fm *flowgraphManager) release(vchanName string) { func (fm *flowgraphManager) release(vchanName string) {
if fg, loaded := fm.flowgraphs.GetAndRemove(vchanName); loaded { if fg, loaded := fm.flowgraphs.Get(vchanName); loaded {
fg.close() fg.close()
fm.flowgraphs.Remove(vchanName)
metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec() metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
rateCol.removeFlowGraphChannel(vchanName)
} }
rateCol.removeFlowGraphChannel(vchanName)
} }
func (fm *flowgraphManager) getFlushCh(segID UniqueID) (chan<- flushMsg, error) { func (fm *flowgraphManager) getFlushCh(segID UniqueID) (chan<- flushMsg, error) {
@ -227,3 +229,13 @@ func (fm *flowgraphManager) dropAll() {
return true return true
}) })
} }
func (fm *flowgraphManager) collections() []int64 {
collectionSet := typeutil.UniqueSet{}
fm.flowgraphs.Range(func(key string, value *dataSyncService) bool {
collectionSet.Insert(value.channel.getCollectionID())
return true
})
return collectionSet.Collect()
}

View File

@ -55,6 +55,7 @@ func TestFlowGraphManager(t *testing.T) {
defer func() { defer func() {
fm.dropAll() fm.dropAll()
}() }()
t.Run("Test addAndStart", func(t *testing.T) { t.Run("Test addAndStart", func(t *testing.T) {
vchanName := "by-dev-rootcoord-dml-test-flowgraphmanager-addAndStart" vchanName := "by-dev-rootcoord-dml-test-flowgraphmanager-addAndStart"
vchan := &datapb.VchannelInfo{ vchan := &datapb.VchannelInfo{

View File

@ -49,15 +49,6 @@ func (node *DataNode) getQuotaMetrics() (*metricsinfo.DataNodeQuotaMetrics, erro
return nil, err return nil, err
} }
getAllCollections := func() []int64 {
collectionSet := typeutil.UniqueSet{}
node.flowgraphManager.flowgraphs.Range(func(key string, fg *dataSyncService) bool {
collectionSet.Insert(fg.channel.getCollectionID())
return true
})
return collectionSet.Collect()
}
minFGChannel, minFGTt := rateCol.getMinFlowGraphTt() minFGChannel, minFGTt := rateCol.getMinFlowGraphTt()
return &metricsinfo.DataNodeQuotaMetrics{ return &metricsinfo.DataNodeQuotaMetrics{
Hms: metricsinfo.HardwareMetrics{}, Hms: metricsinfo.HardwareMetrics{},
@ -69,7 +60,7 @@ func (node *DataNode) getQuotaMetrics() (*metricsinfo.DataNodeQuotaMetrics, erro
}, },
Effect: metricsinfo.NodeEffect{ Effect: metricsinfo.NodeEffect{
NodeID: node.GetSession().ServerID, NodeID: node.GetSession().ServerID,
CollectionIDs: getAllCollections(), CollectionIDs: node.flowgraphManager.collections(),
}, },
}, nil }, nil
} }

View File

@ -101,6 +101,14 @@ func (m *ConcurrentMap[K, V]) GetAndRemove(key K) (V, bool) {
return value.(V), true return value.(V), true
} }
// Remove removes the `key`, `value` set if `key` is in the map,
// does nothing if `key` not in the map.
func (m *ConcurrentMap[K, V]) Remove(key K) {
if _, loaded := m.inner.LoadAndDelete(key); loaded {
m.len.Dec()
}
}
func (m *ConcurrentMap[K, V]) Len() int { func (m *ConcurrentMap[K, V]) Len() int {
return int(m.len.Load()) return int(m.len.Load())
} }

View File

@ -127,6 +127,23 @@ func (suite *MapUtilSuite) TestConcurrentMap() {
suite.FailNow("empty map range") suite.FailNow("empty map range")
return false return false
}) })
suite.Run("TestRemove", func() {
currMap := NewConcurrentMap[int64, string]()
suite.Equal(0, currMap.Len())
currMap.Remove(100)
suite.Equal(0, currMap.Len())
suite.Equal(currMap.Len(), 0)
v, loaded := currMap.GetOrInsert(100, "v-100")
suite.Equal("v-100", v)
suite.Equal(false, loaded)
suite.Equal(1, currMap.Len())
currMap.Remove(100)
suite.Equal(0, currMap.Len())
})
} }
func TestMapUtil(t *testing.T) { func TestMapUtil(t *testing.T) {