mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
parent
4efdd0929c
commit
746aeea35b
@ -130,7 +130,7 @@ func (c *channelStateTimer) removeTimers(channels []string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *channelStateTimer) stopIfExsit(e *ackEvent) {
|
func (c *channelStateTimer) stopIfExist(e *ackEvent) {
|
||||||
stop, ok := c.runningTimers.LoadAndDelete(e.channelName)
|
stop, ok := c.runningTimers.LoadAndDelete(e.channelName)
|
||||||
if ok && e.ackType != watchTimeoutAck && e.ackType != releaseTimeoutAck {
|
if ok && e.ackType != watchTimeoutAck && e.ackType != releaseTimeoutAck {
|
||||||
close(stop.(chan struct{}))
|
close(stop.(chan struct{}))
|
||||||
|
|||||||
@ -113,7 +113,7 @@ func TestChannelStateTimer(t *testing.T) {
|
|||||||
assert.Equal(t, watchTimeoutAck, e.ackType)
|
assert.Equal(t, watchTimeoutAck, e.ackType)
|
||||||
assert.Equal(t, test.channelName, e.channelName)
|
assert.Equal(t, test.channelName, e.channelName)
|
||||||
} else {
|
} else {
|
||||||
timer.stopIfExsit(&ackEvent{watchSuccessAck, test.channelName, 1})
|
timer.stopIfExist(&ackEvent{watchSuccessAck, test.channelName, 1})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -601,7 +601,7 @@ func (c *ChannelManager) updateWithTimer(updates ChannelOpSet, state datapb.Chan
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *ChannelManager) processAck(e *ackEvent) {
|
func (c *ChannelManager) processAck(e *ackEvent) {
|
||||||
c.stateTimer.stopIfExsit(e)
|
c.stateTimer.stopIfExist(e)
|
||||||
|
|
||||||
switch e.ackType {
|
switch e.ackType {
|
||||||
case invalidAck:
|
case invalidAck:
|
||||||
|
|||||||
@ -174,7 +174,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
|
|||||||
nodeID: nodeID,
|
nodeID: nodeID,
|
||||||
}
|
}
|
||||||
chManager.stateTimer.notifyTimeoutWatcher(e)
|
chManager.stateTimer.notifyTimeoutWatcher(e)
|
||||||
chManager.stateTimer.stopIfExsit(e)
|
chManager.stateTimer.stopIfExist(e)
|
||||||
|
|
||||||
waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1))
|
waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1))
|
||||||
cancel()
|
cancel()
|
||||||
@ -374,7 +374,7 @@ func TestChannelManager(t *testing.T) {
|
|||||||
err = chManager.Watch(&channel{"channel-3", collectionID})
|
err = chManager.Watch(&channel{"channel-3", collectionID})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, "channel-3", collectionID)
|
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, "channel-3", collectionID)
|
||||||
chManager.stateTimer.stopIfExsit(&ackEvent{watchSuccessAck, "channel-3", nodeID})
|
chManager.stateTimer.stopIfExist(&ackEvent{watchSuccessAck, "channel-3", nodeID})
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -424,7 +424,7 @@ func TestChannelManager(t *testing.T) {
|
|||||||
|
|
||||||
err = chManager.Release(nodeID, channelName)
|
err = chManager.Release(nodeID, channelName)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, channelName, nodeID})
|
chManager.stateTimer.stopIfExist(&ackEvent{releaseSuccessAck, channelName, nodeID})
|
||||||
|
|
||||||
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channelName, collectionID)
|
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channelName, collectionID)
|
||||||
})
|
})
|
||||||
@ -459,7 +459,7 @@ func TestChannelManager(t *testing.T) {
|
|||||||
remainTest, reassignTest := tests[0], tests[1]
|
remainTest, reassignTest := tests[0], tests[1]
|
||||||
err = chManager.Reassign(reassignTest.nodeID, reassignTest.chName)
|
err = chManager.Reassign(reassignTest.nodeID, reassignTest.chName)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
|
chManager.stateTimer.stopIfExist(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
|
||||||
|
|
||||||
// test nodes of reassignTest contains no channel
|
// test nodes of reassignTest contains no channel
|
||||||
// test all channels are assgined to node of remainTest
|
// test all channels are assgined to node of remainTest
|
||||||
@ -473,7 +473,7 @@ func TestChannelManager(t *testing.T) {
|
|||||||
|
|
||||||
err = chManager.Reassign(remainTest.nodeID, remainTest.chName)
|
err = chManager.Reassign(remainTest.nodeID, remainTest.chName)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
|
chManager.stateTimer.stopIfExist(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
|
||||||
|
|
||||||
// channel is added to remainTest because there's only one node left
|
// channel is added to remainTest because there's only one node left
|
||||||
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID)
|
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID)
|
||||||
@ -539,7 +539,7 @@ func TestChannelManager(t *testing.T) {
|
|||||||
remainTest, reassignTest := tests[0], tests[1]
|
remainTest, reassignTest := tests[0], tests[1]
|
||||||
err = chManager.CleanupAndReassign(reassignTest.nodeID, reassignTest.chName)
|
err = chManager.CleanupAndReassign(reassignTest.nodeID, reassignTest.chName)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
|
chManager.stateTimer.stopIfExist(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
|
||||||
|
|
||||||
// test nodes of reassignTest contains no channel
|
// test nodes of reassignTest contains no channel
|
||||||
assert.False(t, chManager.Match(reassignTest.nodeID, reassignTest.chName))
|
assert.False(t, chManager.Match(reassignTest.nodeID, reassignTest.chName))
|
||||||
@ -554,7 +554,7 @@ func TestChannelManager(t *testing.T) {
|
|||||||
|
|
||||||
err = chManager.CleanupAndReassign(remainTest.nodeID, remainTest.chName)
|
err = chManager.CleanupAndReassign(remainTest.nodeID, remainTest.chName)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
|
chManager.stateTimer.stopIfExist(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
|
||||||
|
|
||||||
// channel is added to remainTest because there's only one node left
|
// channel is added to remainTest because there's only one node left
|
||||||
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID)
|
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID)
|
||||||
@ -640,7 +640,7 @@ func TestChannelManager(t *testing.T) {
|
|||||||
opSet := getReleaseOp(nodeID, &channel{channelName, collectionID})
|
opSet := getReleaseOp(nodeID, &channel{channelName, collectionID})
|
||||||
|
|
||||||
chManager.updateWithTimer(opSet, datapb.ChannelWatchState_ToWatch)
|
chManager.updateWithTimer(opSet, datapb.ChannelWatchState_ToWatch)
|
||||||
chManager.stateTimer.stopIfExsit(&ackEvent{watchSuccessAck, channelName, nodeID})
|
chManager.stateTimer.stopIfExist(&ackEvent{watchSuccessAck, channelName, nodeID})
|
||||||
|
|
||||||
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, channelName, collectionID)
|
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, channelName, collectionID)
|
||||||
})
|
})
|
||||||
@ -749,7 +749,7 @@ func TestChannelManager_Reload(t *testing.T) {
|
|||||||
assert.Empty(t, v)
|
assert.Empty(t, v)
|
||||||
|
|
||||||
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, 111, channelName, collectionID)
|
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, 111, channelName, collectionID)
|
||||||
chManager.stateTimer.stopIfExsit(&ackEvent{watchSuccessAck, channelName, nodeID})
|
chManager.stateTimer.stopIfExist(&ackEvent{watchSuccessAck, channelName, nodeID})
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("ReleaseFail", func(t *testing.T) {
|
t.Run("ReleaseFail", func(t *testing.T) {
|
||||||
@ -815,8 +815,8 @@ func TestChannelManager_Reload(t *testing.T) {
|
|||||||
assert.True(t, cm2.Match(3, "channel1"))
|
assert.True(t, cm2.Match(3, "channel1"))
|
||||||
assert.True(t, cm2.Match(3, "channel2"))
|
assert.True(t, cm2.Match(3, "channel2"))
|
||||||
|
|
||||||
cm2.stateTimer.stopIfExsit(&ackEvent{watchSuccessAck, "channel1", 3})
|
cm2.stateTimer.stopIfExist(&ackEvent{watchSuccessAck, "channel1", 3})
|
||||||
cm2.stateTimer.stopIfExsit(&ackEvent{watchSuccessAck, "channel2", 3})
|
cm2.stateTimer.stopIfExist(&ackEvent{watchSuccessAck, "channel2", 3})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -763,7 +763,7 @@ func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.Sess
|
|||||||
zap.String("address", info.Address),
|
zap.String("address", info.Address),
|
||||||
zap.Int64("serverID", info.Version))
|
zap.Int64("serverID", info.Version))
|
||||||
if err := s.cluster.Register(node); err != nil {
|
if err := s.cluster.Register(node); err != nil {
|
||||||
log.Warn("failed to regisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
|
log.Warn("failed to register node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.metricsCacheManager.InvalidateSystemInfoMetrics()
|
s.metricsCacheManager.InvalidateSystemInfoMetrics()
|
||||||
@ -772,7 +772,7 @@ func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.Sess
|
|||||||
zap.String("address", info.Address),
|
zap.String("address", info.Address),
|
||||||
zap.Int64("serverID", info.Version))
|
zap.Int64("serverID", info.Version))
|
||||||
if err := s.cluster.UnRegister(node); err != nil {
|
if err := s.cluster.UnRegister(node); err != nil {
|
||||||
log.Warn("failed to deregisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
|
log.Warn("failed to deregister node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.metricsCacheManager.InvalidateSystemInfoMetrics()
|
s.metricsCacheManager.InvalidateSystemInfoMetrics()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user