mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 09:38:39 +08:00
fix: Make TestScannerAdaptorReadError stable (#41303)
Related to #41302 Previously wait for 200 milliseconds could cause unsable behavior of this unittest. This PR make unittest wait for certain function call instead of wait for some time. Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
a53f3024cf
commit
1d564a2d95
@ -1,12 +1,13 @@
|
|||||||
package adaptor
|
package adaptor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/cockroachdb/errors"
|
"github.com/cockroachdb/errors"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/interceptors/mock_wab"
|
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/interceptors/mock_wab"
|
||||||
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector"
|
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector"
|
||||||
@ -16,21 +17,34 @@ import (
|
|||||||
"github.com/milvus-io/milvus/pkg/v2/mocks/streaming/mock_walimpls"
|
"github.com/milvus-io/milvus/pkg/v2/mocks/streaming/mock_walimpls"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/streaming/util/options"
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/options"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/util/lifetime"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestScannerAdaptorReadError(t *testing.T) {
|
func TestScannerAdaptorReadError(t *testing.T) {
|
||||||
resource.InitForTest(t)
|
resource.InitForTest(t)
|
||||||
|
|
||||||
|
sig1 := lifetime.NewSafeChan()
|
||||||
|
sig2 := lifetime.NewSafeChan()
|
||||||
|
backoffTime := atomic.NewInt32(0)
|
||||||
|
|
||||||
operator := mock_inspector.NewMockTimeTickSyncOperator(t)
|
operator := mock_inspector.NewMockTimeTickSyncOperator(t)
|
||||||
operator.EXPECT().Channel().Return(types.PChannelInfo{})
|
operator.EXPECT().Channel().Return(types.PChannelInfo{})
|
||||||
operator.EXPECT().Sync(mock.Anything).Return()
|
operator.EXPECT().Sync(mock.Anything).Run(func(ctx context.Context) {
|
||||||
|
sig1.Close()
|
||||||
|
})
|
||||||
wb := mock_wab.NewMockROWriteAheadBuffer(t)
|
wb := mock_wab.NewMockROWriteAheadBuffer(t)
|
||||||
operator.EXPECT().WriteAheadBuffer(mock.Anything).Return(wb, nil)
|
operator.EXPECT().WriteAheadBuffer(mock.Anything).Return(wb, nil)
|
||||||
resource.Resource().TimeTickInspector().RegisterSyncOperator(operator)
|
resource.Resource().TimeTickInspector().RegisterSyncOperator(operator)
|
||||||
|
|
||||||
err := errors.New("read error")
|
err := errors.New("read error")
|
||||||
l := mock_walimpls.NewMockWALImpls(t)
|
l := mock_walimpls.NewMockWALImpls(t)
|
||||||
l.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, err)
|
l.EXPECT().Read(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, _ walimpls.ReadOption) (walimpls.ScannerImpls, error) {
|
||||||
|
if backoffTime.Inc() > 1 {
|
||||||
|
sig2.Close()
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
})
|
||||||
l.EXPECT().Channel().Return(types.PChannelInfo{})
|
l.EXPECT().Channel().Return(types.PChannelInfo{})
|
||||||
|
|
||||||
s := newScannerAdaptor("scanner", l,
|
s := newScannerAdaptor("scanner", l,
|
||||||
@ -41,7 +55,10 @@ func TestScannerAdaptorReadError(t *testing.T) {
|
|||||||
},
|
},
|
||||||
metricsutil.NewScanMetrics(types.PChannelInfo{}).NewScannerMetrics(),
|
metricsutil.NewScanMetrics(types.PChannelInfo{}).NewScannerMetrics(),
|
||||||
func() {})
|
func() {})
|
||||||
time.Sleep(200 * time.Millisecond)
|
// wait for timetick inspector first round
|
||||||
|
<-sig1.CloseCh()
|
||||||
|
// wait for scanner backoff 2 rounds
|
||||||
|
<-sig2.CloseCh()
|
||||||
s.Close()
|
s.Close()
|
||||||
<-s.Chan()
|
<-s.Chan()
|
||||||
<-s.Done()
|
<-s.Done()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user