From 684018fe3c85d48c74f489a8ed87e3dbaa5cb0f6 Mon Sep 17 00:00:00 2001 From: DongHun Kwak Date: Mon, 27 Feb 2023 14:32:21 +0900 Subject: [PATCH] Import spin 0.9.5 --- .cargo_vcs_info.json | 6 + .github/workflows/rust.yml | 56 ++ .gitignore | 3 + .travis.yml | 34 ++ CHANGELOG.md | 128 ++++ Cargo.toml | 72 +++ Cargo.toml.orig | 62 ++ LICENSE | 21 + README.md | 137 +++++ examples/debug.rs | 21 + script/doc-upload.cfg | 3 + src/barrier.rs | 239 ++++++++ src/lazy.rs | 118 ++++ src/lib.rs | 221 +++++++ src/mutex.rs | 340 +++++++++++ src/mutex/fair.rs | 732 +++++++++++++++++++++++ src/mutex/spin.rs | 540 +++++++++++++++++ src/mutex/ticket.rs | 537 +++++++++++++++++ src/once.rs | 731 +++++++++++++++++++++++ src/relax.rs | 61 ++ src/rwlock.rs | 1156 ++++++++++++++++++++++++++++++++++++ 21 files changed, 5218 insertions(+) create mode 100644 .cargo_vcs_info.json create mode 100644 .github/workflows/rust.yml create mode 100644 .gitignore create mode 100644 .travis.yml create mode 100644 CHANGELOG.md create mode 100644 Cargo.toml create mode 100644 Cargo.toml.orig create mode 100644 LICENSE create mode 100644 README.md create mode 100644 examples/debug.rs create mode 100644 script/doc-upload.cfg create mode 100644 src/barrier.rs create mode 100644 src/lazy.rs create mode 100644 src/lib.rs create mode 100644 src/mutex.rs create mode 100644 src/mutex/fair.rs create mode 100644 src/mutex/spin.rs create mode 100644 src/mutex/ticket.rs create mode 100644 src/once.rs create mode 100644 src/relax.rs create mode 100644 src/rwlock.rs diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json new file mode 100644 index 0000000..6f677e3 --- /dev/null +++ b/.cargo_vcs_info.json @@ -0,0 +1,6 @@ +{ + "git": { + "sha1": "5087c8ddb5d080b5bd6c898f95e239bcb3512c22" + }, + "path_in_vcs": "" +} \ No newline at end of file diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml new file mode 100644 index 0000000..04280c5 --- /dev/null +++ b/.github/workflows/rust.yml @@ -0,0 +1,56 @@ +name: Rust + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +env: + CARGO_TERM_COLOR: always + +jobs: + test: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + rust: [stable, beta, nightly] + + steps: + - uses: actions/checkout@v3 + - name: Install Rust + run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} + - name: Run Tests + run: cargo test --verbose + - run: cargo build --all --all-features --all-targets + - name: Catch missing feature flags + if: startsWith(matrix.rust, 'nightly') + run: cargo check -Z features=dev_dep + - name: Install cargo-hack + uses: taiki-e/install-action@cargo-hack + - run: rustup target add thumbv7m-none-eabi + - name: Ensure we don't depend on libstd + run: cargo hack build --target thumbv7m-none-eabi --no-dev-deps --no-default-features + + msrv: + runs-on: ubuntu-latest + strategy: + matrix: + version: [1.38.0] + steps: + - uses: actions/checkout@v3 + - name: Install Rust + run: rustup update ${{ matrix.version }} && rustup default ${{ matrix.version }} + - run: cargo build --all --all-features --all-targets + + miri: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Install Rust + run: rustup toolchain install nightly --component miri && rustup default nightly + - run: cargo miri test + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation -Zmiri-ignore-leaks + RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ac33bc0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*~ +**/target/ +**/Cargo.lock diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..6a1c30f --- /dev/null +++ b/.travis.yml @@ -0,0 +1,34 @@ +language: rust + +rust: + - stable + - beta + - nightly + +sudo: false + +notifications: + email: + on_success: never + on_failure: always + +before_script: + - | + pip install 'travis-cargo<0.2' --user && + export PATH=$HOME/.local/bin:$PATH + +script: + - travis-cargo build + - travis-cargo test + - travis-cargo doc -- --no-deps + # TODO: Reenable later + #- rustdoc --test README.md -L target/debug + +after_success: + - curl https://mvdnes.github.io/rust-docs/travis-doc-upload.sh | bash + +env: + global: + # override the default `--features unstable` used by travis-cargo + # since unstable is activated by default + - TRAVIS_CARGO_NIGHTLY_FEATURE="" diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..5093ea9 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,128 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +# Unreleased + +### Added + +### Changed + +### Fixed + +# [0.9.5] - 2023-02-07 + +### Added + +- `FairMutex`, a new mutex implementation that reduces writer starvation. +- A MSRV policy: Rust 1.38 is currently required + +### Changed + +- The crate's CI now has full MIRI integration, further improving the confidence you can have in the implementation. + +### Fixed + +- Ensured that the crate's abstractions comply with stacked borrows rules. +- Unsoundness in the `RwLock` that could be triggered via a reader overflow +- Relaxed various `Send`/`Sync` bound requirements to make the crate more flexible + +# [0.9.4] - 2022-07-14 + +### Fixed + +- Fixed unsoundness in `RwLock` on reader overflow +- Relaxed `Send`/`Sync` bounds for `SpinMutex` and `TicketMutex` (doesn't affect `Mutex` itself) + +# [0.9.3] - 2022-04-17 + +### Added + +- Implemented `Default` for `Once` +- `Once::try_call_once` + +### Fixed + +- Fixed bug that caused `Once::call_once` to incorrectly fail + +# [0.9.2] - 2021-07-09 + +### Changed + +- Improved `Once` performance by reducing the memory footprint of internal state to one byte + +### Fixed + +- Improved performance of `Once` by relaxing ordering guarantees and removing redundant checks + +# [0.9.1] - 2021-06-21 + +### Added + +- Default type parameter on `Once` for better ergonomics + +# [0.9.0] - 2021-03-18 + +### Changed + +- Placed all major API features behind feature flags + +### Fixed + +- A compilation bug with the `lock_api` feature + +# [0.8.0] - 2021-03-15 + +### Added + +- `Once::get_unchecked` +- `RelaxStrategy` trait with type parameter on all locks to support switching between relax strategies + +### Changed + +- `lock_api1` feature is now named `lock_api` + +# [0.7.1] - 2021-01-12 + +### Fixed + +- Prevented `Once` leaking the inner value upon drop + +# [0.7.0] - 2020-10-18 + +### Added + +- `Once::initialized` +- `Once::get_mut` +- `Once::try_into_inner` +- `Once::poll` +- `RwLock`, `Mutex` and `Once` now implement `From` +- `Lazy` type for lazy initialization +- `TicketMutex`, an alternative mutex implementation +- `std` feature flag to enable thread yielding instead of spinning +- `Mutex::is_locked`/`SpinMutex::is_locked`/`TicketMutex::is_locked` +- `Barrier` + +### Changed + +- `Once::wait` now spins even if initialization has not yet started +- `Guard::leak` is now an associated function instead of a method +- Improved the performance of `SpinMutex` by relaxing unnecessarily conservative + ordering requirements + +# [0.6.0] - 2020-10-08 + +### Added + +- More dynamic `Send`/`Sync` bounds for lock guards +- `lock_api` compatibility +- `Guard::leak` methods +- `RwLock::reader_count` and `RwLock::writer_count` +- `Display` implementation for guard types + +### Changed + +- Made `Debug` impls of lock guards just show the inner type like `std` diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..33ea628 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,72 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +rust-version = "1.38" +name = "spin" +version = "0.9.5" +authors = [ + "Mathijs van de Nes ", + "John Ericson ", + "Joshua Barretto ", +] +description = "Spin-based synchronization primitives" +readme = "README.md" +keywords = [ + "spinlock", + "mutex", + "rwlock", +] +license = "MIT" +repository = "https://github.com/mvdnes/spin-rs.git" + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = [ + "--cfg", + "docsrs", +] + +[dependencies.lock_api_crate] +version = "0.4" +optional = true +package = "lock_api" + +[dependencies.portable-atomic] +version = "0.3" +optional = true +default-features = false + +[features] +barrier = ["mutex"] +default = [ + "lock_api", + "mutex", + "spin_mutex", + "rwlock", + "once", + "lazy", + "barrier", +] +fair_mutex = ["mutex"] +lazy = ["once"] +lock_api = ["lock_api_crate"] +mutex = [] +once = [] +portable_atomic = ["portable-atomic"] +rwlock = [] +spin_mutex = ["mutex"] +std = [] +ticket_mutex = ["mutex"] +use_ticket_mutex = [ + "mutex", + "ticket_mutex", +] diff --git a/Cargo.toml.orig b/Cargo.toml.orig new file mode 100644 index 0000000..cb9df1d --- /dev/null +++ b/Cargo.toml.orig @@ -0,0 +1,62 @@ +[package] +name = "spin" +version = "0.9.5" +authors = [ + "Mathijs van de Nes ", + "John Ericson ", + "Joshua Barretto ", +] +license = "MIT" +repository = "https://github.com/mvdnes/spin-rs.git" +keywords = ["spinlock", "mutex", "rwlock"] +description = "Spin-based synchronization primitives" +rust-version = "1.38" + +[dependencies] +lock_api_crate = { package = "lock_api", version = "0.4", optional = true } +portable-atomic = { version = "0.3", optional = true, default-features = false } + +[features] +default = ["lock_api", "mutex", "spin_mutex", "rwlock", "once", "lazy", "barrier"] + +# Enables `Mutex`. Must be used with either `spin_mutex` or `use_ticket_mutex`. +mutex = [] + +# Enables `SpinMutex` and the default spin mutex implementation for `Mutex`. +spin_mutex = ["mutex"] + +# Enables `TicketMutex`. +ticket_mutex = ["mutex"] + +# Enables `FairMutex`. +fair_mutex = ["mutex"] + +# Enables the non-default ticket mutex implementation for `Mutex`. +use_ticket_mutex = ["mutex", "ticket_mutex"] + +# Enables `RwLock`. +rwlock = [] + +# Enables `Once`. +once = [] + +# Enables `Lazy`. +lazy = ["once"] + +# Enables `Barrier`. Because this feature uses `mutex`, either `spin_mutex` or `use_ticket_mutex` must be enabled. +barrier = ["mutex"] + +# Enables `lock_api`-compatible types that use the primitives in this crate internally. +lock_api = ["lock_api_crate"] + +# Enables std-only features such as yield-relaxing. +std = [] + +# Use the portable_atomic crate to support platforms without native atomic operations +# cfg 'portable_atomic_unsafe_assume_single_core' must also be set by the final binary crate. +# This cfg is unsafe and enabling it for multicore systems is unsound. +portable_atomic = ["portable-atomic"] + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..b2d7f7b --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Mathijs van de Nes + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..4af6cf9 --- /dev/null +++ b/README.md @@ -0,0 +1,137 @@ +# spin-rs + +[![Crates.io version](https://img.shields.io/crates/v/spin.svg)](https://crates.io/crates/spin) +[![docs.rs](https://docs.rs/spin/badge.svg)](https://docs.rs/spin/) +[![Build Status](https://travis-ci.org/mvdnes/spin-rs.svg)](https://travis-ci.org/mvdnes/spin-rs) + +Spin-based synchronization primitives. + +This crate provides [spin-based](https://en.wikipedia.org/wiki/Spinlock) +versions of the primitives in `std::sync`. Because synchronization is done +through spinning, the primitives are suitable for use in `no_std` environments. + +Before deciding to use `spin`, we recommend reading +[this superb blog post](https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html) +by [@matklad](https://github.com/matklad/) that discusses the pros and cons of +spinlocks. If you have access to `std`, it's likely that the primitives in +`std::sync` will serve you better except in very specific circumstances. + +## Features + +- `Mutex`, `RwLock`, `Once`, `Lazy` and `Barrier` equivalents +- Support for `no_std` environments +- [`lock_api`](https://crates.io/crates/lock_api) compatibility +- Upgradeable `RwLock` guards +- Guards can be sent and shared between threads +- Guard leaking +- Ticket locks +- Different strategies for dealing with contention + +## Usage + +Include the following under the `[dependencies]` section in your `Cargo.toml` file. + +```toml +spin = "x.y" +``` + +## Example + +When calling `lock` on a `Mutex` you will get a guard value that provides access +to the data. When this guard is dropped, the mutex will become available again. + +```rust +extern crate spin; +use std::{sync::Arc, thread}; + +fn main() { + let counter = Arc::new(spin::Mutex::new(0)); + + let thread = thread::spawn({ + let counter = counter.clone(); + move || { + for _ in 0..100 { + *counter.lock() += 1; + } + } + }); + + for _ in 0..100 { + *counter.lock() += 1; + } + + thread.join().unwrap(); + + assert_eq!(*counter.lock(), 200); +} +``` + +## Feature flags + +The crate comes with a few feature flags that you may wish to use. + +- `mutex` enables the `Mutex` type. + +- `spin_mutex` enables the `SpinMutex` type. + +- `ticket_mutex` enables the `TicketMutex` type. + +- `use_ticket_mutex` switches to a ticket lock for the implementation of `Mutex`. This + is recommended only on targets for which ordinary spinning locks perform very badly + because it will change the implementation used by other crates that depend on `spin`. + +- `rwlock` enables the `RwLock` type. + +- `once` enables the `Once` type. + +- `lazy` enables the `Lazy` type. + +- `barrier` enables the `Barrier` type. + +- `lock_api` enables support for [`lock_api`](https://crates.io/crates/lock_api) + +- `std` enables support for thread yielding instead of spinning. + +- `portable_atomic` enables usage of the `portable-atomic` crate + to support platforms without native atomic operations (Cortex-M0, etc.). + The `portable_atomic_unsafe_assume_single_core` cfg flag + must also be set by the final binary crate. + This can be done by adapting the following snippet to the `.cargo/config` file: + ``` + [target.] + rustflags = [ "--cfg", "portable_atomic_unsafe_assume_single_core" ] + ``` + Note that this cfg is unsafe by nature, and enabling it for multicore systems is unsound. + +## Remarks + +It is often desirable to have a lock shared between threads. Wrapping the lock in an +`std::sync::Arc` is route through which this might be achieved. + +Locks provide zero-overhead access to their data when accessed through a mutable +reference by using their `get_mut` methods. + +The behaviour of these lock is similar to their namesakes in `std::sync`. they +differ on the following: + +- Locks will not be poisoned in case of failure. +- Threads will not yield to the OS scheduler when encounter a lock that cannot be + accessed. Instead, they will 'spin' in a busy loop until the lock becomes available. + +Many of the feature flags listed above are enabled by default. If you're writing a +library, we recommend disabling those that you don't use to avoid increasing compilation +time for your crate's users. You can do this like so: + +``` +[dependencies] +spin = { version = "x.y", default-features = false, features = [...] } +``` + +## Minimum Safe Rust Version (MSRV) + +This crate is guaranteed to compile on a Minimum Safe Rust Version (MSRV) of 1.38.0 and above. +This version will not be changed without a minor version bump. + +## License + +`spin` is distributed under the MIT License, (See `LICENSE`). diff --git a/examples/debug.rs b/examples/debug.rs new file mode 100644 index 0000000..64654f6 --- /dev/null +++ b/examples/debug.rs @@ -0,0 +1,21 @@ +extern crate spin; + +fn main() { + let mutex = spin::Mutex::new(42); + println!("{:?}", mutex); + { + let x = mutex.lock(); + println!("{:?}, {:?}", mutex, *x); + } + + let rwlock = spin::RwLock::new(42); + println!("{:?}", rwlock); + { + let x = rwlock.read(); + println!("{:?}, {:?}", rwlock, *x); + } + { + let x = rwlock.write(); + println!("{:?}, {:?}", rwlock, *x); + } +} diff --git a/script/doc-upload.cfg b/script/doc-upload.cfg new file mode 100644 index 0000000..c6dfbdc --- /dev/null +++ b/script/doc-upload.cfg @@ -0,0 +1,3 @@ +PROJECT_NAME=spin-rs +DOCS_REPO=mvdnes/rust-docs.git +DOC_RUST_VERSION=stable diff --git a/src/barrier.rs b/src/barrier.rs new file mode 100644 index 0000000..c3a1c92 --- /dev/null +++ b/src/barrier.rs @@ -0,0 +1,239 @@ +//! Synchronization primitive allowing multiple threads to synchronize the +//! beginning of some computation. +//! +//! Implementation adapted from the 'Barrier' type of the standard library. See: +//! +//! +//! Copyright 2014 The Rust Project Developers. See the COPYRIGHT +//! file at the top-level directory of this distribution and at +//! . +//! +//! Licensed under the Apache License, Version 2.0 > or the MIT license +//! >, at your +//! option. This file may not be copied, modified, or distributed +//! except according to those terms. + +use crate::{mutex::Mutex, RelaxStrategy, Spin}; + +/// A primitive that synchronizes the execution of multiple threads. +/// +/// # Example +/// +/// ``` +/// use spin; +/// use std::sync::Arc; +/// use std::thread; +/// +/// let mut handles = Vec::with_capacity(10); +/// let barrier = Arc::new(spin::Barrier::new(10)); +/// for _ in 0..10 { +/// let c = barrier.clone(); +/// // The same messages will be printed together. +/// // You will NOT see any interleaving. +/// handles.push(thread::spawn(move|| { +/// println!("before wait"); +/// c.wait(); +/// println!("after wait"); +/// })); +/// } +/// // Wait for other threads to finish. +/// for handle in handles { +/// handle.join().unwrap(); +/// } +/// ``` +pub struct Barrier { + lock: Mutex, + num_threads: usize, +} + +// The inner state of a double barrier +struct BarrierState { + count: usize, + generation_id: usize, +} + +/// A `BarrierWaitResult` is returned by [`wait`] when all threads in the [`Barrier`] +/// have rendezvoused. +/// +/// [`wait`]: struct.Barrier.html#method.wait +/// [`Barrier`]: struct.Barrier.html +/// +/// # Examples +/// +/// ``` +/// use spin; +/// +/// let barrier = spin::Barrier::new(1); +/// let barrier_wait_result = barrier.wait(); +/// ``` +pub struct BarrierWaitResult(bool); + +impl Barrier { + /// Blocks the current thread until all threads have rendezvoused here. + /// + /// Barriers are re-usable after all threads have rendezvoused once, and can + /// be used continuously. + /// + /// A single (arbitrary) thread will receive a [`BarrierWaitResult`] that + /// returns `true` from [`is_leader`] when returning from this function, and + /// all other threads will receive a result that will return `false` from + /// [`is_leader`]. + /// + /// [`BarrierWaitResult`]: struct.BarrierWaitResult.html + /// [`is_leader`]: struct.BarrierWaitResult.html#method.is_leader + /// + /// # Examples + /// + /// ``` + /// use spin; + /// use std::sync::Arc; + /// use std::thread; + /// + /// let mut handles = Vec::with_capacity(10); + /// let barrier = Arc::new(spin::Barrier::new(10)); + /// for _ in 0..10 { + /// let c = barrier.clone(); + /// // The same messages will be printed together. + /// // You will NOT see any interleaving. + /// handles.push(thread::spawn(move|| { + /// println!("before wait"); + /// c.wait(); + /// println!("after wait"); + /// })); + /// } + /// // Wait for other threads to finish. + /// for handle in handles { + /// handle.join().unwrap(); + /// } + /// ``` + pub fn wait(&self) -> BarrierWaitResult { + let mut lock = self.lock.lock(); + lock.count += 1; + + if lock.count < self.num_threads { + // not the leader + let local_gen = lock.generation_id; + + while local_gen == lock.generation_id && lock.count < self.num_threads { + drop(lock); + R::relax(); + lock = self.lock.lock(); + } + BarrierWaitResult(false) + } else { + // this thread is the leader, + // and is responsible for incrementing the generation + lock.count = 0; + lock.generation_id = lock.generation_id.wrapping_add(1); + BarrierWaitResult(true) + } + } +} + +impl Barrier { + /// Creates a new barrier that can block a given number of threads. + /// + /// A barrier will block `n`-1 threads which call [`wait`] and then wake up + /// all threads at once when the `n`th thread calls [`wait`]. A Barrier created + /// with n = 0 will behave identically to one created with n = 1. + /// + /// [`wait`]: #method.wait + /// + /// # Examples + /// + /// ``` + /// use spin; + /// + /// let barrier = spin::Barrier::new(10); + /// ``` + pub const fn new(n: usize) -> Self { + Self { + lock: Mutex::new(BarrierState { + count: 0, + generation_id: 0, + }), + num_threads: n, + } + } +} + +impl BarrierWaitResult { + /// Returns whether this thread from [`wait`] is the "leader thread". + /// + /// Only one thread will have `true` returned from their result, all other + /// threads will have `false` returned. + /// + /// [`wait`]: struct.Barrier.html#method.wait + /// + /// # Examples + /// + /// ``` + /// use spin; + /// + /// let barrier = spin::Barrier::new(1); + /// let barrier_wait_result = barrier.wait(); + /// println!("{:?}", barrier_wait_result.is_leader()); + /// ``` + pub fn is_leader(&self) -> bool { + self.0 + } +} + +#[cfg(test)] +mod tests { + use std::prelude::v1::*; + + use std::sync::mpsc::{channel, TryRecvError}; + use std::sync::Arc; + use std::thread; + + type Barrier = super::Barrier; + + fn use_barrier(n: usize, barrier: Arc) { + let (tx, rx) = channel(); + + let mut ts = Vec::new(); + for _ in 0..n - 1 { + let c = barrier.clone(); + let tx = tx.clone(); + ts.push(thread::spawn(move || { + tx.send(c.wait().is_leader()).unwrap(); + })); + } + + // At this point, all spawned threads should be blocked, + // so we shouldn't get anything from the port + assert!(match rx.try_recv() { + Err(TryRecvError::Empty) => true, + _ => false, + }); + + let mut leader_found = barrier.wait().is_leader(); + + // Now, the barrier is cleared and we should get data. + for _ in 0..n - 1 { + if rx.recv().unwrap() { + assert!(!leader_found); + leader_found = true; + } + } + assert!(leader_found); + + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn test_barrier() { + const N: usize = 10; + + let barrier = Arc::new(Barrier::new(N)); + + use_barrier(N, barrier.clone()); + + // use barrier twice to ensure it is reusable + use_barrier(N, barrier.clone()); + } +} diff --git a/src/lazy.rs b/src/lazy.rs new file mode 100644 index 0000000..6e5efe4 --- /dev/null +++ b/src/lazy.rs @@ -0,0 +1,118 @@ +//! Synchronization primitives for lazy evaluation. +//! +//! Implementation adapted from the `SyncLazy` type of the standard library. See: +//! + +use crate::{once::Once, RelaxStrategy, Spin}; +use core::{cell::Cell, fmt, ops::Deref}; + +/// A value which is initialized on the first access. +/// +/// This type is a thread-safe `Lazy`, and can be used in statics. +/// +/// # Examples +/// +/// ``` +/// use std::collections::HashMap; +/// use spin::Lazy; +/// +/// static HASHMAP: Lazy> = Lazy::new(|| { +/// println!("initializing"); +/// let mut m = HashMap::new(); +/// m.insert(13, "Spica".to_string()); +/// m.insert(74, "Hoyten".to_string()); +/// m +/// }); +/// +/// fn main() { +/// println!("ready"); +/// std::thread::spawn(|| { +/// println!("{:?}", HASHMAP.get(&13)); +/// }).join().unwrap(); +/// println!("{:?}", HASHMAP.get(&74)); +/// +/// // Prints: +/// // ready +/// // initializing +/// // Some("Spica") +/// // Some("Hoyten") +/// } +/// ``` +pub struct Lazy T, R = Spin> { + cell: Once, + init: Cell>, +} + +impl fmt::Debug for Lazy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Lazy") + .field("cell", &self.cell) + .field("init", &"..") + .finish() + } +} + +// We never create a `&F` from a `&Lazy` so it is fine +// to not impl `Sync` for `F` +// we do create a `&mut Option` in `force`, but this is +// properly synchronized, so it only happens once +// so it also does not contribute to this impl. +unsafe impl Sync for Lazy where Once: Sync {} +// auto-derived `Send` impl is OK. + +impl Lazy { + /// Creates a new lazy value with the given initializing + /// function. + pub const fn new(f: F) -> Self { + Self { + cell: Once::new(), + init: Cell::new(Some(f)), + } + } + /// Retrieves a mutable pointer to the inner data. + /// + /// This is especially useful when interfacing with low level code or FFI where the caller + /// explicitly knows that it has exclusive access to the inner data. Note that reading from + /// this pointer is UB until initialized or directly written to. + pub fn as_mut_ptr(&self) -> *mut T { + self.cell.as_mut_ptr() + } +} + +impl T, R: RelaxStrategy> Lazy { + /// Forces the evaluation of this lazy value and + /// returns a reference to result. This is equivalent + /// to the `Deref` impl, but is explicit. + /// + /// # Examples + /// + /// ``` + /// use spin::Lazy; + /// + /// let lazy = Lazy::new(|| 92); + /// + /// assert_eq!(Lazy::force(&lazy), &92); + /// assert_eq!(&*lazy, &92); + /// ``` + pub fn force(this: &Self) -> &T { + this.cell.call_once(|| match this.init.take() { + Some(f) => f(), + None => panic!("Lazy instance has previously been poisoned"), + }) + } +} + +impl T, R: RelaxStrategy> Deref for Lazy { + type Target = T; + + fn deref(&self) -> &T { + Self::force(self) + } +} + +impl Default for Lazy T, R> { + /// Creates a new lazy value using `Default` as the initializing function. + fn default() -> Self { + Self::new(T::default) + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..50768bc --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,221 @@ +#![cfg_attr(all(not(feature = "std"), not(test)), no_std)] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![deny(missing_docs)] + +//! This crate provides [spin-based](https://en.wikipedia.org/wiki/Spinlock) versions of the +//! primitives in `std::sync` and `std::lazy`. Because synchronization is done through spinning, +//! the primitives are suitable for use in `no_std` environments. +//! +//! # Features +//! +//! - `Mutex`, `RwLock`, `Once`/`SyncOnceCell`, and `SyncLazy` equivalents +//! +//! - Support for `no_std` environments +//! +//! - [`lock_api`](https://crates.io/crates/lock_api) compatibility +//! +//! - Upgradeable `RwLock` guards +//! +//! - Guards can be sent and shared between threads +//! +//! - Guard leaking +//! +//! - Ticket locks +//! +//! - Different strategies for dealing with contention +//! +//! # Relationship with `std::sync` +//! +//! While `spin` is not a drop-in replacement for `std::sync` (and +//! [should not be considered as such](https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html)) +//! an effort is made to keep this crate reasonably consistent with `std::sync`. +//! +//! Many of the types defined in this crate have 'additional capabilities' when compared to `std::sync`: +//! +//! - Because spinning does not depend on the thread-driven model of `std::sync`, guards ([`MutexGuard`], +//! [`RwLockReadGuard`], [`RwLockWriteGuard`], etc.) may be sent and shared between threads. +//! +//! - [`RwLockUpgradableGuard`] supports being upgraded into a [`RwLockWriteGuard`]. +//! +//! - Guards support [leaking](https://doc.rust-lang.org/nomicon/leaking.html). +//! +//! - [`Once`] owns the value returned by its `call_once` initializer. +//! +//! - [`RwLock`] supports counting readers and writers. +//! +//! Conversely, the types in this crate do not have some of the features `std::sync` has: +//! +//! - Locks do not track [panic poisoning](https://doc.rust-lang.org/nomicon/poisoning.html). +//! +//! ## Feature flags +//! +//! The crate comes with a few feature flags that you may wish to use. +//! +//! - `lock_api` enables support for [`lock_api`](https://crates.io/crates/lock_api) +//! +//! - `ticket_mutex` uses a ticket lock for the implementation of `Mutex` +//! +//! - `fair_mutex` enables a fairer implementation of `Mutex` that uses eventual fairness to avoid +//! starvation +//! +//! - `std` enables support for thread yielding instead of spinning + +#[cfg(any(test, feature = "std"))] +extern crate core; + +#[cfg(feature = "portable_atomic")] +extern crate portable_atomic; + +#[cfg(not(feature = "portable_atomic"))] +use core::sync::atomic; +#[cfg(feature = "portable_atomic")] +use portable_atomic as atomic; + +#[cfg(feature = "barrier")] +#[cfg_attr(docsrs, doc(cfg(feature = "barrier")))] +pub mod barrier; +#[cfg(feature = "lazy")] +#[cfg_attr(docsrs, doc(cfg(feature = "lazy")))] +pub mod lazy; +#[cfg(feature = "mutex")] +#[cfg_attr(docsrs, doc(cfg(feature = "mutex")))] +pub mod mutex; +#[cfg(feature = "once")] +#[cfg_attr(docsrs, doc(cfg(feature = "once")))] +pub mod once; +pub mod relax; +#[cfg(feature = "rwlock")] +#[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))] +pub mod rwlock; + +#[cfg(feature = "mutex")] +#[cfg_attr(docsrs, doc(cfg(feature = "mutex")))] +pub use mutex::MutexGuard; +#[cfg(feature = "std")] +#[cfg_attr(docsrs, doc(cfg(feature = "std")))] +pub use relax::Yield; +pub use relax::{RelaxStrategy, Spin}; +#[cfg(feature = "rwlock")] +#[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))] +pub use rwlock::RwLockReadGuard; + +// Avoid confusing inference errors by aliasing away the relax strategy parameter. Users that need to use a different +// relax strategy can do so by accessing the types through their fully-qualified path. This is a little bit horrible +// but sadly adding a default type parameter is *still* a breaking change in Rust (for understandable reasons). + +/// A primitive that synchronizes the execution of multiple threads. See [`barrier::Barrier`] for documentation. +/// +/// A note for advanced users: this alias exists to avoid subtle type inference errors due to the default relax +/// strategy type parameter. If you need a non-default relax strategy, use the fully-qualified path. +#[cfg(feature = "barrier")] +#[cfg_attr(docsrs, doc(cfg(feature = "barrier")))] +pub type Barrier = crate::barrier::Barrier; + +/// A value which is initialized on the first access. See [`lazy::Lazy`] for documentation. +/// +/// A note for advanced users: this alias exists to avoid subtle type inference errors due to the default relax +/// strategy type parameter. If you need a non-default relax strategy, use the fully-qualified path. +#[cfg(feature = "lazy")] +#[cfg_attr(docsrs, doc(cfg(feature = "lazy")))] +pub type Lazy T> = crate::lazy::Lazy; + +/// A primitive that synchronizes the execution of multiple threads. See [`mutex::Mutex`] for documentation. +/// +/// A note for advanced users: this alias exists to avoid subtle type inference errors due to the default relax +/// strategy type parameter. If you need a non-default relax strategy, use the fully-qualified path. +#[cfg(feature = "mutex")] +#[cfg_attr(docsrs, doc(cfg(feature = "mutex")))] +pub type Mutex = crate::mutex::Mutex; + +/// A primitive that provides lazy one-time initialization. See [`once::Once`] for documentation. +/// +/// A note for advanced users: this alias exists to avoid subtle type inference errors due to the default relax +/// strategy type parameter. If you need a non-default relax strategy, use the fully-qualified path. +#[cfg(feature = "once")] +#[cfg_attr(docsrs, doc(cfg(feature = "once")))] +pub type Once = crate::once::Once; + +/// A lock that provides data access to either one writer or many readers. See [`rwlock::RwLock`] for documentation. +/// +/// A note for advanced users: this alias exists to avoid subtle type inference errors due to the default relax +/// strategy type parameter. If you need a non-default relax strategy, use the fully-qualified path. +#[cfg(feature = "rwlock")] +#[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))] +pub type RwLock = crate::rwlock::RwLock; + +/// A guard that provides immutable data access but can be upgraded to [`RwLockWriteGuard`]. See +/// [`rwlock::RwLockUpgradableGuard`] for documentation. +/// +/// A note for advanced users: this alias exists to avoid subtle type inference errors due to the default relax +/// strategy type parameter. If you need a non-default relax strategy, use the fully-qualified path. +#[cfg(feature = "rwlock")] +#[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))] +pub type RwLockUpgradableGuard<'a, T> = crate::rwlock::RwLockUpgradableGuard<'a, T>; + +/// A guard that provides mutable data access. See [`rwlock::RwLockWriteGuard`] for documentation. +/// +/// A note for advanced users: this alias exists to avoid subtle type inference errors due to the default relax +/// strategy type parameter. If you need a non-default relax strategy, use the fully-qualified path. +#[cfg(feature = "rwlock")] +#[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))] +pub type RwLockWriteGuard<'a, T> = crate::rwlock::RwLockWriteGuard<'a, T>; + +/// Spin synchronisation primitives, but compatible with [`lock_api`](https://crates.io/crates/lock_api). +#[cfg(feature = "lock_api")] +#[cfg_attr(docsrs, doc(cfg(feature = "lock_api")))] +pub mod lock_api { + /// A lock that provides mutually exclusive data access (compatible with [`lock_api`](https://crates.io/crates/lock_api)). + #[cfg(feature = "mutex")] + #[cfg_attr(docsrs, doc(cfg(feature = "mutex")))] + pub type Mutex = lock_api_crate::Mutex, T>; + + /// A guard that provides mutable data access (compatible with [`lock_api`](https://crates.io/crates/lock_api)). + #[cfg(feature = "mutex")] + #[cfg_attr(docsrs, doc(cfg(feature = "mutex")))] + pub type MutexGuard<'a, T> = lock_api_crate::MutexGuard<'a, crate::Mutex<()>, T>; + + /// A lock that provides data access to either one writer or many readers (compatible with [`lock_api`](https://crates.io/crates/lock_api)). + #[cfg(feature = "rwlock")] + #[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))] + pub type RwLock = lock_api_crate::RwLock, T>; + + /// A guard that provides immutable data access (compatible with [`lock_api`](https://crates.io/crates/lock_api)). + #[cfg(feature = "rwlock")] + #[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))] + pub type RwLockReadGuard<'a, T> = lock_api_crate::RwLockReadGuard<'a, crate::RwLock<()>, T>; + + /// A guard that provides mutable data access (compatible with [`lock_api`](https://crates.io/crates/lock_api)). + #[cfg(feature = "rwlock")] + #[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))] + pub type RwLockWriteGuard<'a, T> = lock_api_crate::RwLockWriteGuard<'a, crate::RwLock<()>, T>; + + /// A guard that provides immutable data access but can be upgraded to [`RwLockWriteGuard`] (compatible with [`lock_api`](https://crates.io/crates/lock_api)). + #[cfg(feature = "rwlock")] + #[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))] + pub type RwLockUpgradableReadGuard<'a, T> = + lock_api_crate::RwLockUpgradableReadGuard<'a, crate::RwLock<()>, T>; +} + +/// In the event of an invalid operation, it's best to abort the current process. +#[cfg(feature = "fair_mutex")] +fn abort() -> ! { + #[cfg(not(feature = "std"))] + { + // Panicking while panicking is defined by Rust to result in an abort. + struct Panic; + + impl Drop for Panic { + fn drop(&mut self) { + panic!("aborting due to invalid operation"); + } + } + + let _panic = Panic; + panic!("aborting due to invalid operation"); + } + + #[cfg(feature = "std")] + { + std::process::abort(); + } +} diff --git a/src/mutex.rs b/src/mutex.rs new file mode 100644 index 0000000..e333d8a --- /dev/null +++ b/src/mutex.rs @@ -0,0 +1,340 @@ +//! Locks that have the same behaviour as a mutex. +//! +//! The [`Mutex`] in the root of the crate, can be configured using the `ticket_mutex` feature. +//! If it's enabled, [`TicketMutex`] and [`TicketMutexGuard`] will be re-exported as [`Mutex`] +//! and [`MutexGuard`], otherwise the [`SpinMutex`] and guard will be re-exported. +//! +//! `ticket_mutex` is disabled by default. +//! +//! [`Mutex`]: ../struct.Mutex.html +//! [`MutexGuard`]: ../struct.MutexGuard.html +//! [`TicketMutex`]: ./struct.TicketMutex.html +//! [`TicketMutexGuard`]: ./struct.TicketMutexGuard.html +//! [`SpinMutex`]: ./struct.SpinMutex.html +//! [`SpinMutexGuard`]: ./struct.SpinMutexGuard.html + +#[cfg(feature = "spin_mutex")] +#[cfg_attr(docsrs, doc(cfg(feature = "spin_mutex")))] +pub mod spin; +#[cfg(feature = "spin_mutex")] +#[cfg_attr(docsrs, doc(cfg(feature = "spin_mutex")))] +pub use self::spin::{SpinMutex, SpinMutexGuard}; + +#[cfg(feature = "ticket_mutex")] +#[cfg_attr(docsrs, doc(cfg(feature = "ticket_mutex")))] +pub mod ticket; +#[cfg(feature = "ticket_mutex")] +#[cfg_attr(docsrs, doc(cfg(feature = "ticket_mutex")))] +pub use self::ticket::{TicketMutex, TicketMutexGuard}; + +#[cfg(feature = "fair_mutex")] +#[cfg_attr(docsrs, doc(cfg(feature = "fair_mutex")))] +pub mod fair; +#[cfg(feature = "fair_mutex")] +#[cfg_attr(docsrs, doc(cfg(feature = "fair_mutex")))] +pub use self::fair::{FairMutex, FairMutexGuard, Starvation}; + +use crate::{RelaxStrategy, Spin}; +use core::{ + fmt, + ops::{Deref, DerefMut}, +}; + +#[cfg(all(not(feature = "spin_mutex"), not(feature = "use_ticket_mutex")))] +compile_error!("The `mutex` feature flag was used (perhaps through another feature?) without either `spin_mutex` or `use_ticket_mutex`. One of these is required."); + +#[cfg(all(not(feature = "use_ticket_mutex"), feature = "spin_mutex"))] +type InnerMutex = self::spin::SpinMutex; +#[cfg(all(not(feature = "use_ticket_mutex"), feature = "spin_mutex"))] +type InnerMutexGuard<'a, T> = self::spin::SpinMutexGuard<'a, T>; + +#[cfg(feature = "use_ticket_mutex")] +type InnerMutex = self::ticket::TicketMutex; +#[cfg(feature = "use_ticket_mutex")] +type InnerMutexGuard<'a, T> = self::ticket::TicketMutexGuard<'a, T>; + +/// A spin-based lock providing mutually exclusive access to data. +/// +/// The implementation uses either a ticket mutex or a regular spin mutex depending on whether the `spin_mutex` or +/// `ticket_mutex` feature flag is enabled. +/// +/// # Example +/// +/// ``` +/// use spin; +/// +/// let lock = spin::Mutex::new(0); +/// +/// // Modify the data +/// *lock.lock() = 2; +/// +/// // Read the data +/// let answer = *lock.lock(); +/// assert_eq!(answer, 2); +/// ``` +/// +/// # Thread safety example +/// +/// ``` +/// use spin; +/// use std::sync::{Arc, Barrier}; +/// +/// let thread_count = 1000; +/// let spin_mutex = Arc::new(spin::Mutex::new(0)); +/// +/// // We use a barrier to ensure the readout happens after all writing +/// let barrier = Arc::new(Barrier::new(thread_count + 1)); +/// +/// # let mut ts = Vec::new(); +/// for _ in (0..thread_count) { +/// let my_barrier = barrier.clone(); +/// let my_lock = spin_mutex.clone(); +/// # let t = +/// std::thread::spawn(move || { +/// let mut guard = my_lock.lock(); +/// *guard += 1; +/// +/// // Release the lock to prevent a deadlock +/// drop(guard); +/// my_barrier.wait(); +/// }); +/// # ts.push(t); +/// } +/// +/// barrier.wait(); +/// +/// let answer = { *spin_mutex.lock() }; +/// assert_eq!(answer, thread_count); +/// +/// # for t in ts { +/// # t.join().unwrap(); +/// # } +/// ``` +pub struct Mutex { + inner: InnerMutex, +} + +unsafe impl Sync for Mutex {} +unsafe impl Send for Mutex {} + +/// A generic guard that will protect some data access and +/// uses either a ticket lock or a normal spin mutex. +/// +/// For more info see [`TicketMutexGuard`] or [`SpinMutexGuard`]. +/// +/// [`TicketMutexGuard`]: ./struct.TicketMutexGuard.html +/// [`SpinMutexGuard`]: ./struct.SpinMutexGuard.html +pub struct MutexGuard<'a, T: 'a + ?Sized> { + inner: InnerMutexGuard<'a, T>, +} + +impl Mutex { + /// Creates a new [`Mutex`] wrapping the supplied data. + /// + /// # Example + /// + /// ``` + /// use spin::Mutex; + /// + /// static MUTEX: Mutex<()> = Mutex::new(()); + /// + /// fn demo() { + /// let lock = MUTEX.lock(); + /// // do something with lock + /// drop(lock); + /// } + /// ``` + #[inline(always)] + pub const fn new(value: T) -> Self { + Self { + inner: InnerMutex::new(value), + } + } + + /// Consumes this [`Mutex`] and unwraps the underlying data. + /// + /// # Example + /// + /// ``` + /// let lock = spin::Mutex::new(42); + /// assert_eq!(42, lock.into_inner()); + /// ``` + #[inline(always)] + pub fn into_inner(self) -> T { + self.inner.into_inner() + } +} + +impl Mutex { + /// Locks the [`Mutex`] and returns a guard that permits access to the inner data. + /// + /// The returned value may be dereferenced for data access + /// and the lock will be dropped when the guard falls out of scope. + /// + /// ``` + /// let lock = spin::Mutex::new(0); + /// { + /// let mut data = lock.lock(); + /// // The lock is now locked and the data can be accessed + /// *data += 1; + /// // The lock is implicitly dropped at the end of the scope + /// } + /// ``` + #[inline(always)] + pub fn lock(&self) -> MutexGuard { + MutexGuard { + inner: self.inner.lock(), + } + } +} + +impl Mutex { + /// Returns `true` if the lock is currently held. + /// + /// # Safety + /// + /// This function provides no synchronization guarantees and so its result should be considered 'out of date' + /// the instant it is called. Do not use it for synchronization purposes. However, it may be useful as a heuristic. + #[inline(always)] + pub fn is_locked(&self) -> bool { + self.inner.is_locked() + } + + /// Force unlock this [`Mutex`]. + /// + /// # Safety + /// + /// This is *extremely* unsafe if the lock is not held by the current + /// thread. However, this can be useful in some instances for exposing the + /// lock to FFI that doesn't know how to deal with RAII. + #[inline(always)] + pub unsafe fn force_unlock(&self) { + self.inner.force_unlock() + } + + /// Try to lock this [`Mutex`], returning a lock guard if successful. + /// + /// # Example + /// + /// ``` + /// let lock = spin::Mutex::new(42); + /// + /// let maybe_guard = lock.try_lock(); + /// assert!(maybe_guard.is_some()); + /// + /// // `maybe_guard` is still held, so the second call fails + /// let maybe_guard2 = lock.try_lock(); + /// assert!(maybe_guard2.is_none()); + /// ``` + #[inline(always)] + pub fn try_lock(&self) -> Option> { + self.inner + .try_lock() + .map(|guard| MutexGuard { inner: guard }) + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the [`Mutex`] mutably, and a mutable reference is guaranteed to be exclusive in Rust, + /// no actual locking needs to take place -- the mutable borrow statically guarantees no locks exist. As such, + /// this is a 'zero-cost' operation. + /// + /// # Example + /// + /// ``` + /// let mut lock = spin::Mutex::new(0); + /// *lock.get_mut() = 10; + /// assert_eq!(*lock.lock(), 10); + /// ``` + #[inline(always)] + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut() + } +} + +impl fmt::Debug for Mutex { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.inner, f) + } +} + +impl Default for Mutex { + fn default() -> Self { + Self::new(Default::default()) + } +} + +impl From for Mutex { + fn from(data: T) -> Self { + Self::new(data) + } +} + +impl<'a, T: ?Sized> MutexGuard<'a, T> { + /// Leak the lock guard, yielding a mutable reference to the underlying data. + /// + /// Note that this function will permanently lock the original [`Mutex`]. + /// + /// ``` + /// let mylock = spin::Mutex::new(0); + /// + /// let data: &mut i32 = spin::MutexGuard::leak(mylock.lock()); + /// + /// *data = 1; + /// assert_eq!(*data, 1); + /// ``` + #[inline(always)] + pub fn leak(this: Self) -> &'a mut T { + InnerMutexGuard::leak(this.inner) + } +} + +impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized> Deref for MutexGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + &*self.inner + } +} + +impl<'a, T: ?Sized> DerefMut for MutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + &mut *self.inner + } +} + +#[cfg(feature = "lock_api")] +unsafe impl lock_api_crate::RawMutex for Mutex<(), R> { + type GuardMarker = lock_api_crate::GuardSend; + + const INIT: Self = Self::new(()); + + fn lock(&self) { + // Prevent guard destructor running + core::mem::forget(Self::lock(self)); + } + + fn try_lock(&self) -> bool { + // Prevent guard destructor running + Self::try_lock(self).map(core::mem::forget).is_some() + } + + unsafe fn unlock(&self) { + self.force_unlock(); + } + + fn is_locked(&self) -> bool { + self.inner.is_locked() + } +} diff --git a/src/mutex/fair.rs b/src/mutex/fair.rs new file mode 100644 index 0000000..dde3994 --- /dev/null +++ b/src/mutex/fair.rs @@ -0,0 +1,732 @@ +//! A spinning mutex with a fairer unlock algorithm. +//! +//! This mutex is similar to the `SpinMutex` in that it uses spinning to avoid +//! context switches. However, it uses a fairer unlock algorithm that avoids +//! starvation of threads that are waiting for the lock. + +use crate::{ + atomic::{AtomicUsize, Ordering}, + RelaxStrategy, Spin, +}; +use core::{ + cell::UnsafeCell, + fmt, + marker::PhantomData, + mem::ManuallyDrop, + ops::{Deref, DerefMut}, +}; + +// The lowest bit of `lock` is used to indicate whether the mutex is locked or not. The rest of the bits are used to +// store the number of starving threads. +const LOCKED: usize = 1; +const STARVED: usize = 2; + +/// Number chosen by fair roll of the dice, adjust as needed. +const STARVATION_SPINS: usize = 1024; + +/// A [spin lock](https://en.m.wikipedia.org/wiki/Spinlock) providing mutually exclusive access to data, but with a fairer +/// algorithm. +/// +/// # Example +/// +/// ``` +/// use spin; +/// +/// let lock = spin::mutex::FairMutex::<_>::new(0); +/// +/// // Modify the data +/// *lock.lock() = 2; +/// +/// // Read the data +/// let answer = *lock.lock(); +/// assert_eq!(answer, 2); +/// ``` +/// +/// # Thread safety example +/// +/// ``` +/// use spin; +/// use std::sync::{Arc, Barrier}; +/// +/// let thread_count = 1000; +/// let spin_mutex = Arc::new(spin::mutex::FairMutex::<_>::new(0)); +/// +/// // We use a barrier to ensure the readout happens after all writing +/// let barrier = Arc::new(Barrier::new(thread_count + 1)); +/// +/// for _ in (0..thread_count) { +/// let my_barrier = barrier.clone(); +/// let my_lock = spin_mutex.clone(); +/// std::thread::spawn(move || { +/// let mut guard = my_lock.lock(); +/// *guard += 1; +/// +/// // Release the lock to prevent a deadlock +/// drop(guard); +/// my_barrier.wait(); +/// }); +/// } +/// +/// barrier.wait(); +/// +/// let answer = { *spin_mutex.lock() }; +/// assert_eq!(answer, thread_count); +/// ``` +pub struct FairMutex { + phantom: PhantomData, + pub(crate) lock: AtomicUsize, + data: UnsafeCell, +} + +/// A guard that provides mutable data access. +/// +/// When the guard falls out of scope it will release the lock. +pub struct FairMutexGuard<'a, T: ?Sized + 'a> { + lock: &'a AtomicUsize, + data: *mut T, +} + +/// A handle that indicates that we have been trying to acquire the lock for a while. +/// +/// This handle is used to prevent starvation. +pub struct Starvation<'a, T: ?Sized + 'a, R> { + lock: &'a FairMutex, +} + +/// Indicates whether a lock was rejected due to the lock being held by another thread or due to starvation. +#[derive(Debug)] +pub enum LockRejectReason { + /// The lock was rejected due to the lock being held by another thread. + Locked, + + /// The lock was rejected due to starvation. + Starved, +} + +// Same unsafe impls as `std::sync::Mutex` +unsafe impl Sync for FairMutex {} +unsafe impl Send for FairMutex {} + +impl FairMutex { + /// Creates a new [`FairMutex`] wrapping the supplied data. + /// + /// # Example + /// + /// ``` + /// use spin::mutex::FairMutex; + /// + /// static MUTEX: FairMutex<()> = FairMutex::<_>::new(()); + /// + /// fn demo() { + /// let lock = MUTEX.lock(); + /// // do something with lock + /// drop(lock); + /// } + /// ``` + #[inline(always)] + pub const fn new(data: T) -> Self { + FairMutex { + lock: AtomicUsize::new(0), + data: UnsafeCell::new(data), + phantom: PhantomData, + } + } + + /// Consumes this [`FairMutex`] and unwraps the underlying data. + /// + /// # Example + /// + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(42); + /// assert_eq!(42, lock.into_inner()); + /// ``` + #[inline(always)] + pub fn into_inner(self) -> T { + // We know statically that there are no outstanding references to + // `self` so there's no need to lock. + let FairMutex { data, .. } = self; + data.into_inner() + } + + /// Returns a mutable pointer to the underlying data. + /// + /// This is mostly meant to be used for applications which require manual unlocking, but where + /// storing both the lock and the pointer to the inner data gets inefficient. + /// + /// # Example + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(42); + /// + /// unsafe { + /// core::mem::forget(lock.lock()); + /// + /// assert_eq!(lock.as_mut_ptr().read(), 42); + /// lock.as_mut_ptr().write(58); + /// + /// lock.force_unlock(); + /// } + /// + /// assert_eq!(*lock.lock(), 58); + /// + /// ``` + #[inline(always)] + pub fn as_mut_ptr(&self) -> *mut T { + self.data.get() + } +} + +impl FairMutex { + /// Locks the [`FairMutex`] and returns a guard that permits access to the inner data. + /// + /// The returned value may be dereferenced for data access + /// and the lock will be dropped when the guard falls out of scope. + /// + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(0); + /// { + /// let mut data = lock.lock(); + /// // The lock is now locked and the data can be accessed + /// *data += 1; + /// // The lock is implicitly dropped at the end of the scope + /// } + /// ``` + #[inline(always)] + pub fn lock(&self) -> FairMutexGuard { + // Can fail to lock even if the spinlock is not locked. May be more efficient than `try_lock` + // when called in a loop. + let mut spins = 0; + while self + .lock + .compare_exchange_weak(0, 1, Ordering::Acquire, Ordering::Relaxed) + .is_err() + { + // Wait until the lock looks unlocked before retrying + while self.is_locked() { + R::relax(); + + // If we've been spinning for a while, switch to a fairer strategy that will prevent + // newer users from stealing our lock from us. + if spins > STARVATION_SPINS { + return self.starve().lock(); + } + spins += 1; + } + } + + FairMutexGuard { + lock: &self.lock, + data: unsafe { &mut *self.data.get() }, + } + } +} + +impl FairMutex { + /// Returns `true` if the lock is currently held. + /// + /// # Safety + /// + /// This function provides no synchronization guarantees and so its result should be considered 'out of date' + /// the instant it is called. Do not use it for synchronization purposes. However, it may be useful as a heuristic. + #[inline(always)] + pub fn is_locked(&self) -> bool { + self.lock.load(Ordering::Relaxed) & LOCKED != 0 + } + + /// Force unlock this [`FairMutex`]. + /// + /// # Safety + /// + /// This is *extremely* unsafe if the lock is not held by the current + /// thread. However, this can be useful in some instances for exposing the + /// lock to FFI that doesn't know how to deal with RAII. + #[inline(always)] + pub unsafe fn force_unlock(&self) { + self.lock.fetch_and(!LOCKED, Ordering::Release); + } + + /// Try to lock this [`FairMutex`], returning a lock guard if successful. + /// + /// # Example + /// + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(42); + /// + /// let maybe_guard = lock.try_lock(); + /// assert!(maybe_guard.is_some()); + /// + /// // `maybe_guard` is still held, so the second call fails + /// let maybe_guard2 = lock.try_lock(); + /// assert!(maybe_guard2.is_none()); + /// ``` + #[inline(always)] + pub fn try_lock(&self) -> Option> { + self.try_lock_starver().ok() + } + + /// Tries to lock this [`FairMutex`] and returns a result that indicates whether the lock was + /// rejected due to a starver or not. + #[inline(always)] + pub fn try_lock_starver(&self) -> Result, LockRejectReason> { + match self + .lock + .compare_exchange(0, LOCKED, Ordering::Acquire, Ordering::Relaxed) + .unwrap_or_else(|x| x) + { + 0 => Ok(FairMutexGuard { + lock: &self.lock, + data: unsafe { &mut *self.data.get() }, + }), + LOCKED => Err(LockRejectReason::Locked), + _ => Err(LockRejectReason::Starved), + } + } + + /// Indicates that the current user has been waiting for the lock for a while + /// and that the lock should yield to this thread over a newly arriving thread. + /// + /// # Example + /// + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(42); + /// + /// // Lock the mutex to simulate it being used by another user. + /// let guard1 = lock.lock(); + /// + /// // Try to lock the mutex. + /// let guard2 = lock.try_lock(); + /// assert!(guard2.is_none()); + /// + /// // Wait for a while. + /// wait_for_a_while(); + /// + /// // We are now starved, indicate as such. + /// let starve = lock.starve(); + /// + /// // Once the lock is released, another user trying to lock it will + /// // fail. + /// drop(guard1); + /// let guard3 = lock.try_lock(); + /// assert!(guard3.is_none()); + /// + /// // However, we will be able to lock it. + /// let guard4 = starve.try_lock(); + /// assert!(guard4.is_ok()); + /// + /// # fn wait_for_a_while() {} + /// ``` + pub fn starve(&self) -> Starvation<'_, T, R> { + // Add a new starver to the state. + if self.lock.fetch_add(STARVED, Ordering::Relaxed) > (core::isize::MAX - 1) as usize { + // In the event of a potential lock overflow, abort. + crate::abort(); + } + + Starvation { lock: self } + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the [`FairMutex`] mutably, and a mutable reference is guaranteed to be exclusive in + /// Rust, no actual locking needs to take place -- the mutable borrow statically guarantees no locks exist. As + /// such, this is a 'zero-cost' operation. + /// + /// # Example + /// + /// ``` + /// let mut lock = spin::mutex::FairMutex::<_>::new(0); + /// *lock.get_mut() = 10; + /// assert_eq!(*lock.lock(), 10); + /// ``` + #[inline(always)] + pub fn get_mut(&mut self) -> &mut T { + // We know statically that there are no other references to `self`, so + // there's no need to lock the inner mutex. + unsafe { &mut *self.data.get() } + } +} + +impl fmt::Debug for FairMutex { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + struct LockWrapper<'a, T: ?Sized + fmt::Debug>(Option>); + + impl fmt::Debug for LockWrapper<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match &self.0 { + Some(guard) => fmt::Debug::fmt(guard, f), + None => f.write_str(""), + } + } + } + + f.debug_struct("FairMutex") + .field("data", &LockWrapper(self.try_lock())) + .finish() + } +} + +impl Default for FairMutex { + fn default() -> Self { + Self::new(Default::default()) + } +} + +impl From for FairMutex { + fn from(data: T) -> Self { + Self::new(data) + } +} + +impl<'a, T: ?Sized> FairMutexGuard<'a, T> { + /// Leak the lock guard, yielding a mutable reference to the underlying data. + /// + /// Note that this function will permanently lock the original [`FairMutex`]. + /// + /// ``` + /// let mylock = spin::mutex::FairMutex::<_>::new(0); + /// + /// let data: &mut i32 = spin::mutex::FairMutexGuard::leak(mylock.lock()); + /// + /// *data = 1; + /// assert_eq!(*data, 1); + /// ``` + #[inline(always)] + pub fn leak(this: Self) -> &'a mut T { + // Use ManuallyDrop to avoid stacked-borrow invalidation + let mut this = ManuallyDrop::new(this); + // We know statically that only we are referencing data + unsafe { &mut *this.data } + } +} + +impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for FairMutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized + fmt::Display> fmt::Display for FairMutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized> Deref for FairMutexGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + // We know statically that only we are referencing data + unsafe { &*self.data } + } +} + +impl<'a, T: ?Sized> DerefMut for FairMutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + // We know statically that only we are referencing data + unsafe { &mut *self.data } + } +} + +impl<'a, T: ?Sized> Drop for FairMutexGuard<'a, T> { + /// The dropping of the MutexGuard will release the lock it was created from. + fn drop(&mut self) { + self.lock.fetch_and(!LOCKED, Ordering::Release); + } +} + +impl<'a, T: ?Sized, R> Starvation<'a, T, R> { + /// Attempts the lock the mutex if we are the only starving user. + /// + /// This allows another user to lock the mutex if they are starving as well. + pub fn try_lock_fair(self) -> Result, Self> { + // Try to lock the mutex. + if self + .lock + .lock + .compare_exchange( + STARVED, + STARVED | LOCKED, + Ordering::Acquire, + Ordering::Relaxed, + ) + .is_ok() + { + // We are the only starving user, lock the mutex. + Ok(FairMutexGuard { + lock: &self.lock.lock, + data: self.lock.data.get(), + }) + } else { + // Another user is starving, fail. + Err(self) + } + } + + /// Attempts to lock the mutex. + /// + /// If the lock is currently held by another thread, this will return `None`. + /// + /// # Example + /// + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(42); + /// + /// // Lock the mutex to simulate it being used by another user. + /// let guard1 = lock.lock(); + /// + /// // Try to lock the mutex. + /// let guard2 = lock.try_lock(); + /// assert!(guard2.is_none()); + /// + /// // Wait for a while. + /// wait_for_a_while(); + /// + /// // We are now starved, indicate as such. + /// let starve = lock.starve(); + /// + /// // Once the lock is released, another user trying to lock it will + /// // fail. + /// drop(guard1); + /// let guard3 = lock.try_lock(); + /// assert!(guard3.is_none()); + /// + /// // However, we will be able to lock it. + /// let guard4 = starve.try_lock(); + /// assert!(guard4.is_ok()); + /// + /// # fn wait_for_a_while() {} + /// ``` + pub fn try_lock(self) -> Result, Self> { + // Try to lock the mutex. + if self.lock.lock.fetch_or(LOCKED, Ordering::Acquire) & LOCKED == 0 { + // We have successfully locked the mutex. + // By dropping `self` here, we decrement the starvation count. + Ok(FairMutexGuard { + lock: &self.lock.lock, + data: self.lock.data.get(), + }) + } else { + Err(self) + } + } +} + +impl<'a, T: ?Sized, R: RelaxStrategy> Starvation<'a, T, R> { + /// Locks the mutex. + pub fn lock(mut self) -> FairMutexGuard<'a, T> { + // Try to lock the mutex. + loop { + match self.try_lock() { + Ok(lock) => return lock, + Err(starve) => self = starve, + } + + // Relax until the lock is released. + while self.lock.is_locked() { + R::relax(); + } + } + } +} + +impl<'a, T: ?Sized, R> Drop for Starvation<'a, T, R> { + fn drop(&mut self) { + // As there is no longer a user being starved, we decrement the starver count. + self.lock.lock.fetch_sub(STARVED, Ordering::Release); + } +} + +impl fmt::Display for LockRejectReason { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + LockRejectReason::Locked => write!(f, "locked"), + LockRejectReason::Starved => write!(f, "starved"), + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for LockRejectReason {} + +#[cfg(feature = "lock_api")] +unsafe impl lock_api_crate::RawMutex for FairMutex<(), R> { + type GuardMarker = lock_api_crate::GuardSend; + + const INIT: Self = Self::new(()); + + fn lock(&self) { + // Prevent guard destructor running + core::mem::forget(Self::lock(self)); + } + + fn try_lock(&self) -> bool { + // Prevent guard destructor running + Self::try_lock(self).map(core::mem::forget).is_some() + } + + unsafe fn unlock(&self) { + self.force_unlock(); + } + + fn is_locked(&self) -> bool { + Self::is_locked(self) + } +} + +#[cfg(test)] +mod tests { + use std::prelude::v1::*; + + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::mpsc::channel; + use std::sync::Arc; + use std::thread; + + type FairMutex = super::FairMutex; + + #[derive(Eq, PartialEq, Debug)] + struct NonCopy(i32); + + #[test] + fn smoke() { + let m = FairMutex::<_>::new(()); + drop(m.lock()); + drop(m.lock()); + } + + #[test] + fn lots_and_lots() { + static M: FairMutex<()> = FairMutex::<_>::new(()); + static mut CNT: u32 = 0; + const J: u32 = 1000; + const K: u32 = 3; + + fn inc() { + for _ in 0..J { + unsafe { + let _g = M.lock(); + CNT += 1; + } + } + } + + let (tx, rx) = channel(); + for _ in 0..K { + let tx2 = tx.clone(); + thread::spawn(move || { + inc(); + tx2.send(()).unwrap(); + }); + let tx2 = tx.clone(); + thread::spawn(move || { + inc(); + tx2.send(()).unwrap(); + }); + } + + drop(tx); + for _ in 0..2 * K { + rx.recv().unwrap(); + } + assert_eq!(unsafe { CNT }, J * K * 2); + } + + #[test] + fn try_lock() { + let mutex = FairMutex::<_>::new(42); + + // First lock succeeds + let a = mutex.try_lock(); + assert_eq!(a.as_ref().map(|r| **r), Some(42)); + + // Additional lock fails + let b = mutex.try_lock(); + assert!(b.is_none()); + + // After dropping lock, it succeeds again + ::core::mem::drop(a); + let c = mutex.try_lock(); + assert_eq!(c.as_ref().map(|r| **r), Some(42)); + } + + #[test] + fn test_into_inner() { + let m = FairMutex::<_>::new(NonCopy(10)); + assert_eq!(m.into_inner(), NonCopy(10)); + } + + #[test] + fn test_into_inner_drop() { + struct Foo(Arc); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = FairMutex::<_>::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_mutex_arc_nested() { + // Tests nested mutexes and access + // to underlying data. + let arc = Arc::new(FairMutex::<_>::new(1)); + let arc2 = Arc::new(FairMutex::<_>::new(arc)); + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + let lock = arc2.lock(); + let lock2 = lock.lock(); + assert_eq!(*lock2, 1); + tx.send(()).unwrap(); + }); + rx.recv().unwrap(); + } + + #[test] + fn test_mutex_arc_access_in_unwind() { + let arc = Arc::new(FairMutex::<_>::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || -> () { + struct Unwinder { + i: Arc>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + *self.i.lock() += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.lock(); + assert_eq!(*lock, 2); + } + + #[test] + fn test_mutex_unsized() { + let mutex: &FairMutex<[i32]> = &FairMutex::<_>::new([1, 2, 3]); + { + let b = &mut *mutex.lock(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*mutex.lock(), comp); + } + + #[test] + fn test_mutex_force_lock() { + let lock = FairMutex::<_>::new(()); + ::std::mem::forget(lock.lock()); + unsafe { + lock.force_unlock(); + } + assert!(lock.try_lock().is_some()); + } +} diff --git a/src/mutex/spin.rs b/src/mutex/spin.rs new file mode 100644 index 0000000..990af95 --- /dev/null +++ b/src/mutex/spin.rs @@ -0,0 +1,540 @@ +//! A naïve spinning mutex. +//! +//! Waiting threads hammer an atomic variable until it becomes available. Best-case latency is low, but worst-case +//! latency is theoretically infinite. + +use crate::{ + atomic::{AtomicBool, Ordering}, + RelaxStrategy, Spin, +}; +use core::{ + cell::UnsafeCell, + fmt, + marker::PhantomData, + mem::ManuallyDrop, + ops::{Deref, DerefMut}, +}; + +/// A [spin lock](https://en.m.wikipedia.org/wiki/Spinlock) providing mutually exclusive access to data. +/// +/// # Example +/// +/// ``` +/// use spin; +/// +/// let lock = spin::mutex::SpinMutex::<_>::new(0); +/// +/// // Modify the data +/// *lock.lock() = 2; +/// +/// // Read the data +/// let answer = *lock.lock(); +/// assert_eq!(answer, 2); +/// ``` +/// +/// # Thread safety example +/// +/// ``` +/// use spin; +/// use std::sync::{Arc, Barrier}; +/// +/// let thread_count = 1000; +/// let spin_mutex = Arc::new(spin::mutex::SpinMutex::<_>::new(0)); +/// +/// // We use a barrier to ensure the readout happens after all writing +/// let barrier = Arc::new(Barrier::new(thread_count + 1)); +/// +/// # let mut ts = Vec::new(); +/// for _ in (0..thread_count) { +/// let my_barrier = barrier.clone(); +/// let my_lock = spin_mutex.clone(); +/// # let t = +/// std::thread::spawn(move || { +/// let mut guard = my_lock.lock(); +/// *guard += 1; +/// +/// // Release the lock to prevent a deadlock +/// drop(guard); +/// my_barrier.wait(); +/// }); +/// # ts.push(t); +/// } +/// +/// barrier.wait(); +/// +/// let answer = { *spin_mutex.lock() }; +/// assert_eq!(answer, thread_count); +/// +/// # for t in ts { +/// # t.join().unwrap(); +/// # } +/// ``` +pub struct SpinMutex { + phantom: PhantomData, + pub(crate) lock: AtomicBool, + data: UnsafeCell, +} + +/// A guard that provides mutable data access. +/// +/// When the guard falls out of scope it will release the lock. +pub struct SpinMutexGuard<'a, T: ?Sized + 'a> { + lock: &'a AtomicBool, + data: *mut T, +} + +// Same unsafe impls as `std::sync::Mutex` +unsafe impl Sync for SpinMutex {} +unsafe impl Send for SpinMutex {} + +impl SpinMutex { + /// Creates a new [`SpinMutex`] wrapping the supplied data. + /// + /// # Example + /// + /// ``` + /// use spin::mutex::SpinMutex; + /// + /// static MUTEX: SpinMutex<()> = SpinMutex::<_>::new(()); + /// + /// fn demo() { + /// let lock = MUTEX.lock(); + /// // do something with lock + /// drop(lock); + /// } + /// ``` + #[inline(always)] + pub const fn new(data: T) -> Self { + SpinMutex { + lock: AtomicBool::new(false), + data: UnsafeCell::new(data), + phantom: PhantomData, + } + } + + /// Consumes this [`SpinMutex`] and unwraps the underlying data. + /// + /// # Example + /// + /// ``` + /// let lock = spin::mutex::SpinMutex::<_>::new(42); + /// assert_eq!(42, lock.into_inner()); + /// ``` + #[inline(always)] + pub fn into_inner(self) -> T { + // We know statically that there are no outstanding references to + // `self` so there's no need to lock. + let SpinMutex { data, .. } = self; + data.into_inner() + } + + /// Returns a mutable pointer to the underlying data. + /// + /// This is mostly meant to be used for applications which require manual unlocking, but where + /// storing both the lock and the pointer to the inner data gets inefficient. + /// + /// # Example + /// ``` + /// let lock = spin::mutex::SpinMutex::<_>::new(42); + /// + /// unsafe { + /// core::mem::forget(lock.lock()); + /// + /// assert_eq!(lock.as_mut_ptr().read(), 42); + /// lock.as_mut_ptr().write(58); + /// + /// lock.force_unlock(); + /// } + /// + /// assert_eq!(*lock.lock(), 58); + /// + /// ``` + #[inline(always)] + pub fn as_mut_ptr(&self) -> *mut T { + self.data.get() + } +} + +impl SpinMutex { + /// Locks the [`SpinMutex`] and returns a guard that permits access to the inner data. + /// + /// The returned value may be dereferenced for data access + /// and the lock will be dropped when the guard falls out of scope. + /// + /// ``` + /// let lock = spin::mutex::SpinMutex::<_>::new(0); + /// { + /// let mut data = lock.lock(); + /// // The lock is now locked and the data can be accessed + /// *data += 1; + /// // The lock is implicitly dropped at the end of the scope + /// } + /// ``` + #[inline(always)] + pub fn lock(&self) -> SpinMutexGuard { + // Can fail to lock even if the spinlock is not locked. May be more efficient than `try_lock` + // when called in a loop. + while self + .lock + .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_err() + { + // Wait until the lock looks unlocked before retrying + while self.is_locked() { + R::relax(); + } + } + + SpinMutexGuard { + lock: &self.lock, + data: unsafe { &mut *self.data.get() }, + } + } +} + +impl SpinMutex { + /// Returns `true` if the lock is currently held. + /// + /// # Safety + /// + /// This function provides no synchronization guarantees and so its result should be considered 'out of date' + /// the instant it is called. Do not use it for synchronization purposes. However, it may be useful as a heuristic. + #[inline(always)] + pub fn is_locked(&self) -> bool { + self.lock.load(Ordering::Relaxed) + } + + /// Force unlock this [`SpinMutex`]. + /// + /// # Safety + /// + /// This is *extremely* unsafe if the lock is not held by the current + /// thread. However, this can be useful in some instances for exposing the + /// lock to FFI that doesn't know how to deal with RAII. + #[inline(always)] + pub unsafe fn force_unlock(&self) { + self.lock.store(false, Ordering::Release); + } + + /// Try to lock this [`SpinMutex`], returning a lock guard if successful. + /// + /// # Example + /// + /// ``` + /// let lock = spin::mutex::SpinMutex::<_>::new(42); + /// + /// let maybe_guard = lock.try_lock(); + /// assert!(maybe_guard.is_some()); + /// + /// // `maybe_guard` is still held, so the second call fails + /// let maybe_guard2 = lock.try_lock(); + /// assert!(maybe_guard2.is_none()); + /// ``` + #[inline(always)] + pub fn try_lock(&self) -> Option> { + // The reason for using a strong compare_exchange is explained here: + // https://github.com/Amanieu/parking_lot/pull/207#issuecomment-575869107 + if self + .lock + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + Some(SpinMutexGuard { + lock: &self.lock, + data: unsafe { &mut *self.data.get() }, + }) + } else { + None + } + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the [`SpinMutex`] mutably, and a mutable reference is guaranteed to be exclusive in + /// Rust, no actual locking needs to take place -- the mutable borrow statically guarantees no locks exist. As + /// such, this is a 'zero-cost' operation. + /// + /// # Example + /// + /// ``` + /// let mut lock = spin::mutex::SpinMutex::<_>::new(0); + /// *lock.get_mut() = 10; + /// assert_eq!(*lock.lock(), 10); + /// ``` + #[inline(always)] + pub fn get_mut(&mut self) -> &mut T { + // We know statically that there are no other references to `self`, so + // there's no need to lock the inner mutex. + unsafe { &mut *self.data.get() } + } +} + +impl fmt::Debug for SpinMutex { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.try_lock() { + Some(guard) => write!(f, "Mutex {{ data: ") + .and_then(|()| (&*guard).fmt(f)) + .and_then(|()| write!(f, "}}")), + None => write!(f, "Mutex {{ }}"), + } + } +} + +impl Default for SpinMutex { + fn default() -> Self { + Self::new(Default::default()) + } +} + +impl From for SpinMutex { + fn from(data: T) -> Self { + Self::new(data) + } +} + +impl<'a, T: ?Sized> SpinMutexGuard<'a, T> { + /// Leak the lock guard, yielding a mutable reference to the underlying data. + /// + /// Note that this function will permanently lock the original [`SpinMutex`]. + /// + /// ``` + /// let mylock = spin::mutex::SpinMutex::<_>::new(0); + /// + /// let data: &mut i32 = spin::mutex::SpinMutexGuard::leak(mylock.lock()); + /// + /// *data = 1; + /// assert_eq!(*data, 1); + /// ``` + #[inline(always)] + pub fn leak(this: Self) -> &'a mut T { + // Use ManuallyDrop to avoid stacked-borrow invalidation + let mut this = ManuallyDrop::new(this); + // We know statically that only we are referencing data + unsafe { &mut *this.data } + } +} + +impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for SpinMutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized + fmt::Display> fmt::Display for SpinMutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized> Deref for SpinMutexGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + // We know statically that only we are referencing data + unsafe { &*self.data } + } +} + +impl<'a, T: ?Sized> DerefMut for SpinMutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + // We know statically that only we are referencing data + unsafe { &mut *self.data } + } +} + +impl<'a, T: ?Sized> Drop for SpinMutexGuard<'a, T> { + /// The dropping of the MutexGuard will release the lock it was created from. + fn drop(&mut self) { + self.lock.store(false, Ordering::Release); + } +} + +#[cfg(feature = "lock_api")] +unsafe impl lock_api_crate::RawMutex for SpinMutex<(), R> { + type GuardMarker = lock_api_crate::GuardSend; + + const INIT: Self = Self::new(()); + + fn lock(&self) { + // Prevent guard destructor running + core::mem::forget(Self::lock(self)); + } + + fn try_lock(&self) -> bool { + // Prevent guard destructor running + Self::try_lock(self).map(core::mem::forget).is_some() + } + + unsafe fn unlock(&self) { + self.force_unlock(); + } + + fn is_locked(&self) -> bool { + Self::is_locked(self) + } +} + +#[cfg(test)] +mod tests { + use std::prelude::v1::*; + + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::mpsc::channel; + use std::sync::Arc; + use std::thread; + + type SpinMutex = super::SpinMutex; + + #[derive(Eq, PartialEq, Debug)] + struct NonCopy(i32); + + #[test] + fn smoke() { + let m = SpinMutex::<_>::new(()); + drop(m.lock()); + drop(m.lock()); + } + + #[test] + fn lots_and_lots() { + static M: SpinMutex<()> = SpinMutex::<_>::new(()); + static mut CNT: u32 = 0; + const J: u32 = 1000; + const K: u32 = 3; + + fn inc() { + for _ in 0..J { + unsafe { + let _g = M.lock(); + CNT += 1; + } + } + } + + let (tx, rx) = channel(); + let mut ts = Vec::new(); + for _ in 0..K { + let tx2 = tx.clone(); + ts.push(thread::spawn(move || { + inc(); + tx2.send(()).unwrap(); + })); + let tx2 = tx.clone(); + ts.push(thread::spawn(move || { + inc(); + tx2.send(()).unwrap(); + })); + } + + drop(tx); + for _ in 0..2 * K { + rx.recv().unwrap(); + } + assert_eq!(unsafe { CNT }, J * K * 2); + + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn try_lock() { + let mutex = SpinMutex::<_>::new(42); + + // First lock succeeds + let a = mutex.try_lock(); + assert_eq!(a.as_ref().map(|r| **r), Some(42)); + + // Additional lock fails + let b = mutex.try_lock(); + assert!(b.is_none()); + + // After dropping lock, it succeeds again + ::core::mem::drop(a); + let c = mutex.try_lock(); + assert_eq!(c.as_ref().map(|r| **r), Some(42)); + } + + #[test] + fn test_into_inner() { + let m = SpinMutex::<_>::new(NonCopy(10)); + assert_eq!(m.into_inner(), NonCopy(10)); + } + + #[test] + fn test_into_inner_drop() { + struct Foo(Arc); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = SpinMutex::<_>::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_mutex_arc_nested() { + // Tests nested mutexes and access + // to underlying data. + let arc = Arc::new(SpinMutex::<_>::new(1)); + let arc2 = Arc::new(SpinMutex::<_>::new(arc)); + let (tx, rx) = channel(); + let t = thread::spawn(move || { + let lock = arc2.lock(); + let lock2 = lock.lock(); + assert_eq!(*lock2, 1); + tx.send(()).unwrap(); + }); + rx.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn test_mutex_arc_access_in_unwind() { + let arc = Arc::new(SpinMutex::<_>::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || -> () { + struct Unwinder { + i: Arc>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + *self.i.lock() += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.lock(); + assert_eq!(*lock, 2); + } + + #[test] + fn test_mutex_unsized() { + let mutex: &SpinMutex<[i32]> = &SpinMutex::<_>::new([1, 2, 3]); + { + let b = &mut *mutex.lock(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*mutex.lock(), comp); + } + + #[test] + fn test_mutex_force_lock() { + let lock = SpinMutex::<_>::new(()); + ::std::mem::forget(lock.lock()); + unsafe { + lock.force_unlock(); + } + assert!(lock.try_lock().is_some()); + } +} diff --git a/src/mutex/ticket.rs b/src/mutex/ticket.rs new file mode 100644 index 0000000..c14869e --- /dev/null +++ b/src/mutex/ticket.rs @@ -0,0 +1,537 @@ +//! A ticket-based mutex. +//! +//! Waiting threads take a 'ticket' from the lock in the order they arrive and gain access to the lock when their +//! ticket is next in the queue. Best-case latency is slightly worse than a regular spinning mutex, but worse-case +//! latency is infinitely better. Waiting threads simply need to wait for all threads that come before them in the +//! queue to finish. + +use crate::{ + atomic::{AtomicUsize, Ordering}, + RelaxStrategy, Spin, +}; +use core::{ + cell::UnsafeCell, + fmt, + marker::PhantomData, + ops::{Deref, DerefMut}, +}; + +/// A spin-based [ticket lock](https://en.wikipedia.org/wiki/Ticket_lock) providing mutually exclusive access to data. +/// +/// A ticket lock is analogous to a queue management system for lock requests. When a thread tries to take a lock, it +/// is assigned a 'ticket'. It then spins until its ticket becomes next in line. When the lock guard is released, the +/// next ticket will be processed. +/// +/// Ticket locks significantly reduce the worse-case performance of locking at the cost of slightly higher average-time +/// overhead. +/// +/// # Example +/// +/// ``` +/// use spin; +/// +/// let lock = spin::mutex::TicketMutex::<_>::new(0); +/// +/// // Modify the data +/// *lock.lock() = 2; +/// +/// // Read the data +/// let answer = *lock.lock(); +/// assert_eq!(answer, 2); +/// ``` +/// +/// # Thread safety example +/// +/// ``` +/// use spin; +/// use std::sync::{Arc, Barrier}; +/// +/// let thread_count = 1000; +/// let spin_mutex = Arc::new(spin::mutex::TicketMutex::<_>::new(0)); +/// +/// // We use a barrier to ensure the readout happens after all writing +/// let barrier = Arc::new(Barrier::new(thread_count + 1)); +/// +/// for _ in (0..thread_count) { +/// let my_barrier = barrier.clone(); +/// let my_lock = spin_mutex.clone(); +/// std::thread::spawn(move || { +/// let mut guard = my_lock.lock(); +/// *guard += 1; +/// +/// // Release the lock to prevent a deadlock +/// drop(guard); +/// my_barrier.wait(); +/// }); +/// } +/// +/// barrier.wait(); +/// +/// let answer = { *spin_mutex.lock() }; +/// assert_eq!(answer, thread_count); +/// ``` +pub struct TicketMutex { + phantom: PhantomData, + next_ticket: AtomicUsize, + next_serving: AtomicUsize, + data: UnsafeCell, +} + +/// A guard that protects some data. +/// +/// When the guard is dropped, the next ticket will be processed. +pub struct TicketMutexGuard<'a, T: ?Sized + 'a> { + next_serving: &'a AtomicUsize, + ticket: usize, + data: &'a mut T, +} + +unsafe impl Sync for TicketMutex {} +unsafe impl Send for TicketMutex {} + +impl TicketMutex { + /// Creates a new [`TicketMutex`] wrapping the supplied data. + /// + /// # Example + /// + /// ``` + /// use spin::mutex::TicketMutex; + /// + /// static MUTEX: TicketMutex<()> = TicketMutex::<_>::new(()); + /// + /// fn demo() { + /// let lock = MUTEX.lock(); + /// // do something with lock + /// drop(lock); + /// } + /// ``` + #[inline(always)] + pub const fn new(data: T) -> Self { + Self { + phantom: PhantomData, + next_ticket: AtomicUsize::new(0), + next_serving: AtomicUsize::new(0), + data: UnsafeCell::new(data), + } + } + + /// Consumes this [`TicketMutex`] and unwraps the underlying data. + /// + /// # Example + /// + /// ``` + /// let lock = spin::mutex::TicketMutex::<_>::new(42); + /// assert_eq!(42, lock.into_inner()); + /// ``` + #[inline(always)] + pub fn into_inner(self) -> T { + self.data.into_inner() + } + /// Returns a mutable pointer to the underying data. + /// + /// This is mostly meant to be used for applications which require manual unlocking, but where + /// storing both the lock and the pointer to the inner data gets inefficient. + /// + /// # Example + /// ``` + /// let lock = spin::mutex::SpinMutex::<_>::new(42); + /// + /// unsafe { + /// core::mem::forget(lock.lock()); + /// + /// assert_eq!(lock.as_mut_ptr().read(), 42); + /// lock.as_mut_ptr().write(58); + /// + /// lock.force_unlock(); + /// } + /// + /// assert_eq!(*lock.lock(), 58); + /// + /// ``` + #[inline(always)] + pub fn as_mut_ptr(&self) -> *mut T { + self.data.get() + } +} + +impl fmt::Debug for TicketMutex { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.try_lock() { + Some(guard) => write!(f, "Mutex {{ data: ") + .and_then(|()| (&*guard).fmt(f)) + .and_then(|()| write!(f, "}}")), + None => write!(f, "Mutex {{ }}"), + } + } +} + +impl TicketMutex { + /// Locks the [`TicketMutex`] and returns a guard that permits access to the inner data. + /// + /// The returned data may be dereferenced for data access + /// and the lock will be dropped when the guard falls out of scope. + /// + /// ``` + /// let lock = spin::mutex::TicketMutex::<_>::new(0); + /// { + /// let mut data = lock.lock(); + /// // The lock is now locked and the data can be accessed + /// *data += 1; + /// // The lock is implicitly dropped at the end of the scope + /// } + /// ``` + #[inline(always)] + pub fn lock(&self) -> TicketMutexGuard { + let ticket = self.next_ticket.fetch_add(1, Ordering::Relaxed); + + while self.next_serving.load(Ordering::Acquire) != ticket { + R::relax(); + } + + TicketMutexGuard { + next_serving: &self.next_serving, + ticket, + // Safety + // We know that we are the next ticket to be served, + // so there's no other thread accessing the data. + // + // Every other thread has another ticket number so it's + // definitely stuck in the spin loop above. + data: unsafe { &mut *self.data.get() }, + } + } +} + +impl TicketMutex { + /// Returns `true` if the lock is currently held. + /// + /// # Safety + /// + /// This function provides no synchronization guarantees and so its result should be considered 'out of date' + /// the instant it is called. Do not use it for synchronization purposes. However, it may be useful as a heuristic. + #[inline(always)] + pub fn is_locked(&self) -> bool { + let ticket = self.next_ticket.load(Ordering::Relaxed); + self.next_serving.load(Ordering::Relaxed) != ticket + } + + /// Force unlock this [`TicketMutex`], by serving the next ticket. + /// + /// # Safety + /// + /// This is *extremely* unsafe if the lock is not held by the current + /// thread. However, this can be useful in some instances for exposing the + /// lock to FFI that doesn't know how to deal with RAII. + #[inline(always)] + pub unsafe fn force_unlock(&self) { + self.next_serving.fetch_add(1, Ordering::Release); + } + + /// Try to lock this [`TicketMutex`], returning a lock guard if successful. + /// + /// # Example + /// + /// ``` + /// let lock = spin::mutex::TicketMutex::<_>::new(42); + /// + /// let maybe_guard = lock.try_lock(); + /// assert!(maybe_guard.is_some()); + /// + /// // `maybe_guard` is still held, so the second call fails + /// let maybe_guard2 = lock.try_lock(); + /// assert!(maybe_guard2.is_none()); + /// ``` + #[inline(always)] + pub fn try_lock(&self) -> Option> { + let ticket = self + .next_ticket + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |ticket| { + if self.next_serving.load(Ordering::Acquire) == ticket { + Some(ticket + 1) + } else { + None + } + }); + + ticket.ok().map(|ticket| TicketMutexGuard { + next_serving: &self.next_serving, + ticket, + // Safety + // We have a ticket that is equal to the next_serving ticket, so we know: + // - that no other thread can have the same ticket id as this thread + // - that we are the next one to be served so we have exclusive access to the data + data: unsafe { &mut *self.data.get() }, + }) + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the [`TicketMutex`] mutably, and a mutable reference is guaranteed to be exclusive in + /// Rust, no actual locking needs to take place -- the mutable borrow statically guarantees no locks exist. As + /// such, this is a 'zero-cost' operation. + /// + /// # Example + /// + /// ``` + /// let mut lock = spin::mutex::TicketMutex::<_>::new(0); + /// *lock.get_mut() = 10; + /// assert_eq!(*lock.lock(), 10); + /// ``` + #[inline(always)] + pub fn get_mut(&mut self) -> &mut T { + // Safety: + // We know that there are no other references to `self`, + // so it's safe to return a exclusive reference to the data. + unsafe { &mut *self.data.get() } + } +} + +impl Default for TicketMutex { + fn default() -> Self { + Self::new(Default::default()) + } +} + +impl From for TicketMutex { + fn from(data: T) -> Self { + Self::new(data) + } +} + +impl<'a, T: ?Sized> TicketMutexGuard<'a, T> { + /// Leak the lock guard, yielding a mutable reference to the underlying data. + /// + /// Note that this function will permanently lock the original [`TicketMutex`]. + /// + /// ``` + /// let mylock = spin::mutex::TicketMutex::<_>::new(0); + /// + /// let data: &mut i32 = spin::mutex::TicketMutexGuard::leak(mylock.lock()); + /// + /// *data = 1; + /// assert_eq!(*data, 1); + /// ``` + #[inline(always)] + pub fn leak(this: Self) -> &'a mut T { + let data = this.data as *mut _; // Keep it in pointer form temporarily to avoid double-aliasing + core::mem::forget(this); + unsafe { &mut *data } + } +} + +impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for TicketMutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized + fmt::Display> fmt::Display for TicketMutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized> Deref for TicketMutexGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + self.data + } +} + +impl<'a, T: ?Sized> DerefMut for TicketMutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + self.data + } +} + +impl<'a, T: ?Sized> Drop for TicketMutexGuard<'a, T> { + fn drop(&mut self) { + let new_ticket = self.ticket + 1; + self.next_serving.store(new_ticket, Ordering::Release); + } +} + +#[cfg(feature = "lock_api")] +unsafe impl lock_api_crate::RawMutex for TicketMutex<(), R> { + type GuardMarker = lock_api_crate::GuardSend; + + const INIT: Self = Self::new(()); + + fn lock(&self) { + // Prevent guard destructor running + core::mem::forget(Self::lock(self)); + } + + fn try_lock(&self) -> bool { + // Prevent guard destructor running + Self::try_lock(self).map(core::mem::forget).is_some() + } + + unsafe fn unlock(&self) { + self.force_unlock(); + } + + fn is_locked(&self) -> bool { + Self::is_locked(self) + } +} + +#[cfg(test)] +mod tests { + use std::prelude::v1::*; + + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::mpsc::channel; + use std::sync::Arc; + use std::thread; + + type TicketMutex = super::TicketMutex; + + #[derive(Eq, PartialEq, Debug)] + struct NonCopy(i32); + + #[test] + fn smoke() { + let m = TicketMutex::<_>::new(()); + drop(m.lock()); + drop(m.lock()); + } + + #[test] + fn lots_and_lots() { + static M: TicketMutex<()> = TicketMutex::<_>::new(()); + static mut CNT: u32 = 0; + const J: u32 = 1000; + const K: u32 = 3; + + fn inc() { + for _ in 0..J { + unsafe { + let _g = M.lock(); + CNT += 1; + } + } + } + + let (tx, rx) = channel(); + for _ in 0..K { + let tx2 = tx.clone(); + thread::spawn(move || { + inc(); + tx2.send(()).unwrap(); + }); + let tx2 = tx.clone(); + thread::spawn(move || { + inc(); + tx2.send(()).unwrap(); + }); + } + + drop(tx); + for _ in 0..2 * K { + rx.recv().unwrap(); + } + assert_eq!(unsafe { CNT }, J * K * 2); + } + + #[test] + fn try_lock() { + let mutex = TicketMutex::<_>::new(42); + + // First lock succeeds + let a = mutex.try_lock(); + assert_eq!(a.as_ref().map(|r| **r), Some(42)); + + // Additional lock fails + let b = mutex.try_lock(); + assert!(b.is_none()); + + // After dropping lock, it succeeds again + ::core::mem::drop(a); + let c = mutex.try_lock(); + assert_eq!(c.as_ref().map(|r| **r), Some(42)); + } + + #[test] + fn test_into_inner() { + let m = TicketMutex::<_>::new(NonCopy(10)); + assert_eq!(m.into_inner(), NonCopy(10)); + } + + #[test] + fn test_into_inner_drop() { + struct Foo(Arc); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = TicketMutex::<_>::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_mutex_arc_nested() { + // Tests nested mutexes and access + // to underlying data. + let arc = Arc::new(TicketMutex::<_>::new(1)); + let arc2 = Arc::new(TicketMutex::<_>::new(arc)); + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + let lock = arc2.lock(); + let lock2 = lock.lock(); + assert_eq!(*lock2, 1); + tx.send(()).unwrap(); + }); + rx.recv().unwrap(); + } + + #[test] + fn test_mutex_arc_access_in_unwind() { + let arc = Arc::new(TicketMutex::<_>::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || -> () { + struct Unwinder { + i: Arc>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + *self.i.lock() += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.lock(); + assert_eq!(*lock, 2); + } + + #[test] + fn test_mutex_unsized() { + let mutex: &TicketMutex<[i32]> = &TicketMutex::<_>::new([1, 2, 3]); + { + let b = &mut *mutex.lock(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*mutex.lock(), comp); + } + + #[test] + fn is_locked() { + let mutex = TicketMutex::<_>::new(()); + assert!(!mutex.is_locked()); + let lock = mutex.lock(); + assert!(mutex.is_locked()); + drop(lock); + assert!(!mutex.is_locked()); + } +} diff --git a/src/once.rs b/src/once.rs new file mode 100644 index 0000000..22784af --- /dev/null +++ b/src/once.rs @@ -0,0 +1,731 @@ + +//! Synchronization primitives for one-time evaluation. + +use crate::{ + atomic::{AtomicU8, Ordering}, + RelaxStrategy, Spin, +}; +use core::{cell::UnsafeCell, fmt, marker::PhantomData, mem::MaybeUninit}; + +/// A primitive that provides lazy one-time initialization. +/// +/// Unlike its `std::sync` equivalent, this is generalized such that the closure returns a +/// value to be stored by the [`Once`] (`std::sync::Once` can be trivially emulated with +/// `Once`). +/// +/// Because [`Once::new`] is `const`, this primitive may be used to safely initialize statics. +/// +/// # Examples +/// +/// ``` +/// use spin; +/// +/// static START: spin::Once = spin::Once::new(); +/// +/// START.call_once(|| { +/// // run initialization here +/// }); +/// ``` +pub struct Once { + phantom: PhantomData, + status: AtomicStatus, + data: UnsafeCell>, +} + +impl Default for Once { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for Once { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.get() { + Some(s) => write!(f, "Once {{ data: ") + .and_then(|()| s.fmt(f)) + .and_then(|()| write!(f, "}}")), + None => write!(f, "Once {{ }}"), + } + } +} + +// Same unsafe impls as `std::sync::RwLock`, because this also allows for +// concurrent reads. +unsafe impl Sync for Once {} +unsafe impl Send for Once {} + +mod status { + use super::*; + + // SAFETY: This structure has an invariant, namely that the inner atomic u8 must *always* have + // a value for which there exists a valid Status. This means that users of this API must only + // be allowed to load and store `Status`es. + #[repr(transparent)] + pub struct AtomicStatus(AtomicU8); + + // Four states that a Once can be in, encoded into the lower bits of `status` in + // the Once structure. + #[repr(u8)] + #[derive(Clone, Copy, Debug, PartialEq)] + pub enum Status { + Incomplete = 0x00, + Running = 0x01, + Complete = 0x02, + Panicked = 0x03, + } + impl Status { + // Construct a status from an inner u8 integer. + // + // # Safety + // + // For this to be safe, the inner number must have a valid corresponding enum variant. + unsafe fn new_unchecked(inner: u8) -> Self { + core::mem::transmute(inner) + } + } + + impl AtomicStatus { + #[inline(always)] + pub const fn new(status: Status) -> Self { + // SAFETY: We got the value directly from status, so transmuting back is fine. + Self(AtomicU8::new(status as u8)) + } + #[inline(always)] + pub fn load(&self, ordering: Ordering) -> Status { + // SAFETY: We know that the inner integer must have been constructed from a Status in + // the first place. + unsafe { Status::new_unchecked(self.0.load(ordering)) } + } + #[inline(always)] + pub fn store(&self, status: Status, ordering: Ordering) { + // SAFETY: While not directly unsafe, this is safe because the value was retrieved from + // a status, thus making transmutation safe. + self.0.store(status as u8, ordering); + } + #[inline(always)] + pub fn compare_exchange( + &self, + old: Status, + new: Status, + success: Ordering, + failure: Ordering, + ) -> Result { + match self + .0 + .compare_exchange(old as u8, new as u8, success, failure) + { + // SAFETY: A compare exchange will always return a value that was later stored into + // the atomic u8, but due to the invariant that it must be a valid Status, we know + // that both Ok(_) and Err(_) will be safely transmutable. + Ok(ok) => Ok(unsafe { Status::new_unchecked(ok) }), + Err(err) => Err(unsafe { Status::new_unchecked(err) }), + } + } + #[inline(always)] + pub fn get_mut(&mut self) -> &mut Status { + // SAFETY: Since we know that the u8 inside must be a valid Status, we can safely cast + // it to a &mut Status. + unsafe { &mut *((self.0.get_mut() as *mut u8).cast::()) } + } + } +} +use self::status::{AtomicStatus, Status}; + +use core::hint::unreachable_unchecked as unreachable; + +impl Once { + /// Performs an initialization routine once and only once. The given closure + /// will be executed if this is the first time `call_once` has been called, + /// and otherwise the routine will *not* be invoked. + /// + /// This method will block the calling thread if another initialization + /// routine is currently running. + /// + /// When this function returns, it is guaranteed that some initialization + /// has run and completed (it may not be the closure specified). The + /// returned pointer will point to the result from the closure that was + /// run. + /// + /// # Panics + /// + /// This function will panic if the [`Once`] previously panicked while attempting + /// to initialize. This is similar to the poisoning behaviour of `std::sync`'s + /// primitives. + /// + /// # Examples + /// + /// ``` + /// use spin; + /// + /// static INIT: spin::Once = spin::Once::new(); + /// + /// fn get_cached_val() -> usize { + /// *INIT.call_once(expensive_computation) + /// } + /// + /// fn expensive_computation() -> usize { + /// // ... + /// # 2 + /// } + /// ``` + pub fn call_once T>(&self, f: F) -> &T { + match self.try_call_once(|| Ok::(f())) { + Ok(x) => x, + Err(void) => match void {}, + } + } + + /// This method is similar to `call_once`, but allows the given closure to + /// fail, and lets the `Once` in a uninitialized state if it does. + /// + /// This method will block the calling thread if another initialization + /// routine is currently running. + /// + /// When this function returns without error, it is guaranteed that some + /// initialization has run and completed (it may not be the closure + /// specified). The returned reference will point to the result from the + /// closure that was run. + /// + /// # Panics + /// + /// This function will panic if the [`Once`] previously panicked while attempting + /// to initialize. This is similar to the poisoning behaviour of `std::sync`'s + /// primitives. + /// + /// # Examples + /// + /// ``` + /// use spin; + /// + /// static INIT: spin::Once = spin::Once::new(); + /// + /// fn get_cached_val() -> Result { + /// INIT.try_call_once(expensive_fallible_computation).map(|x| *x) + /// } + /// + /// fn expensive_fallible_computation() -> Result { + /// // ... + /// # Ok(2) + /// } + /// ``` + pub fn try_call_once Result, E>(&self, f: F) -> Result<&T, E> { + // SAFETY: We perform an Acquire load because if this were to return COMPLETE, then we need + // the preceding stores done while initializing, to become visible after this load. + let mut status = self.status.load(Ordering::Acquire); + + if status == Status::Incomplete { + match self.status.compare_exchange( + Status::Incomplete, + Status::Running, + // SAFETY: Success ordering: We do not have to synchronize any data at all, as the + // value is at this point uninitialized, so Relaxed is technically sufficient. We + // will however have to do a Release store later. However, the success ordering + // must always be at least as strong as the failure ordering, so we choose Acquire + // here anyway. + Ordering::Acquire, + // SAFETY: Failure ordering: While we have already loaded the status initially, we + // know that if some other thread would have fully initialized this in between, + // then there will be new not-yet-synchronized accesses done during that + // initialization that would not have been synchronized by the earlier load. Thus + // we use Acquire to ensure when we later call force_get() in the last match + // statement, if the status was changed to COMPLETE, that those accesses will become + // visible to us. + Ordering::Acquire, + ) { + Ok(_must_be_state_incomplete) => { + // The compare-exchange succeeded, so we shall initialize it. + + // We use a guard (Finish) to catch panics caused by builder + let finish = Finish { + status: &self.status, + }; + let val = match f() { + Ok(val) => val, + Err(err) => { + // If an error occurs, clean up everything and leave. + core::mem::forget(finish); + self.status.store(Status::Incomplete, Ordering::Release); + return Err(err); + } + }; + unsafe { + // SAFETY: + // `UnsafeCell`/deref: currently the only accessor, mutably + // and immutably by cas exclusion. + // `write`: pointer comes from `MaybeUninit`. + (*self.data.get()).as_mut_ptr().write(val); + }; + // If there were to be a panic with unwind enabled, the code would + // short-circuit and never reach the point where it writes the inner data. + // The destructor for Finish will run, and poison the Once to ensure that other + // threads accessing it do not exhibit unwanted behavior, if there were to be + // any inconsistency in data structures caused by the panicking thread. + // + // However, f() is expected in the general case not to panic. In that case, we + // simply forget the guard, bypassing its destructor. We could theoretically + // clear a flag instead, but this eliminates the call to the destructor at + // compile time, and unconditionally poisons during an eventual panic, if + // unwinding is enabled. + core::mem::forget(finish); + + // SAFETY: Release is required here, so that all memory accesses done in the + // closure when initializing, become visible to other threads that perform Acquire + // loads. + // + // And, we also know that the changes this thread has done will not magically + // disappear from our cache, so it does not need to be AcqRel. + self.status.store(Status::Complete, Ordering::Release); + + // This next line is mainly an optimization. + return unsafe { Ok(self.force_get()) }; + } + // The compare-exchange failed, so we know for a fact that the status cannot be + // INCOMPLETE, or it would have succeeded. + Err(other_status) => status = other_status, + } + } + + Ok(match status { + // SAFETY: We have either checked with an Acquire load, that the status is COMPLETE, or + // initialized it ourselves, in which case no additional synchronization is needed. + Status::Complete => unsafe { self.force_get() }, + Status::Panicked => panic!("Once panicked"), + Status::Running => self.poll().unwrap_or_else(|| { + if cfg!(debug_assertions) { + unreachable!("Encountered INCOMPLETE when polling Once") + } else { + // SAFETY: This poll is guaranteed never to fail because the API of poll + // promises spinning if initialization is in progress. We've already + // checked that initialisation is in progress, and initialisation is + // monotonic: once done, it cannot be undone. We also fetched the status + // with Acquire semantics, thereby guaranteeing that the later-executed + // poll will also agree with us that initialization is in progress. Ergo, + // this poll cannot fail. + unsafe { + unreachable(); + } + } + }), + + // SAFETY: The only invariant possible in addition to the aforementioned ones at the + // moment, is INCOMPLETE. However, the only way for this match statement to be + // reached, is if we lost the CAS (otherwise we would have returned early), in + // which case we know for a fact that the state cannot be changed back to INCOMPLETE as + // `Once`s are monotonic. + Status::Incomplete => unsafe { unreachable() }, + }) + } + + /// Spins until the [`Once`] contains a value. + /// + /// Note that in releases prior to `0.7`, this function had the behaviour of [`Once::poll`]. + /// + /// # Panics + /// + /// This function will panic if the [`Once`] previously panicked while attempting + /// to initialize. This is similar to the poisoning behaviour of `std::sync`'s + /// primitives. + pub fn wait(&self) -> &T { + loop { + match self.poll() { + Some(x) => break x, + None => R::relax(), + } + } + } + + /// Like [`Once::get`], but will spin if the [`Once`] is in the process of being + /// initialized. If initialization has not even begun, `None` will be returned. + /// + /// Note that in releases prior to `0.7`, this function was named `wait`. + /// + /// # Panics + /// + /// This function will panic if the [`Once`] previously panicked while attempting + /// to initialize. This is similar to the poisoning behaviour of `std::sync`'s + /// primitives. + pub fn poll(&self) -> Option<&T> { + loop { + // SAFETY: Acquire is safe here, because if the status is COMPLETE, then we want to make + // sure that all memory accessed done while initializing that value, are visible when + // we return a reference to the inner data after this load. + match self.status.load(Ordering::Acquire) { + Status::Incomplete => return None, + Status::Running => R::relax(), // We spin + Status::Complete => return Some(unsafe { self.force_get() }), + Status::Panicked => panic!("Once previously poisoned by a panicked"), + } + } + } +} + +impl Once { + /// Initialization constant of [`Once`]. + #[allow(clippy::declare_interior_mutable_const)] + pub const INIT: Self = Self { + phantom: PhantomData, + status: AtomicStatus::new(Status::Incomplete), + data: UnsafeCell::new(MaybeUninit::uninit()), + }; + + /// Creates a new [`Once`]. + pub const fn new() -> Self { + Self::INIT + } + + /// Creates a new initialized [`Once`]. + pub const fn initialized(data: T) -> Self { + Self { + phantom: PhantomData, + status: AtomicStatus::new(Status::Complete), + data: UnsafeCell::new(MaybeUninit::new(data)), + } + } + + /// Retrieve a pointer to the inner data. + /// + /// While this method itself is safe, accessing the pointer before the [`Once`] has been + /// initialized is UB, unless this method has already been written to from a pointer coming + /// from this method. + pub fn as_mut_ptr(&self) -> *mut T { + // SAFETY: + // * MaybeUninit always has exactly the same layout as T + self.data.get().cast::() + } + + /// Get a reference to the initialized instance. Must only be called once COMPLETE. + unsafe fn force_get(&self) -> &T { + // SAFETY: + // * `UnsafeCell`/inner deref: data never changes again + // * `MaybeUninit`/outer deref: data was initialized + &*(*self.data.get()).as_ptr() + } + + /// Get a reference to the initialized instance. Must only be called once COMPLETE. + unsafe fn force_get_mut(&mut self) -> &mut T { + // SAFETY: + // * `UnsafeCell`/inner deref: data never changes again + // * `MaybeUninit`/outer deref: data was initialized + &mut *(*self.data.get()).as_mut_ptr() + } + + /// Get a reference to the initialized instance. Must only be called once COMPLETE. + unsafe fn force_into_inner(self) -> T { + // SAFETY: + // * `UnsafeCell`/inner deref: data never changes again + // * `MaybeUninit`/outer deref: data was initialized + (*self.data.get()).as_ptr().read() + } + + /// Returns a reference to the inner value if the [`Once`] has been initialized. + pub fn get(&self) -> Option<&T> { + // SAFETY: Just as with `poll`, Acquire is safe here because we want to be able to see the + // nonatomic stores done when initializing, once we have loaded and checked the status. + match self.status.load(Ordering::Acquire) { + Status::Complete => Some(unsafe { self.force_get() }), + _ => None, + } + } + + /// Returns a reference to the inner value on the unchecked assumption that the [`Once`] has been initialized. + /// + /// # Safety + /// + /// This is *extremely* unsafe if the `Once` has not already been initialized because a reference to uninitialized + /// memory will be returned, immediately triggering undefined behaviour (even if the reference goes unused). + /// However, this can be useful in some instances for exposing the `Once` to FFI or when the overhead of atomically + /// checking initialization is unacceptable and the `Once` has already been initialized. + pub unsafe fn get_unchecked(&self) -> &T { + debug_assert_eq!( + self.status.load(Ordering::SeqCst), + Status::Complete, + "Attempted to access an uninitialized Once. If this was run without debug checks, this would be undefined behaviour. This is a serious bug and you must fix it.", + ); + self.force_get() + } + + /// Returns a mutable reference to the inner value if the [`Once`] has been initialized. + /// + /// Because this method requires a mutable reference to the [`Once`], no synchronization + /// overhead is required to access the inner value. In effect, it is zero-cost. + pub fn get_mut(&mut self) -> Option<&mut T> { + match *self.status.get_mut() { + Status::Complete => Some(unsafe { self.force_get_mut() }), + _ => None, + } + } + + /// Returns a the inner value if the [`Once`] has been initialized. + /// + /// Because this method requires ownership of the [`Once`], no synchronization overhead + /// is required to access the inner value. In effect, it is zero-cost. + pub fn try_into_inner(mut self) -> Option { + match *self.status.get_mut() { + Status::Complete => Some(unsafe { self.force_into_inner() }), + _ => None, + } + } + + /// Checks whether the value has been initialized. + /// + /// This is done using [`Acquire`](core::sync::atomic::Ordering::Acquire) ordering, and + /// therefore it is safe to access the value directly via + /// [`get_unchecked`](Self::get_unchecked) if this returns true. + pub fn is_completed(&self) -> bool { + // TODO: Add a similar variant for Relaxed? + self.status.load(Ordering::Acquire) == Status::Complete + } +} + +impl From for Once { + fn from(data: T) -> Self { + Self::initialized(data) + } +} + +impl Drop for Once { + fn drop(&mut self) { + // No need to do any atomic access here, we have &mut! + if *self.status.get_mut() == Status::Complete { + unsafe { + //TODO: Use MaybeUninit::assume_init_drop once stabilised + core::ptr::drop_in_place((*self.data.get()).as_mut_ptr()); + } + } + } +} + +struct Finish<'a> { + status: &'a AtomicStatus, +} + +impl<'a> Drop for Finish<'a> { + fn drop(&mut self) { + // While using Relaxed here would most likely not be an issue, we use SeqCst anyway. + // This is mainly because panics are not meant to be fast at all, but also because if + // there were to be a compiler bug which reorders accesses within the same thread, + // where it should not, we want to be sure that the panic really is handled, and does + // not cause additional problems. SeqCst will therefore help guarding against such + // bugs. + self.status.store(Status::Panicked, Ordering::SeqCst); + } +} + +#[cfg(test)] +mod tests { + use std::prelude::v1::*; + + use std::sync::mpsc::channel; + use std::thread; + + use super::*; + + #[test] + fn smoke_once() { + static O: Once = Once::new(); + let mut a = 0; + O.call_once(|| a += 1); + assert_eq!(a, 1); + O.call_once(|| a += 1); + assert_eq!(a, 1); + } + + #[test] + fn smoke_once_value() { + static O: Once = Once::new(); + let a = O.call_once(|| 1); + assert_eq!(*a, 1); + let b = O.call_once(|| 2); + assert_eq!(*b, 1); + } + + #[test] + fn stampede_once() { + static O: Once = Once::new(); + static mut RUN: bool = false; + + let (tx, rx) = channel(); + let mut ts = Vec::new(); + for _ in 0..10 { + let tx = tx.clone(); + ts.push(thread::spawn(move || { + for _ in 0..4 { + thread::yield_now() + } + unsafe { + O.call_once(|| { + assert!(!RUN); + RUN = true; + }); + assert!(RUN); + } + tx.send(()).unwrap(); + })); + } + + unsafe { + O.call_once(|| { + assert!(!RUN); + RUN = true; + }); + assert!(RUN); + } + + for _ in 0..10 { + rx.recv().unwrap(); + } + + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn get() { + static INIT: Once = Once::new(); + + assert!(INIT.get().is_none()); + INIT.call_once(|| 2); + assert_eq!(INIT.get().map(|r| *r), Some(2)); + } + + #[test] + fn get_no_wait() { + static INIT: Once = Once::new(); + + assert!(INIT.get().is_none()); + let t = thread::spawn(move || { + INIT.call_once(|| { + thread::sleep(std::time::Duration::from_secs(3)); + 42 + }); + }); + assert!(INIT.get().is_none()); + + t.join().unwrap(); + } + + #[test] + fn poll() { + static INIT: Once = Once::new(); + + assert!(INIT.poll().is_none()); + INIT.call_once(|| 3); + assert_eq!(INIT.poll().map(|r| *r), Some(3)); + } + + #[test] + fn wait() { + static INIT: Once = Once::new(); + + let t = std::thread::spawn(|| { + assert_eq!(*INIT.wait(), 3); + assert!(INIT.is_completed()); + }); + + for _ in 0..4 { + thread::yield_now() + } + + assert!(INIT.poll().is_none()); + INIT.call_once(|| 3); + + t.join().unwrap(); + } + + #[test] + fn panic() { + use std::panic; + + static INIT: Once = Once::new(); + + // poison the once + let t = panic::catch_unwind(|| { + INIT.call_once(|| panic!()); + }); + assert!(t.is_err()); + + // poisoning propagates + let t = panic::catch_unwind(|| { + INIT.call_once(|| {}); + }); + assert!(t.is_err()); + } + + #[test] + fn init_constant() { + static O: Once = Once::INIT; + let mut a = 0; + O.call_once(|| a += 1); + assert_eq!(a, 1); + O.call_once(|| a += 1); + assert_eq!(a, 1); + } + + static mut CALLED: bool = false; + + struct DropTest {} + + impl Drop for DropTest { + fn drop(&mut self) { + unsafe { + CALLED = true; + } + } + } + + // This is sort of two test cases, but if we write them as separate test methods + // they can be executed concurrently and then fail some small fraction of the + // time. + #[test] + fn drop_occurs_and_skip_uninit_drop() { + unsafe { + CALLED = false; + } + + { + let once = Once::<_>::new(); + once.call_once(|| DropTest {}); + } + + assert!(unsafe { CALLED }); + // Now test that we skip drops for the uninitialized case. + unsafe { + CALLED = false; + } + + let once = Once::::new(); + drop(once); + + assert!(unsafe { !CALLED }); + } + + #[test] + fn call_once_test() { + for _ in 0..20 { + use std::sync::atomic::AtomicUsize; + use std::sync::Arc; + use std::time::Duration; + let share = Arc::new(AtomicUsize::new(0)); + let once = Arc::new(Once::<_, Spin>::new()); + let mut hs = Vec::new(); + for _ in 0..8 { + let h = thread::spawn({ + let share = share.clone(); + let once = once.clone(); + move || { + thread::sleep(Duration::from_millis(10)); + once.call_once(|| { + share.fetch_add(1, Ordering::SeqCst); + }); + } + }); + hs.push(h); + } + for h in hs { + h.join().unwrap(); + } + assert_eq!(1, share.load(Ordering::SeqCst)); + } + } +} diff --git a/src/relax.rs b/src/relax.rs new file mode 100644 index 0000000..8842f80 --- /dev/null +++ b/src/relax.rs @@ -0,0 +1,61 @@ +//! Strategies that determine the behaviour of locks when encountering contention. + +/// A trait implemented by spinning relax strategies. +pub trait RelaxStrategy { + /// Perform the relaxing operation during a period of contention. + fn relax(); +} + +/// A strategy that rapidly spins while informing the CPU that it should power down non-essential components via +/// [`core::hint::spin_loop`]. +/// +/// Note that spinning is a 'dumb' strategy and most schedulers cannot correctly differentiate it from useful work, +/// thereby misallocating even more CPU time to the spinning process. This is known as +/// ['priority inversion'](https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html). +/// +/// If you see signs that priority inversion is occurring, consider switching to [`Yield`] or, even better, not using a +/// spinlock at all and opting for a proper scheduler-aware lock. Remember also that different targets, operating +/// systems, schedulers, and even the same scheduler with different workloads will exhibit different behaviour. Just +/// because priority inversion isn't occurring in your tests does not mean that it will not occur. Use a scheduler- +/// aware lock if at all possible. +pub struct Spin; + +impl RelaxStrategy for Spin { + #[inline(always)] + fn relax() { + // Use the deprecated spin_loop_hint() to ensure that we don't get + // a higher MSRV than we need to. + #[allow(deprecated)] + core::sync::atomic::spin_loop_hint(); + } +} + +/// A strategy that yields the current time slice to the scheduler in favour of other threads or processes. +/// +/// This is generally used as a strategy for minimising power consumption and priority inversion on targets that have a +/// standard library available. Note that such targets have scheduler-integrated concurrency primitives available, and +/// you should generally use these instead, except in rare circumstances. +#[cfg(feature = "std")] +#[cfg_attr(docsrs, doc(cfg(feature = "std")))] +pub struct Yield; + +#[cfg(feature = "std")] +#[cfg_attr(docsrs, doc(cfg(feature = "std")))] +impl RelaxStrategy for Yield { + #[inline(always)] + fn relax() { + std::thread::yield_now(); + } +} + +/// A strategy that rapidly spins, without telling the CPU to do any powering down. +/// +/// You almost certainly do not want to use this. Use [`Spin`] instead. It exists for completeness and for targets +/// that, for some reason, miscompile or do not support spin hint intrinsics despite attempting to generate code for +/// them (i.e: this is a workaround for possible compiler bugs). +pub struct Loop; + +impl RelaxStrategy for Loop { + #[inline(always)] + fn relax() {} +} diff --git a/src/rwlock.rs b/src/rwlock.rs new file mode 100644 index 0000000..62d5ff3 --- /dev/null +++ b/src/rwlock.rs @@ -0,0 +1,1156 @@ +//! A lock that provides data access to either one writer or many readers. + +use crate::{ + atomic::{AtomicUsize, Ordering}, + RelaxStrategy, Spin, +}; +use core::{ + cell::UnsafeCell, + fmt, + marker::PhantomData, + mem, + mem::ManuallyDrop, + ops::{Deref, DerefMut}, +}; + +/// A lock that provides data access to either one writer or many readers. +/// +/// This lock behaves in a similar manner to its namesake `std::sync::RwLock` but uses +/// spinning for synchronisation instead. Unlike its namespace, this lock does not +/// track lock poisoning. +/// +/// This type of lock allows a number of readers or at most one writer at any +/// point in time. The write portion of this lock typically allows modification +/// of the underlying data (exclusive access) and the read portion of this lock +/// typically allows for read-only access (shared access). +/// +/// The type parameter `T` represents the data that this lock protects. It is +/// required that `T` satisfies `Send` to be shared across tasks and `Sync` to +/// allow concurrent access through readers. The RAII guards returned from the +/// locking methods implement `Deref` (and `DerefMut` for the `write` methods) +/// to allow access to the contained of the lock. +/// +/// An [`RwLockUpgradableGuard`](RwLockUpgradableGuard) can be upgraded to a +/// writable guard through the [`RwLockUpgradableGuard::upgrade`](RwLockUpgradableGuard::upgrade) +/// [`RwLockUpgradableGuard::try_upgrade`](RwLockUpgradableGuard::try_upgrade) functions. +/// Writable or upgradeable guards can be downgraded through their respective `downgrade` +/// functions. +/// +/// Based on Facebook's +/// [`folly/RWSpinLock.h`](https://github.com/facebook/folly/blob/a0394d84f2d5c3e50ebfd0566f9d3acb52cfab5a/folly/synchronization/RWSpinLock.h). +/// This implementation is unfair to writers - if the lock always has readers, then no writers will +/// ever get a chance. Using an upgradeable lock guard can *somewhat* alleviate this issue as no +/// new readers are allowed when an upgradeable guard is held, but upgradeable guards can be taken +/// when there are existing readers. However if the lock is that highly contended and writes are +/// crucial then this implementation may be a poor choice. +/// +/// # Examples +/// +/// ``` +/// use spin; +/// +/// let lock = spin::RwLock::new(5); +/// +/// // many reader locks can be held at once +/// { +/// let r1 = lock.read(); +/// let r2 = lock.read(); +/// assert_eq!(*r1, 5); +/// assert_eq!(*r2, 5); +/// } // read locks are dropped at this point +/// +/// // only one write lock may be held, however +/// { +/// let mut w = lock.write(); +/// *w += 1; +/// assert_eq!(*w, 6); +/// } // write lock is dropped here +/// ``` +pub struct RwLock { + phantom: PhantomData, + lock: AtomicUsize, + data: UnsafeCell, +} + +const READER: usize = 1 << 2; +const UPGRADED: usize = 1 << 1; +const WRITER: usize = 1; + +/// A guard that provides immutable data access. +/// +/// When the guard falls out of scope it will decrement the read count, +/// potentially releasing the lock. +pub struct RwLockReadGuard<'a, T: 'a + ?Sized> { + lock: &'a AtomicUsize, + data: *const T, +} + +/// A guard that provides mutable data access. +/// +/// When the guard falls out of scope it will release the lock. +pub struct RwLockWriteGuard<'a, T: 'a + ?Sized, R = Spin> { + phantom: PhantomData, + inner: &'a RwLock, + data: *mut T, +} + +/// A guard that provides immutable data access but can be upgraded to [`RwLockWriteGuard`]. +/// +/// No writers or other upgradeable guards can exist while this is in scope. New reader +/// creation is prevented (to alleviate writer starvation) but there may be existing readers +/// when the lock is acquired. +/// +/// When the guard falls out of scope it will release the lock. +pub struct RwLockUpgradableGuard<'a, T: 'a + ?Sized, R = Spin> { + phantom: PhantomData, + inner: &'a RwLock, + data: *const T, +} + +// Same unsafe impls as `std::sync::RwLock` +unsafe impl Send for RwLock {} +unsafe impl Sync for RwLock {} + +impl RwLock { + /// Creates a new spinlock wrapping the supplied data. + /// + /// May be used statically: + /// + /// ``` + /// use spin; + /// + /// static RW_LOCK: spin::RwLock<()> = spin::RwLock::new(()); + /// + /// fn demo() { + /// let lock = RW_LOCK.read(); + /// // do something with lock + /// drop(lock); + /// } + /// ``` + #[inline] + pub const fn new(data: T) -> Self { + RwLock { + phantom: PhantomData, + lock: AtomicUsize::new(0), + data: UnsafeCell::new(data), + } + } + + /// Consumes this `RwLock`, returning the underlying data. + #[inline] + pub fn into_inner(self) -> T { + // We know statically that there are no outstanding references to + // `self` so there's no need to lock. + let RwLock { data, .. } = self; + data.into_inner() + } + /// Returns a mutable pointer to the underying data. + /// + /// This is mostly meant to be used for applications which require manual unlocking, but where + /// storing both the lock and the pointer to the inner data gets inefficient. + /// + /// While this is safe, writing to the data is undefined behavior unless the current thread has + /// acquired a write lock, and reading requires either a read or write lock. + /// + /// # Example + /// ``` + /// let lock = spin::RwLock::new(42); + /// + /// unsafe { + /// core::mem::forget(lock.write()); + /// + /// assert_eq!(lock.as_mut_ptr().read(), 42); + /// lock.as_mut_ptr().write(58); + /// + /// lock.force_write_unlock(); + /// } + /// + /// assert_eq!(*lock.read(), 58); + /// + /// ``` + #[inline(always)] + pub fn as_mut_ptr(&self) -> *mut T { + self.data.get() + } +} + +impl RwLock { + /// Locks this rwlock with shared read access, blocking the current thread + /// until it can be acquired. + /// + /// The calling thread will be blocked until there are no more writers which + /// hold the lock. There may be other readers currently inside the lock when + /// this method returns. This method does not provide any guarantees with + /// respect to the ordering of whether contentious readers or writers will + /// acquire the lock first. + /// + /// Returns an RAII guard which will release this thread's shared access + /// once it is dropped. + /// + /// ``` + /// let mylock = spin::RwLock::new(0); + /// { + /// let mut data = mylock.read(); + /// // The lock is now locked and the data can be read + /// println!("{}", *data); + /// // The lock is dropped + /// } + /// ``` + #[inline] + pub fn read(&self) -> RwLockReadGuard { + loop { + match self.try_read() { + Some(guard) => return guard, + None => R::relax(), + } + } + } + + /// Lock this rwlock with exclusive write access, blocking the current + /// thread until it can be acquired. + /// + /// This function will not return while other writers or other readers + /// currently have access to the lock. + /// + /// Returns an RAII guard which will drop the write access of this rwlock + /// when dropped. + /// + /// ``` + /// let mylock = spin::RwLock::new(0); + /// { + /// let mut data = mylock.write(); + /// // The lock is now locked and the data can be written + /// *data += 1; + /// // The lock is dropped + /// } + /// ``` + #[inline] + pub fn write(&self) -> RwLockWriteGuard { + loop { + match self.try_write_internal(false) { + Some(guard) => return guard, + None => R::relax(), + } + } + } + + /// Obtain a readable lock guard that can later be upgraded to a writable lock guard. + /// Upgrades can be done through the [`RwLockUpgradableGuard::upgrade`](RwLockUpgradableGuard::upgrade) method. + #[inline] + pub fn upgradeable_read(&self) -> RwLockUpgradableGuard { + loop { + match self.try_upgradeable_read() { + Some(guard) => return guard, + None => R::relax(), + } + } + } +} + +impl RwLock { + // Acquire a read lock, returning the new lock value. + fn acquire_reader(&self) -> usize { + // An arbitrary cap that allows us to catch overflows long before they happen + const MAX_READERS: usize = core::usize::MAX / READER / 2; + + let value = self.lock.fetch_add(READER, Ordering::Acquire); + + if value > MAX_READERS * READER { + self.lock.fetch_sub(READER, Ordering::Relaxed); + panic!("Too many lock readers, cannot safely proceed"); + } else { + value + } + } + + /// Attempt to acquire this lock with shared read access. + /// + /// This function will never block and will return immediately if `read` + /// would otherwise succeed. Returns `Some` of an RAII guard which will + /// release the shared access of this thread when dropped, or `None` if the + /// access could not be granted. This method does not provide any + /// guarantees with respect to the ordering of whether contentious readers + /// or writers will acquire the lock first. + /// + /// ``` + /// let mylock = spin::RwLock::new(0); + /// { + /// match mylock.try_read() { + /// Some(data) => { + /// // The lock is now locked and the data can be read + /// println!("{}", *data); + /// // The lock is dropped + /// }, + /// None => (), // no cigar + /// }; + /// } + /// ``` + #[inline] + pub fn try_read(&self) -> Option> { + let value = self.acquire_reader(); + + // We check the UPGRADED bit here so that new readers are prevented when an UPGRADED lock is held. + // This helps reduce writer starvation. + if value & (WRITER | UPGRADED) != 0 { + // Lock is taken, undo. + self.lock.fetch_sub(READER, Ordering::Release); + None + } else { + Some(RwLockReadGuard { + lock: &self.lock, + data: unsafe { &*self.data.get() }, + }) + } + } + + /// Return the number of readers that currently hold the lock (including upgradable readers). + /// + /// # Safety + /// + /// This function provides no synchronization guarantees and so its result should be considered 'out of date' + /// the instant it is called. Do not use it for synchronization purposes. However, it may be useful as a heuristic. + pub fn reader_count(&self) -> usize { + let state = self.lock.load(Ordering::Relaxed); + state / READER + (state & UPGRADED) / UPGRADED + } + + /// Return the number of writers that currently hold the lock. + /// + /// Because [`RwLock`] guarantees exclusive mutable access, this function may only return either `0` or `1`. + /// + /// # Safety + /// + /// This function provides no synchronization guarantees and so its result should be considered 'out of date' + /// the instant it is called. Do not use it for synchronization purposes. However, it may be useful as a heuristic. + pub fn writer_count(&self) -> usize { + (self.lock.load(Ordering::Relaxed) & WRITER) / WRITER + } + + /// Force decrement the reader count. + /// + /// # Safety + /// + /// This is *extremely* unsafe if there are outstanding `RwLockReadGuard`s + /// live, or if called more times than `read` has been called, but can be + /// useful in FFI contexts where the caller doesn't know how to deal with + /// RAII. The underlying atomic operation uses `Ordering::Release`. + #[inline] + pub unsafe fn force_read_decrement(&self) { + debug_assert!(self.lock.load(Ordering::Relaxed) & !WRITER > 0); + self.lock.fetch_sub(READER, Ordering::Release); + } + + /// Force unlock exclusive write access. + /// + /// # Safety + /// + /// This is *extremely* unsafe if there are outstanding `RwLockWriteGuard`s + /// live, or if called when there are current readers, but can be useful in + /// FFI contexts where the caller doesn't know how to deal with RAII. The + /// underlying atomic operation uses `Ordering::Release`. + #[inline] + pub unsafe fn force_write_unlock(&self) { + debug_assert_eq!(self.lock.load(Ordering::Relaxed) & !(WRITER | UPGRADED), 0); + self.lock.fetch_and(!(WRITER | UPGRADED), Ordering::Release); + } + + #[inline(always)] + fn try_write_internal(&self, strong: bool) -> Option> { + if compare_exchange( + &self.lock, + 0, + WRITER, + Ordering::Acquire, + Ordering::Relaxed, + strong, + ) + .is_ok() + { + Some(RwLockWriteGuard { + phantom: PhantomData, + inner: self, + data: unsafe { &mut *self.data.get() }, + }) + } else { + None + } + } + + /// Attempt to lock this rwlock with exclusive write access. + /// + /// This function does not ever block, and it will return `None` if a call + /// to `write` would otherwise block. If successful, an RAII guard is + /// returned. + /// + /// ``` + /// let mylock = spin::RwLock::new(0); + /// { + /// match mylock.try_write() { + /// Some(mut data) => { + /// // The lock is now locked and the data can be written + /// *data += 1; + /// // The lock is implicitly dropped + /// }, + /// None => (), // no cigar + /// }; + /// } + /// ``` + #[inline] + pub fn try_write(&self) -> Option> { + self.try_write_internal(true) + } + + /// Tries to obtain an upgradeable lock guard. + #[inline] + pub fn try_upgradeable_read(&self) -> Option> { + if self.lock.fetch_or(UPGRADED, Ordering::Acquire) & (WRITER | UPGRADED) == 0 { + Some(RwLockUpgradableGuard { + phantom: PhantomData, + inner: self, + data: unsafe { &*self.data.get() }, + }) + } else { + // We can't unflip the UPGRADED bit back just yet as there is another upgradeable or write lock. + // When they unlock, they will clear the bit. + None + } + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the `RwLock` mutably, no actual locking needs to + /// take place -- the mutable borrow statically guarantees no locks exist. + /// + /// # Examples + /// + /// ``` + /// let mut lock = spin::RwLock::new(0); + /// *lock.get_mut() = 10; + /// assert_eq!(*lock.read(), 10); + /// ``` + pub fn get_mut(&mut self) -> &mut T { + // We know statically that there are no other references to `self`, so + // there's no need to lock the inner lock. + unsafe { &mut *self.data.get() } + } +} + +impl fmt::Debug for RwLock { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.try_read() { + Some(guard) => write!(f, "RwLock {{ data: ") + .and_then(|()| (&*guard).fmt(f)) + .and_then(|()| write!(f, "}}")), + None => write!(f, "RwLock {{ }}"), + } + } +} + +impl Default for RwLock { + fn default() -> Self { + Self::new(Default::default()) + } +} + +impl From for RwLock { + fn from(data: T) -> Self { + Self::new(data) + } +} + +impl<'rwlock, T: ?Sized> RwLockReadGuard<'rwlock, T> { + /// Leak the lock guard, yielding a reference to the underlying data. + /// + /// Note that this function will permanently lock the original lock for all but reading locks. + /// + /// ``` + /// let mylock = spin::RwLock::new(0); + /// + /// let data: &i32 = spin::RwLockReadGuard::leak(mylock.read()); + /// + /// assert_eq!(*data, 0); + /// ``` + #[inline] + pub fn leak(this: Self) -> &'rwlock T { + let this = ManuallyDrop::new(this); + // Safety: We know statically that only we are referencing data + unsafe { &*this.data } + } +} + +impl<'rwlock, T: ?Sized + fmt::Debug> fmt::Debug for RwLockReadGuard<'rwlock, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'rwlock, T: ?Sized + fmt::Display> fmt::Display for RwLockReadGuard<'rwlock, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +impl<'rwlock, T: ?Sized, R: RelaxStrategy> RwLockUpgradableGuard<'rwlock, T, R> { + /// Upgrades an upgradeable lock guard to a writable lock guard. + /// + /// ``` + /// let mylock = spin::RwLock::new(0); + /// + /// let upgradeable = mylock.upgradeable_read(); // Readable, but not yet writable + /// let writable = upgradeable.upgrade(); + /// ``` + #[inline] + pub fn upgrade(mut self) -> RwLockWriteGuard<'rwlock, T, R> { + loop { + self = match self.try_upgrade_internal(false) { + Ok(guard) => return guard, + Err(e) => e, + }; + + R::relax(); + } + } +} + +impl<'rwlock, T: ?Sized, R> RwLockUpgradableGuard<'rwlock, T, R> { + #[inline(always)] + fn try_upgrade_internal(self, strong: bool) -> Result, Self> { + if compare_exchange( + &self.inner.lock, + UPGRADED, + WRITER, + Ordering::Acquire, + Ordering::Relaxed, + strong, + ) + .is_ok() + { + let inner = self.inner; + + // Forget the old guard so its destructor doesn't run (before mutably aliasing data below) + mem::forget(self); + + // Upgrade successful + Ok(RwLockWriteGuard { + phantom: PhantomData, + inner, + data: unsafe { &mut *inner.data.get() }, + }) + } else { + Err(self) + } + } + + /// Tries to upgrade an upgradeable lock guard to a writable lock guard. + /// + /// ``` + /// let mylock = spin::RwLock::new(0); + /// let upgradeable = mylock.upgradeable_read(); // Readable, but not yet writable + /// + /// match upgradeable.try_upgrade() { + /// Ok(writable) => /* upgrade successful - use writable lock guard */ (), + /// Err(upgradeable) => /* upgrade unsuccessful */ (), + /// }; + /// ``` + #[inline] + pub fn try_upgrade(self) -> Result, Self> { + self.try_upgrade_internal(true) + } + + #[inline] + /// Downgrades the upgradeable lock guard to a readable, shared lock guard. Cannot fail and is guaranteed not to spin. + /// + /// ``` + /// let mylock = spin::RwLock::new(1); + /// + /// let upgradeable = mylock.upgradeable_read(); + /// assert!(mylock.try_read().is_none()); + /// assert_eq!(*upgradeable, 1); + /// + /// let readable = upgradeable.downgrade(); // This is guaranteed not to spin + /// assert!(mylock.try_read().is_some()); + /// assert_eq!(*readable, 1); + /// ``` + pub fn downgrade(self) -> RwLockReadGuard<'rwlock, T> { + // Reserve the read guard for ourselves + self.inner.acquire_reader(); + + let inner = self.inner; + + // Dropping self removes the UPGRADED bit + mem::drop(self); + + RwLockReadGuard { + lock: &inner.lock, + data: unsafe { &*inner.data.get() }, + } + } + + /// Leak the lock guard, yielding a reference to the underlying data. + /// + /// Note that this function will permanently lock the original lock. + /// + /// ``` + /// let mylock = spin::RwLock::new(0); + /// + /// let data: &i32 = spin::RwLockUpgradableGuard::leak(mylock.upgradeable_read()); + /// + /// assert_eq!(*data, 0); + /// ``` + #[inline] + pub fn leak(this: Self) -> &'rwlock T { + let this = ManuallyDrop::new(this); + // Safety: We know statically that only we are referencing data + unsafe { &*this.data } + } +} + +impl<'rwlock, T: ?Sized + fmt::Debug, R> fmt::Debug for RwLockUpgradableGuard<'rwlock, T, R> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'rwlock, T: ?Sized + fmt::Display, R> fmt::Display for RwLockUpgradableGuard<'rwlock, T, R> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +impl<'rwlock, T: ?Sized, R> RwLockWriteGuard<'rwlock, T, R> { + /// Downgrades the writable lock guard to a readable, shared lock guard. Cannot fail and is guaranteed not to spin. + /// + /// ``` + /// let mylock = spin::RwLock::new(0); + /// + /// let mut writable = mylock.write(); + /// *writable = 1; + /// + /// let readable = writable.downgrade(); // This is guaranteed not to spin + /// # let readable_2 = mylock.try_read().unwrap(); + /// assert_eq!(*readable, 1); + /// ``` + #[inline] + pub fn downgrade(self) -> RwLockReadGuard<'rwlock, T> { + // Reserve the read guard for ourselves + self.inner.acquire_reader(); + + let inner = self.inner; + + // Dropping self removes the UPGRADED bit + mem::drop(self); + + RwLockReadGuard { + lock: &inner.lock, + data: unsafe { &*inner.data.get() }, + } + } + + /// Downgrades the writable lock guard to an upgradable, shared lock guard. Cannot fail and is guaranteed not to spin. + /// + /// ``` + /// let mylock = spin::RwLock::new(0); + /// + /// let mut writable = mylock.write(); + /// *writable = 1; + /// + /// let readable = writable.downgrade_to_upgradeable(); // This is guaranteed not to spin + /// assert_eq!(*readable, 1); + /// ``` + #[inline] + pub fn downgrade_to_upgradeable(self) -> RwLockUpgradableGuard<'rwlock, T, R> { + debug_assert_eq!( + self.inner.lock.load(Ordering::Acquire) & (WRITER | UPGRADED), + WRITER + ); + + // Reserve the read guard for ourselves + self.inner.lock.store(UPGRADED, Ordering::Release); + + let inner = self.inner; + + // Dropping self removes the UPGRADED bit + mem::forget(self); + + RwLockUpgradableGuard { + phantom: PhantomData, + inner, + data: unsafe { &*inner.data.get() }, + } + } + + /// Leak the lock guard, yielding a mutable reference to the underlying data. + /// + /// Note that this function will permanently lock the original lock. + /// + /// ``` + /// let mylock = spin::RwLock::new(0); + /// + /// let data: &mut i32 = spin::RwLockWriteGuard::leak(mylock.write()); + /// + /// *data = 1; + /// assert_eq!(*data, 1); + /// ``` + #[inline] + pub fn leak(this: Self) -> &'rwlock mut T { + let mut this = ManuallyDrop::new(this); + // Safety: We know statically that only we are referencing data + unsafe { &mut *this.data } + } +} + +impl<'rwlock, T: ?Sized + fmt::Debug, R> fmt::Debug for RwLockWriteGuard<'rwlock, T, R> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'rwlock, T: ?Sized + fmt::Display, R> fmt::Display for RwLockWriteGuard<'rwlock, T, R> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +impl<'rwlock, T: ?Sized> Deref for RwLockReadGuard<'rwlock, T> { + type Target = T; + + fn deref(&self) -> &T { + // Safety: We know statically that only we are referencing data + unsafe { &*self.data } + } +} + +impl<'rwlock, T: ?Sized, R> Deref for RwLockUpgradableGuard<'rwlock, T, R> { + type Target = T; + + fn deref(&self) -> &T { + // Safety: We know statically that only we are referencing data + unsafe { &*self.data } + } +} + +impl<'rwlock, T: ?Sized, R> Deref for RwLockWriteGuard<'rwlock, T, R> { + type Target = T; + + fn deref(&self) -> &T { + // Safety: We know statically that only we are referencing data + unsafe { &*self.data } + } +} + +impl<'rwlock, T: ?Sized, R> DerefMut for RwLockWriteGuard<'rwlock, T, R> { + fn deref_mut(&mut self) -> &mut T { + // Safety: We know statically that only we are referencing data + unsafe { &mut *self.data } + } +} + +impl<'rwlock, T: ?Sized> Drop for RwLockReadGuard<'rwlock, T> { + fn drop(&mut self) { + debug_assert!(self.lock.load(Ordering::Relaxed) & !(WRITER | UPGRADED) > 0); + self.lock.fetch_sub(READER, Ordering::Release); + } +} + +impl<'rwlock, T: ?Sized, R> Drop for RwLockUpgradableGuard<'rwlock, T, R> { + fn drop(&mut self) { + debug_assert_eq!( + self.inner.lock.load(Ordering::Relaxed) & (WRITER | UPGRADED), + UPGRADED + ); + self.inner.lock.fetch_sub(UPGRADED, Ordering::AcqRel); + } +} + +impl<'rwlock, T: ?Sized, R> Drop for RwLockWriteGuard<'rwlock, T, R> { + fn drop(&mut self) { + debug_assert_eq!(self.inner.lock.load(Ordering::Relaxed) & WRITER, WRITER); + + // Writer is responsible for clearing both WRITER and UPGRADED bits. + // The UPGRADED bit may be set if an upgradeable lock attempts an upgrade while this lock is held. + self.inner + .lock + .fetch_and(!(WRITER | UPGRADED), Ordering::Release); + } +} + +#[inline(always)] +fn compare_exchange( + atomic: &AtomicUsize, + current: usize, + new: usize, + success: Ordering, + failure: Ordering, + strong: bool, +) -> Result { + if strong { + atomic.compare_exchange(current, new, success, failure) + } else { + atomic.compare_exchange_weak(current, new, success, failure) + } +} + +#[cfg(feature = "lock_api")] +unsafe impl lock_api_crate::RawRwLock for RwLock<(), R> { + type GuardMarker = lock_api_crate::GuardSend; + + const INIT: Self = Self::new(()); + + #[inline(always)] + fn lock_exclusive(&self) { + // Prevent guard destructor running + core::mem::forget(self.write()); + } + + #[inline(always)] + fn try_lock_exclusive(&self) -> bool { + // Prevent guard destructor running + self.try_write().map(|g| core::mem::forget(g)).is_some() + } + + #[inline(always)] + unsafe fn unlock_exclusive(&self) { + drop(RwLockWriteGuard { + inner: self, + data: &mut (), + phantom: PhantomData, + }); + } + + #[inline(always)] + fn lock_shared(&self) { + // Prevent guard destructor running + core::mem::forget(self.read()); + } + + #[inline(always)] + fn try_lock_shared(&self) -> bool { + // Prevent guard destructor running + self.try_read().map(|g| core::mem::forget(g)).is_some() + } + + #[inline(always)] + unsafe fn unlock_shared(&self) { + drop(RwLockReadGuard { + lock: &self.lock, + data: &(), + }); + } + + #[inline(always)] + fn is_locked(&self) -> bool { + self.lock.load(Ordering::Relaxed) != 0 + } +} + +#[cfg(feature = "lock_api")] +unsafe impl lock_api_crate::RawRwLockUpgrade for RwLock<(), R> { + #[inline(always)] + fn lock_upgradable(&self) { + // Prevent guard destructor running + core::mem::forget(self.upgradeable_read()); + } + + #[inline(always)] + fn try_lock_upgradable(&self) -> bool { + // Prevent guard destructor running + self.try_upgradeable_read() + .map(|g| core::mem::forget(g)) + .is_some() + } + + #[inline(always)] + unsafe fn unlock_upgradable(&self) { + drop(RwLockUpgradableGuard { + inner: self, + data: &(), + phantom: PhantomData, + }); + } + + #[inline(always)] + unsafe fn upgrade(&self) { + let tmp_guard = RwLockUpgradableGuard { + inner: self, + data: &(), + phantom: PhantomData, + }; + core::mem::forget(tmp_guard.upgrade()); + } + + #[inline(always)] + unsafe fn try_upgrade(&self) -> bool { + let tmp_guard = RwLockUpgradableGuard { + inner: self, + data: &(), + phantom: PhantomData, + }; + tmp_guard + .try_upgrade() + .map(|g| core::mem::forget(g)) + .is_ok() + } +} + +#[cfg(feature = "lock_api")] +unsafe impl lock_api_crate::RawRwLockDowngrade for RwLock<(), R> { + unsafe fn downgrade(&self) { + let tmp_guard = RwLockWriteGuard { + inner: self, + data: &mut (), + phantom: PhantomData, + }; + core::mem::forget(tmp_guard.downgrade()); + } +} + +#[cfg(feature = "lock_api1")] +unsafe impl lock_api::RawRwLockUpgradeDowngrade for RwLock<()> { + unsafe fn downgrade_upgradable(&self) { + let tmp_guard = RwLockUpgradableGuard { + inner: self, + data: &(), + phantom: PhantomData, + }; + core::mem::forget(tmp_guard.downgrade()); + } + + unsafe fn downgrade_to_upgradable(&self) { + let tmp_guard = RwLockWriteGuard { + inner: self, + data: &mut (), + phantom: PhantomData, + }; + core::mem::forget(tmp_guard.downgrade_to_upgradeable()); + } +} + +#[cfg(test)] +mod tests { + use std::prelude::v1::*; + + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::mpsc::channel; + use std::sync::Arc; + use std::thread; + + type RwLock = super::RwLock; + + #[derive(Eq, PartialEq, Debug)] + struct NonCopy(i32); + + #[test] + fn smoke() { + let l = RwLock::new(()); + drop(l.read()); + drop(l.write()); + drop((l.read(), l.read())); + drop(l.write()); + } + + // TODO: needs RNG + //#[test] + //fn frob() { + // static R: RwLock = RwLock::new(); + // const N: usize = 10; + // const M: usize = 1000; + // + // let (tx, rx) = channel::<()>(); + // for _ in 0..N { + // let tx = tx.clone(); + // thread::spawn(move|| { + // let mut rng = rand::thread_rng(); + // for _ in 0..M { + // if rng.gen_weighted_bool(N) { + // drop(R.write()); + // } else { + // drop(R.read()); + // } + // } + // drop(tx); + // }); + // } + // drop(tx); + // let _ = rx.recv(); + // unsafe { R.destroy(); } + //} + + #[test] + fn test_rw_arc() { + let arc = Arc::new(RwLock::new(0)); + let arc2 = arc.clone(); + let (tx, rx) = channel(); + + let t = thread::spawn(move || { + let mut lock = arc2.write(); + for _ in 0..10 { + let tmp = *lock; + *lock = -1; + thread::yield_now(); + *lock = tmp + 1; + } + tx.send(()).unwrap(); + }); + + // Readers try to catch the writer in the act + let mut children = Vec::new(); + for _ in 0..5 { + let arc3 = arc.clone(); + children.push(thread::spawn(move || { + let lock = arc3.read(); + assert!(*lock >= 0); + })); + } + + // Wait for children to pass their asserts + for r in children { + assert!(r.join().is_ok()); + } + + // Wait for writer to finish + rx.recv().unwrap(); + let lock = arc.read(); + assert_eq!(*lock, 10); + + assert!(t.join().is_ok()); + } + + #[test] + fn test_rw_access_in_unwind() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || -> () { + struct Unwinder { + i: Arc>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + let mut lock = self.i.write(); + *lock += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.read(); + assert_eq!(*lock, 2); + } + + #[test] + fn test_rwlock_unsized() { + let rw: &RwLock<[i32]> = &RwLock::new([1, 2, 3]); + { + let b = &mut *rw.write(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*rw.read(), comp); + } + + #[test] + fn test_rwlock_try_write() { + use std::mem::drop; + + let lock = RwLock::new(0isize); + let read_guard = lock.read(); + + let write_result = lock.try_write(); + match write_result { + None => (), + Some(_) => assert!( + false, + "try_write should not succeed while read_guard is in scope" + ), + } + + drop(read_guard); + } + + #[test] + fn test_rw_try_read() { + let m = RwLock::new(0); + ::std::mem::forget(m.write()); + assert!(m.try_read().is_none()); + } + + #[test] + fn test_into_inner() { + let m = RwLock::new(NonCopy(10)); + assert_eq!(m.into_inner(), NonCopy(10)); + } + + #[test] + fn test_into_inner_drop() { + struct Foo(Arc); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = RwLock::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_force_read_decrement() { + let m = RwLock::new(()); + ::std::mem::forget(m.read()); + ::std::mem::forget(m.read()); + ::std::mem::forget(m.read()); + assert!(m.try_write().is_none()); + unsafe { + m.force_read_decrement(); + m.force_read_decrement(); + } + assert!(m.try_write().is_none()); + unsafe { + m.force_read_decrement(); + } + assert!(m.try_write().is_some()); + } + + #[test] + fn test_force_write_unlock() { + let m = RwLock::new(()); + ::std::mem::forget(m.write()); + assert!(m.try_read().is_none()); + unsafe { + m.force_write_unlock(); + } + assert!(m.try_read().is_some()); + } + + #[test] + fn test_upgrade_downgrade() { + let m = RwLock::new(()); + { + let _r = m.read(); + let upg = m.try_upgradeable_read().unwrap(); + assert!(m.try_read().is_none()); + assert!(m.try_write().is_none()); + assert!(upg.try_upgrade().is_err()); + } + { + let w = m.write(); + assert!(m.try_upgradeable_read().is_none()); + let _r = w.downgrade(); + assert!(m.try_upgradeable_read().is_some()); + assert!(m.try_read().is_some()); + assert!(m.try_write().is_none()); + } + { + let _u = m.upgradeable_read(); + assert!(m.try_upgradeable_read().is_none()); + } + + assert!(m.try_upgradeable_read().unwrap().try_upgrade().is_ok()); + } +} -- 2.34.1