mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Fix watcher loop quit and channel shouldDrop logic (#23401)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
16b133749d
commit
d0e8711381
@ -91,3 +91,8 @@ const (
|
||||
func IsSystemField(fieldID int64) bool {
|
||||
return fieldID < StartOfUserFieldID
|
||||
}
|
||||
|
||||
const (
|
||||
// LatestVerision is the magic number for watch latest revision
|
||||
LatestRevision = int64(-1)
|
||||
)
|
||||
|
||||
@ -59,6 +59,11 @@ func (c *channelStateTimer) getWatchers(prefix string) (clientv3.WatchChan, chan
|
||||
return c.etcdWatcher, c.timeoutWatcher
|
||||
}
|
||||
|
||||
func (c *channelStateTimer) getWatchersWithRevision(prefix string, revision int64) (clientv3.WatchChan, chan *ackEvent) {
|
||||
c.etcdWatcher = c.watchkv.WatchWithRevision(prefix, revision)
|
||||
return c.etcdWatcher, c.timeoutWatcher
|
||||
}
|
||||
|
||||
func (c *channelStateTimer) loadAllChannels(nodeID UniqueID) ([]*datapb.ChannelWatchInfo, error) {
|
||||
prefix := path.Join(Params.DataCoordCfg.ChannelWatchSubPath, strconv.FormatInt(nodeID, 10))
|
||||
|
||||
@ -113,7 +118,7 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe
|
||||
case <-ticker.C:
|
||||
// check tickle at path as :tickle/[prefix]/{channel_name}
|
||||
c.removeTimers([]string{channelName})
|
||||
log.Info("timeout and stop timer: wait for channel ACK timeout",
|
||||
log.Warn("timeout and stop timer: wait for channel ACK timeout",
|
||||
zap.String("watch state", watchState.String()),
|
||||
zap.Int64("nodeID", nodeID),
|
||||
zap.String("channel name", channelName),
|
||||
|
||||
@ -24,6 +24,7 @@ import (
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/mq/msgstream"
|
||||
@ -164,7 +165,8 @@ func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error {
|
||||
checkerContext, cancel := context.WithCancel(ctx)
|
||||
c.stopChecker = cancel
|
||||
if c.stateChecker != nil {
|
||||
go c.stateChecker(checkerContext)
|
||||
// TODO get revision from reload logic
|
||||
go c.stateChecker(checkerContext, common.LatestRevision)
|
||||
log.Info("starting etcd states checker")
|
||||
}
|
||||
|
||||
@ -651,15 +653,20 @@ func (c *ChannelManager) processAck(e *ackEvent) {
|
||||
}
|
||||
}
|
||||
|
||||
type channelStateChecker func(context.Context)
|
||||
type channelStateChecker func(context.Context, int64)
|
||||
|
||||
func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
|
||||
func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context, revision int64) {
|
||||
defer logutil.LogPanic()
|
||||
|
||||
// REF MEP#7 watchInfo paths are orgnized as: [prefix]/channel/{node_id}/{channel_name}
|
||||
watchPrefix := Params.DataCoordCfg.ChannelWatchSubPath
|
||||
// TODO, this is risky, we'd better watch etcd with revision rather simply a path
|
||||
etcdWatcher, timeoutWatcher := c.stateTimer.getWatchers(watchPrefix)
|
||||
var etcdWatcher clientv3.WatchChan
|
||||
var timeoutWatcher chan *ackEvent
|
||||
if revision == common.LatestRevision {
|
||||
etcdWatcher, timeoutWatcher = c.stateTimer.getWatchers(watchPrefix)
|
||||
} else {
|
||||
etcdWatcher, timeoutWatcher = c.stateTimer.getWatchersWithRevision(watchPrefix, revision)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
@ -674,14 +681,17 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
|
||||
case event, ok := <-etcdWatcher:
|
||||
if !ok {
|
||||
log.Warn("datacoord failed to watch channel, return")
|
||||
// rewatch for transient network error, session handles process quiting if connect is not recoverable
|
||||
go c.watchChannelStatesLoop(ctx, revision)
|
||||
return
|
||||
}
|
||||
|
||||
if err := event.Err(); err != nil {
|
||||
log.Warn("datacoord watch channel hit error", zap.Error(event.Err()))
|
||||
// https://github.com/etcd-io/etcd/issues/8980
|
||||
// TODO add list and wathc with revision
|
||||
if event.Err() == v3rpc.ErrCompacted {
|
||||
go c.watchChannelStatesLoop(ctx)
|
||||
go c.watchChannelStatesLoop(ctx, event.CompactRevision)
|
||||
return
|
||||
}
|
||||
// if watch loop return due to event canceled, the datacoord is not functional anymore
|
||||
@ -689,6 +699,7 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
revision = event.Header.GetRevision() + 1
|
||||
for _, evt := range event.Events {
|
||||
if evt.Type == clientv3.EventTypeDelete {
|
||||
continue
|
||||
|
||||
@ -26,6 +26,7 @@ import (
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
@ -114,7 +115,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
chManager.watchChannelStatesLoop(ctx)
|
||||
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
@ -144,7 +145,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
chManager.watchChannelStatesLoop(ctx)
|
||||
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
@ -175,7 +176,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
chManager.watchChannelStatesLoop(ctx)
|
||||
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
@ -213,7 +214,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
chManager.watchChannelStatesLoop(ctx)
|
||||
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
@ -256,7 +257,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
chManager.watchChannelStatesLoop(ctx)
|
||||
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
@ -302,7 +303,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
chManager.watchChannelStatesLoop(ctx)
|
||||
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
@ -348,7 +349,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
chManager.watchChannelStatesLoop(ctx)
|
||||
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
@ -927,7 +928,7 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
chManager.stopChecker = cancel
|
||||
defer cancel()
|
||||
go chManager.stateChecker(ctx)
|
||||
go chManager.stateChecker(ctx, common.LatestRevision)
|
||||
|
||||
chManager.store = &ChannelStore{
|
||||
store: metakv,
|
||||
|
||||
@ -331,10 +331,10 @@ func (c *ChannelStore) GetNodeChannelCount(nodeID int64) int {
|
||||
func (c *ChannelStore) Delete(nodeID int64) ([]*channel, error) {
|
||||
for id, info := range c.channelsInfo {
|
||||
if id == nodeID {
|
||||
delete(c.channelsInfo, id)
|
||||
if err := c.remove(nodeID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
delete(c.channelsInfo, id)
|
||||
return info.Channels, nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -334,22 +334,8 @@ func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID
|
||||
|
||||
// CheckShouldDropChannel returns whether specified channel is marked to be removed
|
||||
func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
|
||||
/*
|
||||
segments := h.s.meta.GetSegmentsByChannel(channel)
|
||||
for _, segment := range segments {
|
||||
if segment.GetStartPosition() != nil && // filter empty segment
|
||||
// FIXME: we filter compaction generated segments
|
||||
// because datanode may not know the segment due to the network lag or
|
||||
// datacoord crash when handling CompleteCompaction.
|
||||
// FIXME: cancel this limitation for #12265
|
||||
// need to change a unified DropAndFlush to solve the root problem
|
||||
//len(segment.CompactionFrom) == 0 &&
|
||||
segment.GetState() != commonpb.SegmentState_Dropped {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return false*/
|
||||
return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel)
|
||||
return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel) ||
|
||||
!h.s.meta.catalog.ChannelExists(h.s.ctx, channel)
|
||||
}
|
||||
|
||||
// FinishDropChannel cleans up the remove flag for channels
|
||||
|
||||
@ -2596,7 +2596,7 @@ func TestShouldDropChannel(t *testing.T) {
|
||||
})
|
||||
*/
|
||||
t.Run("channel name not in kv", func(t *testing.T) {
|
||||
assert.False(t, svr.handler.CheckShouldDropChannel("ch99"))
|
||||
assert.True(t, svr.handler.CheckShouldDropChannel("ch99"))
|
||||
})
|
||||
|
||||
t.Run("channel in remove flag", func(t *testing.T) {
|
||||
@ -2605,10 +2605,6 @@ func TestShouldDropChannel(t *testing.T) {
|
||||
|
||||
assert.True(t, svr.handler.CheckShouldDropChannel("ch1"))
|
||||
})
|
||||
|
||||
t.Run("channel name not matched", func(t *testing.T) {
|
||||
assert.False(t, svr.handler.CheckShouldDropChannel("ch2"))
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetRecoveryInfo(t *testing.T) {
|
||||
|
||||
@ -279,6 +279,7 @@ func (node *DataNode) StartWatchChannels(ctx context.Context) {
|
||||
case event, ok := <-evtChan:
|
||||
if !ok {
|
||||
log.Warn("datanode failed to watch channel, return")
|
||||
go node.StartWatchChannels(ctx)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@ -122,7 +122,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
|
||||
}
|
||||
|
||||
if load := ddn.dropMode.Load(); load != nil && load.(bool) {
|
||||
log.Debug("ddNode in dropMode",
|
||||
log.Info("ddNode in dropMode",
|
||||
zap.String("vChannelName", ddn.vChannelName),
|
||||
zap.Int64("collection ID", ddn.collectionID))
|
||||
return []Msg{}
|
||||
|
||||
@ -1236,7 +1236,7 @@ func (i *IndexCoord) watchFlushedSegmentLoop() {
|
||||
log.Info("IndexCoord start watching flushed segments...")
|
||||
defer i.loopWg.Done()
|
||||
|
||||
watchChan := i.etcdKV.WatchWithRevision(util.FlushedSegmentPrefix, i.flushedSegmentWatcher.etcdRevision+1)
|
||||
watchChan := i.etcdKV.WatchWithRevision(util.FlushedSegmentPrefix, i.flushedSegmentWatcher.etcdRevision)
|
||||
for {
|
||||
select {
|
||||
case <-i.loopCtx.Done():
|
||||
@ -1244,7 +1244,8 @@ func (i *IndexCoord) watchFlushedSegmentLoop() {
|
||||
return
|
||||
case resp, ok := <-watchChan:
|
||||
if !ok {
|
||||
log.Warn("IndexCoord watch flush segments loop failed because watch channel closed")
|
||||
log.Warn("IndexCoord watch flush segments loop failed because watch channel closed, retry...")
|
||||
go i.watchFlushedSegmentLoop()
|
||||
return
|
||||
}
|
||||
if err := resp.Err(); err != nil {
|
||||
@ -1264,6 +1265,7 @@ func (i *IndexCoord) watchFlushedSegmentLoop() {
|
||||
zap.String("prefix", util.FlushedSegmentPrefix), zap.Error(err))
|
||||
panic("failed to handle etcd request, exit..")
|
||||
}
|
||||
i.flushedSegmentWatcher.etcdRevision = resp.Header.GetRevision() + 1
|
||||
events := resp.Events
|
||||
for _, event := range events {
|
||||
switch event.Type {
|
||||
|
||||
@ -43,6 +43,7 @@ func TestShardClusterService_SyncReplicaSegments(t *testing.T) {
|
||||
defer client.Close()
|
||||
session := sessionutil.NewSession(context.Background(), "/by-dev/sessions/unittest/querynode/", client)
|
||||
clusterService := newShardClusterService(client, session, qn)
|
||||
defer clusterService.close()
|
||||
|
||||
t.Run("sync non-exist shard cluster", func(t *testing.T) {
|
||||
err := clusterService.SyncReplicaSegments(defaultDMLChannel, nil)
|
||||
|
||||
@ -24,6 +24,7 @@ import (
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
@ -110,10 +111,11 @@ func (nd *etcdShardNodeDetector) watchNodes(collectionID int64, replicaID int64,
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go nd.cancelClose(cancel)
|
||||
watchCh := nd.client.Watch(ctx, nd.path, clientv3.WithRev(resp.Header.Revision+1), clientv3.WithPrefix(), clientv3.WithPrevKV())
|
||||
revision := resp.Header.GetRevision() + 1
|
||||
watchCh := nd.client.Watch(ctx, nd.path, clientv3.WithRev(revision), clientv3.WithPrefix(), clientv3.WithPrevKV())
|
||||
|
||||
nd.wg.Add(1)
|
||||
go nd.watch(watchCh, collectionID, replicaID)
|
||||
go nd.watch(watchCh, collectionID, replicaID, revision)
|
||||
|
||||
return nodes, nd.evtCh
|
||||
}
|
||||
@ -123,7 +125,7 @@ func (nd *etcdShardNodeDetector) cancelClose(cancel func()) {
|
||||
cancel()
|
||||
}
|
||||
|
||||
func (nd *etcdShardNodeDetector) watch(ch clientv3.WatchChan, collectionID, replicaID int64) {
|
||||
func (nd *etcdShardNodeDetector) watch(ch clientv3.WatchChan, collectionID, replicaID, revision int64) {
|
||||
defer nd.wg.Done()
|
||||
for {
|
||||
select {
|
||||
@ -132,33 +134,91 @@ func (nd *etcdShardNodeDetector) watch(ch clientv3.WatchChan, collectionID, repl
|
||||
return
|
||||
case evt, ok := <-ch:
|
||||
if !ok {
|
||||
log.Warn("event ch closed")
|
||||
log.Warn("watch channel closed, retry...")
|
||||
var watchCh clientv3.WatchChan
|
||||
var ok bool
|
||||
watchCh, ok, revision = nd.rewatch(collectionID, replicaID, revision)
|
||||
if !ok {
|
||||
// detector closed
|
||||
return
|
||||
}
|
||||
nd.wg.Add(1)
|
||||
go nd.watch(watchCh, collectionID, replicaID, revision)
|
||||
return
|
||||
}
|
||||
if err := evt.Err(); err != nil {
|
||||
if err == v3rpc.ErrCompacted {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
watchCh := nd.client.Watch(ctx, nd.path, clientv3.WithPrefix())
|
||||
go nd.cancelClose(cancel)
|
||||
watchCh, ok, revision := nd.rewatch(collectionID, replicaID, evt.CompactRevision)
|
||||
if !ok {
|
||||
// detector closed
|
||||
return
|
||||
}
|
||||
nd.wg.Add(1)
|
||||
go nd.watch(watchCh, collectionID, replicaID)
|
||||
go nd.watch(watchCh, collectionID, replicaID, revision)
|
||||
return
|
||||
}
|
||||
log.Error("failed to handle watch node error", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
for _, e := range evt.Events {
|
||||
switch e.Type {
|
||||
case mvccpb.PUT:
|
||||
nd.handlePutEvent(e, collectionID, replicaID)
|
||||
case mvccpb.DELETE:
|
||||
nd.handleDelEvent(e, collectionID, replicaID)
|
||||
}
|
||||
}
|
||||
revision = evt.Header.GetRevision() + 1
|
||||
nd.handleEvt(evt, collectionID, replicaID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (nd *etcdShardNodeDetector) handleEvt(evt clientv3.WatchResponse, collectionID, replicaID int64) {
|
||||
for _, e := range evt.Events {
|
||||
switch e.Type {
|
||||
case mvccpb.PUT:
|
||||
nd.handlePutEvent(e, collectionID, replicaID)
|
||||
case mvccpb.DELETE:
|
||||
nd.handleDelEvent(e, collectionID, replicaID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (nd *etcdShardNodeDetector) rewatch(collectionID, replicaID, rev int64) (ch clientv3.WatchChan, ok bool, revision int64) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
revision = rev
|
||||
err := retry.Do(ctx, func() error {
|
||||
ch = nd.client.Watch(ctx, nd.path, clientv3.WithPrefix(), clientv3.WithRev(revision))
|
||||
select {
|
||||
case <-nd.closeCh:
|
||||
return retry.Unrecoverable(errors.New("detector closed"))
|
||||
|
||||
case evt, ok := <-ch:
|
||||
if !ok {
|
||||
return errors.New("rewatch got closed ch")
|
||||
}
|
||||
if err := evt.Err(); err != nil {
|
||||
if err == v3rpc.ErrCompacted {
|
||||
revision = evt.CompactRevision
|
||||
return err
|
||||
}
|
||||
log.Error("failed to handle watch node error", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
revision = evt.Header.GetRevision() + 1
|
||||
nd.handleEvt(evt, collectionID, replicaID)
|
||||
default:
|
||||
// blocked, fine
|
||||
}
|
||||
return nil
|
||||
})
|
||||
// check detector closed
|
||||
if err != nil {
|
||||
select {
|
||||
case <-nd.closeCh:
|
||||
return nil, false, revision
|
||||
default:
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
return ch, true, revision
|
||||
}
|
||||
|
||||
func (nd *etcdShardNodeDetector) handlePutEvent(e *clientv3.Event, collectionID, replicaID int64) {
|
||||
var err error
|
||||
var info, prevInfo *querypb.Replica
|
||||
|
||||
@ -18,11 +18,13 @@ package querynode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
@ -103,14 +105,16 @@ func (sd *etcdShardSegmentDetector) watchSegments(collectionID int64, replicaID
|
||||
}
|
||||
}
|
||||
|
||||
revision := resp.Header.GetRevision() + 1
|
||||
sd.wg.Add(1)
|
||||
watchCh := sd.client.Watch(sd.getCtx(), sd.path, clientv3.WithRev(resp.Header.GetRevision()+1), clientv3.WithPrefix(), clientv3.WithPrevKV())
|
||||
go sd.watch(watchCh, collectionID, replicaID, vchannelName)
|
||||
watchCh := sd.client.Watch(sd.getCtx(), sd.path, clientv3.WithRev(revision), clientv3.WithPrefix(), clientv3.WithPrevKV())
|
||||
|
||||
go sd.watch(watchCh, collectionID, replicaID, vchannelName, revision)
|
||||
|
||||
return events, sd.evtCh
|
||||
}
|
||||
|
||||
func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID int64, replicaID int64, vchannel string) {
|
||||
func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID int64, replicaID int64, vchannel string, revision int64) {
|
||||
defer sd.wg.Done()
|
||||
for {
|
||||
select {
|
||||
@ -119,31 +123,86 @@ func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID in
|
||||
return
|
||||
case evt, ok := <-ch:
|
||||
if !ok {
|
||||
log.Warn("SegmentDetector event channel closed")
|
||||
log.Warn("SegmentDetector event channel closed, retry...")
|
||||
watchCh, ok := sd.rewatch(collectionID, replicaID, vchannel, revision)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
sd.wg.Add(1)
|
||||
go sd.watch(watchCh, collectionID, replicaID, vchannel, revision)
|
||||
return
|
||||
}
|
||||
if err := evt.Err(); err != nil {
|
||||
if err == v3rpc.ErrCompacted {
|
||||
watchCh, ok := sd.rewatch(collectionID, replicaID, vchannel, evt.CompactRevision)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
sd.wg.Add(1)
|
||||
watchCh := sd.client.Watch(sd.getCtx(), sd.path, clientv3.WithPrefix())
|
||||
go sd.watch(watchCh, collectionID, replicaID, vchannel)
|
||||
go sd.watch(watchCh, collectionID, replicaID, vchannel, revision)
|
||||
return
|
||||
}
|
||||
log.Error("failed to handle watch segment error, panic", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
for _, e := range evt.Events {
|
||||
switch e.Type {
|
||||
case mvccpb.PUT:
|
||||
sd.handlePutEvent(e, collectionID, replicaID, vchannel)
|
||||
case mvccpb.DELETE:
|
||||
sd.handleDelEvent(e, collectionID, replicaID, vchannel)
|
||||
}
|
||||
}
|
||||
revision = evt.Header.GetRevision() + 1
|
||||
sd.handleEvt(evt, collectionID, replicaID, vchannel)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sd *etcdShardSegmentDetector) rewatch(collectionID int64, replicaID int64, vchannel string, revision int64) (ch clientv3.WatchChan, ok bool) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
err := retry.Do(ctx, func() error {
|
||||
ch = sd.client.Watch(ctx, sd.path, clientv3.WithPrefix(), clientv3.WithRev(revision))
|
||||
select {
|
||||
case <-sd.closeCh:
|
||||
return retry.Unrecoverable(errors.New("detector closed"))
|
||||
case evt, ok := <-ch:
|
||||
if !ok {
|
||||
return errors.New("rewatch got closed ch")
|
||||
}
|
||||
if err := evt.Err(); err != nil {
|
||||
if err == v3rpc.ErrCompacted {
|
||||
revision = evt.CompactRevision
|
||||
return err
|
||||
}
|
||||
log.Error("failed to handle watch segment error", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
revision = evt.Header.GetRevision() + 1
|
||||
sd.handleEvt(evt, collectionID, replicaID, vchannel)
|
||||
default:
|
||||
// blocked, fine
|
||||
}
|
||||
return nil
|
||||
})
|
||||
// check detector closed
|
||||
if err != nil {
|
||||
select {
|
||||
case <-sd.closeCh:
|
||||
return nil, false
|
||||
default:
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
return ch, true
|
||||
}
|
||||
|
||||
func (sd *etcdShardSegmentDetector) handleEvt(evt clientv3.WatchResponse, collectionID int64, replicaID int64, vchannel string) {
|
||||
for _, e := range evt.Events {
|
||||
switch e.Type {
|
||||
case mvccpb.PUT:
|
||||
sd.handlePutEvent(e, collectionID, replicaID, vchannel)
|
||||
case mvccpb.DELETE:
|
||||
sd.handleDelEvent(e, collectionID, replicaID, vchannel)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (sd *etcdShardSegmentDetector) handlePutEvent(e *clientv3.Event, collectionID int64, replicaID int64, vchannel string) {
|
||||
info, err := sd.parseSegmentInfo(e.Kv.Value)
|
||||
if err != nil {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user