diff --git a/internal/proxynode/channels_time_ticker.go b/internal/proxynode/channels_time_ticker.go index 139ee653b7..fa3c27ee0e 100644 --- a/internal/proxynode/channels_time_ticker.go +++ b/internal/proxynode/channels_time_ticker.go @@ -101,7 +101,8 @@ func (ticker *channelsTimeTickerImpl) tick() error { log.Warn("failed to get statistics from scheduler", zap.Error(err)) continue } - + log.Debug("ProxyNode channelsTimeTickerImpl", zap.Any("invalid", stats.invalid), + zap.Any("stats.minTs", stats.minTs), zap.Any("current", current)) if !stats.invalid && stats.minTs > current { ticker.minTsStatistics[pchan] = current ticker.currents[pchan] = getTs(current+Timestamp(ticker.interval), stats.maxTs, func(ts1, ts2 Timestamp) bool { @@ -115,6 +116,8 @@ func (ticker *channelsTimeTickerImpl) tick() error { log.Warn("failed to get ts from tso", zap.Error(err)) continue } + log.Debug("ProxyNode channelsTimeTickerImpl, update currents", zap.Any("pchan", pchan), + zap.Any("t", t)) ticker.currents[pchan] = t } } @@ -167,11 +170,17 @@ func (ticker *channelsTimeTickerImpl) addPChan(pchan pChan) error { defer ticker.statisticsMtx.Unlock() if _, ok := ticker.minTsStatistics[pchan]; ok { - return fmt.Errorf("pChan %v already exist", pchan) + return fmt.Errorf("pChan %v already exist in minTsStatistics", pchan) } - ticker.minTsStatistics[pchan] = 0 + ticker.currentsMtx.Lock() + defer ticker.currentsMtx.Unlock() + if _, ok := ticker.currents[pchan]; ok { + return fmt.Errorf("pChan %v already exist in currents", pchan) + } + ticker.currents[pchan] = 0 + return nil } @@ -180,11 +189,19 @@ func (ticker *channelsTimeTickerImpl) removePChan(pchan pChan) error { defer ticker.statisticsMtx.Unlock() if _, ok := ticker.minTsStatistics[pchan]; !ok { - return fmt.Errorf("pChan %v don't exist", pchan) + return fmt.Errorf("pChan %v don't exist in minTsStatistics", pchan) } delete(ticker.minTsStatistics, pchan) + ticker.currentsMtx.Lock() + defer ticker.currentsMtx.Unlock() + + if _, ok := ticker.currents[pchan]; !ok { + return fmt.Errorf("pChan %v don't exist in currents", pchan) + } + delete(ticker.currents, pchan) + return nil }