mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
issue: #37166 pr: #37433 cause the misuse of timer.Reset, which cause dispatcher failed to send msg to virtual channel buffer, and dispatcher do splitting again and again, which hold the dispatcher manager's lock, block watching channel progress. This PR fix the misuse of timer.Reset Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
bff0113cbb
commit
a9beca44ef
@ -392,6 +392,12 @@ func (r *opRunner) watchWithTimer(info *datapb.ChannelWatchInfo) *opState {
|
||||
|
||||
case <-tickler.progressSig:
|
||||
log.Info("Reset timer for tickler updated", zap.Int32("current progress", tickler.progress()))
|
||||
if !timer.Stop() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
timer.Reset(watchTimeout)
|
||||
|
||||
case <-successSig:
|
||||
|
||||
@ -73,6 +73,13 @@ func (t *target) send(pack *MsgPack) error {
|
||||
if t.closed {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !t.timer.Stop() {
|
||||
select {
|
||||
case <-t.timer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
t.timer.Reset(t.maxLag)
|
||||
select {
|
||||
case <-t.cancelCh.CloseCh():
|
||||
|
||||
30
pkg/mq/msgdispatcher/target_test.go
Normal file
30
pkg/mq/msgdispatcher/target_test.go
Normal file
@ -0,0 +1,30 @@
|
||||
package msgdispatcher
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
func TestSendTimeout(t *testing.T) {
|
||||
target := newTarget("test1", &msgpb.MsgPosition{})
|
||||
|
||||
time.Sleep(paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second))
|
||||
|
||||
counter := 0
|
||||
for i := 0; i < 10; i++ {
|
||||
err := target.send(&msgstream.MsgPack{})
|
||||
if err != nil {
|
||||
log.Error("send failed", zap.Int("idx", i), zap.Error(err))
|
||||
counter++
|
||||
}
|
||||
}
|
||||
assert.Equal(t, counter, 0)
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user