Import tokio-test 0.4.2 upstream upstream/0.4.2
authorDongHun Kwak <dh0128.kwak@samsung.com>
Mon, 17 Apr 2023 01:06:38 +0000 (10:06 +0900)
committerDongHun Kwak <dh0128.kwak@samsung.com>
Mon, 17 Apr 2023 01:06:38 +0000 (10:06 +0900)
13 files changed:
.cargo_vcs_info.json [new file with mode: 0644]
CHANGELOG.md [new file with mode: 0644]
Cargo.toml [new file with mode: 0644]
Cargo.toml.orig [new file with mode: 0644]
LICENSE [new file with mode: 0644]
README.md [new file with mode: 0644]
src/io.rs [new file with mode: 0644]
src/lib.rs [new file with mode: 0644]
src/macros.rs [new file with mode: 0644]
src/task.rs [new file with mode: 0644]
tests/block_on.rs [new file with mode: 0644]
tests/io.rs [new file with mode: 0644]
tests/macros.rs [new file with mode: 0644]

diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
new file mode 100644 (file)
index 0000000..eb7e2b0
--- /dev/null
@@ -0,0 +1,5 @@
+{
+  "git": {
+    "sha1": "ff9b0ef7ca5a93f8fb10baba52996e755500546f"
+  }
+}
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644 (file)
index 0000000..5b1786f
--- /dev/null
@@ -0,0 +1,27 @@
+# 0.4.2 (May 14, 2021)
+
+- test: add `assert_elapsed!` macro ([#3728])
+
+[#3728]: https://github.com/tokio-rs/tokio/pull/3728
+
+# 0.4.1 (March 10, 2021)
+
+- Fix `io::Mock` to be `Send` and `Sync` ([#3594])
+
+[#3594]: https://github.com/tokio-rs/tokio/pull/3594
+
+# 0.4.0 (December 23, 2020)
+
+- Track `tokio` 1.0 release.
+
+# 0.3.0 (October 15, 2020)
+
+- Track `tokio` 0.3 release.
+
+# 0.2.1 (April 17, 2020)
+
+- Add `Future` and `Stream` implementations for `task::Spawn<T>`.
+
+# 0.2.0 (November 25, 2019)
+
+- Initial release
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644 (file)
index 0000000..1ebd109
--- /dev/null
@@ -0,0 +1,46 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g., crates.io) dependencies
+#
+# If you believe there's an error in this file please file an
+# issue against the rust-lang/cargo repository. If you're
+# editing this file be aware that the upstream Cargo.toml
+# will likely look very different (and much more reasonable)
+
+[package]
+edition = "2018"
+name = "tokio-test"
+version = "0.4.2"
+authors = ["Tokio Contributors <team@tokio.rs>"]
+description = "Testing utilities for Tokio- and futures-based code\n"
+homepage = "https://tokio.rs"
+documentation = "https://docs.rs/tokio-test/0.4.2/tokio_test"
+categories = ["asynchronous", "testing"]
+license = "MIT"
+repository = "https://github.com/tokio-rs/tokio"
+[package.metadata.docs.rs]
+all-features = true
+[dependencies.async-stream]
+version = "0.3"
+
+[dependencies.bytes]
+version = "1.0.0"
+
+[dependencies.futures-core]
+version = "0.3.0"
+
+[dependencies.tokio]
+version = "1.2.0"
+features = ["rt", "sync", "time", "test-util"]
+
+[dependencies.tokio-stream]
+version = "0.1"
+[dev-dependencies.futures-util]
+version = "0.3.0"
+
+[dev-dependencies.tokio]
+version = "1.2.0"
+features = ["full"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
new file mode 100644 (file)
index 0000000..55d5aaf
--- /dev/null
@@ -0,0 +1,34 @@
+[package]
+name = "tokio-test"
+# When releasing to crates.io:
+# - Remove path dependencies
+# - Update doc url
+#   - Cargo.toml
+# - Update CHANGELOG.md.
+# - Create "tokio-test-0.4.x" git tag.
+version = "0.4.2"
+edition = "2018"
+authors = ["Tokio Contributors <team@tokio.rs>"]
+license = "MIT"
+repository = "https://github.com/tokio-rs/tokio"
+homepage = "https://tokio.rs"
+documentation = "https://docs.rs/tokio-test/0.4.2/tokio_test"
+description = """
+Testing utilities for Tokio- and futures-based code
+"""
+categories = ["asynchronous", "testing"]
+
+[dependencies]
+tokio = { version = "1.2.0", path = "../tokio", features = ["rt", "sync", "time", "test-util"] }
+tokio-stream = { version = "0.1", path = "../tokio-stream" }
+async-stream = "0.3"
+
+bytes = "1.0.0"
+futures-core = "0.3.0"
+
+[dev-dependencies]
+tokio = { version = "1.2.0", path = "../tokio", features = ["full"] }
+futures-util = "0.3.0"
+
+[package.metadata.docs.rs]
+all-features = true
diff --git a/LICENSE b/LICENSE
new file mode 100644 (file)
index 0000000..ffa38bb
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,25 @@
+Copyright (c) 2021 Tokio Contributors
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/README.md b/README.md
new file mode 100644 (file)
index 0000000..64174d9
--- /dev/null
+++ b/README.md
@@ -0,0 +1,13 @@
+# tokio-test
+
+Tokio and Futures based testing utilities
+
+## License
+
+This project is licensed under the [MIT license](LICENSE).
+
+### Contribution
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in Tokio by you, shall be licensed as MIT, without any additional
+terms or conditions.
diff --git a/src/io.rs b/src/io.rs
new file mode 100644 (file)
index 0000000..4ec66a4
--- /dev/null
+++ b/src/io.rs
@@ -0,0 +1,494 @@
+#![cfg(not(loom))]
+
+//! A mock type implementing [`AsyncRead`] and [`AsyncWrite`].
+//!
+//!
+//! # Overview
+//!
+//! Provides a type that implements [`AsyncRead`] + [`AsyncWrite`] that can be configured
+//! to handle an arbitrary sequence of read and write operations. This is useful
+//! for writing unit tests for networking services as using an actual network
+//! type is fairly non deterministic.
+//!
+//! # Usage
+//!
+//! Attempting to write data that the mock isn't expecting will result in a
+//! panic.
+//!
+//! [`AsyncRead`]: tokio::io::AsyncRead
+//! [`AsyncWrite`]: tokio::io::AsyncWrite
+
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+use tokio::sync::mpsc;
+use tokio::time::{self, Duration, Instant, Sleep};
+use tokio_stream::wrappers::UnboundedReceiverStream;
+
+use futures_core::{ready, Stream};
+use std::collections::VecDeque;
+use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{self, Poll, Waker};
+use std::{cmp, io};
+
+/// An I/O object that follows a predefined script.
+///
+/// This value is created by `Builder` and implements `AsyncRead` + `AsyncWrite`. It
+/// follows the scenario described by the builder and panics otherwise.
+#[derive(Debug)]
+pub struct Mock {
+    inner: Inner,
+}
+
+/// A handle to send additional actions to the related `Mock`.
+#[derive(Debug)]
+pub struct Handle {
+    tx: mpsc::UnboundedSender<Action>,
+}
+
+/// Builds `Mock` instances.
+#[derive(Debug, Clone, Default)]
+pub struct Builder {
+    // Sequence of actions for the Mock to take
+    actions: VecDeque<Action>,
+}
+
+#[derive(Debug, Clone)]
+enum Action {
+    Read(Vec<u8>),
+    Write(Vec<u8>),
+    Wait(Duration),
+    // Wrapped in Arc so that Builder can be cloned and Send.
+    // Mock is not cloned as does not need to check Rc for ref counts.
+    ReadError(Option<Arc<io::Error>>),
+    WriteError(Option<Arc<io::Error>>),
+}
+
+struct Inner {
+    actions: VecDeque<Action>,
+    waiting: Option<Instant>,
+    sleep: Option<Pin<Box<Sleep>>>,
+    read_wait: Option<Waker>,
+    rx: UnboundedReceiverStream<Action>,
+}
+
+impl Builder {
+    /// Return a new, empty `Builder.
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Sequence a `read` operation.
+    ///
+    /// The next operation in the mock's script will be to expect a `read` call
+    /// and return `buf`.
+    pub fn read(&mut self, buf: &[u8]) -> &mut Self {
+        self.actions.push_back(Action::Read(buf.into()));
+        self
+    }
+
+    /// Sequence a `read` operation that produces an error.
+    ///
+    /// The next operation in the mock's script will be to expect a `read` call
+    /// and return `error`.
+    pub fn read_error(&mut self, error: io::Error) -> &mut Self {
+        let error = Some(error.into());
+        self.actions.push_back(Action::ReadError(error));
+        self
+    }
+
+    /// Sequence a `write` operation.
+    ///
+    /// The next operation in the mock's script will be to expect a `write`
+    /// call.
+    pub fn write(&mut self, buf: &[u8]) -> &mut Self {
+        self.actions.push_back(Action::Write(buf.into()));
+        self
+    }
+
+    /// Sequence a `write` operation that produces an error.
+    ///
+    /// The next operation in the mock's script will be to expect a `write`
+    /// call that provides `error`.
+    pub fn write_error(&mut self, error: io::Error) -> &mut Self {
+        let error = Some(error.into());
+        self.actions.push_back(Action::WriteError(error));
+        self
+    }
+
+    /// Sequence a wait.
+    ///
+    /// The next operation in the mock's script will be to wait without doing so
+    /// for `duration` amount of time.
+    pub fn wait(&mut self, duration: Duration) -> &mut Self {
+        let duration = cmp::max(duration, Duration::from_millis(1));
+        self.actions.push_back(Action::Wait(duration));
+        self
+    }
+
+    /// Build a `Mock` value according to the defined script.
+    pub fn build(&mut self) -> Mock {
+        let (mock, _) = self.build_with_handle();
+        mock
+    }
+
+    /// Build a `Mock` value paired with a handle
+    pub fn build_with_handle(&mut self) -> (Mock, Handle) {
+        let (inner, handle) = Inner::new(self.actions.clone());
+
+        let mock = Mock { inner };
+
+        (mock, handle)
+    }
+}
+
+impl Handle {
+    /// Sequence a `read` operation.
+    ///
+    /// The next operation in the mock's script will be to expect a `read` call
+    /// and return `buf`.
+    pub fn read(&mut self, buf: &[u8]) -> &mut Self {
+        self.tx.send(Action::Read(buf.into())).unwrap();
+        self
+    }
+
+    /// Sequence a `read` operation error.
+    ///
+    /// The next operation in the mock's script will be to expect a `read` call
+    /// and return `error`.
+    pub fn read_error(&mut self, error: io::Error) -> &mut Self {
+        let error = Some(error.into());
+        self.tx.send(Action::ReadError(error)).unwrap();
+        self
+    }
+
+    /// Sequence a `write` operation.
+    ///
+    /// The next operation in the mock's script will be to expect a `write`
+    /// call.
+    pub fn write(&mut self, buf: &[u8]) -> &mut Self {
+        self.tx.send(Action::Write(buf.into())).unwrap();
+        self
+    }
+
+    /// Sequence a `write` operation error.
+    ///
+    /// The next operation in the mock's script will be to expect a `write`
+    /// call error.
+    pub fn write_error(&mut self, error: io::Error) -> &mut Self {
+        let error = Some(error.into());
+        self.tx.send(Action::WriteError(error)).unwrap();
+        self
+    }
+}
+
+impl Inner {
+    fn new(actions: VecDeque<Action>) -> (Inner, Handle) {
+        let (tx, rx) = mpsc::unbounded_channel();
+
+        let rx = UnboundedReceiverStream::new(rx);
+
+        let inner = Inner {
+            actions,
+            sleep: None,
+            read_wait: None,
+            rx,
+            waiting: None,
+        };
+
+        let handle = Handle { tx };
+
+        (inner, handle)
+    }
+
+    fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> {
+        Pin::new(&mut self.rx).poll_next(cx)
+    }
+
+    fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> {
+        match self.action() {
+            Some(&mut Action::Read(ref mut data)) => {
+                // Figure out how much to copy
+                let n = cmp::min(dst.remaining(), data.len());
+
+                // Copy the data into the `dst` slice
+                dst.put_slice(&data[..n]);
+
+                // Drain the data from the source
+                data.drain(..n);
+
+                Ok(())
+            }
+            Some(&mut Action::ReadError(ref mut err)) => {
+                // As the
+                let err = err.take().expect("Should have been removed from actions.");
+                let err = Arc::try_unwrap(err).expect("There are no other references.");
+                Err(err)
+            }
+            Some(_) => {
+                // Either waiting or expecting a write
+                Err(io::ErrorKind::WouldBlock.into())
+            }
+            None => Ok(()),
+        }
+    }
+
+    fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
+        let mut ret = 0;
+
+        if self.actions.is_empty() {
+            return Err(io::ErrorKind::BrokenPipe.into());
+        }
+
+        if let Some(&mut Action::Wait(..)) = self.action() {
+            return Err(io::ErrorKind::WouldBlock.into());
+        }
+
+        if let Some(&mut Action::WriteError(ref mut err)) = self.action() {
+            let err = err.take().expect("Should have been removed from actions.");
+            let err = Arc::try_unwrap(err).expect("There are no other references.");
+            return Err(err);
+        }
+
+        for i in 0..self.actions.len() {
+            match self.actions[i] {
+                Action::Write(ref mut expect) => {
+                    let n = cmp::min(src.len(), expect.len());
+
+                    assert_eq!(&src[..n], &expect[..n]);
+
+                    // Drop data that was matched
+                    expect.drain(..n);
+                    src = &src[n..];
+
+                    ret += n;
+
+                    if src.is_empty() {
+                        return Ok(ret);
+                    }
+                }
+                Action::Wait(..) | Action::WriteError(..) => {
+                    break;
+                }
+                _ => {}
+            }
+
+            // TODO: remove write
+        }
+
+        Ok(ret)
+    }
+
+    fn remaining_wait(&mut self) -> Option<Duration> {
+        match self.action() {
+            Some(&mut Action::Wait(dur)) => Some(dur),
+            _ => None,
+        }
+    }
+
+    fn action(&mut self) -> Option<&mut Action> {
+        loop {
+            if self.actions.is_empty() {
+                return None;
+            }
+
+            match self.actions[0] {
+                Action::Read(ref mut data) => {
+                    if !data.is_empty() {
+                        break;
+                    }
+                }
+                Action::Write(ref mut data) => {
+                    if !data.is_empty() {
+                        break;
+                    }
+                }
+                Action::Wait(ref mut dur) => {
+                    if let Some(until) = self.waiting {
+                        let now = Instant::now();
+
+                        if now < until {
+                            break;
+                        }
+                    } else {
+                        self.waiting = Some(Instant::now() + *dur);
+                        break;
+                    }
+                }
+                Action::ReadError(ref mut error) | Action::WriteError(ref mut error) => {
+                    if error.is_some() {
+                        break;
+                    }
+                }
+            }
+
+            let _action = self.actions.pop_front();
+        }
+
+        self.actions.front_mut()
+    }
+}
+
+// ===== impl Inner =====
+
+impl Mock {
+    fn maybe_wakeup_reader(&mut self) {
+        match self.inner.action() {
+            Some(&mut Action::Read(_)) | Some(&mut Action::ReadError(_)) | None => {
+                if let Some(waker) = self.inner.read_wait.take() {
+                    waker.wake();
+                }
+            }
+            _ => {}
+        }
+    }
+}
+
+impl AsyncRead for Mock {
+    fn poll_read(
+        mut self: Pin<&mut Self>,
+        cx: &mut task::Context<'_>,
+        buf: &mut ReadBuf<'_>,
+    ) -> Poll<io::Result<()>> {
+        loop {
+            if let Some(ref mut sleep) = self.inner.sleep {
+                ready!(Pin::new(sleep).poll(cx));
+            }
+
+            // If a sleep is set, it has already fired
+            self.inner.sleep = None;
+
+            // Capture 'filled' to monitor if it changed
+            let filled = buf.filled().len();
+
+            match self.inner.read(buf) {
+                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+                    if let Some(rem) = self.inner.remaining_wait() {
+                        let until = Instant::now() + rem;
+                        self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
+                    } else {
+                        self.inner.read_wait = Some(cx.waker().clone());
+                        return Poll::Pending;
+                    }
+                }
+                Ok(()) => {
+                    if buf.filled().len() == filled {
+                        match ready!(self.inner.poll_action(cx)) {
+                            Some(action) => {
+                                self.inner.actions.push_back(action);
+                                continue;
+                            }
+                            None => {
+                                return Poll::Ready(Ok(()));
+                            }
+                        }
+                    } else {
+                        return Poll::Ready(Ok(()));
+                    }
+                }
+                Err(e) => return Poll::Ready(Err(e)),
+            }
+        }
+    }
+}
+
+impl AsyncWrite for Mock {
+    fn poll_write(
+        mut self: Pin<&mut Self>,
+        cx: &mut task::Context<'_>,
+        buf: &[u8],
+    ) -> Poll<io::Result<usize>> {
+        loop {
+            if let Some(ref mut sleep) = self.inner.sleep {
+                ready!(Pin::new(sleep).poll(cx));
+            }
+
+            // If a sleep is set, it has already fired
+            self.inner.sleep = None;
+
+            match self.inner.write(buf) {
+                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+                    if let Some(rem) = self.inner.remaining_wait() {
+                        let until = Instant::now() + rem;
+                        self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
+                    } else {
+                        panic!("unexpected WouldBlock");
+                    }
+                }
+                Ok(0) => {
+                    // TODO: Is this correct?
+                    if !self.inner.actions.is_empty() {
+                        return Poll::Pending;
+                    }
+
+                    // TODO: Extract
+                    match ready!(self.inner.poll_action(cx)) {
+                        Some(action) => {
+                            self.inner.actions.push_back(action);
+                            continue;
+                        }
+                        None => {
+                            panic!("unexpected write");
+                        }
+                    }
+                }
+                ret => {
+                    self.maybe_wakeup_reader();
+                    return Poll::Ready(ret);
+                }
+            }
+        }
+    }
+
+    fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+        Poll::Ready(Ok(()))
+    }
+}
+
+/// Ensures that Mock isn't dropped with data "inside".
+impl Drop for Mock {
+    fn drop(&mut self) {
+        // Avoid double panicking, since makes debugging much harder.
+        if std::thread::panicking() {
+            return;
+        }
+
+        self.inner.actions.iter().for_each(|a| match a {
+            Action::Read(data) => assert!(data.is_empty(), "There is still data left to read."),
+            Action::Write(data) => assert!(data.is_empty(), "There is still data left to write."),
+            _ => (),
+        })
+    }
+}
+/*
+/// Returns `true` if called from the context of a futures-rs Task
+fn is_task_ctx() -> bool {
+    use std::panic;
+
+    // Save the existing panic hook
+    let h = panic::take_hook();
+
+    // Install a new one that does nothing
+    panic::set_hook(Box::new(|_| {}));
+
+    // Attempt to call the fn
+    let r = panic::catch_unwind(|| task::current()).is_ok();
+
+    // Re-install the old one
+    panic::set_hook(h);
+
+    // Return the result
+    r
+}
+*/
+
+impl fmt::Debug for Inner {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "Inner {{...}}")
+    }
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644 (file)
index 0000000..c510454
--- /dev/null
@@ -0,0 +1,36 @@
+#![warn(
+    missing_debug_implementations,
+    missing_docs,
+    rust_2018_idioms,
+    unreachable_pub
+)]
+#![cfg_attr(docsrs, deny(broken_intra_doc_links))]
+#![doc(test(
+    no_crate_inject,
+    attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
+))]
+
+//! Tokio and Futures based testing utilites
+
+pub mod io;
+
+mod macros;
+pub mod task;
+
+/// Runs the provided future, blocking the current thread until the
+/// future completes.
+///
+/// For more information, see the documentation for
+/// [`tokio::runtime::Runtime::block_on`][runtime-block-on].
+///
+/// [runtime-block-on]: https://docs.rs/tokio/1.3.0/tokio/runtime/struct.Runtime.html#method.block_on
+pub fn block_on<F: std::future::Future>(future: F) -> F::Output {
+    use tokio::runtime;
+
+    let rt = runtime::Builder::new_current_thread()
+        .enable_all()
+        .build()
+        .unwrap();
+
+    rt.block_on(future)
+}
diff --git a/src/macros.rs b/src/macros.rs
new file mode 100644 (file)
index 0000000..7ca7345
--- /dev/null
@@ -0,0 +1,295 @@
+//! A collection of useful macros for testing futures and tokio based code
+
+/// Asserts a `Poll` is ready, returning the value.
+///
+/// This will invoke `panic!` if the provided `Poll` does not evaluate to `Poll::Ready` at
+/// runtime.
+///
+/// # Custom Messages
+///
+/// This macro has a second form, where a custom panic message can be provided with or without
+/// arguments for formatting.
+///
+/// # Examples
+///
+/// ```
+/// use futures_util::future;
+/// use tokio_test::{assert_ready, task};
+///
+/// let mut fut = task::spawn(future::ready(()));
+/// assert_ready!(fut.poll());
+/// ```
+#[macro_export]
+macro_rules! assert_ready {
+    ($e:expr) => {{
+        use core::task::Poll::*;
+        match $e {
+            Ready(v) => v,
+            Pending => panic!("pending"),
+        }
+    }};
+    ($e:expr, $($msg:tt)+) => {{
+        use core::task::Poll::*;
+        match $e {
+            Ready(v) => v,
+            Pending => {
+                panic!("pending; {}", format_args!($($msg)+))
+            }
+        }
+    }};
+}
+
+/// Asserts a `Poll<Result<...>>` is ready and `Ok`, returning the value.
+///
+/// This will invoke `panic!` if the provided `Poll` does not evaluate to `Poll::Ready(Ok(..))` at
+/// runtime.
+///
+/// # Custom Messages
+///
+/// This macro has a second form, where a custom panic message can be provided with or without
+/// arguments for formatting.
+///
+/// # Examples
+///
+/// ```
+/// use futures_util::future;
+/// use tokio_test::{assert_ready_ok, task};
+///
+/// let mut fut = task::spawn(future::ok::<_, ()>(()));
+/// assert_ready_ok!(fut.poll());
+/// ```
+#[macro_export]
+macro_rules! assert_ready_ok {
+    ($e:expr) => {{
+        use tokio_test::{assert_ready, assert_ok};
+        let val = assert_ready!($e);
+        assert_ok!(val)
+    }};
+    ($e:expr, $($msg:tt)+) => {{
+        use tokio_test::{assert_ready, assert_ok};
+        let val = assert_ready!($e, $($msg)*);
+        assert_ok!(val, $($msg)*)
+    }};
+}
+
+/// Asserts a `Poll<Result<...>>` is ready and `Err`, returning the error.
+///
+/// This will invoke `panic!` if the provided `Poll` does not evaluate to `Poll::Ready(Err(..))` at
+/// runtime.
+///
+/// # Custom Messages
+///
+/// This macro has a second form, where a custom panic message can be provided with or without
+/// arguments for formatting.
+///
+/// # Examples
+///
+/// ```
+/// use futures_util::future;
+/// use tokio_test::{assert_ready_err, task};
+///
+/// let mut fut = task::spawn(future::err::<(), _>(()));
+/// assert_ready_err!(fut.poll());
+/// ```
+#[macro_export]
+macro_rules! assert_ready_err {
+    ($e:expr) => {{
+        use tokio_test::{assert_ready, assert_err};
+        let val = assert_ready!($e);
+        assert_err!(val)
+    }};
+    ($e:expr, $($msg:tt)+) => {{
+        use tokio_test::{assert_ready, assert_err};
+        let val = assert_ready!($e, $($msg)*);
+        assert_err!(val, $($msg)*)
+    }};
+}
+
+/// Asserts a `Poll` is pending.
+///
+/// This will invoke `panic!` if the provided `Poll` does not evaluate to `Poll::Pending` at
+/// runtime.
+///
+/// # Custom Messages
+///
+/// This macro has a second form, where a custom panic message can be provided with or without
+/// arguments for formatting.
+///
+/// # Examples
+///
+/// ```
+/// use futures_util::future;
+/// use tokio_test::{assert_pending, task};
+///
+/// let mut fut = task::spawn(future::pending::<()>());
+/// assert_pending!(fut.poll());
+/// ```
+#[macro_export]
+macro_rules! assert_pending {
+    ($e:expr) => {{
+        use core::task::Poll::*;
+        match $e {
+            Pending => {}
+            Ready(v) => panic!("ready; value = {:?}", v),
+        }
+    }};
+    ($e:expr, $($msg:tt)+) => {{
+        use core::task::Poll::*;
+        match $e {
+            Pending => {}
+            Ready(v) => {
+                panic!("ready; value = {:?}; {}", v, format_args!($($msg)+))
+            }
+        }
+    }};
+}
+
+/// Asserts if a poll is ready and check for equality on the value
+///
+/// This will invoke `panic!` if the provided `Poll` does not evaluate to `Poll::Ready` at
+/// runtime and the value produced does not partially equal the expected value.
+///
+/// # Custom Messages
+///
+/// This macro has a second form, where a custom panic message can be provided with or without
+/// arguments for formatting.
+///
+/// # Examples
+///
+/// ```
+/// use futures_util::future;
+/// use tokio_test::{assert_ready_eq, task};
+///
+/// let mut fut = task::spawn(future::ready(42));
+/// assert_ready_eq!(fut.poll(), 42);
+/// ```
+#[macro_export]
+macro_rules! assert_ready_eq {
+    ($e:expr, $expect:expr) => {
+        let val = $crate::assert_ready!($e);
+        assert_eq!(val, $expect)
+    };
+
+    ($e:expr, $expect:expr, $($msg:tt)+) => {
+        let val = $crate::assert_ready!($e, $($msg)*);
+        assert_eq!(val, $expect, $($msg)*)
+    };
+}
+
+/// Asserts that the expression evaluates to `Ok` and returns the value.
+///
+/// This will invoke the `panic!` macro if the provided expression does not evaluate to `Ok` at
+/// runtime.
+///
+/// # Custom Messages
+///
+/// This macro has a second form, where a custom panic message can be provided with or without
+/// arguments for formatting.
+///
+/// # Examples
+///
+/// ```
+/// use tokio_test::assert_ok;
+///
+/// let n: u32 = assert_ok!("123".parse());
+///
+/// let s = "123";
+/// let n: u32 = assert_ok!(s.parse(), "testing parsing {:?} as a u32", s);
+/// ```
+#[macro_export]
+macro_rules! assert_ok {
+    ($e:expr) => {
+        assert_ok!($e,)
+    };
+    ($e:expr,) => {{
+        use std::result::Result::*;
+        match $e {
+            Ok(v) => v,
+            Err(e) => panic!("assertion failed: Err({:?})", e),
+        }
+    }};
+    ($e:expr, $($arg:tt)+) => {{
+        use std::result::Result::*;
+        match $e {
+            Ok(v) => v,
+            Err(e) => panic!("assertion failed: Err({:?}): {}", e, format_args!($($arg)+)),
+        }
+    }};
+}
+
+/// Asserts that the expression evaluates to `Err` and returns the error.
+///
+/// This will invoke the `panic!` macro if the provided expression does not evaluate to `Err` at
+/// runtime.
+///
+/// # Custom Messages
+///
+/// This macro has a second form, where a custom panic message can be provided with or without
+/// arguments for formatting.
+///
+/// # Examples
+///
+/// ```
+/// use tokio_test::assert_err;
+/// use std::str::FromStr;
+///
+///
+/// let err = assert_err!(u32::from_str("fail"));
+///
+/// let msg = "fail";
+/// let err = assert_err!(u32::from_str(msg), "testing parsing {:?} as u32", msg);
+/// ```
+#[macro_export]
+macro_rules! assert_err {
+    ($e:expr) => {
+        assert_err!($e,);
+    };
+    ($e:expr,) => {{
+        use std::result::Result::*;
+        match $e {
+            Ok(v) => panic!("assertion failed: Ok({:?})", v),
+            Err(e) => e,
+        }
+    }};
+    ($e:expr, $($arg:tt)+) => {{
+        use std::result::Result::*;
+        match $e {
+            Ok(v) => panic!("assertion failed: Ok({:?}): {}", v, format_args!($($arg)+)),
+            Err(e) => e,
+        }
+    }};
+}
+
+/// Asserts that an exact duration has elapsed since since the start instant ±1ms.
+///
+/// ```rust
+/// use tokio::time::{self, Instant};
+/// use std::time::Duration;
+/// use tokio_test::assert_elapsed;
+/// # async fn test_time_passed() {
+///
+/// let start = Instant::now();
+/// let dur = Duration::from_millis(50);
+/// time::sleep(dur).await;
+/// assert_elapsed!(start, dur);
+/// # }
+/// ```
+///
+/// This 1ms buffer is required because Tokio's hashed-wheel timer has finite time resolution and
+/// will not always sleep for the exact interval.
+#[macro_export]
+macro_rules! assert_elapsed {
+    ($start:expr, $dur:expr) => {{
+        let elapsed = $start.elapsed();
+        // type ascription improves compiler error when wrong type is passed
+        let lower: std::time::Duration = $dur;
+
+        // Handles ms rounding
+        assert!(
+            elapsed >= lower && elapsed <= lower + std::time::Duration::from_millis(1),
+            "actual = {:?}, expected = {:?}",
+            elapsed,
+            lower
+        );
+    }};
+}
diff --git a/src/task.rs b/src/task.rs
new file mode 100644 (file)
index 0000000..fa98bae
--- /dev/null
@@ -0,0 +1,253 @@
+//! Futures task based helpers
+
+#![allow(clippy::mutex_atomic)]
+
+use std::future::Future;
+use std::mem;
+use std::ops;
+use std::pin::Pin;
+use std::sync::{Arc, Condvar, Mutex};
+use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
+
+use tokio_stream::Stream;
+
+/// TODO: dox
+pub fn spawn<T>(task: T) -> Spawn<T> {
+    Spawn {
+        task: MockTask::new(),
+        future: Box::pin(task),
+    }
+}
+
+/// Future spawned on a mock task
+#[derive(Debug)]
+pub struct Spawn<T> {
+    task: MockTask,
+    future: Pin<Box<T>>,
+}
+
+/// Mock task
+///
+/// A mock task is able to intercept and track wake notifications.
+#[derive(Debug, Clone)]
+struct MockTask {
+    waker: Arc<ThreadWaker>,
+}
+
+#[derive(Debug)]
+struct ThreadWaker {
+    state: Mutex<usize>,
+    condvar: Condvar,
+}
+
+const IDLE: usize = 0;
+const WAKE: usize = 1;
+const SLEEP: usize = 2;
+
+impl<T> Spawn<T> {
+    /// Consumes `self` returning the inner value
+    pub fn into_inner(self) -> T
+    where
+        T: Unpin,
+    {
+        *Pin::into_inner(self.future)
+    }
+
+    /// Returns `true` if the inner future has received a wake notification
+    /// since the last call to `enter`.
+    pub fn is_woken(&self) -> bool {
+        self.task.is_woken()
+    }
+
+    /// Returns the number of references to the task waker
+    ///
+    /// The task itself holds a reference. The return value will never be zero.
+    pub fn waker_ref_count(&self) -> usize {
+        self.task.waker_ref_count()
+    }
+
+    /// Enter the task context
+    pub fn enter<F, R>(&mut self, f: F) -> R
+    where
+        F: FnOnce(&mut Context<'_>, Pin<&mut T>) -> R,
+    {
+        let fut = self.future.as_mut();
+        self.task.enter(|cx| f(cx, fut))
+    }
+}
+
+impl<T: Unpin> ops::Deref for Spawn<T> {
+    type Target = T;
+
+    fn deref(&self) -> &T {
+        &self.future
+    }
+}
+
+impl<T: Unpin> ops::DerefMut for Spawn<T> {
+    fn deref_mut(&mut self) -> &mut T {
+        &mut self.future
+    }
+}
+
+impl<T: Future> Spawn<T> {
+    /// Polls a future
+    pub fn poll(&mut self) -> Poll<T::Output> {
+        let fut = self.future.as_mut();
+        self.task.enter(|cx| fut.poll(cx))
+    }
+}
+
+impl<T: Stream> Spawn<T> {
+    /// Polls a stream
+    pub fn poll_next(&mut self) -> Poll<Option<T::Item>> {
+        let stream = self.future.as_mut();
+        self.task.enter(|cx| stream.poll_next(cx))
+    }
+}
+
+impl<T: Future> Future for Spawn<T> {
+    type Output = T::Output;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        self.future.as_mut().poll(cx)
+    }
+}
+
+impl<T: Stream> Stream for Spawn<T> {
+    type Item = T::Item;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        self.future.as_mut().poll_next(cx)
+    }
+}
+
+impl MockTask {
+    /// Creates new mock task
+    fn new() -> Self {
+        MockTask {
+            waker: Arc::new(ThreadWaker::new()),
+        }
+    }
+
+    /// Runs a closure from the context of the task.
+    ///
+    /// Any wake notifications resulting from the execution of the closure are
+    /// tracked.
+    fn enter<F, R>(&mut self, f: F) -> R
+    where
+        F: FnOnce(&mut Context<'_>) -> R,
+    {
+        self.waker.clear();
+        let waker = self.waker();
+        let mut cx = Context::from_waker(&waker);
+
+        f(&mut cx)
+    }
+
+    /// Returns `true` if the inner future has received a wake notification
+    /// since the last call to `enter`.
+    fn is_woken(&self) -> bool {
+        self.waker.is_woken()
+    }
+
+    /// Returns the number of references to the task waker
+    ///
+    /// The task itself holds a reference. The return value will never be zero.
+    fn waker_ref_count(&self) -> usize {
+        Arc::strong_count(&self.waker)
+    }
+
+    fn waker(&self) -> Waker {
+        unsafe {
+            let raw = to_raw(self.waker.clone());
+            Waker::from_raw(raw)
+        }
+    }
+}
+
+impl Default for MockTask {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ThreadWaker {
+    fn new() -> Self {
+        ThreadWaker {
+            state: Mutex::new(IDLE),
+            condvar: Condvar::new(),
+        }
+    }
+
+    /// Clears any previously received wakes, avoiding potential spurrious
+    /// wake notifications. This should only be called immediately before running the
+    /// task.
+    fn clear(&self) {
+        *self.state.lock().unwrap() = IDLE;
+    }
+
+    fn is_woken(&self) -> bool {
+        match *self.state.lock().unwrap() {
+            IDLE => false,
+            WAKE => true,
+            _ => unreachable!(),
+        }
+    }
+
+    fn wake(&self) {
+        // First, try transitioning from IDLE -> NOTIFY, this does not require a lock.
+        let mut state = self.state.lock().unwrap();
+        let prev = *state;
+
+        if prev == WAKE {
+            return;
+        }
+
+        *state = WAKE;
+
+        if prev == IDLE {
+            return;
+        }
+
+        // The other half is sleeping, so we wake it up.
+        assert_eq!(prev, SLEEP);
+        self.condvar.notify_one();
+    }
+}
+
+static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker);
+
+unsafe fn to_raw(waker: Arc<ThreadWaker>) -> RawWaker {
+    RawWaker::new(Arc::into_raw(waker) as *const (), &VTABLE)
+}
+
+unsafe fn from_raw(raw: *const ()) -> Arc<ThreadWaker> {
+    Arc::from_raw(raw as *const ThreadWaker)
+}
+
+unsafe fn clone(raw: *const ()) -> RawWaker {
+    let waker = from_raw(raw);
+
+    // Increment the ref count
+    mem::forget(waker.clone());
+
+    to_raw(waker)
+}
+
+unsafe fn wake(raw: *const ()) {
+    let waker = from_raw(raw);
+    waker.wake();
+}
+
+unsafe fn wake_by_ref(raw: *const ()) {
+    let waker = from_raw(raw);
+    waker.wake();
+
+    // We don't actually own a reference to the unparker
+    mem::forget(waker);
+}
+
+unsafe fn drop_waker(raw: *const ()) {
+    let _ = from_raw(raw);
+}
diff --git a/tests/block_on.rs b/tests/block_on.rs
new file mode 100644 (file)
index 0000000..efaaf51
--- /dev/null
@@ -0,0 +1,27 @@
+#![warn(rust_2018_idioms)]
+
+use tokio::time::{sleep_until, Duration, Instant};
+use tokio_test::block_on;
+
+#[test]
+fn async_block() {
+    assert_eq!(4, block_on(async { 4 }));
+}
+
+async fn five() -> u8 {
+    5
+}
+
+#[test]
+fn async_fn() {
+    assert_eq!(5, block_on(five()));
+}
+
+#[test]
+fn test_sleep() {
+    let deadline = Instant::now() + Duration::from_millis(100);
+
+    block_on(async {
+        sleep_until(deadline).await;
+    });
+}
diff --git a/tests/io.rs b/tests/io.rs
new file mode 100644 (file)
index 0000000..f164aba
--- /dev/null
@@ -0,0 +1,86 @@
+#![warn(rust_2018_idioms)]
+
+use std::io;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio_test::io::Builder;
+
+#[tokio::test]
+async fn read() {
+    let mut mock = Builder::new().read(b"hello ").read(b"world!").build();
+
+    let mut buf = [0; 256];
+
+    let n = mock.read(&mut buf).await.expect("read 1");
+    assert_eq!(&buf[..n], b"hello ");
+
+    let n = mock.read(&mut buf).await.expect("read 2");
+    assert_eq!(&buf[..n], b"world!");
+}
+
+#[tokio::test]
+async fn read_error() {
+    let error = io::Error::new(io::ErrorKind::Other, "cruel");
+    let mut mock = Builder::new()
+        .read(b"hello ")
+        .read_error(error)
+        .read(b"world!")
+        .build();
+    let mut buf = [0; 256];
+
+    let n = mock.read(&mut buf).await.expect("read 1");
+    assert_eq!(&buf[..n], b"hello ");
+
+    match mock.read(&mut buf).await {
+        Err(error) => {
+            assert_eq!(error.kind(), io::ErrorKind::Other);
+            assert_eq!("cruel", format!("{}", error));
+        }
+        Ok(_) => panic!("error not received"),
+    }
+
+    let n = mock.read(&mut buf).await.expect("read 1");
+    assert_eq!(&buf[..n], b"world!");
+}
+
+#[tokio::test]
+async fn write() {
+    let mut mock = Builder::new().write(b"hello ").write(b"world!").build();
+
+    mock.write_all(b"hello ").await.expect("write 1");
+    mock.write_all(b"world!").await.expect("write 2");
+}
+
+#[tokio::test]
+async fn write_error() {
+    let error = io::Error::new(io::ErrorKind::Other, "cruel");
+    let mut mock = Builder::new()
+        .write(b"hello ")
+        .write_error(error)
+        .write(b"world!")
+        .build();
+    mock.write_all(b"hello ").await.expect("write 1");
+
+    match mock.write_all(b"whoa").await {
+        Err(error) => {
+            assert_eq!(error.kind(), io::ErrorKind::Other);
+            assert_eq!("cruel", format!("{}", error));
+        }
+        Ok(_) => panic!("error not received"),
+    }
+
+    mock.write_all(b"world!").await.expect("write 2");
+}
+
+#[tokio::test]
+#[should_panic]
+async fn mock_panics_read_data_left() {
+    use tokio_test::io::Builder;
+    Builder::new().read(b"read").build();
+}
+
+#[tokio::test]
+#[should_panic]
+async fn mock_panics_write_data_left() {
+    use tokio_test::io::Builder;
+    Builder::new().write(b"write").build();
+}
diff --git a/tests/macros.rs b/tests/macros.rs
new file mode 100644 (file)
index 0000000..2183fc8
--- /dev/null
@@ -0,0 +1,107 @@
+#![warn(rust_2018_idioms)]
+
+use std::task::Poll;
+use tokio_test::{
+    assert_pending, assert_ready, assert_ready_eq, assert_ready_err, assert_ready_ok,
+};
+
+fn ready() -> Poll<()> {
+    Poll::Ready(())
+}
+
+fn ready_ok() -> Poll<Result<(), ()>> {
+    Poll::Ready(Ok(()))
+}
+
+fn ready_err() -> Poll<Result<(), ()>> {
+    Poll::Ready(Err(()))
+}
+
+fn pending() -> Poll<()> {
+    Poll::Pending
+}
+
+#[derive(Debug)]
+enum Test {
+    Data,
+}
+
+#[test]
+fn assert_ready() {
+    let poll = ready();
+    assert_ready!(poll);
+    assert_ready!(poll, "some message");
+    assert_ready!(poll, "{:?}", ());
+    assert_ready!(poll, "{:?}", Test::Data);
+}
+
+#[test]
+#[should_panic]
+fn assert_ready_on_pending() {
+    let poll = pending();
+    assert_ready!(poll);
+}
+
+#[test]
+fn assert_pending() {
+    let poll = pending();
+    assert_pending!(poll);
+    assert_pending!(poll, "some message");
+    assert_pending!(poll, "{:?}", ());
+    assert_pending!(poll, "{:?}", Test::Data);
+}
+
+#[test]
+#[should_panic]
+fn assert_pending_on_ready() {
+    let poll = ready();
+    assert_pending!(poll);
+}
+
+#[test]
+fn assert_ready_ok() {
+    let poll = ready_ok();
+    assert_ready_ok!(poll);
+    assert_ready_ok!(poll, "some message");
+    assert_ready_ok!(poll, "{:?}", ());
+    assert_ready_ok!(poll, "{:?}", Test::Data);
+}
+
+#[test]
+#[should_panic]
+fn assert_ok_on_err() {
+    let poll = ready_err();
+    assert_ready_ok!(poll);
+}
+
+#[test]
+fn assert_ready_err() {
+    let poll = ready_err();
+    assert_ready_err!(poll);
+    assert_ready_err!(poll, "some message");
+    assert_ready_err!(poll, "{:?}", ());
+    assert_ready_err!(poll, "{:?}", Test::Data);
+}
+
+#[test]
+#[should_panic]
+fn assert_err_on_ok() {
+    let poll = ready_ok();
+    assert_ready_err!(poll);
+}
+
+#[test]
+fn assert_ready_eq() {
+    let poll = ready();
+    assert_ready_eq!(poll, ());
+    assert_ready_eq!(poll, (), "some message");
+    assert_ready_eq!(poll, (), "{:?}", ());
+    assert_ready_eq!(poll, (), "{:?}", Test::Data);
+}
+
+#[test]
+#[should_panic]
+fn assert_eq_on_not_eq() {
+    let poll = ready_err();
+    assert_ready_eq!(poll, Ok(()));
+}