diff --git a/internal/querynode/tsafe.go b/internal/querynode/tsafe.go index 53347188f6..6cd66cd79b 100644 --- a/internal/querynode/tsafe.go +++ b/internal/querynode/tsafe.go @@ -13,6 +13,7 @@ package querynode import ( "context" + "errors" "math" "sync" @@ -37,11 +38,6 @@ func (watcher *tSafeWatcher) notify() { } } -// deprecated -func (watcher *tSafeWatcher) hasUpdate() { - <-watcher.notifyChan -} - func (watcher *tSafeWatcher) watcherChan() <-chan bool { return watcher.notifyChan } @@ -49,7 +45,7 @@ func (watcher *tSafeWatcher) watcherChan() <-chan bool { type tSafer interface { get() Timestamp set(id UniqueID, t Timestamp) - registerTSafeWatcher(t *tSafeWatcher) + registerTSafeWatcher(t *tSafeWatcher) error start() close() removeRecord(partitionID UniqueID) @@ -100,6 +96,7 @@ func (ts *tSafe) start() { for _, watcher := range ts.watcherList { close(watcher.notifyChan) } + ts.watcherList = nil close(ts.tSafeChan) ts.tSafeMu.Unlock() return @@ -140,17 +137,37 @@ func (ts *tSafe) start() { func (ts *tSafe) removeRecord(partitionID UniqueID) { ts.tSafeMu.Lock() defer ts.tSafeMu.Unlock() - + if ts.isClose { + // should not happen if tsafe_replica guard correctly + log.Warn("Try to remove record with tsafe close ", + zap.Any("channel", ts.channel), + zap.Any("id", partitionID)) + return + } log.Debug("remove tSafeRecord", zap.Any("partitionID", partitionID), ) delete(ts.tSafeRecord, partitionID) + var tmpT Timestamp = math.MaxUint64 + for _, t := range ts.tSafeRecord { + if t <= tmpT { + tmpT = t + } + } + ts.tSafe = tmpT + for _, watcher := range ts.watcherList { + watcher.notify() + } } -func (ts *tSafe) registerTSafeWatcher(t *tSafeWatcher) { +func (ts *tSafe) registerTSafeWatcher(t *tSafeWatcher) error { ts.tSafeMu.Lock() + if ts.isClose { + return errors.New("Failed to register tsafe watcher because tsafe is closed " + ts.channel) + } defer ts.tSafeMu.Unlock() ts.watcherList = append(ts.watcherList, t) + return nil } func (ts *tSafe) get() Timestamp { @@ -164,9 +181,9 @@ func (ts *tSafe) set(id UniqueID, t Timestamp) { defer ts.tSafeMu.Unlock() if ts.isClose { // should not happen if tsafe_replica guard correctly - log.Warn("Try to set id with ts close ", + log.Warn("Try to set id with tsafe close ", zap.Any("channel", ts.channel), - zap.Any("it", id)) + zap.Any("id", id)) return } msg := tSafeMsg{ diff --git a/internal/querynode/tsafe_test.go b/internal/querynode/tsafe_test.go index 95bddfa640..4953ff2a67 100644 --- a/internal/querynode/tsafe_test.go +++ b/internal/querynode/tsafe_test.go @@ -13,6 +13,7 @@ package querynode import ( "context" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -20,14 +21,64 @@ import ( func TestTSafe_GetAndSet(t *testing.T) { tSafe := newTSafe(context.Background(), "TestTSafe-channel") + tSafe.start() watcher := newTSafeWatcher() tSafe.registerTSafeWatcher(watcher) + var wg sync.WaitGroup + wg.Add(1) go func() { - watcher.hasUpdate() + // wait work + <-watcher.watcherChan() timestamp := tSafe.get() assert.Equal(t, timestamp, Timestamp(1000)) + wg.Done() }() + tSafe.set(UniqueID(1), Timestamp(1000)) + wg.Wait() +} + +func TestTSafe_Remove(t *testing.T) { + tSafe := newTSafe(context.Background(), "TestTSafe-remove") + tSafe.start() + watcher := newTSafeWatcher() + tSafe.registerTSafeWatcher(watcher) tSafe.set(UniqueID(1), Timestamp(1000)) + tSafe.set(UniqueID(2), Timestamp(1001)) + + <-watcher.watcherChan() + timestamp := tSafe.get() + assert.Equal(t, timestamp, Timestamp(1000)) + + tSafe.removeRecord(UniqueID(1)) + timestamp = tSafe.get() + assert.Equal(t, timestamp, Timestamp(1001)) +} + +func TestTSafe_Close(t *testing.T) { + tSafe := newTSafe(context.Background(), "TestTSafe-close") + tSafe.start() + watcher := newTSafeWatcher() + tSafe.registerTSafeWatcher(watcher) + + // test set won't panic while close + go func() { + for i := 0; i <= 100; i++ { + tSafe.set(UniqueID(i), Timestamp(1000)) + } + }() + + tSafe.close() + + // wait until channel close + for range watcher.watcherChan() { + + } + + tSafe.set(UniqueID(101), Timestamp(1000)) + tSafe.removeRecord(UniqueID(1)) + // register TSafe will fail + err := tSafe.registerTSafeWatcher(watcher) + assert.Error(t, err) }