diff --git a/pkg/mq/msgstream/mq_factory_test.go b/pkg/mq/msgstream/mq_factory_test.go index 1f1d161335..33566edca0 100644 --- a/pkg/mq/msgstream/mq_factory_test.go +++ b/pkg/mq/msgstream/mq_factory_test.go @@ -46,13 +46,13 @@ func TestPmsFactory(t *testing.T) { var cancel context.CancelFunc ctx := context.Background() if test.withTimeout { - ctx, cancel = context.WithTimeout(ctx, time.Millisecond) + if test.ctxTimeouted { + ctx, cancel = context.WithDeadline(ctx, time.Now().Add(-1*time.Minute)) + } else { + ctx, cancel = context.WithTimeout(ctx, time.Second*10) + } defer cancel() } - - if test.ctxTimeouted { - time.Sleep(time.Millisecond) - } stream, err := pmsFactory.NewMsgStream(ctx) if test.expectedError { assert.Error(t, err) @@ -120,15 +120,14 @@ func TestKafkaFactory(t *testing.T) { t.Run(test.description, func(t *testing.T) { var cancel context.CancelFunc ctx := context.Background() - timeoutDur := time.Millisecond * 30 if test.withTimeout { - ctx, cancel = context.WithTimeout(ctx, timeoutDur) + if test.ctxTimeouted { + ctx, cancel = context.WithDeadline(ctx, time.Now().Add(-1*time.Minute)) + } else { + ctx, cancel = context.WithTimeout(ctx, time.Second*10) + } defer cancel() } - - if test.ctxTimeouted { - time.Sleep(timeoutDur) - } stream, err := kmsFactory.NewMsgStream(ctx) if test.expectedError { assert.Error(t, err)