diff --git a/internal/proxynode/channels_mgr_test.go b/internal/proxynode/channels_mgr_test.go index 8e99427993..4426a72596 100644 --- a/internal/proxynode/channels_mgr_test.go +++ b/internal/proxynode/channels_mgr_test.go @@ -35,7 +35,7 @@ func newMockMaster() *mockMaster { } func genUniqueStr() string { - l := rand.Uint64() % 100 + l := rand.Uint64()%100 + 1 b := make([]byte, l) if _, err := rand.Read(b); err != nil { return "" @@ -50,7 +50,7 @@ func (m *mockMaster) GetChannels(collectionID UniqueID) (map[vChan]pChan, error) } channels = make(map[vChan]pChan) - l := rand.Uint64() % 10 + l := rand.Uint64()%10 + 1 for i := 0; uint64(i) < l; i++ { channels[genUniqueStr()] = genUniqueStr() } diff --git a/internal/proxynode/channels_time_ticker.go b/internal/proxynode/channels_time_ticker.go new file mode 100644 index 0000000000..1c821be841 --- /dev/null +++ b/internal/proxynode/channels_time_ticker.go @@ -0,0 +1,196 @@ +package proxynode + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +type pChanStatistics struct { + minTs Timestamp + maxTs Timestamp + invalid bool // invalid is true when there is no task in queue +} + +// channelsTimeTickerCheckFunc(pchan, ts) return true only when all timestamp of tasks who use the pchan is greater than ts +type channelsTimeTickerCheckFunc func(string, Timestamp) bool + +// ticker can update ts only when the minTs greater than the ts of ticker, we can use maxTs to update current later +type getPChanStatisticsFunc func(pChan) (pChanStatistics, error) + +// use interface tsoAllocator to keep channelsTimeTickerImpl testable +type tsoAllocator interface { + //Start() error + AllocOne() (Timestamp, error) + //Alloc(count uint32) ([]Timestamp, error) + //ClearCache() +} + +type channelsTimeTicker interface { + start() error + close() error + addPChan(pchan pChan) error + getLastTick(pchan pChan) (Timestamp, error) +} + +type channelsTimeTickerImpl struct { + interval time.Duration // interval to synchronize + minTsStatistics map[pChan]Timestamp // pchan -> min Timestamp + statisticsMtx sync.RWMutex + getStatistics getPChanStatisticsFunc + tso tsoAllocator + currents map[pChan]Timestamp + currentsMtx sync.RWMutex + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc +} + +func (ticker *channelsTimeTickerImpl) initStatistics() { + ticker.statisticsMtx.Lock() + defer ticker.statisticsMtx.Unlock() + + for pchan := range ticker.minTsStatistics { + ticker.minTsStatistics[pchan] = 0 + } +} + +func (ticker *channelsTimeTickerImpl) initCurrents(current Timestamp) { + ticker.currentsMtx.Lock() + defer ticker.currentsMtx.Unlock() + + for pchan := range ticker.currents { + ticker.currents[pchan] = current + } +} + +// What if golang support generic? interface{} is not comparable now! +func getTs(ts1, ts2 Timestamp, comp func(ts1, ts2 Timestamp) bool) Timestamp { + if comp(ts1, ts2) { + return ts1 + } + return ts2 +} + +func (ticker *channelsTimeTickerImpl) tick() error { + ticker.statisticsMtx.Lock() + defer ticker.statisticsMtx.Unlock() + + ticker.currentsMtx.Lock() + defer ticker.currentsMtx.Unlock() + + for pchan := range ticker.currents { + current := ticker.currents[pchan] + + stats, err := ticker.getStatistics(pchan) + if err != nil { + continue + } + + if !stats.invalid && stats.minTs > current { + ticker.minTsStatistics[pchan] = current + ticker.currents[pchan] = getTs(current+Timestamp(ticker.interval), stats.maxTs, func(ts1, ts2 Timestamp) bool { + return ts1 > ts2 + }) + } + } + + return nil +} + +func (ticker *channelsTimeTickerImpl) tickLoop() { + defer ticker.wg.Done() + + timer := time.NewTicker(ticker.interval) + defer timer.Stop() + + for { + select { + case <-ticker.ctx.Done(): + return + case <-timer.C: + err := ticker.tick() + if err != nil { + log.Warn("channelsTimeTickerImpl.tickLoop", zap.Error(err)) + } + } + } +} + +func (ticker *channelsTimeTickerImpl) start() error { + ticker.initStatistics() + + current, err := ticker.tso.AllocOne() + if err != nil { + return err + } + ticker.initCurrents(current) + + ticker.wg.Add(1) + go ticker.tickLoop() + + return nil +} + +func (ticker *channelsTimeTickerImpl) close() error { + ticker.cancel() + ticker.wg.Wait() + return nil +} + +func (ticker *channelsTimeTickerImpl) addPChan(pchan pChan) error { + ticker.statisticsMtx.Lock() + defer ticker.statisticsMtx.Unlock() + + if _, ok := ticker.minTsStatistics[pchan]; ok { + return fmt.Errorf("pChan %v already exist", pchan) + } + + ticker.minTsStatistics[pchan] = 0 + + return nil +} + +func (ticker *channelsTimeTickerImpl) getLastTick(pchan pChan) (Timestamp, error) { + ticker.statisticsMtx.RLock() + defer ticker.statisticsMtx.RUnlock() + + ts, ok := ticker.minTsStatistics[pchan] + if !ok { + return 0, fmt.Errorf("pChan %v not found", pchan) + } + + return ts, nil +} + +func newChannelsTimeTicker( + ctx context.Context, + interval time.Duration, + pchans []pChan, + getStatistics getPChanStatisticsFunc, + tso tsoAllocator, +) *channelsTimeTickerImpl { + + ctx1, cancel := context.WithCancel(ctx) + + ticker := &channelsTimeTickerImpl{ + interval: interval, + minTsStatistics: make(map[pChan]Timestamp), + getStatistics: getStatistics, + tso: tso, + currents: make(map[pChan]Timestamp), + ctx: ctx1, + cancel: cancel, + } + + for _, pchan := range pchans { + ticker.minTsStatistics[pchan] = 0 + ticker.currents[pchan] = 0 + } + + return ticker +} diff --git a/internal/proxynode/channels_time_ticker_test.go b/internal/proxynode/channels_time_ticker_test.go new file mode 100644 index 0000000000..973ff7d2f9 --- /dev/null +++ b/internal/proxynode/channels_time_ticker_test.go @@ -0,0 +1,154 @@ +package proxynode + +import ( + "context" + "math/rand" + "sync" + "testing" + "time" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/log" + + "github.com/stretchr/testify/assert" +) + +type mockTsoAllocator struct { +} + +func (tso *mockTsoAllocator) AllocOne() (Timestamp, error) { + return Timestamp(time.Now().UnixNano()), nil +} + +func newMockTsoAllocator() *mockTsoAllocator { + return &mockTsoAllocator{} +} + +func getStatistics(pchan pChan) (pChanStatistics, error) { + stats := pChanStatistics{ + minTs: Timestamp(time.Now().UnixNano()), + invalid: false, + } + stats.maxTs = stats.minTs + Timestamp(time.Millisecond*10) + return stats, nil +} + +func TestChannelsTimeTickerImpl_start(t *testing.T) { + interval := time.Millisecond * 10 + pchanNum := rand.Uint64()%10 + 1 + pchans := make([]pChan, 0, pchanNum) + for i := 0; uint64(i) < pchanNum; i++ { + pchans = append(pchans, genUniqueStr()) + } + tso := newMockTsoAllocator() + ctx := context.Background() + + ticker := newChannelsTimeTicker(ctx, interval, pchans, getStatistics, tso) + err := ticker.start() + assert.Equal(t, nil, err) + + defer func() { + err := ticker.close() + assert.Equal(t, nil, err) + }() + + time.Sleep(time.Second) +} + +func TestChannelsTimeTickerImpl_close(t *testing.T) { + interval := time.Millisecond * 10 + pchanNum := rand.Uint64()%10 + 1 + pchans := make([]pChan, 0, pchanNum) + for i := 0; uint64(i) < pchanNum; i++ { + pchans = append(pchans, genUniqueStr()) + } + tso := newMockTsoAllocator() + ctx := context.Background() + + ticker := newChannelsTimeTicker(ctx, interval, pchans, getStatistics, tso) + err := ticker.start() + assert.Equal(t, nil, err) + + defer func() { + err := ticker.close() + assert.Equal(t, nil, err) + }() + + time.Sleep(time.Second) +} + +func TestChannelsTimeTickerImpl_addPChan(t *testing.T) { + interval := time.Millisecond * 10 + pchanNum := rand.Uint64()%10 + 1 + pchans := make([]pChan, 0, pchanNum) + for i := 0; uint64(i) < pchanNum; i++ { + pchans = append(pchans, genUniqueStr()) + } + tso := newMockTsoAllocator() + ctx := context.Background() + + ticker := newChannelsTimeTicker(ctx, interval, pchans, getStatistics, tso) + err := ticker.start() + assert.Equal(t, nil, err) + + newPChanNum := rand.Uint64()%10 + 1 + for i := 0; uint64(i) < newPChanNum; i++ { + err = ticker.addPChan(genUniqueStr()) + assert.Equal(t, nil, err) + } + + defer func() { + err := ticker.close() + assert.Equal(t, nil, err) + }() + + time.Sleep(time.Second) +} + +func TestChannelsTimeTickerImpl_getLastTick(t *testing.T) { + interval := time.Millisecond * 10 + pchanNum := rand.Uint64()%10 + 1 + pchans := make([]pChan, 0, pchanNum) + for i := 0; uint64(i) < pchanNum; i++ { + pchans = append(pchans, genUniqueStr()) + } + tso := newMockTsoAllocator() + ctx := context.Background() + + ticker := newChannelsTimeTicker(ctx, interval, pchans, getStatistics, tso) + err := ticker.start() + assert.Equal(t, nil, err) + + var wg sync.WaitGroup + wg.Add(1) + b := make(chan struct{}, 1) + go func() { + defer wg.Done() + timer := time.NewTicker(interval * 40) + for { + select { + case <-b: + return + case <-timer.C: + for _, pchan := range pchans { + ts, err := ticker.getLastTick(pchan) + assert.Equal(t, nil, err) + log.Debug("TestChannelsTimeTickerImpl_getLastTick", + zap.Any("pchan", pchan), + zap.Any("minTs", ts)) + } + } + } + }() + time.Sleep(time.Second) + b <- struct{}{} + wg.Wait() + + defer func() { + err := ticker.close() + assert.Equal(t, nil, err) + }() + + time.Sleep(time.Second) +}