From ae717bf9912dc16448ac3d41eb5ce2b5e0789970 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 12 May 2022 18:09:53 +0800 Subject: [PATCH] Fix channelUnsubscribe data race and logic (#16946) - Add a RWMutex for container/list which is not goroutine-safe - Fix the element in list is never removed Signed-off-by: Congqi Xia --- internal/querycoord/channel_unsubscribe.go | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/internal/querycoord/channel_unsubscribe.go b/internal/querycoord/channel_unsubscribe.go index c364cbde10..7f5e157907 100644 --- a/internal/querycoord/channel_unsubscribe.go +++ b/internal/querycoord/channel_unsubscribe.go @@ -45,6 +45,7 @@ type channelUnsubscribeHandler struct { kvClient *etcdkv.EtcdKV factory msgstream.Factory + mut sync.RWMutex // mutex for channelInfos, since container/list is not goroutine-safe channelInfos *list.List downNodeChan chan int64 @@ -73,6 +74,13 @@ func newChannelUnsubscribeHandler(ctx context.Context, kv *etcdkv.EtcdKV, factor return handler, nil } +// appendUnsubInfo pushes unsub info safely +func (csh *channelUnsubscribeHandler) appendUnsubInfo(info *querypb.UnsubscribeChannelInfo) { + csh.mut.Lock() + defer csh.mut.Unlock() + csh.channelInfos.PushBack(info) +} + // reloadFromKV reload unsolved channels to unsubscribe func (csh *channelUnsubscribeHandler) reloadFromKV() error { log.Info("start reload unsubscribe channelInfo from kv") @@ -86,7 +94,7 @@ func (csh *channelUnsubscribeHandler) reloadFromKV() error { if err != nil { return err } - csh.channelInfos.PushBack(channelInfo) + csh.appendUnsubInfo(channelInfo) csh.downNodeChan <- channelInfo.NodeID } @@ -102,11 +110,14 @@ func (csh *channelUnsubscribeHandler) addUnsubscribeChannelInfo(info *querypb.Un } // when queryCoord is restarted multiple times, the nodeID of added channelInfo may be the same hasEnqueue := false + // reduce the lock range to iteration here, since `addUnsubscribeChannelInfo` is called one by one + csh.mut.RLock() for e := csh.channelInfos.Back(); e != nil; e = e.Prev() { if e.Value.(*querypb.UnsubscribeChannelInfo).NodeID == nodeID { hasEnqueue = true } } + csh.mut.RUnlock() if !hasEnqueue { channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, nodeID) @@ -114,7 +125,7 @@ func (csh *channelUnsubscribeHandler) addUnsubscribeChannelInfo(info *querypb.Un if err != nil { panic(err) } - csh.channelInfos.PushBack(info) + csh.appendUnsubInfo(info) csh.downNodeChan <- info.NodeID log.Info("add unsubscribeChannelInfo to handler", zap.Int64("nodeID", info.NodeID)) } @@ -129,7 +140,10 @@ func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() { log.Info("channelUnsubscribeHandler ctx done, handleChannelUnsubscribeLoop end") return case <-csh.downNodeChan: + csh.mut.RLock() + e := csh.channelInfos.Front() channelInfo := csh.channelInfos.Front().Value.(*querypb.UnsubscribeChannelInfo) + csh.mut.RUnlock() nodeID := channelInfo.NodeID for _, collectionChannels := range channelInfo.CollectionChannels { collectionID := collectionChannels.CollectionID @@ -142,6 +156,10 @@ func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() { log.Error("remove unsubscribe channelInfo from etcd failed", zap.Int64("nodeID", nodeID)) panic(err) } + + csh.mut.Lock() + csh.channelInfos.Remove(e) + csh.mut.Unlock() log.Info("unsubscribe channels success", zap.Int64("nodeID", nodeID)) } }