--- /dev/null
+{
+ "git": {
+ "sha1": "ff9b0ef7ca5a93f8fb10baba52996e755500546f"
+ }
+}
--- /dev/null
+# 0.4.2 (May 14, 2021)
+
+- test: add `assert_elapsed!` macro ([#3728])
+
+[#3728]: https://github.com/tokio-rs/tokio/pull/3728
+
+# 0.4.1 (March 10, 2021)
+
+- Fix `io::Mock` to be `Send` and `Sync` ([#3594])
+
+[#3594]: https://github.com/tokio-rs/tokio/pull/3594
+
+# 0.4.0 (December 23, 2020)
+
+- Track `tokio` 1.0 release.
+
+# 0.3.0 (October 15, 2020)
+
+- Track `tokio` 0.3 release.
+
+# 0.2.1 (April 17, 2020)
+
+- Add `Future` and `Stream` implementations for `task::Spawn<T>`.
+
+# 0.2.0 (November 25, 2019)
+
+- Initial release
--- /dev/null
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g., crates.io) dependencies
+#
+# If you believe there's an error in this file please file an
+# issue against the rust-lang/cargo repository. If you're
+# editing this file be aware that the upstream Cargo.toml
+# will likely look very different (and much more reasonable)
+
+[package]
+edition = "2018"
+name = "tokio-test"
+version = "0.4.2"
+authors = ["Tokio Contributors <team@tokio.rs>"]
+description = "Testing utilities for Tokio- and futures-based code\n"
+homepage = "https://tokio.rs"
+documentation = "https://docs.rs/tokio-test/0.4.2/tokio_test"
+categories = ["asynchronous", "testing"]
+license = "MIT"
+repository = "https://github.com/tokio-rs/tokio"
+[package.metadata.docs.rs]
+all-features = true
+[dependencies.async-stream]
+version = "0.3"
+
+[dependencies.bytes]
+version = "1.0.0"
+
+[dependencies.futures-core]
+version = "0.3.0"
+
+[dependencies.tokio]
+version = "1.2.0"
+features = ["rt", "sync", "time", "test-util"]
+
+[dependencies.tokio-stream]
+version = "0.1"
+[dev-dependencies.futures-util]
+version = "0.3.0"
+
+[dev-dependencies.tokio]
+version = "1.2.0"
+features = ["full"]
--- /dev/null
+[package]
+name = "tokio-test"
+# When releasing to crates.io:
+# - Remove path dependencies
+# - Update doc url
+# - Cargo.toml
+# - Update CHANGELOG.md.
+# - Create "tokio-test-0.4.x" git tag.
+version = "0.4.2"
+edition = "2018"
+authors = ["Tokio Contributors <team@tokio.rs>"]
+license = "MIT"
+repository = "https://github.com/tokio-rs/tokio"
+homepage = "https://tokio.rs"
+documentation = "https://docs.rs/tokio-test/0.4.2/tokio_test"
+description = """
+Testing utilities for Tokio- and futures-based code
+"""
+categories = ["asynchronous", "testing"]
+
+[dependencies]
+tokio = { version = "1.2.0", path = "../tokio", features = ["rt", "sync", "time", "test-util"] }
+tokio-stream = { version = "0.1", path = "../tokio-stream" }
+async-stream = "0.3"
+
+bytes = "1.0.0"
+futures-core = "0.3.0"
+
+[dev-dependencies]
+tokio = { version = "1.2.0", path = "../tokio", features = ["full"] }
+futures-util = "0.3.0"
+
+[package.metadata.docs.rs]
+all-features = true
--- /dev/null
+Copyright (c) 2021 Tokio Contributors
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
--- /dev/null
+# tokio-test
+
+Tokio and Futures based testing utilities
+
+## License
+
+This project is licensed under the [MIT license](LICENSE).
+
+### Contribution
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in Tokio by you, shall be licensed as MIT, without any additional
+terms or conditions.
--- /dev/null
+#![cfg(not(loom))]
+
+//! A mock type implementing [`AsyncRead`] and [`AsyncWrite`].
+//!
+//!
+//! # Overview
+//!
+//! Provides a type that implements [`AsyncRead`] + [`AsyncWrite`] that can be configured
+//! to handle an arbitrary sequence of read and write operations. This is useful
+//! for writing unit tests for networking services as using an actual network
+//! type is fairly non deterministic.
+//!
+//! # Usage
+//!
+//! Attempting to write data that the mock isn't expecting will result in a
+//! panic.
+//!
+//! [`AsyncRead`]: tokio::io::AsyncRead
+//! [`AsyncWrite`]: tokio::io::AsyncWrite
+
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+use tokio::sync::mpsc;
+use tokio::time::{self, Duration, Instant, Sleep};
+use tokio_stream::wrappers::UnboundedReceiverStream;
+
+use futures_core::{ready, Stream};
+use std::collections::VecDeque;
+use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{self, Poll, Waker};
+use std::{cmp, io};
+
+/// An I/O object that follows a predefined script.
+///
+/// This value is created by `Builder` and implements `AsyncRead` + `AsyncWrite`. It
+/// follows the scenario described by the builder and panics otherwise.
+#[derive(Debug)]
+pub struct Mock {
+ inner: Inner,
+}
+
+/// A handle to send additional actions to the related `Mock`.
+#[derive(Debug)]
+pub struct Handle {
+ tx: mpsc::UnboundedSender<Action>,
+}
+
+/// Builds `Mock` instances.
+#[derive(Debug, Clone, Default)]
+pub struct Builder {
+ // Sequence of actions for the Mock to take
+ actions: VecDeque<Action>,
+}
+
+#[derive(Debug, Clone)]
+enum Action {
+ Read(Vec<u8>),
+ Write(Vec<u8>),
+ Wait(Duration),
+ // Wrapped in Arc so that Builder can be cloned and Send.
+ // Mock is not cloned as does not need to check Rc for ref counts.
+ ReadError(Option<Arc<io::Error>>),
+ WriteError(Option<Arc<io::Error>>),
+}
+
+struct Inner {
+ actions: VecDeque<Action>,
+ waiting: Option<Instant>,
+ sleep: Option<Pin<Box<Sleep>>>,
+ read_wait: Option<Waker>,
+ rx: UnboundedReceiverStream<Action>,
+}
+
+impl Builder {
+ /// Return a new, empty `Builder.
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Sequence a `read` operation.
+ ///
+ /// The next operation in the mock's script will be to expect a `read` call
+ /// and return `buf`.
+ pub fn read(&mut self, buf: &[u8]) -> &mut Self {
+ self.actions.push_back(Action::Read(buf.into()));
+ self
+ }
+
+ /// Sequence a `read` operation that produces an error.
+ ///
+ /// The next operation in the mock's script will be to expect a `read` call
+ /// and return `error`.
+ pub fn read_error(&mut self, error: io::Error) -> &mut Self {
+ let error = Some(error.into());
+ self.actions.push_back(Action::ReadError(error));
+ self
+ }
+
+ /// Sequence a `write` operation.
+ ///
+ /// The next operation in the mock's script will be to expect a `write`
+ /// call.
+ pub fn write(&mut self, buf: &[u8]) -> &mut Self {
+ self.actions.push_back(Action::Write(buf.into()));
+ self
+ }
+
+ /// Sequence a `write` operation that produces an error.
+ ///
+ /// The next operation in the mock's script will be to expect a `write`
+ /// call that provides `error`.
+ pub fn write_error(&mut self, error: io::Error) -> &mut Self {
+ let error = Some(error.into());
+ self.actions.push_back(Action::WriteError(error));
+ self
+ }
+
+ /// Sequence a wait.
+ ///
+ /// The next operation in the mock's script will be to wait without doing so
+ /// for `duration` amount of time.
+ pub fn wait(&mut self, duration: Duration) -> &mut Self {
+ let duration = cmp::max(duration, Duration::from_millis(1));
+ self.actions.push_back(Action::Wait(duration));
+ self
+ }
+
+ /// Build a `Mock` value according to the defined script.
+ pub fn build(&mut self) -> Mock {
+ let (mock, _) = self.build_with_handle();
+ mock
+ }
+
+ /// Build a `Mock` value paired with a handle
+ pub fn build_with_handle(&mut self) -> (Mock, Handle) {
+ let (inner, handle) = Inner::new(self.actions.clone());
+
+ let mock = Mock { inner };
+
+ (mock, handle)
+ }
+}
+
+impl Handle {
+ /// Sequence a `read` operation.
+ ///
+ /// The next operation in the mock's script will be to expect a `read` call
+ /// and return `buf`.
+ pub fn read(&mut self, buf: &[u8]) -> &mut Self {
+ self.tx.send(Action::Read(buf.into())).unwrap();
+ self
+ }
+
+ /// Sequence a `read` operation error.
+ ///
+ /// The next operation in the mock's script will be to expect a `read` call
+ /// and return `error`.
+ pub fn read_error(&mut self, error: io::Error) -> &mut Self {
+ let error = Some(error.into());
+ self.tx.send(Action::ReadError(error)).unwrap();
+ self
+ }
+
+ /// Sequence a `write` operation.
+ ///
+ /// The next operation in the mock's script will be to expect a `write`
+ /// call.
+ pub fn write(&mut self, buf: &[u8]) -> &mut Self {
+ self.tx.send(Action::Write(buf.into())).unwrap();
+ self
+ }
+
+ /// Sequence a `write` operation error.
+ ///
+ /// The next operation in the mock's script will be to expect a `write`
+ /// call error.
+ pub fn write_error(&mut self, error: io::Error) -> &mut Self {
+ let error = Some(error.into());
+ self.tx.send(Action::WriteError(error)).unwrap();
+ self
+ }
+}
+
+impl Inner {
+ fn new(actions: VecDeque<Action>) -> (Inner, Handle) {
+ let (tx, rx) = mpsc::unbounded_channel();
+
+ let rx = UnboundedReceiverStream::new(rx);
+
+ let inner = Inner {
+ actions,
+ sleep: None,
+ read_wait: None,
+ rx,
+ waiting: None,
+ };
+
+ let handle = Handle { tx };
+
+ (inner, handle)
+ }
+
+ fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> {
+ Pin::new(&mut self.rx).poll_next(cx)
+ }
+
+ fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> {
+ match self.action() {
+ Some(&mut Action::Read(ref mut data)) => {
+ // Figure out how much to copy
+ let n = cmp::min(dst.remaining(), data.len());
+
+ // Copy the data into the `dst` slice
+ dst.put_slice(&data[..n]);
+
+ // Drain the data from the source
+ data.drain(..n);
+
+ Ok(())
+ }
+ Some(&mut Action::ReadError(ref mut err)) => {
+ // As the
+ let err = err.take().expect("Should have been removed from actions.");
+ let err = Arc::try_unwrap(err).expect("There are no other references.");
+ Err(err)
+ }
+ Some(_) => {
+ // Either waiting or expecting a write
+ Err(io::ErrorKind::WouldBlock.into())
+ }
+ None => Ok(()),
+ }
+ }
+
+ fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
+ let mut ret = 0;
+
+ if self.actions.is_empty() {
+ return Err(io::ErrorKind::BrokenPipe.into());
+ }
+
+ if let Some(&mut Action::Wait(..)) = self.action() {
+ return Err(io::ErrorKind::WouldBlock.into());
+ }
+
+ if let Some(&mut Action::WriteError(ref mut err)) = self.action() {
+ let err = err.take().expect("Should have been removed from actions.");
+ let err = Arc::try_unwrap(err).expect("There are no other references.");
+ return Err(err);
+ }
+
+ for i in 0..self.actions.len() {
+ match self.actions[i] {
+ Action::Write(ref mut expect) => {
+ let n = cmp::min(src.len(), expect.len());
+
+ assert_eq!(&src[..n], &expect[..n]);
+
+ // Drop data that was matched
+ expect.drain(..n);
+ src = &src[n..];
+
+ ret += n;
+
+ if src.is_empty() {
+ return Ok(ret);
+ }
+ }
+ Action::Wait(..) | Action::WriteError(..) => {
+ break;
+ }
+ _ => {}
+ }
+
+ // TODO: remove write
+ }
+
+ Ok(ret)
+ }
+
+ fn remaining_wait(&mut self) -> Option<Duration> {
+ match self.action() {
+ Some(&mut Action::Wait(dur)) => Some(dur),
+ _ => None,
+ }
+ }
+
+ fn action(&mut self) -> Option<&mut Action> {
+ loop {
+ if self.actions.is_empty() {
+ return None;
+ }
+
+ match self.actions[0] {
+ Action::Read(ref mut data) => {
+ if !data.is_empty() {
+ break;
+ }
+ }
+ Action::Write(ref mut data) => {
+ if !data.is_empty() {
+ break;
+ }
+ }
+ Action::Wait(ref mut dur) => {
+ if let Some(until) = self.waiting {
+ let now = Instant::now();
+
+ if now < until {
+ break;
+ }
+ } else {
+ self.waiting = Some(Instant::now() + *dur);
+ break;
+ }
+ }
+ Action::ReadError(ref mut error) | Action::WriteError(ref mut error) => {
+ if error.is_some() {
+ break;
+ }
+ }
+ }
+
+ let _action = self.actions.pop_front();
+ }
+
+ self.actions.front_mut()
+ }
+}
+
+// ===== impl Inner =====
+
+impl Mock {
+ fn maybe_wakeup_reader(&mut self) {
+ match self.inner.action() {
+ Some(&mut Action::Read(_)) | Some(&mut Action::ReadError(_)) | None => {
+ if let Some(waker) = self.inner.read_wait.take() {
+ waker.wake();
+ }
+ }
+ _ => {}
+ }
+ }
+}
+
+impl AsyncRead for Mock {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ loop {
+ if let Some(ref mut sleep) = self.inner.sleep {
+ ready!(Pin::new(sleep).poll(cx));
+ }
+
+ // If a sleep is set, it has already fired
+ self.inner.sleep = None;
+
+ // Capture 'filled' to monitor if it changed
+ let filled = buf.filled().len();
+
+ match self.inner.read(buf) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ if let Some(rem) = self.inner.remaining_wait() {
+ let until = Instant::now() + rem;
+ self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
+ } else {
+ self.inner.read_wait = Some(cx.waker().clone());
+ return Poll::Pending;
+ }
+ }
+ Ok(()) => {
+ if buf.filled().len() == filled {
+ match ready!(self.inner.poll_action(cx)) {
+ Some(action) => {
+ self.inner.actions.push_back(action);
+ continue;
+ }
+ None => {
+ return Poll::Ready(Ok(()));
+ }
+ }
+ } else {
+ return Poll::Ready(Ok(()));
+ }
+ }
+ Err(e) => return Poll::Ready(Err(e)),
+ }
+ }
+ }
+}
+
+impl AsyncWrite for Mock {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ loop {
+ if let Some(ref mut sleep) = self.inner.sleep {
+ ready!(Pin::new(sleep).poll(cx));
+ }
+
+ // If a sleep is set, it has already fired
+ self.inner.sleep = None;
+
+ match self.inner.write(buf) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ if let Some(rem) = self.inner.remaining_wait() {
+ let until = Instant::now() + rem;
+ self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
+ } else {
+ panic!("unexpected WouldBlock");
+ }
+ }
+ Ok(0) => {
+ // TODO: Is this correct?
+ if !self.inner.actions.is_empty() {
+ return Poll::Pending;
+ }
+
+ // TODO: Extract
+ match ready!(self.inner.poll_action(cx)) {
+ Some(action) => {
+ self.inner.actions.push_back(action);
+ continue;
+ }
+ None => {
+ panic!("unexpected write");
+ }
+ }
+ }
+ ret => {
+ self.maybe_wakeup_reader();
+ return Poll::Ready(ret);
+ }
+ }
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+/// Ensures that Mock isn't dropped with data "inside".
+impl Drop for Mock {
+ fn drop(&mut self) {
+ // Avoid double panicking, since makes debugging much harder.
+ if std::thread::panicking() {
+ return;
+ }
+
+ self.inner.actions.iter().for_each(|a| match a {
+ Action::Read(data) => assert!(data.is_empty(), "There is still data left to read."),
+ Action::Write(data) => assert!(data.is_empty(), "There is still data left to write."),
+ _ => (),
+ })
+ }
+}
+/*
+/// Returns `true` if called from the context of a futures-rs Task
+fn is_task_ctx() -> bool {
+ use std::panic;
+
+ // Save the existing panic hook
+ let h = panic::take_hook();
+
+ // Install a new one that does nothing
+ panic::set_hook(Box::new(|_| {}));
+
+ // Attempt to call the fn
+ let r = panic::catch_unwind(|| task::current()).is_ok();
+
+ // Re-install the old one
+ panic::set_hook(h);
+
+ // Return the result
+ r
+}
+*/
+
+impl fmt::Debug for Inner {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Inner {{...}}")
+ }
+}
--- /dev/null
+#![warn(
+ missing_debug_implementations,
+ missing_docs,
+ rust_2018_idioms,
+ unreachable_pub
+)]
+#![cfg_attr(docsrs, deny(broken_intra_doc_links))]
+#![doc(test(
+ no_crate_inject,
+ attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
+))]
+
+//! Tokio and Futures based testing utilites
+
+pub mod io;
+
+mod macros;
+pub mod task;
+
+/// Runs the provided future, blocking the current thread until the
+/// future completes.
+///
+/// For more information, see the documentation for
+/// [`tokio::runtime::Runtime::block_on`][runtime-block-on].
+///
+/// [runtime-block-on]: https://docs.rs/tokio/1.3.0/tokio/runtime/struct.Runtime.html#method.block_on
+pub fn block_on<F: std::future::Future>(future: F) -> F::Output {
+ use tokio::runtime;
+
+ let rt = runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+
+ rt.block_on(future)
+}
--- /dev/null
+//! A collection of useful macros for testing futures and tokio based code
+
+/// Asserts a `Poll` is ready, returning the value.
+///
+/// This will invoke `panic!` if the provided `Poll` does not evaluate to `Poll::Ready` at
+/// runtime.
+///
+/// # Custom Messages
+///
+/// This macro has a second form, where a custom panic message can be provided with or without
+/// arguments for formatting.
+///
+/// # Examples
+///
+/// ```
+/// use futures_util::future;
+/// use tokio_test::{assert_ready, task};
+///
+/// let mut fut = task::spawn(future::ready(()));
+/// assert_ready!(fut.poll());
+/// ```
+#[macro_export]
+macro_rules! assert_ready {
+ ($e:expr) => {{
+ use core::task::Poll::*;
+ match $e {
+ Ready(v) => v,
+ Pending => panic!("pending"),
+ }
+ }};
+ ($e:expr, $($msg:tt)+) => {{
+ use core::task::Poll::*;
+ match $e {
+ Ready(v) => v,
+ Pending => {
+ panic!("pending; {}", format_args!($($msg)+))
+ }
+ }
+ }};
+}
+
+/// Asserts a `Poll<Result<...>>` is ready and `Ok`, returning the value.
+///
+/// This will invoke `panic!` if the provided `Poll` does not evaluate to `Poll::Ready(Ok(..))` at
+/// runtime.
+///
+/// # Custom Messages
+///
+/// This macro has a second form, where a custom panic message can be provided with or without
+/// arguments for formatting.
+///
+/// # Examples
+///
+/// ```
+/// use futures_util::future;
+/// use tokio_test::{assert_ready_ok, task};
+///
+/// let mut fut = task::spawn(future::ok::<_, ()>(()));
+/// assert_ready_ok!(fut.poll());
+/// ```
+#[macro_export]
+macro_rules! assert_ready_ok {
+ ($e:expr) => {{
+ use tokio_test::{assert_ready, assert_ok};
+ let val = assert_ready!($e);
+ assert_ok!(val)
+ }};
+ ($e:expr, $($msg:tt)+) => {{
+ use tokio_test::{assert_ready, assert_ok};
+ let val = assert_ready!($e, $($msg)*);
+ assert_ok!(val, $($msg)*)
+ }};
+}
+
+/// Asserts a `Poll<Result<...>>` is ready and `Err`, returning the error.
+///
+/// This will invoke `panic!` if the provided `Poll` does not evaluate to `Poll::Ready(Err(..))` at
+/// runtime.
+///
+/// # Custom Messages
+///
+/// This macro has a second form, where a custom panic message can be provided with or without
+/// arguments for formatting.
+///
+/// # Examples
+///
+/// ```
+/// use futures_util::future;
+/// use tokio_test::{assert_ready_err, task};
+///
+/// let mut fut = task::spawn(future::err::<(), _>(()));
+/// assert_ready_err!(fut.poll());
+/// ```
+#[macro_export]
+macro_rules! assert_ready_err {
+ ($e:expr) => {{
+ use tokio_test::{assert_ready, assert_err};
+ let val = assert_ready!($e);
+ assert_err!(val)
+ }};
+ ($e:expr, $($msg:tt)+) => {{
+ use tokio_test::{assert_ready, assert_err};
+ let val = assert_ready!($e, $($msg)*);
+ assert_err!(val, $($msg)*)
+ }};
+}
+
+/// Asserts a `Poll` is pending.
+///
+/// This will invoke `panic!` if the provided `Poll` does not evaluate to `Poll::Pending` at
+/// runtime.
+///
+/// # Custom Messages
+///
+/// This macro has a second form, where a custom panic message can be provided with or without
+/// arguments for formatting.
+///
+/// # Examples
+///
+/// ```
+/// use futures_util::future;
+/// use tokio_test::{assert_pending, task};
+///
+/// let mut fut = task::spawn(future::pending::<()>());
+/// assert_pending!(fut.poll());
+/// ```
+#[macro_export]
+macro_rules! assert_pending {
+ ($e:expr) => {{
+ use core::task::Poll::*;
+ match $e {
+ Pending => {}
+ Ready(v) => panic!("ready; value = {:?}", v),
+ }
+ }};
+ ($e:expr, $($msg:tt)+) => {{
+ use core::task::Poll::*;
+ match $e {
+ Pending => {}
+ Ready(v) => {
+ panic!("ready; value = {:?}; {}", v, format_args!($($msg)+))
+ }
+ }
+ }};
+}
+
+/// Asserts if a poll is ready and check for equality on the value
+///
+/// This will invoke `panic!` if the provided `Poll` does not evaluate to `Poll::Ready` at
+/// runtime and the value produced does not partially equal the expected value.
+///
+/// # Custom Messages
+///
+/// This macro has a second form, where a custom panic message can be provided with or without
+/// arguments for formatting.
+///
+/// # Examples
+///
+/// ```
+/// use futures_util::future;
+/// use tokio_test::{assert_ready_eq, task};
+///
+/// let mut fut = task::spawn(future::ready(42));
+/// assert_ready_eq!(fut.poll(), 42);
+/// ```
+#[macro_export]
+macro_rules! assert_ready_eq {
+ ($e:expr, $expect:expr) => {
+ let val = $crate::assert_ready!($e);
+ assert_eq!(val, $expect)
+ };
+
+ ($e:expr, $expect:expr, $($msg:tt)+) => {
+ let val = $crate::assert_ready!($e, $($msg)*);
+ assert_eq!(val, $expect, $($msg)*)
+ };
+}
+
+/// Asserts that the expression evaluates to `Ok` and returns the value.
+///
+/// This will invoke the `panic!` macro if the provided expression does not evaluate to `Ok` at
+/// runtime.
+///
+/// # Custom Messages
+///
+/// This macro has a second form, where a custom panic message can be provided with or without
+/// arguments for formatting.
+///
+/// # Examples
+///
+/// ```
+/// use tokio_test::assert_ok;
+///
+/// let n: u32 = assert_ok!("123".parse());
+///
+/// let s = "123";
+/// let n: u32 = assert_ok!(s.parse(), "testing parsing {:?} as a u32", s);
+/// ```
+#[macro_export]
+macro_rules! assert_ok {
+ ($e:expr) => {
+ assert_ok!($e,)
+ };
+ ($e:expr,) => {{
+ use std::result::Result::*;
+ match $e {
+ Ok(v) => v,
+ Err(e) => panic!("assertion failed: Err({:?})", e),
+ }
+ }};
+ ($e:expr, $($arg:tt)+) => {{
+ use std::result::Result::*;
+ match $e {
+ Ok(v) => v,
+ Err(e) => panic!("assertion failed: Err({:?}): {}", e, format_args!($($arg)+)),
+ }
+ }};
+}
+
+/// Asserts that the expression evaluates to `Err` and returns the error.
+///
+/// This will invoke the `panic!` macro if the provided expression does not evaluate to `Err` at
+/// runtime.
+///
+/// # Custom Messages
+///
+/// This macro has a second form, where a custom panic message can be provided with or without
+/// arguments for formatting.
+///
+/// # Examples
+///
+/// ```
+/// use tokio_test::assert_err;
+/// use std::str::FromStr;
+///
+///
+/// let err = assert_err!(u32::from_str("fail"));
+///
+/// let msg = "fail";
+/// let err = assert_err!(u32::from_str(msg), "testing parsing {:?} as u32", msg);
+/// ```
+#[macro_export]
+macro_rules! assert_err {
+ ($e:expr) => {
+ assert_err!($e,);
+ };
+ ($e:expr,) => {{
+ use std::result::Result::*;
+ match $e {
+ Ok(v) => panic!("assertion failed: Ok({:?})", v),
+ Err(e) => e,
+ }
+ }};
+ ($e:expr, $($arg:tt)+) => {{
+ use std::result::Result::*;
+ match $e {
+ Ok(v) => panic!("assertion failed: Ok({:?}): {}", v, format_args!($($arg)+)),
+ Err(e) => e,
+ }
+ }};
+}
+
+/// Asserts that an exact duration has elapsed since since the start instant ±1ms.
+///
+/// ```rust
+/// use tokio::time::{self, Instant};
+/// use std::time::Duration;
+/// use tokio_test::assert_elapsed;
+/// # async fn test_time_passed() {
+///
+/// let start = Instant::now();
+/// let dur = Duration::from_millis(50);
+/// time::sleep(dur).await;
+/// assert_elapsed!(start, dur);
+/// # }
+/// ```
+///
+/// This 1ms buffer is required because Tokio's hashed-wheel timer has finite time resolution and
+/// will not always sleep for the exact interval.
+#[macro_export]
+macro_rules! assert_elapsed {
+ ($start:expr, $dur:expr) => {{
+ let elapsed = $start.elapsed();
+ // type ascription improves compiler error when wrong type is passed
+ let lower: std::time::Duration = $dur;
+
+ // Handles ms rounding
+ assert!(
+ elapsed >= lower && elapsed <= lower + std::time::Duration::from_millis(1),
+ "actual = {:?}, expected = {:?}",
+ elapsed,
+ lower
+ );
+ }};
+}
--- /dev/null
+//! Futures task based helpers
+
+#![allow(clippy::mutex_atomic)]
+
+use std::future::Future;
+use std::mem;
+use std::ops;
+use std::pin::Pin;
+use std::sync::{Arc, Condvar, Mutex};
+use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
+
+use tokio_stream::Stream;
+
+/// TODO: dox
+pub fn spawn<T>(task: T) -> Spawn<T> {
+ Spawn {
+ task: MockTask::new(),
+ future: Box::pin(task),
+ }
+}
+
+/// Future spawned on a mock task
+#[derive(Debug)]
+pub struct Spawn<T> {
+ task: MockTask,
+ future: Pin<Box<T>>,
+}
+
+/// Mock task
+///
+/// A mock task is able to intercept and track wake notifications.
+#[derive(Debug, Clone)]
+struct MockTask {
+ waker: Arc<ThreadWaker>,
+}
+
+#[derive(Debug)]
+struct ThreadWaker {
+ state: Mutex<usize>,
+ condvar: Condvar,
+}
+
+const IDLE: usize = 0;
+const WAKE: usize = 1;
+const SLEEP: usize = 2;
+
+impl<T> Spawn<T> {
+ /// Consumes `self` returning the inner value
+ pub fn into_inner(self) -> T
+ where
+ T: Unpin,
+ {
+ *Pin::into_inner(self.future)
+ }
+
+ /// Returns `true` if the inner future has received a wake notification
+ /// since the last call to `enter`.
+ pub fn is_woken(&self) -> bool {
+ self.task.is_woken()
+ }
+
+ /// Returns the number of references to the task waker
+ ///
+ /// The task itself holds a reference. The return value will never be zero.
+ pub fn waker_ref_count(&self) -> usize {
+ self.task.waker_ref_count()
+ }
+
+ /// Enter the task context
+ pub fn enter<F, R>(&mut self, f: F) -> R
+ where
+ F: FnOnce(&mut Context<'_>, Pin<&mut T>) -> R,
+ {
+ let fut = self.future.as_mut();
+ self.task.enter(|cx| f(cx, fut))
+ }
+}
+
+impl<T: Unpin> ops::Deref for Spawn<T> {
+ type Target = T;
+
+ fn deref(&self) -> &T {
+ &self.future
+ }
+}
+
+impl<T: Unpin> ops::DerefMut for Spawn<T> {
+ fn deref_mut(&mut self) -> &mut T {
+ &mut self.future
+ }
+}
+
+impl<T: Future> Spawn<T> {
+ /// Polls a future
+ pub fn poll(&mut self) -> Poll<T::Output> {
+ let fut = self.future.as_mut();
+ self.task.enter(|cx| fut.poll(cx))
+ }
+}
+
+impl<T: Stream> Spawn<T> {
+ /// Polls a stream
+ pub fn poll_next(&mut self) -> Poll<Option<T::Item>> {
+ let stream = self.future.as_mut();
+ self.task.enter(|cx| stream.poll_next(cx))
+ }
+}
+
+impl<T: Future> Future for Spawn<T> {
+ type Output = T::Output;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.future.as_mut().poll(cx)
+ }
+}
+
+impl<T: Stream> Stream for Spawn<T> {
+ type Item = T::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.future.as_mut().poll_next(cx)
+ }
+}
+
+impl MockTask {
+ /// Creates new mock task
+ fn new() -> Self {
+ MockTask {
+ waker: Arc::new(ThreadWaker::new()),
+ }
+ }
+
+ /// Runs a closure from the context of the task.
+ ///
+ /// Any wake notifications resulting from the execution of the closure are
+ /// tracked.
+ fn enter<F, R>(&mut self, f: F) -> R
+ where
+ F: FnOnce(&mut Context<'_>) -> R,
+ {
+ self.waker.clear();
+ let waker = self.waker();
+ let mut cx = Context::from_waker(&waker);
+
+ f(&mut cx)
+ }
+
+ /// Returns `true` if the inner future has received a wake notification
+ /// since the last call to `enter`.
+ fn is_woken(&self) -> bool {
+ self.waker.is_woken()
+ }
+
+ /// Returns the number of references to the task waker
+ ///
+ /// The task itself holds a reference. The return value will never be zero.
+ fn waker_ref_count(&self) -> usize {
+ Arc::strong_count(&self.waker)
+ }
+
+ fn waker(&self) -> Waker {
+ unsafe {
+ let raw = to_raw(self.waker.clone());
+ Waker::from_raw(raw)
+ }
+ }
+}
+
+impl Default for MockTask {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl ThreadWaker {
+ fn new() -> Self {
+ ThreadWaker {
+ state: Mutex::new(IDLE),
+ condvar: Condvar::new(),
+ }
+ }
+
+ /// Clears any previously received wakes, avoiding potential spurrious
+ /// wake notifications. This should only be called immediately before running the
+ /// task.
+ fn clear(&self) {
+ *self.state.lock().unwrap() = IDLE;
+ }
+
+ fn is_woken(&self) -> bool {
+ match *self.state.lock().unwrap() {
+ IDLE => false,
+ WAKE => true,
+ _ => unreachable!(),
+ }
+ }
+
+ fn wake(&self) {
+ // First, try transitioning from IDLE -> NOTIFY, this does not require a lock.
+ let mut state = self.state.lock().unwrap();
+ let prev = *state;
+
+ if prev == WAKE {
+ return;
+ }
+
+ *state = WAKE;
+
+ if prev == IDLE {
+ return;
+ }
+
+ // The other half is sleeping, so we wake it up.
+ assert_eq!(prev, SLEEP);
+ self.condvar.notify_one();
+ }
+}
+
+static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker);
+
+unsafe fn to_raw(waker: Arc<ThreadWaker>) -> RawWaker {
+ RawWaker::new(Arc::into_raw(waker) as *const (), &VTABLE)
+}
+
+unsafe fn from_raw(raw: *const ()) -> Arc<ThreadWaker> {
+ Arc::from_raw(raw as *const ThreadWaker)
+}
+
+unsafe fn clone(raw: *const ()) -> RawWaker {
+ let waker = from_raw(raw);
+
+ // Increment the ref count
+ mem::forget(waker.clone());
+
+ to_raw(waker)
+}
+
+unsafe fn wake(raw: *const ()) {
+ let waker = from_raw(raw);
+ waker.wake();
+}
+
+unsafe fn wake_by_ref(raw: *const ()) {
+ let waker = from_raw(raw);
+ waker.wake();
+
+ // We don't actually own a reference to the unparker
+ mem::forget(waker);
+}
+
+unsafe fn drop_waker(raw: *const ()) {
+ let _ = from_raw(raw);
+}
--- /dev/null
+#![warn(rust_2018_idioms)]
+
+use tokio::time::{sleep_until, Duration, Instant};
+use tokio_test::block_on;
+
+#[test]
+fn async_block() {
+ assert_eq!(4, block_on(async { 4 }));
+}
+
+async fn five() -> u8 {
+ 5
+}
+
+#[test]
+fn async_fn() {
+ assert_eq!(5, block_on(five()));
+}
+
+#[test]
+fn test_sleep() {
+ let deadline = Instant::now() + Duration::from_millis(100);
+
+ block_on(async {
+ sleep_until(deadline).await;
+ });
+}
--- /dev/null
+#![warn(rust_2018_idioms)]
+
+use std::io;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio_test::io::Builder;
+
+#[tokio::test]
+async fn read() {
+ let mut mock = Builder::new().read(b"hello ").read(b"world!").build();
+
+ let mut buf = [0; 256];
+
+ let n = mock.read(&mut buf).await.expect("read 1");
+ assert_eq!(&buf[..n], b"hello ");
+
+ let n = mock.read(&mut buf).await.expect("read 2");
+ assert_eq!(&buf[..n], b"world!");
+}
+
+#[tokio::test]
+async fn read_error() {
+ let error = io::Error::new(io::ErrorKind::Other, "cruel");
+ let mut mock = Builder::new()
+ .read(b"hello ")
+ .read_error(error)
+ .read(b"world!")
+ .build();
+ let mut buf = [0; 256];
+
+ let n = mock.read(&mut buf).await.expect("read 1");
+ assert_eq!(&buf[..n], b"hello ");
+
+ match mock.read(&mut buf).await {
+ Err(error) => {
+ assert_eq!(error.kind(), io::ErrorKind::Other);
+ assert_eq!("cruel", format!("{}", error));
+ }
+ Ok(_) => panic!("error not received"),
+ }
+
+ let n = mock.read(&mut buf).await.expect("read 1");
+ assert_eq!(&buf[..n], b"world!");
+}
+
+#[tokio::test]
+async fn write() {
+ let mut mock = Builder::new().write(b"hello ").write(b"world!").build();
+
+ mock.write_all(b"hello ").await.expect("write 1");
+ mock.write_all(b"world!").await.expect("write 2");
+}
+
+#[tokio::test]
+async fn write_error() {
+ let error = io::Error::new(io::ErrorKind::Other, "cruel");
+ let mut mock = Builder::new()
+ .write(b"hello ")
+ .write_error(error)
+ .write(b"world!")
+ .build();
+ mock.write_all(b"hello ").await.expect("write 1");
+
+ match mock.write_all(b"whoa").await {
+ Err(error) => {
+ assert_eq!(error.kind(), io::ErrorKind::Other);
+ assert_eq!("cruel", format!("{}", error));
+ }
+ Ok(_) => panic!("error not received"),
+ }
+
+ mock.write_all(b"world!").await.expect("write 2");
+}
+
+#[tokio::test]
+#[should_panic]
+async fn mock_panics_read_data_left() {
+ use tokio_test::io::Builder;
+ Builder::new().read(b"read").build();
+}
+
+#[tokio::test]
+#[should_panic]
+async fn mock_panics_write_data_left() {
+ use tokio_test::io::Builder;
+ Builder::new().write(b"write").build();
+}
--- /dev/null
+#![warn(rust_2018_idioms)]
+
+use std::task::Poll;
+use tokio_test::{
+ assert_pending, assert_ready, assert_ready_eq, assert_ready_err, assert_ready_ok,
+};
+
+fn ready() -> Poll<()> {
+ Poll::Ready(())
+}
+
+fn ready_ok() -> Poll<Result<(), ()>> {
+ Poll::Ready(Ok(()))
+}
+
+fn ready_err() -> Poll<Result<(), ()>> {
+ Poll::Ready(Err(()))
+}
+
+fn pending() -> Poll<()> {
+ Poll::Pending
+}
+
+#[derive(Debug)]
+enum Test {
+ Data,
+}
+
+#[test]
+fn assert_ready() {
+ let poll = ready();
+ assert_ready!(poll);
+ assert_ready!(poll, "some message");
+ assert_ready!(poll, "{:?}", ());
+ assert_ready!(poll, "{:?}", Test::Data);
+}
+
+#[test]
+#[should_panic]
+fn assert_ready_on_pending() {
+ let poll = pending();
+ assert_ready!(poll);
+}
+
+#[test]
+fn assert_pending() {
+ let poll = pending();
+ assert_pending!(poll);
+ assert_pending!(poll, "some message");
+ assert_pending!(poll, "{:?}", ());
+ assert_pending!(poll, "{:?}", Test::Data);
+}
+
+#[test]
+#[should_panic]
+fn assert_pending_on_ready() {
+ let poll = ready();
+ assert_pending!(poll);
+}
+
+#[test]
+fn assert_ready_ok() {
+ let poll = ready_ok();
+ assert_ready_ok!(poll);
+ assert_ready_ok!(poll, "some message");
+ assert_ready_ok!(poll, "{:?}", ());
+ assert_ready_ok!(poll, "{:?}", Test::Data);
+}
+
+#[test]
+#[should_panic]
+fn assert_ok_on_err() {
+ let poll = ready_err();
+ assert_ready_ok!(poll);
+}
+
+#[test]
+fn assert_ready_err() {
+ let poll = ready_err();
+ assert_ready_err!(poll);
+ assert_ready_err!(poll, "some message");
+ assert_ready_err!(poll, "{:?}", ());
+ assert_ready_err!(poll, "{:?}", Test::Data);
+}
+
+#[test]
+#[should_panic]
+fn assert_err_on_ok() {
+ let poll = ready_ok();
+ assert_ready_err!(poll);
+}
+
+#[test]
+fn assert_ready_eq() {
+ let poll = ready();
+ assert_ready_eq!(poll, ());
+ assert_ready_eq!(poll, (), "some message");
+ assert_ready_eq!(poll, (), "{:?}", ());
+ assert_ready_eq!(poll, (), "{:?}", Test::Data);
+}
+
+#[test]
+#[should_panic]
+fn assert_eq_on_not_eq() {
+ let poll = ready_err();
+ assert_ready_eq!(poll, Ok(()));
+}