From e3f003e0a6aba5f57c85a06c27cba0a5c64450bc Mon Sep 17 00:00:00 2001 From: DongHun Kwak Date: Mon, 17 Apr 2023 10:06:38 +0900 Subject: [PATCH 1/1] Import tokio-test 0.4.2 --- .cargo_vcs_info.json | 5 + CHANGELOG.md | 27 +++ Cargo.toml | 46 +++++ Cargo.toml.orig | 34 ++++ LICENSE | 25 +++ README.md | 13 ++ src/io.rs | 494 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 36 ++++ src/macros.rs | 295 ++++++++++++++++++++++++++++++ src/task.rs | 253 ++++++++++++++++++++++++++ tests/block_on.rs | 27 +++ tests/io.rs | 86 +++++++++ tests/macros.rs | 107 +++++++++++ 13 files changed, 1448 insertions(+) create mode 100644 .cargo_vcs_info.json create mode 100644 CHANGELOG.md create mode 100644 Cargo.toml create mode 100644 Cargo.toml.orig create mode 100644 LICENSE create mode 100644 README.md create mode 100644 src/io.rs create mode 100644 src/lib.rs create mode 100644 src/macros.rs create mode 100644 src/task.rs create mode 100644 tests/block_on.rs create mode 100644 tests/io.rs create mode 100644 tests/macros.rs diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json new file mode 100644 index 0000000..eb7e2b0 --- /dev/null +++ b/.cargo_vcs_info.json @@ -0,0 +1,5 @@ +{ + "git": { + "sha1": "ff9b0ef7ca5a93f8fb10baba52996e755500546f" + } +} diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..5b1786f --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,27 @@ +# 0.4.2 (May 14, 2021) + +- test: add `assert_elapsed!` macro ([#3728]) + +[#3728]: https://github.com/tokio-rs/tokio/pull/3728 + +# 0.4.1 (March 10, 2021) + +- Fix `io::Mock` to be `Send` and `Sync` ([#3594]) + +[#3594]: https://github.com/tokio-rs/tokio/pull/3594 + +# 0.4.0 (December 23, 2020) + +- Track `tokio` 1.0 release. + +# 0.3.0 (October 15, 2020) + +- Track `tokio` 0.3 release. + +# 0.2.1 (April 17, 2020) + +- Add `Future` and `Stream` implementations for `task::Spawn`. + +# 0.2.0 (November 25, 2019) + +- Initial release diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..1ebd109 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,46 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies +# +# If you believe there's an error in this file please file an +# issue against the rust-lang/cargo repository. If you're +# editing this file be aware that the upstream Cargo.toml +# will likely look very different (and much more reasonable) + +[package] +edition = "2018" +name = "tokio-test" +version = "0.4.2" +authors = ["Tokio Contributors "] +description = "Testing utilities for Tokio- and futures-based code\n" +homepage = "https://tokio.rs" +documentation = "https://docs.rs/tokio-test/0.4.2/tokio_test" +categories = ["asynchronous", "testing"] +license = "MIT" +repository = "https://github.com/tokio-rs/tokio" +[package.metadata.docs.rs] +all-features = true +[dependencies.async-stream] +version = "0.3" + +[dependencies.bytes] +version = "1.0.0" + +[dependencies.futures-core] +version = "0.3.0" + +[dependencies.tokio] +version = "1.2.0" +features = ["rt", "sync", "time", "test-util"] + +[dependencies.tokio-stream] +version = "0.1" +[dev-dependencies.futures-util] +version = "0.3.0" + +[dev-dependencies.tokio] +version = "1.2.0" +features = ["full"] diff --git a/Cargo.toml.orig b/Cargo.toml.orig new file mode 100644 index 0000000..55d5aaf --- /dev/null +++ b/Cargo.toml.orig @@ -0,0 +1,34 @@ +[package] +name = "tokio-test" +# When releasing to crates.io: +# - Remove path dependencies +# - Update doc url +# - Cargo.toml +# - Update CHANGELOG.md. +# - Create "tokio-test-0.4.x" git tag. +version = "0.4.2" +edition = "2018" +authors = ["Tokio Contributors "] +license = "MIT" +repository = "https://github.com/tokio-rs/tokio" +homepage = "https://tokio.rs" +documentation = "https://docs.rs/tokio-test/0.4.2/tokio_test" +description = """ +Testing utilities for Tokio- and futures-based code +""" +categories = ["asynchronous", "testing"] + +[dependencies] +tokio = { version = "1.2.0", path = "../tokio", features = ["rt", "sync", "time", "test-util"] } +tokio-stream = { version = "0.1", path = "../tokio-stream" } +async-stream = "0.3" + +bytes = "1.0.0" +futures-core = "0.3.0" + +[dev-dependencies] +tokio = { version = "1.2.0", path = "../tokio", features = ["full"] } +futures-util = "0.3.0" + +[package.metadata.docs.rs] +all-features = true diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..ffa38bb --- /dev/null +++ b/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2021 Tokio Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..64174d9 --- /dev/null +++ b/README.md @@ -0,0 +1,13 @@ +# tokio-test + +Tokio and Futures based testing utilities + +## License + +This project is licensed under the [MIT license](LICENSE). + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in Tokio by you, shall be licensed as MIT, without any additional +terms or conditions. diff --git a/src/io.rs b/src/io.rs new file mode 100644 index 0000000..4ec66a4 --- /dev/null +++ b/src/io.rs @@ -0,0 +1,494 @@ +#![cfg(not(loom))] + +//! A mock type implementing [`AsyncRead`] and [`AsyncWrite`]. +//! +//! +//! # Overview +//! +//! Provides a type that implements [`AsyncRead`] + [`AsyncWrite`] that can be configured +//! to handle an arbitrary sequence of read and write operations. This is useful +//! for writing unit tests for networking services as using an actual network +//! type is fairly non deterministic. +//! +//! # Usage +//! +//! Attempting to write data that the mock isn't expecting will result in a +//! panic. +//! +//! [`AsyncRead`]: tokio::io::AsyncRead +//! [`AsyncWrite`]: tokio::io::AsyncWrite + +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::sync::mpsc; +use tokio::time::{self, Duration, Instant, Sleep}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use futures_core::{ready, Stream}; +use std::collections::VecDeque; +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{self, Poll, Waker}; +use std::{cmp, io}; + +/// An I/O object that follows a predefined script. +/// +/// This value is created by `Builder` and implements `AsyncRead` + `AsyncWrite`. It +/// follows the scenario described by the builder and panics otherwise. +#[derive(Debug)] +pub struct Mock { + inner: Inner, +} + +/// A handle to send additional actions to the related `Mock`. +#[derive(Debug)] +pub struct Handle { + tx: mpsc::UnboundedSender, +} + +/// Builds `Mock` instances. +#[derive(Debug, Clone, Default)] +pub struct Builder { + // Sequence of actions for the Mock to take + actions: VecDeque, +} + +#[derive(Debug, Clone)] +enum Action { + Read(Vec), + Write(Vec), + Wait(Duration), + // Wrapped in Arc so that Builder can be cloned and Send. + // Mock is not cloned as does not need to check Rc for ref counts. + ReadError(Option>), + WriteError(Option>), +} + +struct Inner { + actions: VecDeque, + waiting: Option, + sleep: Option>>, + read_wait: Option, + rx: UnboundedReceiverStream, +} + +impl Builder { + /// Return a new, empty `Builder. + pub fn new() -> Self { + Self::default() + } + + /// Sequence a `read` operation. + /// + /// The next operation in the mock's script will be to expect a `read` call + /// and return `buf`. + pub fn read(&mut self, buf: &[u8]) -> &mut Self { + self.actions.push_back(Action::Read(buf.into())); + self + } + + /// Sequence a `read` operation that produces an error. + /// + /// The next operation in the mock's script will be to expect a `read` call + /// and return `error`. + pub fn read_error(&mut self, error: io::Error) -> &mut Self { + let error = Some(error.into()); + self.actions.push_back(Action::ReadError(error)); + self + } + + /// Sequence a `write` operation. + /// + /// The next operation in the mock's script will be to expect a `write` + /// call. + pub fn write(&mut self, buf: &[u8]) -> &mut Self { + self.actions.push_back(Action::Write(buf.into())); + self + } + + /// Sequence a `write` operation that produces an error. + /// + /// The next operation in the mock's script will be to expect a `write` + /// call that provides `error`. + pub fn write_error(&mut self, error: io::Error) -> &mut Self { + let error = Some(error.into()); + self.actions.push_back(Action::WriteError(error)); + self + } + + /// Sequence a wait. + /// + /// The next operation in the mock's script will be to wait without doing so + /// for `duration` amount of time. + pub fn wait(&mut self, duration: Duration) -> &mut Self { + let duration = cmp::max(duration, Duration::from_millis(1)); + self.actions.push_back(Action::Wait(duration)); + self + } + + /// Build a `Mock` value according to the defined script. + pub fn build(&mut self) -> Mock { + let (mock, _) = self.build_with_handle(); + mock + } + + /// Build a `Mock` value paired with a handle + pub fn build_with_handle(&mut self) -> (Mock, Handle) { + let (inner, handle) = Inner::new(self.actions.clone()); + + let mock = Mock { inner }; + + (mock, handle) + } +} + +impl Handle { + /// Sequence a `read` operation. + /// + /// The next operation in the mock's script will be to expect a `read` call + /// and return `buf`. + pub fn read(&mut self, buf: &[u8]) -> &mut Self { + self.tx.send(Action::Read(buf.into())).unwrap(); + self + } + + /// Sequence a `read` operation error. + /// + /// The next operation in the mock's script will be to expect a `read` call + /// and return `error`. + pub fn read_error(&mut self, error: io::Error) -> &mut Self { + let error = Some(error.into()); + self.tx.send(Action::ReadError(error)).unwrap(); + self + } + + /// Sequence a `write` operation. + /// + /// The next operation in the mock's script will be to expect a `write` + /// call. + pub fn write(&mut self, buf: &[u8]) -> &mut Self { + self.tx.send(Action::Write(buf.into())).unwrap(); + self + } + + /// Sequence a `write` operation error. + /// + /// The next operation in the mock's script will be to expect a `write` + /// call error. + pub fn write_error(&mut self, error: io::Error) -> &mut Self { + let error = Some(error.into()); + self.tx.send(Action::WriteError(error)).unwrap(); + self + } +} + +impl Inner { + fn new(actions: VecDeque) -> (Inner, Handle) { + let (tx, rx) = mpsc::unbounded_channel(); + + let rx = UnboundedReceiverStream::new(rx); + + let inner = Inner { + actions, + sleep: None, + read_wait: None, + rx, + waiting: None, + }; + + let handle = Handle { tx }; + + (inner, handle) + } + + fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll> { + Pin::new(&mut self.rx).poll_next(cx) + } + + fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> { + match self.action() { + Some(&mut Action::Read(ref mut data)) => { + // Figure out how much to copy + let n = cmp::min(dst.remaining(), data.len()); + + // Copy the data into the `dst` slice + dst.put_slice(&data[..n]); + + // Drain the data from the source + data.drain(..n); + + Ok(()) + } + Some(&mut Action::ReadError(ref mut err)) => { + // As the + let err = err.take().expect("Should have been removed from actions."); + let err = Arc::try_unwrap(err).expect("There are no other references."); + Err(err) + } + Some(_) => { + // Either waiting or expecting a write + Err(io::ErrorKind::WouldBlock.into()) + } + None => Ok(()), + } + } + + fn write(&mut self, mut src: &[u8]) -> io::Result { + let mut ret = 0; + + if self.actions.is_empty() { + return Err(io::ErrorKind::BrokenPipe.into()); + } + + if let Some(&mut Action::Wait(..)) = self.action() { + return Err(io::ErrorKind::WouldBlock.into()); + } + + if let Some(&mut Action::WriteError(ref mut err)) = self.action() { + let err = err.take().expect("Should have been removed from actions."); + let err = Arc::try_unwrap(err).expect("There are no other references."); + return Err(err); + } + + for i in 0..self.actions.len() { + match self.actions[i] { + Action::Write(ref mut expect) => { + let n = cmp::min(src.len(), expect.len()); + + assert_eq!(&src[..n], &expect[..n]); + + // Drop data that was matched + expect.drain(..n); + src = &src[n..]; + + ret += n; + + if src.is_empty() { + return Ok(ret); + } + } + Action::Wait(..) | Action::WriteError(..) => { + break; + } + _ => {} + } + + // TODO: remove write + } + + Ok(ret) + } + + fn remaining_wait(&mut self) -> Option { + match self.action() { + Some(&mut Action::Wait(dur)) => Some(dur), + _ => None, + } + } + + fn action(&mut self) -> Option<&mut Action> { + loop { + if self.actions.is_empty() { + return None; + } + + match self.actions[0] { + Action::Read(ref mut data) => { + if !data.is_empty() { + break; + } + } + Action::Write(ref mut data) => { + if !data.is_empty() { + break; + } + } + Action::Wait(ref mut dur) => { + if let Some(until) = self.waiting { + let now = Instant::now(); + + if now < until { + break; + } + } else { + self.waiting = Some(Instant::now() + *dur); + break; + } + } + Action::ReadError(ref mut error) | Action::WriteError(ref mut error) => { + if error.is_some() { + break; + } + } + } + + let _action = self.actions.pop_front(); + } + + self.actions.front_mut() + } +} + +// ===== impl Inner ===== + +impl Mock { + fn maybe_wakeup_reader(&mut self) { + match self.inner.action() { + Some(&mut Action::Read(_)) | Some(&mut Action::ReadError(_)) | None => { + if let Some(waker) = self.inner.read_wait.take() { + waker.wake(); + } + } + _ => {} + } + } +} + +impl AsyncRead for Mock { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + loop { + if let Some(ref mut sleep) = self.inner.sleep { + ready!(Pin::new(sleep).poll(cx)); + } + + // If a sleep is set, it has already fired + self.inner.sleep = None; + + // Capture 'filled' to monitor if it changed + let filled = buf.filled().len(); + + match self.inner.read(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + if let Some(rem) = self.inner.remaining_wait() { + let until = Instant::now() + rem; + self.inner.sleep = Some(Box::pin(time::sleep_until(until))); + } else { + self.inner.read_wait = Some(cx.waker().clone()); + return Poll::Pending; + } + } + Ok(()) => { + if buf.filled().len() == filled { + match ready!(self.inner.poll_action(cx)) { + Some(action) => { + self.inner.actions.push_back(action); + continue; + } + None => { + return Poll::Ready(Ok(())); + } + } + } else { + return Poll::Ready(Ok(())); + } + } + Err(e) => return Poll::Ready(Err(e)), + } + } + } +} + +impl AsyncWrite for Mock { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll> { + loop { + if let Some(ref mut sleep) = self.inner.sleep { + ready!(Pin::new(sleep).poll(cx)); + } + + // If a sleep is set, it has already fired + self.inner.sleep = None; + + match self.inner.write(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + if let Some(rem) = self.inner.remaining_wait() { + let until = Instant::now() + rem; + self.inner.sleep = Some(Box::pin(time::sleep_until(until))); + } else { + panic!("unexpected WouldBlock"); + } + } + Ok(0) => { + // TODO: Is this correct? + if !self.inner.actions.is_empty() { + return Poll::Pending; + } + + // TODO: Extract + match ready!(self.inner.poll_action(cx)) { + Some(action) => { + self.inner.actions.push_back(action); + continue; + } + None => { + panic!("unexpected write"); + } + } + } + ret => { + self.maybe_wakeup_reader(); + return Poll::Ready(ret); + } + } + } + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +/// Ensures that Mock isn't dropped with data "inside". +impl Drop for Mock { + fn drop(&mut self) { + // Avoid double panicking, since makes debugging much harder. + if std::thread::panicking() { + return; + } + + self.inner.actions.iter().for_each(|a| match a { + Action::Read(data) => assert!(data.is_empty(), "There is still data left to read."), + Action::Write(data) => assert!(data.is_empty(), "There is still data left to write."), + _ => (), + }) + } +} +/* +/// Returns `true` if called from the context of a futures-rs Task +fn is_task_ctx() -> bool { + use std::panic; + + // Save the existing panic hook + let h = panic::take_hook(); + + // Install a new one that does nothing + panic::set_hook(Box::new(|_| {})); + + // Attempt to call the fn + let r = panic::catch_unwind(|| task::current()).is_ok(); + + // Re-install the old one + panic::set_hook(h); + + // Return the result + r +} +*/ + +impl fmt::Debug for Inner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Inner {{...}}") + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..c510454 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,36 @@ +#![warn( + missing_debug_implementations, + missing_docs, + rust_2018_idioms, + unreachable_pub +)] +#![cfg_attr(docsrs, deny(broken_intra_doc_links))] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] + +//! Tokio and Futures based testing utilites + +pub mod io; + +mod macros; +pub mod task; + +/// Runs the provided future, blocking the current thread until the +/// future completes. +/// +/// For more information, see the documentation for +/// [`tokio::runtime::Runtime::block_on`][runtime-block-on]. +/// +/// [runtime-block-on]: https://docs.rs/tokio/1.3.0/tokio/runtime/struct.Runtime.html#method.block_on +pub fn block_on(future: F) -> F::Output { + use tokio::runtime; + + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(future) +} diff --git a/src/macros.rs b/src/macros.rs new file mode 100644 index 0000000..7ca7345 --- /dev/null +++ b/src/macros.rs @@ -0,0 +1,295 @@ +//! A collection of useful macros for testing futures and tokio based code + +/// Asserts a `Poll` is ready, returning the value. +/// +/// This will invoke `panic!` if the provided `Poll` does not evaluate to `Poll::Ready` at +/// runtime. +/// +/// # Custom Messages +/// +/// This macro has a second form, where a custom panic message can be provided with or without +/// arguments for formatting. +/// +/// # Examples +/// +/// ``` +/// use futures_util::future; +/// use tokio_test::{assert_ready, task}; +/// +/// let mut fut = task::spawn(future::ready(())); +/// assert_ready!(fut.poll()); +/// ``` +#[macro_export] +macro_rules! assert_ready { + ($e:expr) => {{ + use core::task::Poll::*; + match $e { + Ready(v) => v, + Pending => panic!("pending"), + } + }}; + ($e:expr, $($msg:tt)+) => {{ + use core::task::Poll::*; + match $e { + Ready(v) => v, + Pending => { + panic!("pending; {}", format_args!($($msg)+)) + } + } + }}; +} + +/// Asserts a `Poll>` is ready and `Ok`, returning the value. +/// +/// This will invoke `panic!` if the provided `Poll` does not evaluate to `Poll::Ready(Ok(..))` at +/// runtime. +/// +/// # Custom Messages +/// +/// This macro has a second form, where a custom panic message can be provided with or without +/// arguments for formatting. +/// +/// # Examples +/// +/// ``` +/// use futures_util::future; +/// use tokio_test::{assert_ready_ok, task}; +/// +/// let mut fut = task::spawn(future::ok::<_, ()>(())); +/// assert_ready_ok!(fut.poll()); +/// ``` +#[macro_export] +macro_rules! assert_ready_ok { + ($e:expr) => {{ + use tokio_test::{assert_ready, assert_ok}; + let val = assert_ready!($e); + assert_ok!(val) + }}; + ($e:expr, $($msg:tt)+) => {{ + use tokio_test::{assert_ready, assert_ok}; + let val = assert_ready!($e, $($msg)*); + assert_ok!(val, $($msg)*) + }}; +} + +/// Asserts a `Poll>` is ready and `Err`, returning the error. +/// +/// This will invoke `panic!` if the provided `Poll` does not evaluate to `Poll::Ready(Err(..))` at +/// runtime. +/// +/// # Custom Messages +/// +/// This macro has a second form, where a custom panic message can be provided with or without +/// arguments for formatting. +/// +/// # Examples +/// +/// ``` +/// use futures_util::future; +/// use tokio_test::{assert_ready_err, task}; +/// +/// let mut fut = task::spawn(future::err::<(), _>(())); +/// assert_ready_err!(fut.poll()); +/// ``` +#[macro_export] +macro_rules! assert_ready_err { + ($e:expr) => {{ + use tokio_test::{assert_ready, assert_err}; + let val = assert_ready!($e); + assert_err!(val) + }}; + ($e:expr, $($msg:tt)+) => {{ + use tokio_test::{assert_ready, assert_err}; + let val = assert_ready!($e, $($msg)*); + assert_err!(val, $($msg)*) + }}; +} + +/// Asserts a `Poll` is pending. +/// +/// This will invoke `panic!` if the provided `Poll` does not evaluate to `Poll::Pending` at +/// runtime. +/// +/// # Custom Messages +/// +/// This macro has a second form, where a custom panic message can be provided with or without +/// arguments for formatting. +/// +/// # Examples +/// +/// ``` +/// use futures_util::future; +/// use tokio_test::{assert_pending, task}; +/// +/// let mut fut = task::spawn(future::pending::<()>()); +/// assert_pending!(fut.poll()); +/// ``` +#[macro_export] +macro_rules! assert_pending { + ($e:expr) => {{ + use core::task::Poll::*; + match $e { + Pending => {} + Ready(v) => panic!("ready; value = {:?}", v), + } + }}; + ($e:expr, $($msg:tt)+) => {{ + use core::task::Poll::*; + match $e { + Pending => {} + Ready(v) => { + panic!("ready; value = {:?}; {}", v, format_args!($($msg)+)) + } + } + }}; +} + +/// Asserts if a poll is ready and check for equality on the value +/// +/// This will invoke `panic!` if the provided `Poll` does not evaluate to `Poll::Ready` at +/// runtime and the value produced does not partially equal the expected value. +/// +/// # Custom Messages +/// +/// This macro has a second form, where a custom panic message can be provided with or without +/// arguments for formatting. +/// +/// # Examples +/// +/// ``` +/// use futures_util::future; +/// use tokio_test::{assert_ready_eq, task}; +/// +/// let mut fut = task::spawn(future::ready(42)); +/// assert_ready_eq!(fut.poll(), 42); +/// ``` +#[macro_export] +macro_rules! assert_ready_eq { + ($e:expr, $expect:expr) => { + let val = $crate::assert_ready!($e); + assert_eq!(val, $expect) + }; + + ($e:expr, $expect:expr, $($msg:tt)+) => { + let val = $crate::assert_ready!($e, $($msg)*); + assert_eq!(val, $expect, $($msg)*) + }; +} + +/// Asserts that the expression evaluates to `Ok` and returns the value. +/// +/// This will invoke the `panic!` macro if the provided expression does not evaluate to `Ok` at +/// runtime. +/// +/// # Custom Messages +/// +/// This macro has a second form, where a custom panic message can be provided with or without +/// arguments for formatting. +/// +/// # Examples +/// +/// ``` +/// use tokio_test::assert_ok; +/// +/// let n: u32 = assert_ok!("123".parse()); +/// +/// let s = "123"; +/// let n: u32 = assert_ok!(s.parse(), "testing parsing {:?} as a u32", s); +/// ``` +#[macro_export] +macro_rules! assert_ok { + ($e:expr) => { + assert_ok!($e,) + }; + ($e:expr,) => {{ + use std::result::Result::*; + match $e { + Ok(v) => v, + Err(e) => panic!("assertion failed: Err({:?})", e), + } + }}; + ($e:expr, $($arg:tt)+) => {{ + use std::result::Result::*; + match $e { + Ok(v) => v, + Err(e) => panic!("assertion failed: Err({:?}): {}", e, format_args!($($arg)+)), + } + }}; +} + +/// Asserts that the expression evaluates to `Err` and returns the error. +/// +/// This will invoke the `panic!` macro if the provided expression does not evaluate to `Err` at +/// runtime. +/// +/// # Custom Messages +/// +/// This macro has a second form, where a custom panic message can be provided with or without +/// arguments for formatting. +/// +/// # Examples +/// +/// ``` +/// use tokio_test::assert_err; +/// use std::str::FromStr; +/// +/// +/// let err = assert_err!(u32::from_str("fail")); +/// +/// let msg = "fail"; +/// let err = assert_err!(u32::from_str(msg), "testing parsing {:?} as u32", msg); +/// ``` +#[macro_export] +macro_rules! assert_err { + ($e:expr) => { + assert_err!($e,); + }; + ($e:expr,) => {{ + use std::result::Result::*; + match $e { + Ok(v) => panic!("assertion failed: Ok({:?})", v), + Err(e) => e, + } + }}; + ($e:expr, $($arg:tt)+) => {{ + use std::result::Result::*; + match $e { + Ok(v) => panic!("assertion failed: Ok({:?}): {}", v, format_args!($($arg)+)), + Err(e) => e, + } + }}; +} + +/// Asserts that an exact duration has elapsed since since the start instant ±1ms. +/// +/// ```rust +/// use tokio::time::{self, Instant}; +/// use std::time::Duration; +/// use tokio_test::assert_elapsed; +/// # async fn test_time_passed() { +/// +/// let start = Instant::now(); +/// let dur = Duration::from_millis(50); +/// time::sleep(dur).await; +/// assert_elapsed!(start, dur); +/// # } +/// ``` +/// +/// This 1ms buffer is required because Tokio's hashed-wheel timer has finite time resolution and +/// will not always sleep for the exact interval. +#[macro_export] +macro_rules! assert_elapsed { + ($start:expr, $dur:expr) => {{ + let elapsed = $start.elapsed(); + // type ascription improves compiler error when wrong type is passed + let lower: std::time::Duration = $dur; + + // Handles ms rounding + assert!( + elapsed >= lower && elapsed <= lower + std::time::Duration::from_millis(1), + "actual = {:?}, expected = {:?}", + elapsed, + lower + ); + }}; +} diff --git a/src/task.rs b/src/task.rs new file mode 100644 index 0000000..fa98bae --- /dev/null +++ b/src/task.rs @@ -0,0 +1,253 @@ +//! Futures task based helpers + +#![allow(clippy::mutex_atomic)] + +use std::future::Future; +use std::mem; +use std::ops; +use std::pin::Pin; +use std::sync::{Arc, Condvar, Mutex}; +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + +use tokio_stream::Stream; + +/// TODO: dox +pub fn spawn(task: T) -> Spawn { + Spawn { + task: MockTask::new(), + future: Box::pin(task), + } +} + +/// Future spawned on a mock task +#[derive(Debug)] +pub struct Spawn { + task: MockTask, + future: Pin>, +} + +/// Mock task +/// +/// A mock task is able to intercept and track wake notifications. +#[derive(Debug, Clone)] +struct MockTask { + waker: Arc, +} + +#[derive(Debug)] +struct ThreadWaker { + state: Mutex, + condvar: Condvar, +} + +const IDLE: usize = 0; +const WAKE: usize = 1; +const SLEEP: usize = 2; + +impl Spawn { + /// Consumes `self` returning the inner value + pub fn into_inner(self) -> T + where + T: Unpin, + { + *Pin::into_inner(self.future) + } + + /// Returns `true` if the inner future has received a wake notification + /// since the last call to `enter`. + pub fn is_woken(&self) -> bool { + self.task.is_woken() + } + + /// Returns the number of references to the task waker + /// + /// The task itself holds a reference. The return value will never be zero. + pub fn waker_ref_count(&self) -> usize { + self.task.waker_ref_count() + } + + /// Enter the task context + pub fn enter(&mut self, f: F) -> R + where + F: FnOnce(&mut Context<'_>, Pin<&mut T>) -> R, + { + let fut = self.future.as_mut(); + self.task.enter(|cx| f(cx, fut)) + } +} + +impl ops::Deref for Spawn { + type Target = T; + + fn deref(&self) -> &T { + &self.future + } +} + +impl ops::DerefMut for Spawn { + fn deref_mut(&mut self) -> &mut T { + &mut self.future + } +} + +impl Spawn { + /// Polls a future + pub fn poll(&mut self) -> Poll { + let fut = self.future.as_mut(); + self.task.enter(|cx| fut.poll(cx)) + } +} + +impl Spawn { + /// Polls a stream + pub fn poll_next(&mut self) -> Poll> { + let stream = self.future.as_mut(); + self.task.enter(|cx| stream.poll_next(cx)) + } +} + +impl Future for Spawn { + type Output = T::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.future.as_mut().poll(cx) + } +} + +impl Stream for Spawn { + type Item = T::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.future.as_mut().poll_next(cx) + } +} + +impl MockTask { + /// Creates new mock task + fn new() -> Self { + MockTask { + waker: Arc::new(ThreadWaker::new()), + } + } + + /// Runs a closure from the context of the task. + /// + /// Any wake notifications resulting from the execution of the closure are + /// tracked. + fn enter(&mut self, f: F) -> R + where + F: FnOnce(&mut Context<'_>) -> R, + { + self.waker.clear(); + let waker = self.waker(); + let mut cx = Context::from_waker(&waker); + + f(&mut cx) + } + + /// Returns `true` if the inner future has received a wake notification + /// since the last call to `enter`. + fn is_woken(&self) -> bool { + self.waker.is_woken() + } + + /// Returns the number of references to the task waker + /// + /// The task itself holds a reference. The return value will never be zero. + fn waker_ref_count(&self) -> usize { + Arc::strong_count(&self.waker) + } + + fn waker(&self) -> Waker { + unsafe { + let raw = to_raw(self.waker.clone()); + Waker::from_raw(raw) + } + } +} + +impl Default for MockTask { + fn default() -> Self { + Self::new() + } +} + +impl ThreadWaker { + fn new() -> Self { + ThreadWaker { + state: Mutex::new(IDLE), + condvar: Condvar::new(), + } + } + + /// Clears any previously received wakes, avoiding potential spurrious + /// wake notifications. This should only be called immediately before running the + /// task. + fn clear(&self) { + *self.state.lock().unwrap() = IDLE; + } + + fn is_woken(&self) -> bool { + match *self.state.lock().unwrap() { + IDLE => false, + WAKE => true, + _ => unreachable!(), + } + } + + fn wake(&self) { + // First, try transitioning from IDLE -> NOTIFY, this does not require a lock. + let mut state = self.state.lock().unwrap(); + let prev = *state; + + if prev == WAKE { + return; + } + + *state = WAKE; + + if prev == IDLE { + return; + } + + // The other half is sleeping, so we wake it up. + assert_eq!(prev, SLEEP); + self.condvar.notify_one(); + } +} + +static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker); + +unsafe fn to_raw(waker: Arc) -> RawWaker { + RawWaker::new(Arc::into_raw(waker) as *const (), &VTABLE) +} + +unsafe fn from_raw(raw: *const ()) -> Arc { + Arc::from_raw(raw as *const ThreadWaker) +} + +unsafe fn clone(raw: *const ()) -> RawWaker { + let waker = from_raw(raw); + + // Increment the ref count + mem::forget(waker.clone()); + + to_raw(waker) +} + +unsafe fn wake(raw: *const ()) { + let waker = from_raw(raw); + waker.wake(); +} + +unsafe fn wake_by_ref(raw: *const ()) { + let waker = from_raw(raw); + waker.wake(); + + // We don't actually own a reference to the unparker + mem::forget(waker); +} + +unsafe fn drop_waker(raw: *const ()) { + let _ = from_raw(raw); +} diff --git a/tests/block_on.rs b/tests/block_on.rs new file mode 100644 index 0000000..efaaf51 --- /dev/null +++ b/tests/block_on.rs @@ -0,0 +1,27 @@ +#![warn(rust_2018_idioms)] + +use tokio::time::{sleep_until, Duration, Instant}; +use tokio_test::block_on; + +#[test] +fn async_block() { + assert_eq!(4, block_on(async { 4 })); +} + +async fn five() -> u8 { + 5 +} + +#[test] +fn async_fn() { + assert_eq!(5, block_on(five())); +} + +#[test] +fn test_sleep() { + let deadline = Instant::now() + Duration::from_millis(100); + + block_on(async { + sleep_until(deadline).await; + }); +} diff --git a/tests/io.rs b/tests/io.rs new file mode 100644 index 0000000..f164aba --- /dev/null +++ b/tests/io.rs @@ -0,0 +1,86 @@ +#![warn(rust_2018_idioms)] + +use std::io; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio_test::io::Builder; + +#[tokio::test] +async fn read() { + let mut mock = Builder::new().read(b"hello ").read(b"world!").build(); + + let mut buf = [0; 256]; + + let n = mock.read(&mut buf).await.expect("read 1"); + assert_eq!(&buf[..n], b"hello "); + + let n = mock.read(&mut buf).await.expect("read 2"); + assert_eq!(&buf[..n], b"world!"); +} + +#[tokio::test] +async fn read_error() { + let error = io::Error::new(io::ErrorKind::Other, "cruel"); + let mut mock = Builder::new() + .read(b"hello ") + .read_error(error) + .read(b"world!") + .build(); + let mut buf = [0; 256]; + + let n = mock.read(&mut buf).await.expect("read 1"); + assert_eq!(&buf[..n], b"hello "); + + match mock.read(&mut buf).await { + Err(error) => { + assert_eq!(error.kind(), io::ErrorKind::Other); + assert_eq!("cruel", format!("{}", error)); + } + Ok(_) => panic!("error not received"), + } + + let n = mock.read(&mut buf).await.expect("read 1"); + assert_eq!(&buf[..n], b"world!"); +} + +#[tokio::test] +async fn write() { + let mut mock = Builder::new().write(b"hello ").write(b"world!").build(); + + mock.write_all(b"hello ").await.expect("write 1"); + mock.write_all(b"world!").await.expect("write 2"); +} + +#[tokio::test] +async fn write_error() { + let error = io::Error::new(io::ErrorKind::Other, "cruel"); + let mut mock = Builder::new() + .write(b"hello ") + .write_error(error) + .write(b"world!") + .build(); + mock.write_all(b"hello ").await.expect("write 1"); + + match mock.write_all(b"whoa").await { + Err(error) => { + assert_eq!(error.kind(), io::ErrorKind::Other); + assert_eq!("cruel", format!("{}", error)); + } + Ok(_) => panic!("error not received"), + } + + mock.write_all(b"world!").await.expect("write 2"); +} + +#[tokio::test] +#[should_panic] +async fn mock_panics_read_data_left() { + use tokio_test::io::Builder; + Builder::new().read(b"read").build(); +} + +#[tokio::test] +#[should_panic] +async fn mock_panics_write_data_left() { + use tokio_test::io::Builder; + Builder::new().write(b"write").build(); +} diff --git a/tests/macros.rs b/tests/macros.rs new file mode 100644 index 0000000..2183fc8 --- /dev/null +++ b/tests/macros.rs @@ -0,0 +1,107 @@ +#![warn(rust_2018_idioms)] + +use std::task::Poll; +use tokio_test::{ + assert_pending, assert_ready, assert_ready_eq, assert_ready_err, assert_ready_ok, +}; + +fn ready() -> Poll<()> { + Poll::Ready(()) +} + +fn ready_ok() -> Poll> { + Poll::Ready(Ok(())) +} + +fn ready_err() -> Poll> { + Poll::Ready(Err(())) +} + +fn pending() -> Poll<()> { + Poll::Pending +} + +#[derive(Debug)] +enum Test { + Data, +} + +#[test] +fn assert_ready() { + let poll = ready(); + assert_ready!(poll); + assert_ready!(poll, "some message"); + assert_ready!(poll, "{:?}", ()); + assert_ready!(poll, "{:?}", Test::Data); +} + +#[test] +#[should_panic] +fn assert_ready_on_pending() { + let poll = pending(); + assert_ready!(poll); +} + +#[test] +fn assert_pending() { + let poll = pending(); + assert_pending!(poll); + assert_pending!(poll, "some message"); + assert_pending!(poll, "{:?}", ()); + assert_pending!(poll, "{:?}", Test::Data); +} + +#[test] +#[should_panic] +fn assert_pending_on_ready() { + let poll = ready(); + assert_pending!(poll); +} + +#[test] +fn assert_ready_ok() { + let poll = ready_ok(); + assert_ready_ok!(poll); + assert_ready_ok!(poll, "some message"); + assert_ready_ok!(poll, "{:?}", ()); + assert_ready_ok!(poll, "{:?}", Test::Data); +} + +#[test] +#[should_panic] +fn assert_ok_on_err() { + let poll = ready_err(); + assert_ready_ok!(poll); +} + +#[test] +fn assert_ready_err() { + let poll = ready_err(); + assert_ready_err!(poll); + assert_ready_err!(poll, "some message"); + assert_ready_err!(poll, "{:?}", ()); + assert_ready_err!(poll, "{:?}", Test::Data); +} + +#[test] +#[should_panic] +fn assert_err_on_ok() { + let poll = ready_ok(); + assert_ready_err!(poll); +} + +#[test] +fn assert_ready_eq() { + let poll = ready(); + assert_ready_eq!(poll, ()); + assert_ready_eq!(poll, (), "some message"); + assert_ready_eq!(poll, (), "{:?}", ()); + assert_ready_eq!(poll, (), "{:?}", Test::Data); +} + +#[test] +#[should_panic] +fn assert_eq_on_not_eq() { + let poll = ready_err(); + assert_ready_eq!(poll, Ok(())); +} -- 2.7.4