milvus/internal/datanode/channel_manager.go
wei liu a9beca44ef
fix: watch channel stuck due to misuse of timer.Reset (#37433) (#37542)
issue: #37166
pr: #37433
cause the misuse of timer.Reset, which cause dispatcher failed to send
msg to virtual channel buffer, and dispatcher do splitting again and
again, which hold the dispatcher manager's lock, block watching channel
progress.

This PR fix the misuse of timer.Reset

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
2024-11-08 18:46:27 +08:00

561 lines
14 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"context"
"sync"
"time"
"github.com/cockroachdb/errors"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type (
releaseFunc func(channel string)
watchFunc func(ctx context.Context, dn *DataNode, info *datapb.ChannelWatchInfo, tickler *tickler) (*dataSyncService, error)
)
type ChannelManager interface {
Submit(info *datapb.ChannelWatchInfo) error
GetProgress(info *datapb.ChannelWatchInfo) *datapb.ChannelOperationProgressResponse
Close()
Start()
}
type ChannelManagerImpl struct {
mu sync.RWMutex
dn *DataNode
fgManager FlowgraphManager
communicateCh chan *opState
opRunners *typeutil.ConcurrentMap[string, *opRunner] // channel -> runner
abnormals *typeutil.ConcurrentMap[int64, string] // OpID -> Channel
releaseFunc releaseFunc
closeCh lifetime.SafeChan
closeWaiter sync.WaitGroup
}
func NewChannelManager(dn *DataNode) *ChannelManagerImpl {
cm := ChannelManagerImpl{
dn: dn,
fgManager: dn.flowgraphManager,
communicateCh: make(chan *opState, 100),
opRunners: typeutil.NewConcurrentMap[string, *opRunner](),
abnormals: typeutil.NewConcurrentMap[int64, string](),
releaseFunc: dn.tryToReleaseFlowgraph,
closeCh: lifetime.NewSafeChan(),
}
return &cm
}
func (m *ChannelManagerImpl) Submit(info *datapb.ChannelWatchInfo) error {
channel := info.GetVchan().GetChannelName()
// skip enqueue the same operation resubmmited by datacoord
if runner, ok := m.opRunners.Get(channel); ok {
if _, exists := runner.Exist(info.GetOpID()); exists {
log.Warn("op already exist, skip", zap.Int64("opID", info.GetOpID()), zap.String("channel", channel))
return nil
}
}
if info.GetState() == datapb.ChannelWatchState_ToWatch &&
m.fgManager.HasFlowgraphWithOpID(channel, info.GetOpID()) {
log.Warn("Watch op already finished, skip", zap.Int64("opID", info.GetOpID()), zap.String("channel", channel))
return nil
}
if info.GetState() == datapb.ChannelWatchState_ToRelease &&
!m.fgManager.HasFlowgraph(channel) {
log.Warn("Release op already finished, skip", zap.Int64("opID", info.GetOpID()), zap.String("channel", channel))
return nil
}
runner := m.getOrCreateRunner(channel)
return runner.Enqueue(info)
}
func (m *ChannelManagerImpl) GetProgress(info *datapb.ChannelWatchInfo) *datapb.ChannelOperationProgressResponse {
m.mu.RLock()
defer m.mu.RUnlock()
resp := &datapb.ChannelOperationProgressResponse{
Status: merr.Success(),
OpID: info.GetOpID(),
}
channel := info.GetVchan().GetChannelName()
switch info.GetState() {
case datapb.ChannelWatchState_ToWatch:
// running flowgraph means watch success
if m.fgManager.HasFlowgraphWithOpID(channel, info.GetOpID()) {
resp.State = datapb.ChannelWatchState_WatchSuccess
resp.Progress = 100
return resp
}
if runner, ok := m.opRunners.Get(channel); ok {
if progress, exists := runner.Exist(info.GetOpID()); exists {
resp.Progress = progress
resp.State = datapb.ChannelWatchState_ToWatch
} else {
resp.State = datapb.ChannelWatchState_WatchFailure
}
return resp
}
resp.State = datapb.ChannelWatchState_WatchFailure
return resp
case datapb.ChannelWatchState_ToRelease:
if !m.fgManager.HasFlowgraph(channel) {
resp.State = datapb.ChannelWatchState_ReleaseSuccess
return resp
}
runner, ok := m.opRunners.Get(channel)
if ok {
_, exists := runner.Exist(info.GetOpID())
if exists {
resp.State = datapb.ChannelWatchState_ToRelease
return resp
}
}
resp.State = datapb.ChannelWatchState_ReleaseFailure
return resp
default:
err := merr.WrapErrParameterInvalid("ToWatch or ToRelease", info.GetState().String())
log.Warn("fail to get progress", zap.Error(err))
resp.Status = merr.Status(err)
return resp
}
}
func (m *ChannelManagerImpl) Close() {
if m.opRunners != nil {
m.opRunners.Range(func(channel string, runner *opRunner) bool {
runner.Close()
return true
})
}
m.closeCh.Close()
m.closeWaiter.Wait()
}
func (m *ChannelManagerImpl) Start() {
m.closeWaiter.Add(1)
go func() {
defer m.closeWaiter.Done()
log.Info("DataNode ChannelManager start")
for {
select {
case opState := <-m.communicateCh:
m.handleOpState(opState)
case <-m.closeCh.CloseCh():
log.Info("DataNode ChannelManager exit")
return
}
}
}()
}
func (m *ChannelManagerImpl) handleOpState(opState *opState) {
m.mu.Lock()
defer m.mu.Unlock()
log := log.With(
zap.Int64("opID", opState.opID),
zap.String("channel", opState.channel),
zap.String("State", opState.state.String()),
)
switch opState.state {
case datapb.ChannelWatchState_WatchSuccess:
log.Info("Success to watch")
m.fgManager.AddFlowgraph(opState.fg)
case datapb.ChannelWatchState_WatchFailure:
log.Info("Fail to watch")
case datapb.ChannelWatchState_ReleaseSuccess:
log.Info("Success to release")
case datapb.ChannelWatchState_ReleaseFailure:
log.Info("Fail to release, add channel to abnormal lists")
m.abnormals.Insert(opState.opID, opState.channel)
}
m.finishOp(opState.opID, opState.channel)
}
func (m *ChannelManagerImpl) getOrCreateRunner(channel string) *opRunner {
runner, loaded := m.opRunners.GetOrInsert(channel, NewOpRunner(channel, m.dn, m.releaseFunc, executeWatch, m.communicateCh))
if !loaded {
runner.Start()
}
return runner
}
func (m *ChannelManagerImpl) finishOp(opID int64, channel string) {
if runner, loaded := m.opRunners.GetAndRemove(channel); loaded {
runner.FinishOp(opID)
runner.Close()
}
}
type opInfo struct {
tickler *tickler
}
type opRunner struct {
channel string
dn *DataNode
releaseFunc releaseFunc
watchFunc watchFunc
guard sync.RWMutex
allOps map[UniqueID]*opInfo // opID -> tickler
opsInQueue chan *datapb.ChannelWatchInfo
resultCh chan *opState
closeCh lifetime.SafeChan
closeWg sync.WaitGroup
}
func NewOpRunner(channel string, dn *DataNode, releaseF releaseFunc, watchF watchFunc, resultCh chan *opState) *opRunner {
return &opRunner{
channel: channel,
dn: dn,
releaseFunc: releaseF,
watchFunc: watchF,
opsInQueue: make(chan *datapb.ChannelWatchInfo, 10),
allOps: make(map[UniqueID]*opInfo),
resultCh: resultCh,
closeCh: lifetime.NewSafeChan(),
}
}
func (r *opRunner) Start() {
r.closeWg.Add(1)
go func() {
defer r.closeWg.Done()
for {
select {
case info := <-r.opsInQueue:
r.NotifyState(r.Execute(info))
case <-r.closeCh.CloseCh():
return
}
}
}()
}
func (r *opRunner) FinishOp(opID UniqueID) {
r.guard.Lock()
defer r.guard.Unlock()
delete(r.allOps, opID)
}
func (r *opRunner) Exist(opID UniqueID) (progress int32, exists bool) {
r.guard.RLock()
defer r.guard.RUnlock()
info, ok := r.allOps[opID]
if !ok {
return -1, false
}
if info.tickler == nil {
return 0, true
}
return info.tickler.progress(), true
}
func (r *opRunner) Enqueue(info *datapb.ChannelWatchInfo) error {
if info.GetState() != datapb.ChannelWatchState_ToWatch &&
info.GetState() != datapb.ChannelWatchState_ToRelease {
return errors.New("Invalid channel watch state")
}
r.guard.Lock()
defer r.guard.Unlock()
if _, ok := r.allOps[info.GetOpID()]; !ok {
r.opsInQueue <- info
r.allOps[info.GetOpID()] = &opInfo{}
}
return nil
}
func (r *opRunner) UnfinishedOpSize() int {
r.guard.RLock()
defer r.guard.RUnlock()
return len(r.allOps)
}
// Execute excutes channel operations, channel state is validated during enqueue
func (r *opRunner) Execute(info *datapb.ChannelWatchInfo) *opState {
log.Info("Start to execute channel operation",
zap.String("channel", info.GetVchan().GetChannelName()),
zap.Int64("opID", info.GetOpID()),
zap.String("state", info.GetState().String()),
)
if info.GetState() == datapb.ChannelWatchState_ToWatch {
return r.watchWithTimer(info)
}
// ToRelease state
return r.releaseWithTimer(r.releaseFunc, info.GetVchan().GetChannelName(), info.GetOpID())
}
func (r *opRunner) updateTickler(opID int64, tickler *tickler) bool {
r.guard.Lock()
defer r.guard.Unlock()
opInfo, ok := r.allOps[opID]
if !ok {
return false
}
opInfo.tickler = tickler
return true
}
// watchWithTimer will return WatchFailure after WatchTimeoutInterval
func (r *opRunner) watchWithTimer(info *datapb.ChannelWatchInfo) *opState {
opState := &opState{
channel: info.GetVchan().GetChannelName(),
opID: info.GetOpID(),
}
log := log.With(zap.String("channel", opState.channel), zap.Int64("opID", opState.opID))
tickler := newTickler()
ok := r.updateTickler(info.GetOpID(), tickler)
if !ok {
opState.state = datapb.ChannelWatchState_WatchFailure
return opState
}
var (
successSig = make(chan struct{}, 1)
finishWaiter sync.WaitGroup
)
watchTimeout := paramtable.Get().DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
startTimer := func(finishWg *sync.WaitGroup) {
defer finishWg.Done()
timer := time.NewTimer(watchTimeout)
defer timer.Stop()
log := log.With(zap.Duration("timeout", watchTimeout))
log.Info("Start timer for ToWatch operation")
for {
select {
case <-timer.C:
// watch timeout
tickler.close()
cancel()
log.Info("Stop timer for ToWatch operation timeout")
return
case <-r.closeCh.CloseCh():
// runner closed from outside
tickler.close()
cancel()
log.Info("Suspend ToWatch operation from outside of opRunner")
return
case <-tickler.progressSig:
log.Info("Reset timer for tickler updated", zap.Int32("current progress", tickler.progress()))
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(watchTimeout)
case <-successSig:
// watch success
log.Info("Stop timer for ToWatch operation succeeded")
return
}
}
}
finishWaiter.Add(2)
go startTimer(&finishWaiter)
go func() {
defer finishWaiter.Done()
fg, err := r.watchFunc(ctx, r.dn, info, tickler)
if err != nil {
log.Warn("failed to watch channel", zap.Error(err))
opState.state = datapb.ChannelWatchState_WatchFailure
} else {
opState.state = datapb.ChannelWatchState_WatchSuccess
opState.fg = fg
successSig <- struct{}{}
}
}()
finishWaiter.Wait()
return opState
}
// releaseWithTimer will return ReleaseFailure after WatchTimeoutInterval
func (r *opRunner) releaseWithTimer(releaseFunc releaseFunc, channel string, opID UniqueID) *opState {
opState := &opState{
channel: channel,
opID: opID,
}
var (
successSig = make(chan struct{}, 1)
finishWaiter sync.WaitGroup
)
log := log.With(zap.Int64("opID", opID), zap.String("channel", channel))
startTimer := func(finishWaiter *sync.WaitGroup) {
defer finishWaiter.Done()
releaseTimeout := Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second)
timer := time.NewTimer(releaseTimeout)
defer timer.Stop()
log := log.With(zap.Duration("timeout", releaseTimeout))
log.Info("Start ToRelease timer")
for {
select {
case <-timer.C:
log.Info("Stop timer for ToRelease operation timeout")
opState.state = datapb.ChannelWatchState_ReleaseFailure
return
case <-r.closeCh.CloseCh():
// runner closed from outside
log.Info("Stop timer for opRunner closed")
return
case <-successSig:
log.Info("Stop timer for ToRelease operation succeeded")
opState.state = datapb.ChannelWatchState_ReleaseSuccess
return
}
}
}
finishWaiter.Add(1)
go startTimer(&finishWaiter)
go func() {
// TODO: failure should panic this DN, but we're not sure how
// to recover when releaseFunc stuck.
// Whenever we see a stuck, it's a bug need to be fixed.
// In case of the unknown behavior after the stuck of release,
// we'll mark this channel abnormal in this DN. This goroutine might never return.
//
// The channel can still be balanced into other DNs, but not on this one.
// ExclusiveConsumer error happens when the same DN subscribes the same pchannel twice.
releaseFunc(opState.channel)
successSig <- struct{}{}
}()
finishWaiter.Wait()
return opState
}
func (r *opRunner) NotifyState(state *opState) {
r.resultCh <- state
}
func (r *opRunner) Close() {
r.closeCh.Close()
r.closeWg.Wait()
}
type opState struct {
channel string
opID int64
state datapb.ChannelWatchState
fg *dataSyncService
}
// executeWatch will always return, won't be stuck, either success or fail.
func executeWatch(ctx context.Context, dn *DataNode, info *datapb.ChannelWatchInfo, tickler *tickler) (*dataSyncService, error) {
dataSyncService, err := newDataSyncService(ctx, dn, info, tickler)
if err != nil {
return nil, err
}
dataSyncService.start()
return dataSyncService, nil
}
// tickler counts every time when called inc(),
type tickler struct {
count *atomic.Int32
total *atomic.Int32
closedSig *atomic.Bool
progressSig chan struct{}
}
func (t *tickler) inc() {
t.count.Inc()
t.progressSig <- struct{}{}
}
func (t *tickler) setTotal(total int32) {
t.total.Store(total)
}
// progress returns the count over total if total is set
// else just return the count number.
func (t *tickler) progress() int32 {
if t.total.Load() == 0 {
return t.count.Load()
}
return t.count.Load() * 100 / t.total.Load()
}
func (t *tickler) close() {
t.closedSig.CompareAndSwap(false, true)
}
func (t *tickler) closed() bool {
return t.closedSig.Load()
}
func newTickler() *tickler {
return &tickler{
count: atomic.NewInt32(0),
total: atomic.NewInt32(0),
closedSig: atomic.NewBool(false),
progressSig: make(chan struct{}, 200),
}
}