From ba84f521196e4936f8c02b7674f5118cda50128b Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 14 Apr 2023 09:54:28 +0800 Subject: [PATCH] Fix watcher loop quit and channel shouldDrop logic (#23402) Signed-off-by: Congqi Xia --- internal/datacoord/channel_checker.go | 7 ++++++- internal/datacoord/channel_manager.go | 22 +++++++++++++++++----- internal/datacoord/channel_manager_test.go | 17 +++++++++-------- internal/datacoord/channel_store.go | 2 +- internal/datacoord/handler.go | 18 ++---------------- internal/datacoord/server_test.go | 6 +----- internal/datanode/data_node.go | 1 + internal/datanode/flow_graph_dd_node.go | 2 +- pkg/common/common.go | 5 +++++ 9 files changed, 43 insertions(+), 37 deletions(-) diff --git a/internal/datacoord/channel_checker.go b/internal/datacoord/channel_checker.go index 79622e6d57..25eeb68fa7 100644 --- a/internal/datacoord/channel_checker.go +++ b/internal/datacoord/channel_checker.go @@ -59,6 +59,11 @@ func (c *channelStateTimer) getWatchers(prefix string) (clientv3.WatchChan, chan return c.etcdWatcher, c.timeoutWatcher } +func (c *channelStateTimer) getWatchersWithRevision(prefix string, revision int64) (clientv3.WatchChan, chan *ackEvent) { + c.etcdWatcher = c.watchkv.WatchWithRevision(prefix, revision) + return c.etcdWatcher, c.timeoutWatcher +} + func (c *channelStateTimer) loadAllChannels(nodeID UniqueID) ([]*datapb.ChannelWatchInfo, error) { prefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), strconv.FormatInt(nodeID, 10)) @@ -113,7 +118,7 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe case <-ticker.C: // check tickle at path as :tickle/[prefix]/{channel_name} c.removeTimers([]string{channelName}) - log.Info("timeout and stop timer: wait for channel ACK timeout", + log.Warn("timeout and stop timer: wait for channel ACK timeout", zap.String("watch state", watchState.String()), zap.Int64("nodeID", nodeID), zap.String("channel name", channelName), diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 40b7335321..a0e539117e 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -164,7 +165,8 @@ func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error { checkerContext, cancel := context.WithCancel(ctx) c.stopChecker = cancel if c.stateChecker != nil { - go c.stateChecker(checkerContext) + // TODO get revision from reload logic + go c.stateChecker(checkerContext, common.LatestRevision) log.Info("starting etcd states checker") } @@ -651,15 +653,21 @@ func (c *ChannelManager) processAck(e *ackEvent) { } } -type channelStateChecker func(context.Context) +type channelStateChecker func(context.Context, int64) -func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) { +func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context, revision int64) { defer logutil.LogPanic() // REF MEP#7 watchInfo paths are orgnized as: [prefix]/channel/{node_id}/{channel_name} watchPrefix := Params.CommonCfg.DataCoordWatchSubPath.GetValue() // TODO, this is risky, we'd better watch etcd with revision rather simply a path - etcdWatcher, timeoutWatcher := c.stateTimer.getWatchers(watchPrefix) + var etcdWatcher clientv3.WatchChan + var timeoutWatcher chan *ackEvent + if revision == common.LatestRevision { + etcdWatcher, timeoutWatcher = c.stateTimer.getWatchers(watchPrefix) + } else { + etcdWatcher, timeoutWatcher = c.stateTimer.getWatchersWithRevision(watchPrefix, revision) + } for { select { @@ -674,14 +682,17 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) { case event, ok := <-etcdWatcher: if !ok { log.Warn("datacoord failed to watch channel, return") + // rewatch for transient network error, session handles process quiting if connect is not recoverable + go c.watchChannelStatesLoop(ctx, revision) return } if err := event.Err(); err != nil { log.Warn("datacoord watch channel hit error", zap.Error(event.Err())) // https://github.com/etcd-io/etcd/issues/8980 + // TODO add list and wathc with revision if event.Err() == v3rpc.ErrCompacted { - go c.watchChannelStatesLoop(ctx) + go c.watchChannelStatesLoop(ctx, event.CompactRevision) return } // if watch loop return due to event canceled, the datacoord is not functional anymore @@ -689,6 +700,7 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) { return } + revision = event.Header.GetRevision() + 1 for _, evt := range event.Events { if evt.Type == clientv3.EventTypeDelete { continue diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 6c0ce68be8..b5804322c5 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/pkg/common" ) // waitAndStore simulates DataNode's action @@ -114,7 +115,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - chManager.watchChannelStatesLoop(ctx) + chManager.watchChannelStatesLoop(ctx, common.LatestRevision) wg.Done() }() @@ -144,7 +145,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - chManager.watchChannelStatesLoop(ctx) + chManager.watchChannelStatesLoop(ctx, common.LatestRevision) wg.Done() }() @@ -175,7 +176,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - chManager.watchChannelStatesLoop(ctx) + chManager.watchChannelStatesLoop(ctx, common.LatestRevision) wg.Done() }() @@ -213,7 +214,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - chManager.watchChannelStatesLoop(ctx) + chManager.watchChannelStatesLoop(ctx, common.LatestRevision) wg.Done() }() @@ -256,7 +257,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - chManager.watchChannelStatesLoop(ctx) + chManager.watchChannelStatesLoop(ctx, common.LatestRevision) wg.Done() }() @@ -302,7 +303,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - chManager.watchChannelStatesLoop(ctx) + chManager.watchChannelStatesLoop(ctx, common.LatestRevision) wg.Done() }() @@ -348,7 +349,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - chManager.watchChannelStatesLoop(ctx) + chManager.watchChannelStatesLoop(ctx, common.LatestRevision) wg.Done() }() @@ -967,7 +968,7 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) chManager.stopChecker = cancel defer cancel() - go chManager.stateChecker(ctx) + go chManager.stateChecker(ctx, common.LatestRevision) chManager.store = &ChannelStore{ store: metakv, diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index 24242b5305..a0774368f7 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -331,10 +331,10 @@ func (c *ChannelStore) GetNodeChannelCount(nodeID int64) int { func (c *ChannelStore) Delete(nodeID int64) ([]*channel, error) { for id, info := range c.channelsInfo { if id == nodeID { - delete(c.channelsInfo, id) if err := c.remove(nodeID); err != nil { return nil, err } + delete(c.channelsInfo, id) return info.Channels, nil } } diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 3d1efeb492..7e55e19efa 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -333,22 +333,8 @@ func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID // CheckShouldDropChannel returns whether specified channel is marked to be removed func (h *ServerHandler) CheckShouldDropChannel(channel string) bool { - /* - segments := h.s.meta.GetSegmentsByChannel(channel) - for _, segment := range segments { - if segment.GetStartPosition() != nil && // filter empty segment - // FIXME: we filter compaction generated segments - // because datanode may not know the segment due to the network lag or - // datacoord crash when handling CompleteCompaction. - // FIXME: cancel this limitation for #12265 - // need to change a unified DropAndFlush to solve the root problem - //len(segment.CompactionFrom) == 0 && - segment.GetState() != commonpb.SegmentState_Dropped { - return false - } - } - return false*/ - return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel) + return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel) || + !h.s.meta.catalog.ChannelExists(h.s.ctx, channel) } // FinishDropChannel cleans up the remove flag for channels diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 1c9d328254..3c6f5927b5 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2483,7 +2483,7 @@ func TestShouldDropChannel(t *testing.T) { }) */ t.Run("channel name not in kv", func(t *testing.T) { - assert.False(t, svr.handler.CheckShouldDropChannel("ch99")) + assert.True(t, svr.handler.CheckShouldDropChannel("ch99")) }) t.Run("channel in remove flag", func(t *testing.T) { @@ -2492,10 +2492,6 @@ func TestShouldDropChannel(t *testing.T) { assert.True(t, svr.handler.CheckShouldDropChannel("ch1")) }) - - t.Run("channel name not matched", func(t *testing.T) { - assert.False(t, svr.handler.CheckShouldDropChannel("ch2")) - }) } func TestGetRecoveryInfo(t *testing.T) { diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index ed18d05be5..2bdbbc8760 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -293,6 +293,7 @@ func (node *DataNode) StartWatchChannels(ctx context.Context) { case event, ok := <-evtChan: if !ok { log.Warn("datanode failed to watch channel, return") + go node.StartWatchChannels(ctx) return } diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 8951e49fc8..134ed209ba 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -115,7 +115,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { } if load := ddn.dropMode.Load(); load != nil && load.(bool) { - log.Debug("ddNode in dropMode", + log.Info("ddNode in dropMode", zap.String("vChannelName", ddn.vChannelName), zap.Int64("collection ID", ddn.collectionID)) return []Msg{} diff --git a/pkg/common/common.go b/pkg/common/common.go index faef287a92..9d2f1d2b01 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -101,3 +101,8 @@ const ( func IsSystemField(fieldID int64) bool { return fieldID < StartOfUserFieldID } + +const ( + // LatestVerision is the magic number for watch latest revision + LatestRevision = int64(-1) +)